msgstream

package
v0.0.0-...-1593278 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: Apache-2.0 Imports: 31 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultRepackFunc

func DefaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)

DefaultRepackFunc is used to repack messages after hash by primary key

func DeleteRepackFunc

func DeleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)

DeleteRepackFunc is used to repack messages after hash by primary key

func ExtractCtx

func ExtractCtx(msg TsMsg, properties map[string]string) (context.Context, trace.Span)

ExtractCtx extracts trace span from msg.properties. And it will attach some default tags to the span.

func InjectCtx

func InjectCtx(sc context.Context, properties map[string]string)

InjectCtx is a method inject span to pulsr message.

func InsertRepackFunc

func InsertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)

InsertRepackFunc is used to repack messages after hash by primary key

func MsgSpanFromCtx

func MsgSpanFromCtx(ctx context.Context, msg TsMsg) (context.Context, trace.Span)

MsgSpanFromCtx extracts the span from context. And it will attach some default tags to the span.

func NewMqMsgStream

func NewMqMsgStream(ctx context.Context,
	receiveBufSize int64,
	bufSize int64,
	client mqwrapper.Client,
	unmarshal UnmarshalDispatcher) (*mqMsgStream, error)

NewMqMsgStream is used to generate a new mqMsgStream object

func UnsubscribeChannels

func UnsubscribeChannels(ctx context.Context, factory Factory, subName string, channels []string)

unsubscribeChannels create consumer first, and unsubscribe channel through msgStream.close() TODO use streamnative pulsarctl

Types

type BaseMsg

type BaseMsg struct {
	Ctx            context.Context
	BeginTimestamp Timestamp
	EndTimestamp   Timestamp
	HashValues     []uint32
	MsgPosition    *MsgPosition
}

BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream

func (*BaseMsg) BeginTs

func (bm *BaseMsg) BeginTs() Timestamp

BeginTs returns the begin timestamp of this message pack

func (*BaseMsg) EndTs

func (bm *BaseMsg) EndTs() Timestamp

EndTs returns the end timestamp of this message pack

func (*BaseMsg) HashKeys

func (bm *BaseMsg) HashKeys() []uint32

HashKeys returns the end timestamp of this message pack

func (*BaseMsg) Position

func (bm *BaseMsg) Position() *MsgPosition

Position returns the position of this message pack in msgstream

func (*BaseMsg) SetPosition

func (bm *BaseMsg) SetPosition(position *MsgPosition)

SetPosition is used to set position of this message in msgstream

func (*BaseMsg) SetTraceCtx

func (bm *BaseMsg) SetTraceCtx(ctx context.Context)

SetTraceCtx is used to set context for opentracing

func (*BaseMsg) TraceCtx

func (bm *BaseMsg) TraceCtx() context.Context

TraceCtx returns the context of opentracing

type CommonFactory

type CommonFactory struct {
	Newer             func() (mqwrapper.Client, error) // client constructor
	DispatcherFactory ProtoUDFactory
	ReceiveBufSize    int64
	MQBufSize         int64
}

CommonFactory is a Factory for creating message streams with common logic.

It contains a function field named newer, which is a function that creates an mqwrapper.Client when called.

func (*CommonFactory) NewMsgStream

func (f *CommonFactory) NewMsgStream(ctx context.Context) (ms MsgStream, err error)

NewMsgStream is used to generate a new Msgstream object

func (*CommonFactory) NewMsgStreamDisposer

func (f *CommonFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error

NewMsgStreamDisposer returns a function that can be used to dispose of a message stream. The returned function takes a slice of channel names and a subscription name, and disposes of the message stream associated with those arguments.

func (*CommonFactory) NewQueryMsgStream

func (f *CommonFactory) NewQueryMsgStream(ctx context.Context) (ms MsgStream, err error)

NewQueryMsgStream is used to generate a new QueryMsgstream object

func (*CommonFactory) NewTtMsgStream

func (f *CommonFactory) NewTtMsgStream(ctx context.Context) (ms MsgStream, err error)

NewTtMsgStream is used to generate a new TtMsgstream object

type CreateCollectionMsg

type CreateCollectionMsg struct {
	BaseMsg
	msgpb.CreateCollectionRequest
}

CreateCollectionMsg is a message pack that contains create collection request

func (*CreateCollectionMsg) ID

func (cc *CreateCollectionMsg) ID() UniqueID

ID returns the ID of this message pack

func (*CreateCollectionMsg) Marshal

func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*CreateCollectionMsg) SetID

func (cc *CreateCollectionMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*CreateCollectionMsg) SourceID

func (cc *CreateCollectionMsg) SourceID() int64

SourceID indicates which component generated this message

func (*CreateCollectionMsg) Type

func (cc *CreateCollectionMsg) Type() MsgType

Type returns the type of this message pack

func (*CreateCollectionMsg) Unmarshal

func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type CreatePartitionMsg

type CreatePartitionMsg struct {
	BaseMsg
	msgpb.CreatePartitionRequest
}

CreatePartitionMsg is a message pack that contains create partition request

func (*CreatePartitionMsg) ID

func (cp *CreatePartitionMsg) ID() UniqueID

ID returns the ID of this message pack

func (*CreatePartitionMsg) Marshal

func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*CreatePartitionMsg) SetID

func (cp *CreatePartitionMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*CreatePartitionMsg) SourceID

func (cp *CreatePartitionMsg) SourceID() int64

SourceID indicates which component generated this message

func (*CreatePartitionMsg) Type

func (cp *CreatePartitionMsg) Type() MsgType

Type returns the type of this message pack

func (*CreatePartitionMsg) Unmarshal

func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type DataNodeTtMsg

type DataNodeTtMsg struct {
	BaseMsg
	msgpb.DataNodeTtMsg
}

DataNodeTtMsg is a message pack that contains datanode time tick

func (*DataNodeTtMsg) ID

func (m *DataNodeTtMsg) ID() UniqueID

ID returns the ID of this message pack

func (*DataNodeTtMsg) Marshal

func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*DataNodeTtMsg) SetID

func (m *DataNodeTtMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*DataNodeTtMsg) SourceID

func (m *DataNodeTtMsg) SourceID() int64

SourceID indicates which component generated this message

func (*DataNodeTtMsg) Type

func (m *DataNodeTtMsg) Type() MsgType

Type returns the type of this message pack

func (*DataNodeTtMsg) Unmarshal

func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type DeleteMsg

type DeleteMsg struct {
	BaseMsg
	msgpb.DeleteRequest
}

DeleteMsg is a message pack that contains delete request

func (*DeleteMsg) CheckAligned

func (dt *DeleteMsg) CheckAligned() error

func (*DeleteMsg) ID

func (dt *DeleteMsg) ID() UniqueID

ID returns the ID of this message pack

func (*DeleteMsg) Marshal

func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*DeleteMsg) SetID

func (dt *DeleteMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*DeleteMsg) SourceID

func (dt *DeleteMsg) SourceID() int64

SourceID indicates which component generated this message

func (*DeleteMsg) Type

func (dt *DeleteMsg) Type() MsgType

Type returns the type of this message pack

func (*DeleteMsg) Unmarshal

func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type DropCollectionMsg

type DropCollectionMsg struct {
	BaseMsg
	msgpb.DropCollectionRequest
}

DropCollectionMsg is a message pack that contains drop collection request

func (*DropCollectionMsg) ID

func (dc *DropCollectionMsg) ID() UniqueID

ID returns the ID of this message pack

func (*DropCollectionMsg) Marshal

func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*DropCollectionMsg) SetID

func (dc *DropCollectionMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*DropCollectionMsg) SourceID

func (dc *DropCollectionMsg) SourceID() int64

SourceID indicates which component generated this message

func (*DropCollectionMsg) Type

func (dc *DropCollectionMsg) Type() MsgType

Type returns the type of this message pack

func (*DropCollectionMsg) Unmarshal

func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type DropPartitionMsg

type DropPartitionMsg struct {
	BaseMsg
	msgpb.DropPartitionRequest
}

DropPartitionMsg is a message pack that contains drop partition request

func (*DropPartitionMsg) ID

func (dp *DropPartitionMsg) ID() UniqueID

ID returns the ID of this message pack

func (*DropPartitionMsg) Marshal

func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*DropPartitionMsg) SetID

func (dp *DropPartitionMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*DropPartitionMsg) SourceID

func (dp *DropPartitionMsg) SourceID() int64

SourceID indicates which component generated this message

func (*DropPartitionMsg) Type

func (dp *DropPartitionMsg) Type() MsgType

Type returns the type of this message pack

func (*DropPartitionMsg) Unmarshal

func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type Factory

type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewQueryMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

func NewKmsFactory

func NewKmsFactory(config *paramtable.KafkaConfig) Factory

func NewNatsmqFactory

func NewNatsmqFactory() Factory

NewNatsmqFactory create a new nats-mq factory.

type InsertMsg

type InsertMsg struct {
	BaseMsg
	msgpb.InsertRequest
}

InsertMsg is a message pack that contains insert request

func (*InsertMsg) CheckAligned

func (it *InsertMsg) CheckAligned() error

func (*InsertMsg) ID

func (it *InsertMsg) ID() UniqueID

ID returns the ID of this message pack

func (*InsertMsg) IndexMsg

func (it *InsertMsg) IndexMsg(index int) *InsertMsg

func (*InsertMsg) IndexRequest

func (it *InsertMsg) IndexRequest(index int) msgpb.InsertRequest

func (*InsertMsg) IsColumnBased

func (it *InsertMsg) IsColumnBased() bool

func (*InsertMsg) IsRowBased

func (it *InsertMsg) IsRowBased() bool

func (*InsertMsg) Marshal

func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serialize a message pack to byte array

func (*InsertMsg) NRows

func (it *InsertMsg) NRows() uint64

func (*InsertMsg) SetID

func (it *InsertMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*InsertMsg) SourceID

func (it *InsertMsg) SourceID() int64

SourceID indicates which component generated this message

func (*InsertMsg) Type

func (it *InsertMsg) Type() MsgType

Type returns the type of this message pack

func (*InsertMsg) Unmarshal

func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserialize a message pack from byte array

type IntPrimaryKey

type IntPrimaryKey = typeutil.IntPrimaryKey

IntPrimaryKey is an alias for short

type KmsFactory

type KmsFactory struct {
	ReceiveBufSize int64
	// contains filtered or unexported fields
}

func (*KmsFactory) NewMsgStream

func (f *KmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)

func (*KmsFactory) NewMsgStreamDisposer

func (f *KmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error

func (*KmsFactory) NewQueryMsgStream

func (f *KmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)

func (*KmsFactory) NewTtMsgStream

func (f *KmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)

type MarshalType

type MarshalType = interface{}

MarshalType is an empty interface

type MessageID

type MessageID = mqwrapper.MessageID

MessageID is an alias for short

type MockMqFactory

type MockMqFactory struct {
	Factory
	NewMsgStreamFunc func(ctx context.Context) (MsgStream, error)
}

func NewMockMqFactory

func NewMockMqFactory() *MockMqFactory

func (MockMqFactory) NewMsgStream

func (m MockMqFactory) NewMsgStream(ctx context.Context) (MsgStream, error)

type MockMsgStream

type MockMsgStream struct {
	mock.Mock
}

MockMsgStream is an autogenerated mock type for the MsgStream type

func NewMockMsgStream

func NewMockMsgStream(t mockConstructorTestingTNewMockMsgStream) *MockMsgStream

NewMockMsgStream creates a new instance of MockMsgStream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockMsgStream) AsConsumer

func (_m *MockMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)

AsConsumer provides a mock function with given fields: channels, subName, position

func (*MockMsgStream) AsProducer

func (_m *MockMsgStream) AsProducer(channels []string)

AsProducer provides a mock function with given fields: channels

func (*MockMsgStream) Broadcast

func (_m *MockMsgStream) Broadcast(_a0 *MsgPack) (map[string][]mqwrapper.MessageID, error)

Broadcast provides a mock function with given fields: _a0

func (*MockMsgStream) Chan

func (_m *MockMsgStream) Chan() <-chan *MsgPack

Chan provides a mock function with given fields:

func (*MockMsgStream) CheckTopicValid

func (_m *MockMsgStream) CheckTopicValid(channel string) error

CheckTopicValid provides a mock function with given fields: channel

func (*MockMsgStream) Close

func (_m *MockMsgStream) Close()

Close provides a mock function with given fields:

func (*MockMsgStream) EXPECT

func (_m *MockMsgStream) EXPECT() *MockMsgStream_Expecter

func (*MockMsgStream) GetLatestMsgID

func (_m *MockMsgStream) GetLatestMsgID(channel string) (mqwrapper.MessageID, error)

GetLatestMsgID provides a mock function with given fields: channel

func (*MockMsgStream) GetProduceChannels

func (_m *MockMsgStream) GetProduceChannels() []string

GetProduceChannels provides a mock function with given fields:

func (*MockMsgStream) Produce

func (_m *MockMsgStream) Produce(_a0 *MsgPack) error

Produce provides a mock function with given fields: _a0

func (*MockMsgStream) Seek

func (_m *MockMsgStream) Seek(offset []*msgpb.MsgPosition) error

Seek provides a mock function with given fields: offset

func (*MockMsgStream) SetRepackFunc

func (_m *MockMsgStream) SetRepackFunc(repackFunc RepackFunc)

SetRepackFunc provides a mock function with given fields: repackFunc

type MockMsgStream_AsConsumer_Call

type MockMsgStream_AsConsumer_Call struct {
	*mock.Call
}

MockMsgStream_AsConsumer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsConsumer'

func (*MockMsgStream_AsConsumer_Call) Return

func (*MockMsgStream_AsConsumer_Call) Run

func (*MockMsgStream_AsConsumer_Call) RunAndReturn

type MockMsgStream_AsProducer_Call

type MockMsgStream_AsProducer_Call struct {
	*mock.Call
}

MockMsgStream_AsProducer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AsProducer'

func (*MockMsgStream_AsProducer_Call) Return

func (*MockMsgStream_AsProducer_Call) Run

func (*MockMsgStream_AsProducer_Call) RunAndReturn

type MockMsgStream_Broadcast_Call

type MockMsgStream_Broadcast_Call struct {
	*mock.Call
}

MockMsgStream_Broadcast_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Broadcast'

func (*MockMsgStream_Broadcast_Call) Return

func (*MockMsgStream_Broadcast_Call) Run

func (*MockMsgStream_Broadcast_Call) RunAndReturn

type MockMsgStream_Chan_Call

type MockMsgStream_Chan_Call struct {
	*mock.Call
}

MockMsgStream_Chan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chan'

func (*MockMsgStream_Chan_Call) Return

func (_c *MockMsgStream_Chan_Call) Return(_a0 <-chan *MsgPack) *MockMsgStream_Chan_Call

func (*MockMsgStream_Chan_Call) Run

func (_c *MockMsgStream_Chan_Call) Run(run func()) *MockMsgStream_Chan_Call

func (*MockMsgStream_Chan_Call) RunAndReturn

func (_c *MockMsgStream_Chan_Call) RunAndReturn(run func() <-chan *MsgPack) *MockMsgStream_Chan_Call

type MockMsgStream_CheckTopicValid_Call

type MockMsgStream_CheckTopicValid_Call struct {
	*mock.Call
}

MockMsgStream_CheckTopicValid_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckTopicValid'

func (*MockMsgStream_CheckTopicValid_Call) Return

func (*MockMsgStream_CheckTopicValid_Call) Run

func (*MockMsgStream_CheckTopicValid_Call) RunAndReturn

type MockMsgStream_Close_Call

type MockMsgStream_Close_Call struct {
	*mock.Call
}

MockMsgStream_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'

func (*MockMsgStream_Close_Call) Return

func (*MockMsgStream_Close_Call) Run

func (*MockMsgStream_Close_Call) RunAndReturn

func (_c *MockMsgStream_Close_Call) RunAndReturn(run func()) *MockMsgStream_Close_Call

type MockMsgStream_Expecter

type MockMsgStream_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockMsgStream_Expecter) AsConsumer

func (_e *MockMsgStream_Expecter) AsConsumer(channels interface{}, subName interface{}, position interface{}) *MockMsgStream_AsConsumer_Call

AsConsumer is a helper method to define mock.On call

  • channels []string
  • subName string
  • position mqwrapper.SubscriptionInitialPosition

func (*MockMsgStream_Expecter) AsProducer

func (_e *MockMsgStream_Expecter) AsProducer(channels interface{}) *MockMsgStream_AsProducer_Call

AsProducer is a helper method to define mock.On call

  • channels []string

func (*MockMsgStream_Expecter) Broadcast

func (_e *MockMsgStream_Expecter) Broadcast(_a0 interface{}) *MockMsgStream_Broadcast_Call

Broadcast is a helper method to define mock.On call

  • _a0 *MsgPack

func (*MockMsgStream_Expecter) Chan

Chan is a helper method to define mock.On call

func (*MockMsgStream_Expecter) CheckTopicValid

func (_e *MockMsgStream_Expecter) CheckTopicValid(channel interface{}) *MockMsgStream_CheckTopicValid_Call

CheckTopicValid is a helper method to define mock.On call

  • channel string

func (*MockMsgStream_Expecter) Close

Close is a helper method to define mock.On call

func (*MockMsgStream_Expecter) GetLatestMsgID

func (_e *MockMsgStream_Expecter) GetLatestMsgID(channel interface{}) *MockMsgStream_GetLatestMsgID_Call

GetLatestMsgID is a helper method to define mock.On call

  • channel string

func (*MockMsgStream_Expecter) GetProduceChannels

GetProduceChannels is a helper method to define mock.On call

func (*MockMsgStream_Expecter) Produce

func (_e *MockMsgStream_Expecter) Produce(_a0 interface{}) *MockMsgStream_Produce_Call

Produce is a helper method to define mock.On call

  • _a0 *MsgPack

func (*MockMsgStream_Expecter) Seek

func (_e *MockMsgStream_Expecter) Seek(offset interface{}) *MockMsgStream_Seek_Call

Seek is a helper method to define mock.On call

  • offset []*msgpb.MsgPosition

func (*MockMsgStream_Expecter) SetRepackFunc

func (_e *MockMsgStream_Expecter) SetRepackFunc(repackFunc interface{}) *MockMsgStream_SetRepackFunc_Call

SetRepackFunc is a helper method to define mock.On call

  • repackFunc RepackFunc

type MockMsgStream_GetLatestMsgID_Call

type MockMsgStream_GetLatestMsgID_Call struct {
	*mock.Call
}

MockMsgStream_GetLatestMsgID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestMsgID'

func (*MockMsgStream_GetLatestMsgID_Call) Return

func (*MockMsgStream_GetLatestMsgID_Call) Run

func (*MockMsgStream_GetLatestMsgID_Call) RunAndReturn

type MockMsgStream_GetProduceChannels_Call

type MockMsgStream_GetProduceChannels_Call struct {
	*mock.Call
}

MockMsgStream_GetProduceChannels_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetProduceChannels'

func (*MockMsgStream_GetProduceChannels_Call) Return

func (*MockMsgStream_GetProduceChannels_Call) Run

func (*MockMsgStream_GetProduceChannels_Call) RunAndReturn

type MockMsgStream_Produce_Call

type MockMsgStream_Produce_Call struct {
	*mock.Call
}

MockMsgStream_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'

func (*MockMsgStream_Produce_Call) Return

func (*MockMsgStream_Produce_Call) Run

func (*MockMsgStream_Produce_Call) RunAndReturn

type MockMsgStream_Seek_Call

type MockMsgStream_Seek_Call struct {
	*mock.Call
}

MockMsgStream_Seek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Seek'

func (*MockMsgStream_Seek_Call) Return

func (*MockMsgStream_Seek_Call) Run

func (_c *MockMsgStream_Seek_Call) Run(run func(offset []*msgpb.MsgPosition)) *MockMsgStream_Seek_Call

func (*MockMsgStream_Seek_Call) RunAndReturn

func (_c *MockMsgStream_Seek_Call) RunAndReturn(run func([]*msgpb.MsgPosition) error) *MockMsgStream_Seek_Call

type MockMsgStream_SetRepackFunc_Call

type MockMsgStream_SetRepackFunc_Call struct {
	*mock.Call
}

MockMsgStream_SetRepackFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetRepackFunc'

func (*MockMsgStream_SetRepackFunc_Call) Return

func (*MockMsgStream_SetRepackFunc_Call) Run

func (*MockMsgStream_SetRepackFunc_Call) RunAndReturn

type MqTtMsgStream

type MqTtMsgStream struct {
	// contains filtered or unexported fields
}

MqTtMsgStream is a msgstream that contains timeticks

func NewMqTtMsgStream

func NewMqTtMsgStream(ctx context.Context,
	receiveBufSize int64,
	bufSize int64,
	client mqwrapper.Client,
	unmarshal UnmarshalDispatcher) (*MqTtMsgStream, error)

NewMqTtMsgStream is used to generate a new MqTtMsgStream object

func (*MqTtMsgStream) AsConsumer

func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)

AsConsumerWithPosition subscribes channels as consumer for a MsgStream and seeks to a certain position.

func (MqTtMsgStream) AsProducer

func (ms MqTtMsgStream) AsProducer(channels []string)

AsProducer create producer to send message to channels

func (MqTtMsgStream) Broadcast

func (ms MqTtMsgStream) Broadcast(msgPack *MsgPack) (map[string][]MessageID, error)

BroadcastMark broadcast msg pack to all producers and returns corresponding msg id the returned message id serves as marking

func (*MqTtMsgStream) Chan

func (ms *MqTtMsgStream) Chan() <-chan *MsgPack

func (MqTtMsgStream) CheckTopicValid

func (ms MqTtMsgStream) CheckTopicValid(channel string) error

func (*MqTtMsgStream) Close

func (ms *MqTtMsgStream) Close()

Close will stop goroutine and free internal producers and consumers

func (MqTtMsgStream) ComputeProduceChannelIndexes

func (ms MqTtMsgStream) ComputeProduceChannelIndexes(tsMsgs []TsMsg) [][]int32

func (MqTtMsgStream) GetLatestMsgID

func (ms MqTtMsgStream) GetLatestMsgID(channel string) (MessageID, error)

func (MqTtMsgStream) GetProduceChannels

func (ms MqTtMsgStream) GetProduceChannels() []string

func (MqTtMsgStream) Produce

func (ms MqTtMsgStream) Produce(msgPack *MsgPack) error

func (*MqTtMsgStream) Seek

func (ms *MqTtMsgStream) Seek(msgPositions []*msgpb.MsgPosition) error

Seek to the specified position

func (MqTtMsgStream) SetRepackFunc

func (ms MqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)

type MsgPack

type MsgPack struct {
	BeginTs        Timestamp
	EndTs          Timestamp
	Msgs           []TsMsg
	StartPositions []*MsgPosition
	EndPositions   []*MsgPosition
}

MsgPack represents a batch of msg in msgstream

type MsgPosition

type MsgPosition = msgpb.MsgPosition

MsgPosition is an alias for short

type MsgStream

type MsgStream interface {
	Close()

	AsProducer(channels []string)
	Produce(*MsgPack) error
	SetRepackFunc(repackFunc RepackFunc)
	GetProduceChannels() []string
	Broadcast(*MsgPack) (map[string][]MessageID, error)

	AsConsumer(channels []string, subName string, position mqwrapper.SubscriptionInitialPosition)
	Chan() <-chan *MsgPack
	Seek(offset []*MsgPosition) error

	GetLatestMsgID(channel string) (MessageID, error)
	CheckTopicValid(channel string) error
}

MsgStream is an interface that can be used to produce and consume message on message queue

type MsgType

type MsgType = commonpb.MsgType

MsgType is an alias of commonpb.MsgType

type PmsFactory

type PmsFactory struct {

	// the following members must be public, so that mapstructure.Decode() can access them
	PulsarAddress    string
	PulsarWebAddress string
	ReceiveBufSize   int64
	PulsarBufSize    int64
	PulsarAuthPlugin string
	PulsarAuthParams string
	PulsarTenant     string
	PulsarNameSpace  string
	// contains filtered or unexported fields
}

PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)

func NewPmsFactory

func NewPmsFactory(config *paramtable.PulsarConfig) *PmsFactory

func (*PmsFactory) NewMsgStream

func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error)

NewMsgStream is used to generate a new Msgstream object

func (*PmsFactory) NewMsgStreamDisposer

func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error

func (*PmsFactory) NewQueryMsgStream

func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error)

NewQueryMsgStream is used to generate a new QueryMsgstream object

func (*PmsFactory) NewTtMsgStream

func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error)

NewTtMsgStream is used to generate a new TtMsgstream object

type ProtoUDFactory

type ProtoUDFactory struct{}

ProtoUDFactory is a factory to generate ProtoUnmarshalDispatcher object

func (*ProtoUDFactory) NewUnmarshalDispatcher

func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher

NewUnmarshalDispatcher returns a new UnmarshalDispatcher

type ProtoUnmarshalDispatcher

type ProtoUnmarshalDispatcher struct {
	TempMap map[commonpb.MsgType]UnmarshalFunc
}

ProtoUnmarshalDispatcher is Unmarshal Dispatcher which used for data of proto type

func (*ProtoUnmarshalDispatcher) Unmarshal

func (p *ProtoUnmarshalDispatcher) Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)

Unmarshal will forward unmarshal request to msg type specified unmarshal function

type RepackFunc

type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)

RepackFunc is a function type which used to repack message after hash by primary key

type TimeTickMsg

type TimeTickMsg struct {
	BaseMsg
	msgpb.TimeTickMsg
}

TimeTickMsg is a message pack that contains time tick only

func (*TimeTickMsg) ID

func (tst *TimeTickMsg) ID() UniqueID

ID returns the ID of this message pack

func (*TimeTickMsg) Marshal

func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error)

Marshal is used to serializing a message pack to byte array

func (*TimeTickMsg) SetID

func (tst *TimeTickMsg) SetID(id UniqueID)

SetID set the ID of this message pack

func (*TimeTickMsg) SourceID

func (tst *TimeTickMsg) SourceID() int64

SourceID indicates which component generated this message

func (*TimeTickMsg) Type

func (tst *TimeTickMsg) Type() MsgType

Type returns the type of this message pack

func (*TimeTickMsg) Unmarshal

func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error)

Unmarshal is used to deserializing a message pack from byte array

type Timestamp

type Timestamp = typeutil.Timestamp

Timestamp is an alias for short

type TsMsg

type TsMsg interface {
	TraceCtx() context.Context
	SetTraceCtx(ctx context.Context)
	ID() UniqueID
	SetID(id UniqueID)
	BeginTs() Timestamp
	EndTs() Timestamp
	Type() MsgType
	SourceID() int64
	HashKeys() []uint32
	Marshal(TsMsg) (MarshalType, error)
	Unmarshal(MarshalType) (TsMsg, error)
	Position() *MsgPosition
	SetPosition(*MsgPosition)
}

TsMsg provides methods to get begin timestamp and end timestamp of a message pack

type UniqueID

type UniqueID = typeutil.UniqueID

UniqueID is an alias for short

type UnmarshalDispatcher

type UnmarshalDispatcher interface {
	Unmarshal(input interface{}, msgType commonpb.MsgType) (TsMsg, error)
}

UnmarshalDispatcher is an interface contains method Unmarshal

type UnmarshalDispatcherFactory

type UnmarshalDispatcherFactory interface {
	NewUnmarshalDispatcher() *UnmarshalDispatcher
}

UnmarshalDispatcherFactory is a factory to generate an object which implement interface UnmarshalDispatcher

type UnmarshalFunc

type UnmarshalFunc func(interface{}) (TsMsg, error)

UnmarshalFunc is an interface that has been implemented by each Msg

type UpsertMsg

type UpsertMsg struct {
	InsertMsg *InsertMsg
	DeleteMsg *DeleteMsg
}

///////////////////////////////////////Upsert//////////////////////////////////////////

type WastedMockMsgStream

type WastedMockMsgStream struct {
	MsgStream
	AsProducerFunc    func(channels []string)
	BroadcastMarkFunc func(*MsgPack) (map[string][]MessageID, error)
	BroadcastFunc     func(*MsgPack) error
	ChanFunc          func() <-chan *MsgPack
}

func NewWastedMockMsgStream

func NewWastedMockMsgStream() *WastedMockMsgStream

func (WastedMockMsgStream) AsProducer

func (m WastedMockMsgStream) AsProducer(channels []string)

func (WastedMockMsgStream) Broadcast

func (m WastedMockMsgStream) Broadcast(pack *MsgPack) (map[string][]MessageID, error)

func (WastedMockMsgStream) Chan

func (m WastedMockMsgStream) Chan() <-chan *MsgPack

Directories

Path Synopsis
nmq

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL