tunnel

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 24 Imported by: 1

Documentation

Index

Constants

View Source
const (
	FILE_MAGIC_NUMBER    uint64 = 0xeeeeeeeeee201314
	FILE_PROTOCOL_NUMBER uint32 = 1
	BLOCK_HEADER_SIZE           = 20
)
View Source
const (
	BatchSize = 64
	TableName = "mongoshake_mock.table"
)
View Source
const (
	MagicNumber    = 0xCAFE
	CurrentVersion = 0x01
	HeaderLen      = 12
)

Network packet structure

[ Big-edian ]
Header (12 Bytes)
Body (n Bytes)

[ Header structure ]
-----------------------------------------------------------------------------------
|    magic(2B)    |  version(1B)  |  type(1B)  |  crc32(4B) |  length(4B)  |
-----------------------------------------------------------------------------------
|  0x00201314   |       0x01       |      0x01    |   0xFFFFF  |     4096        |
-----------------------------------------------------------------------------------

[ PacketWrite payload ]
-------------------------------------------------------------------------------------------------------------------------------------------------
|    cksum(4B)    |  tag(4B)  |  shard(4B)  |  compress(4B) |  number(4B)  |  len(4B)  |  log([]byte)  |  len(4B)  |  log([]byte)  |
-------------------------------------------------------------------------------------------------------------------------------------------------

[ PacketGetACK payload ]
--------------|
|    (zero)    |
--------------|

[ PacketReturnACK payload ]
------------------
|    ack(4B)    |
------------------
View Source
const (
	PacketIncomplete uint8 = 0x00
	PacketGetACK     uint8 = 0x01
	PacketWrite      uint8 = 0x02
	PacketReturnACK  uint8 = 0x3

	UndefinedPacketType uint8 = 0x4
)
View Source
const (
	TransferChannel = iota
	RecvAckChannel
	TotalQueueNum
)
View Source
const (
	MsgNormal         = 0x00000000
	MsgRetransmission = 0x00000001
	MsgProbe          = 0x00000010
	MsgResident       = 0x00000100
	MsgPersistent     = 0x00001000
	MsgStorageBackend = 0x00010000
)
View Source
const (
	ReplyOK                     int64 = 0
	ReplyError                  int64 = -1
	ReplyNetworkOpFail          int64 = -2
	ReplyNetworkTimeout         int64 = -3
	ReplyRetransmission         int64 = -4
	ReplyServerFault            int64 = -5
	ReplyChecksumInvalid        int64 = -6
	ReplyCompressorNotSupported int64 = -7
	ReplyDecompressInvalid            = -8
)
View Source
const InitialStageChecking = false
View Source
const NetworkDefaultTimeout = 60 * time.Second
View Source
const (
	OPEN_FILE_FLAGS = os.O_CREATE | os.O_RDWR | os.O_TRUNC
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DataFile

type DataFile struct {
	// contains filtered or unexported fields
}

func (*DataFile) ReadHeader

func (dataFile *DataFile) ReadHeader() *FileHeader

func (*DataFile) WriteHeader

func (dataFile *DataFile) WriteHeader()

type DirectWriter

type DirectWriter struct {
	RemoteAddrs   []string
	ReplayerId    uint32 // equal to worker-id
	BatchExecutor *executor.BatchGroupExecutor
}

func (*DirectWriter) AckRequired

func (writer *DirectWriter) AckRequired() bool

func (*DirectWriter) Name

func (writer *DirectWriter) Name() string

func (*DirectWriter) ParsedLogsRequired

func (writer *DirectWriter) ParsedLogsRequired() bool

func (*DirectWriter) Prepare

func (writer *DirectWriter) Prepare() bool

func (*DirectWriter) Send

func (writer *DirectWriter) Send(message *WMessage) int64

type FakeGenerator

type FakeGenerator struct {
	// contains filtered or unexported fields
}

type FileHeader

type FileHeader struct {
	Magic    uint64
	Protocol uint32
	Checksum uint32
	Reserved [16]byte
}

*

  • File Structure *
  • |----- Header ------|------ OplogBlock ------|------ OplogBlock --------| ......
  • |<--- 32bytes ---->| *

type FileReader

type FileReader struct {
	File string
	// contains filtered or unexported fields
}
func (tunnel *FileReader) Link(relativeReplayer []Replayer) error

type FileWriter

type FileWriter struct {
	// local file folder path
	Local string
	// contains filtered or unexported fields
}

func (*FileWriter) AckRequired

func (tunnel *FileWriter) AckRequired() bool

func (*FileWriter) Name

func (tunnel *FileWriter) Name() string

func (*FileWriter) ParsedLogsRequired

func (tunnel *FileWriter) ParsedLogsRequired() bool

func (*FileWriter) Prepare

func (tunnel *FileWriter) Prepare() bool

func (*FileWriter) Send

func (tunnel *FileWriter) Send(message *WMessage) int64

func (*FileWriter) SyncToDisk

func (tunnel *FileWriter) SyncToDisk()

type KafkaReader

type KafkaReader struct {
	// contains filtered or unexported fields
}
func (tunnel *KafkaReader) Link(replayer []Replayer) error

type KafkaWriter

type KafkaWriter struct {
	RemoteAddr  string
	PartitionId int // write to which partition
	// contains filtered or unexported fields
}

func (*KafkaWriter) AckRequired

func (tunnel *KafkaWriter) AckRequired() bool

KafkaWriter.AckRequired() is always false, return 0 directly

func (*KafkaWriter) Name

func (tunnel *KafkaWriter) Name() string

func (*KafkaWriter) ParsedLogsRequired

func (tunnel *KafkaWriter) ParsedLogsRequired() bool

func (*KafkaWriter) Prepare

func (tunnel *KafkaWriter) Prepare() bool

func (*KafkaWriter) Send

func (tunnel *KafkaWriter) Send(message *WMessage) int64

func (*KafkaWriter) String

func (tunnel *KafkaWriter) String() string

type ListenSocket

type ListenSocket struct {
	// contains filtered or unexported fields
}

type MockReader

type MockReader struct {
	// contains filtered or unexported fields
}
func (tunnel *MockReader) Link(replayer []Replayer) error

type MockWriter

type MockWriter struct {
}

func (*MockWriter) AckRequired

func (tunnel *MockWriter) AckRequired() bool

func (*MockWriter) Name

func (tunnel *MockWriter) Name() string

func (*MockWriter) ParsedLogsRequired

func (tunnel *MockWriter) ParsedLogsRequired() bool

func (*MockWriter) Prepare

func (tunnel *MockWriter) Prepare() bool

func (*MockWriter) Send

func (tunnel *MockWriter) Send(message *WMessage) int64

type Packet

type Packet struct {
	// contains filtered or unexported fields
}

func NewPacketV1

func NewPacketV1(packetType uint8, payload []byte) *Packet

func (*Packet) String

func (packet *Packet) String() string

type RPCReader

type RPCReader struct {
	// contains filtered or unexported fields
}
func (tunnel *RPCReader) Link(replayers []Replayer) (err error)

type RPCWriter

type RPCWriter struct {
	RemoteAddr string
	// contains filtered or unexported fields
}

func (*RPCWriter) AckRequired

func (tunnel *RPCWriter) AckRequired() bool

func (*RPCWriter) Name

func (tunnel *RPCWriter) Name() string

func (*RPCWriter) ParsedLogsRequired

func (tunnel *RPCWriter) ParsedLogsRequired() bool

func (*RPCWriter) Prepare

func (tunnel *RPCWriter) Prepare() bool

func (*RPCWriter) Send

func (tunnel *RPCWriter) Send(message *WMessage) int64

type Reader

type Reader interface {
	/**
	 * Bridge of tunnel reader and aggregater(replayer)
	 *
	 */
	Link(aggregate []Replayer) error
}

type ReaderFactory

type ReaderFactory struct {
	Name string
}

func (*ReaderFactory) Create

func (factory *ReaderFactory) Create(address string) Reader

create specific Tunnel with tunnel name and pass connection or usefully meta

type Replayer

type Replayer interface {
	/**
	 * Replay oplog entry with batched Oplog
	 *
	 */
	Sync(message *TMessage, completion func()) int64

	/**
	 * Ack offset value
	 *
	 */
	GetAcked() int64
}

type TCPReader

type TCPReader struct {
	// contains filtered or unexported fields
}
func (reader *TCPReader) Link(replayer []Replayer) (err error)

type TCPWriter

type TCPWriter struct {
	RemoteAddr string
	// contains filtered or unexported fields
}

func (*TCPWriter) AckRequired

func (writer *TCPWriter) AckRequired() bool

func (*TCPWriter) Name

func (tunnel *TCPWriter) Name() string

func (*TCPWriter) ParsedLogsRequired

func (writer *TCPWriter) ParsedLogsRequired() bool

func (*TCPWriter) Prepare

func (writer *TCPWriter) Prepare() bool

func (*TCPWriter) Send

func (writer *TCPWriter) Send(message *WMessage) int64

type TMessage

type TMessage struct {
	Checksum uint32
	Tag      uint32
	Shard    uint32
	Compress uint32
	RawLogs  [][]byte
}

func (*TMessage) ApproximateSize

func (msg *TMessage) ApproximateSize() uint64

func (*TMessage) Crc32

func (msg *TMessage) Crc32() uint32

func (*TMessage) FromBytes

func (msg *TMessage) FromBytes(buf []byte, order binary.ByteOrder)

func (*TMessage) String

func (msg *TMessage) String() string

func (*TMessage) ToBytes

func (msg *TMessage) ToBytes(order binary.ByteOrder) []byte

type TcpSocket

type TcpSocket struct {
	// contains filtered or unexported fields
}

type TunnelRPC

type TunnelRPC struct {
}

func (*TunnelRPC) Transfer

func (rpc *TunnelRPC) Transfer(message *TMessage, response *int64) error

type WMessage

type WMessage struct {
	*TMessage                      // whole raw log
	ParsedLogs []*oplog.PartialLog // parsed log
}

WMessage wrapped TMessage

type Writer

type Writer interface {
	/**
	 * Indicate weather this tunnel cares about ACK feedback value.
	 * Like RPC_TUNNEL (ack required is true), it's asynchronous and
	 * needs peer receiver has completely consumed the log entries
	 * and we can drop the reserved log entries only if the log entry
	 * ACK is confirmed
	 */
	AckRequired() bool

	/**
	 * prepare stage of the tunnel such as create the network connection or initialize
	 * something etc before the Send() invocation.
	 * return true on successful or false on failed
	 */
	Prepare() bool

	/**
	 * write the real tunnel message to tunnel.
	 *
	 * return the right ACK offset value with positive number. if AckRequired is set
	 * this ACk offset is used to purge buffered oplogs. Otherwise upper layer use
	 * the max oplog ts as ACK offset and discard the returned value (ACK offset).
	 * error on returning a negative number
	 */
	Send(message *WMessage) int64

	/**
	 * whether need parsed log or raw log
	 */
	ParsedLogsRequired() bool

	/*
	 * tunnel name
	 */
	Name() string
}

type WriterFactory

type WriterFactory struct {
	Name string
}

func (*WriterFactory) Create

func (factory *WriterFactory) Create(address []string, workerId uint32) Writer

create specific Tunnel with tunnel name and pass connection or usefully meta

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL