broker

package
v0.0.0-...-433abdc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WorkerTypeConcurrent = 1
	WorkerTypeSerial     = 2
	WorkerTypeAsync      = 3
)

Variables

This section is empty.

Functions

func ExitBrokers

func ExitBrokers()

func InitBroker

func InitBroker()

func SetWorker

func SetWorker(workerId string, worker *Worker)

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func GetOrCreateBroker

func GetOrCreateBroker(topic string) *Broker

func NewBroker

func NewBroker(topic string) *Broker

func (*Broker) Flush

func (b *Broker) Flush(itemList []*Item, dataSize int)

func (*Broker) GetConsumeWorker

func (b *Broker) GetConsumeWorker(topic, group string, workerId string) (*Worker, error)

func (*Broker) Send

func (b *Broker) Send(item *Item) error

func (*Broker) Stop

func (b *Broker) Stop()

type Endian

type Endian struct {
	// contains filtered or unexported fields
}

type IBackendWriter

type IBackendWriter interface {
	Write(item *Item)
	SetFinish(item *Item)
}

type Item

type Item struct {
	Sequence  uint64
	HashCode  uint64
	CreatedAt uint32
	//Partition uint32
	Offset uint32
	Size   uint32

	RetryCount uint32
	DelayType  uint32
	DelayValue uint32
	Priority   int64
	Data       []byte
}

func (*Item) FillData

func (i *Item) FillData(dataBuf []byte) error

func (*Item) FillIndex

func (i *Item) FillIndex(buf []byte) error

func (*Item) Marshal2Data

func (i *Item) Marshal2Data(buf []byte)

func (*Item) Marshal2Index

func (i *Item) Marshal2Index(buf []byte)

type MinHeap

type MinHeap []*MinHeapElement

MinHeap A PriorityQueue implements heap.Interface and holds Items.

func NewMinHeap

func NewMinHeap() *MinHeap

func (MinHeap) Len

func (mh MinHeap) Len() int

func (MinHeap) Less

func (mh MinHeap) Less(i, j int) bool

func (*MinHeap) PeekEl

func (mh *MinHeap) PeekEl() *MinHeapElement

func (*MinHeap) Pop

func (mh *MinHeap) Pop() interface{}

func (*MinHeap) PopEl

func (mh *MinHeap) PopEl() *MinHeapElement

func (*MinHeap) Push

func (mh *MinHeap) Push(x interface{})

func (*MinHeap) PushEl

func (mh *MinHeap) PushEl(el *MinHeapElement)

func (*MinHeap) RemoveEl

func (mh *MinHeap) RemoveEl(el *MinHeapElement)

func (MinHeap) Swap

func (mh MinHeap) Swap(i, j int)

func (*MinHeap) UpdateEl

func (mh *MinHeap) UpdateEl(el *MinHeapElement, priority int64)

UpdateEl update modifies the priority and value of an Item in the queue.

type MinHeapElement

type MinHeapElement struct {
	Value    interface{}
	Priority int64 // The priority of the item in the queue.
	// contains filtered or unexported fields
}

An MinHeapElement is something we manage in a priority queue.

type MsgPayload

type MsgPayload struct {
	//Ctx                  *MsgPayload_Context `protobuf:"bytes,1,opt,name=ctx" json:"ctx,omitempty"`
	MsgId string `protobuf:"bytes,2,opt,name=msg_id,json=msgId" json:"msg_id,omitempty"`
	Data  []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`

	DelayType  uint32
	DelayValue uint32
}

func DecodeMsgPayload

func DecodeMsgPayload(buf []byte) (*MsgPayload, error)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func GetWorker

func GetWorker(workerId string) *Worker

func NewWorker

func NewWorker(finishChan *chan uint64) *Worker

func (*Worker) NextItem

func (w *Worker) NextItem(exit *bool) *Item

func (*Worker) ReceiveItem

func (w *Worker) ReceiveItem(msg *Item)

func (*Worker) Stop

func (w *Worker) Stop()

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func NewWorkerGroup

func NewWorkerGroup(topic string, cfg *config.SubCfg) *WorkerGroup

func (*WorkerGroup) GetOrCreateWorker

func (g *WorkerGroup) GetOrCreateWorker(workerId string) *Worker

func (*WorkerGroup) RemoveWorker

func (g *WorkerGroup) RemoveWorker(workerId string)

func (*WorkerGroup) SetLatestFileSequence

func (g *WorkerGroup) SetLatestFileSequence(sequence uint64)

func (*WorkerGroup) Start

func (g *WorkerGroup) Start() error

func (*WorkerGroup) Stop

func (g *WorkerGroup) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL