server

package
v0.0.0-...-f0f27be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMemoryStorage

func NewMemoryStorage() *memoryStorage

NewMemoryStorage returns storage backed by slice, Item id is an index in slice

func NewProcessFn

func NewProcessFn(id int, storage Storage, logFile *os.File) func(*message.Any)

NewProcessFn creates new processing function for messages

func NewRWLockedStorage

func NewRWLockedStorage(storage Storage) *rwLockedStorage

NewRWLockedStorage returns storage which adds read/write mutex to upstream storage

Types

type Item

type Item struct {
	K string
	V string
}

Item represents stored item

type MessageChan

type MessageChan chan *message.Any

MessageChan is a channel of messages

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor allows to process incoming messages with given processing function

func NewProcessor

func NewProcessor(messages MessageChan) *Processor

NewProcessor creates new processor

func (*Processor) Run

func (s *Processor) Run(ctx context.Context, processFn func(*message.Any))

Run runs processing, can be stopped with context's cancel function

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is responsible for reading ordered from SQS and passing it to messages channel

func NewReader

func NewReader(sqsClient *sqs.Client, queueUrl string, messages MessageChan) *Reader

NewReader creates new reader

func (*Reader) Run

func (s *Reader) Run(ctx context.Context, waitTimeSeconds int32)

Run runs reading, can be stopped with context's cancel function

type Storage

type Storage interface {
	// AddItem adds Item to storage
	AddItem(item Item)
	// RemoveItem removes Item from storage
	RemoveItem(key string) error
	// GetItem returns Item with given id from storage
	GetItem(key string) (*Item, error)
	// GetAllItems returns items in storage, new slice created every time
	GetAllItems() []Item
	// Iterate allows to iterato over ordered in storage, can be used for processing which does not involve blocking IO
	Iterate(accept func(Item))
}

Storage defines interface for ordered storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL