datastreamer

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: AGPL-3.0, AGPL-3.0-or-later Imports: 13 Imported by: 12

Documentation

Index

Constants

View Source
const (
	PageHeaderSize = 4096        // PageHeaderSize is the size of header page (4 KB)
	PageDataSize   = 1024 * 1024 // PageDataSize is the size of one data page (1 MB)

	PtPadding = 0    // PtPadding is packet type for pad
	PtHeader  = 1    // PtHeader is packet type just for the header page
	PtData    = 2    // PtData is packet type for data entry
	PtDataRsp = 0xfe // PtDataRsp is packet type for command response with data
	PtResult  = 0xff // PtResult is packet type not stored/present in file (just for client command result)

	EtBookmark = 0xb0 // EtBookmark is entry type for bookmarks

	FixedSizeFileEntry   = 17 // FixedSizeFileEntry is the fixed size in bytes for a data file entry (1+4+4+8)
	FixedSizeResultEntry = 9  // FixedSizeResultEntry is the fixed size in bytes for a result entry (1+4+4)
)
View Source
const EntryTypeNotFound = math.MaxUint32

EntryTypeNotFound is the entry type value for CmdEntry/CmdBookmark when entry/bookmark not found

Variables

View Source
var (
	// ErrInvalidCommand is returned when the command is invalid
	ErrInvalidCommand = fmt.Errorf("invalid command")
	// ErrResultCommandError is returned when the command is invalid
	ErrResultCommandError = fmt.Errorf("result command error")
	// ErrNilConnection is returned when the connection is nil
	ErrNilConnection = fmt.Errorf("nil connection")
	// ErrReadingDataEntry is returned when there is an error reading data entry
	ErrReadingDataEntry = fmt.Errorf("error reading data entry")
	// ErrReadingResultEntry is returned when there is an error reading result entry
	ErrReadingResultEntry = fmt.Errorf("error reading result entry")
	// ErrGettingHeaderInfo is returned when there is an error getting header info
	ErrGettingHeaderInfo = fmt.Errorf("error getting header info")
	// ErrInvalidBinaryHeader is returned when the binary header is invalid
	ErrInvalidBinaryHeader = fmt.Errorf("invalid binary header info")
	// ErrInvalidFileMissingHeaderPage is returned when the file is invalid, missing header page
	ErrInvalidFileMissingHeaderPage = fmt.Errorf("invalid file, missing header page")
	// ErrBadFileSizeCutDataPage is returned when the file size is bad, cut data page
	ErrBadFileSizeCutDataPage = fmt.Errorf("bad file size, cut data page")
	// ErrBadFileFormat is returned when the file format is bad
	ErrBadFileFormat = fmt.Errorf("bad file format")
	// ErrInvalidHeaderBadPacketType is returned when the header is invalid, bad packet type
	ErrInvalidHeaderBadPacketType = fmt.Errorf("invalid header, bad packet type")
	// ErrInvalidHeaderBadHeaderLength is returned when the header is invalid, bad header length
	ErrInvalidHeaderBadHeaderLength = fmt.Errorf("invalid header, bad header length")
	// ErrInvalidHeaderBadStreamType is returned when the header is invalid, bad stream type
	ErrInvalidHeaderBadStreamType = fmt.Errorf("invalid header, bad stream type")
	// ErrInvalidBinaryEntry is returned when the binary entry is invalid
	ErrInvalidBinaryEntry = fmt.Errorf("invalid binary entry")
	// ErrDecodingBinaryDataEntry is returned when there is an error decoding binary data entry
	ErrDecodingBinaryDataEntry = fmt.Errorf("error decoding binary data entry")
	// ErrExpectingPacketTypeData is returned when there is an error expecting packet type data
	ErrExpectingPacketTypeData = fmt.Errorf("expecting packet type data")
	// ErrDecodingLengthDataEntry is returned when there is an error decoding length data entry
	ErrDecodingLengthDataEntry = fmt.Errorf("error decoding length data entry")
	// ErrPageNotStartingWithEntryData is returned when the page is not starting with entry data
	ErrPageNotStartingWithEntryData = fmt.Errorf("page not starting with entry data")
	// ErrCurrentPositionOutsideDataPage is returned when the current position is outside data page
	ErrCurrentPositionOutsideDataPage = fmt.Errorf("current position outside data page")
	// ErrEntryNotFound is returned when the entry is not found
	ErrEntryNotFound = fmt.Errorf("entry not found")
	// ErrInvalidEntryNumberNotCommittedInFile is returned when the entry number is invalid, not committed in the file
	ErrInvalidEntryNumberNotCommittedInFile = fmt.Errorf("invalid entry number, not committed in the file")
	// ErrEntryNumberMismatch is returned when the entry number doesn't match
	ErrEntryNumberMismatch = fmt.Errorf("entry number doesn't match")
	// ErrUpdateEntryTypeNotAllowed is returned when the update entry type is not allowed
	ErrUpdateEntryTypeNotAllowed = fmt.Errorf("update entry to a different entry type not allowed")
	// ErrUpdateEntryDifferentSize is returned when the update entry is a different size
	ErrUpdateEntryDifferentSize = fmt.Errorf("update entry to a different size not allowed")
	// ErrAtomicOpNotAllowed is returned when the atomic operation is not allowed
	ErrAtomicOpNotAllowed = fmt.Errorf("atomicop not allowed, server is not started")
	// ErrStartAtomicOpNotAllowed is returned when the start atomic operation is not allowed
	ErrStartAtomicOpNotAllowed = fmt.Errorf("start atomicop not allowed, atomicop already started")
	// ErrAddEntryNotAllowed is returned when the add entry is not allowed
	ErrAddEntryNotAllowed = fmt.Errorf("add entry not allowed, atomicop is not started")
	// ErrCommitNotAllowed is returned when the commit is not allowed
	ErrCommitNotAllowed = fmt.Errorf("commit not allowed, atomicop not in started state")
	// ErrRollbackNotAllowed is returned when the rollback is not allowed
	ErrRollbackNotAllowed = fmt.Errorf("rollback not allowed, atomicop not in started state")
	// ErrInvalidEntryNumber is returned when the entry number is invalid
	ErrInvalidEntryNumber = fmt.Errorf("invalid entry number, doesn't exist")
	// ErrUpdateNotAllowed is returned when the update is not allowed
	ErrUpdateNotAllowed = fmt.Errorf("update not allowed, it's in current atomic operation")
	// ErrClientAlreadyStarted is returned when the client is already started
	ErrClientAlreadyStarted = fmt.Errorf("client already started")
	// ErrClientAlreadyStopped is returned when the client is already stopped
	ErrClientAlreadyStopped = fmt.Errorf("client already stopped")
	// ErrHeaderCommandNotAllowed is returned when the header command is not allowed
	ErrHeaderCommandNotAllowed = fmt.Errorf("header command not allowed")
	// ErrEntryCommandNotAllowed is returned when the entry command is not allowed
	ErrEntryCommandNotAllowed = fmt.Errorf("entry command not allowed")
	// ErrStartCommandInvalidParamFromEntry is returned when the start command is invalid, param from entry
	ErrStartCommandInvalidParamFromEntry = fmt.Errorf("start command invalid param from entry")
	// ErrStartBookmarkInvalidParamFromBookmark is returned when the start bookmark is invalid, param from bookmark
	ErrStartBookmarkInvalidParamFromBookmark = fmt.Errorf("start bookmark invalid param from bookmark")
	// ErrInvalidBinaryResultEntry is returned when the binary result entry is invalid
	ErrInvalidBinaryResultEntry = fmt.Errorf("invalid binary result entry")
	// ErrDecodingBinaryResultEntry is returned when there is an error decoding binary result entry
	ErrDecodingBinaryResultEntry = fmt.Errorf("error decoding binary result entry")
	// ErrTruncateNotAllowed is returned when there is an atomic operation in progress
	ErrTruncateNotAllowed = fmt.Errorf("truncate not allowed, atomic operation in progress")
	// ErrBookmarkCommandNotAllowed is returned when the bookmark command is not allowed
	ErrBookmarkCommandNotAllowed = fmt.Errorf("bookmark command not allowed")
	// ErrExecCommandNotAllowed is returned when execute TCP command is not allowed
	ErrExecCommandNotAllowed = fmt.Errorf("execute command not allowed, client is not started")
	// ErrBookmarkNotFound is returned when the bookmark is not found
	ErrBookmarkNotFound = fmt.Errorf("bookmark not found")
	// ErrBookmarkMaxLength is returned when the bookmark length exceeds maximum length
	ErrBookmarkMaxLength = fmt.Errorf("bookmark max length")
	// ErrInvalidBookmarkRange is returned when the bookmark range is invalid
	ErrInvalidBookmarkRange = fmt.Errorf("invalid bookmark range")
)
View Source
var (
	// StrClientStatus for client status description
	StrClientStatus = map[ClientStatus]string{
					// contains filtered or unexported fields
	}

	// StrCommand for TCP commands description
	StrCommand = map[Command]string{
		CmdStart:         "Start",
		CmdStop:          "Stop",
		CmdHeader:        "Header",
		CmdStartBookmark: "StartBookmark",
		CmdEntry:         "Entry",
		CmdBookmark:      "Bookmark",
	}

	// StrCommandErrors for TCP command errors description
	StrCommandErrors = map[CommandError]string{
		CmdErrOK:              "OK",
		CmdErrAlreadyStarted:  "Already started",
		CmdErrAlreadyStopped:  "Already stopped",
		CmdErrBadFromEntry:    "Bad from entry",
		CmdErrBadFromBookmark: "Bad from bookmark",
		CmdErrInvalidCommand:  "Invalid command",
	}
)

Functions

func PrintHeaderEntry added in v0.1.9

func PrintHeaderEntry(e HeaderEntry, title string)

PrintHeaderEntry prints file header information

func PrintReceivedEntry added in v0.1.0

func PrintReceivedEntry(e *FileEntry, c *StreamClient, s *StreamServer) error

PrintReceivedEntry prints received entry (default callback function)

func PrintResultEntry

func PrintResultEntry(e ResultEntry)

PrintResultEntry prints result entry type

Types

type AOStatus

type AOStatus uint64

AOStatus type for the atomic operation internal states

type ClientStatus

type ClientStatus uint64

ClientStatus type for the status of the client

type Command

type Command uint64

Command type for the TCP client commands

const (
	CmdStart         Command = iota + 1 // CmdStart for the start from entry TCP client command
	CmdStop                             // CmdStop for the stop TCP client command
	CmdHeader                           // CmdHeader for the header TCP client command
	CmdStartBookmark                    // CmdStartBookmark for the start from bookmark TCP client command
	CmdEntry                            // CmdEntry for the get entry TCP client command
	CmdBookmark                         // CmdBookmark for the get bookmark TCP client command
)

func (Command) IsACommand added in v0.1.4

func (c Command) IsACommand() bool

IsACommand checks if a command is a valid command

type CommandError

type CommandError uint32

CommandError type for the command responses

const (
	CmdErrOK              CommandError = iota // CmdErrOK for no error
	CmdErrAlreadyStarted                      // CmdErrAlreadyStarted for client already started error
	CmdErrAlreadyStopped                      // CmdErrAlreadyStopped for client already stopped error
	CmdErrBadFromEntry                        // CmdErrBadFromEntry for invalid starting entry number
	CmdErrBadFromBookmark                     // CmdErrBadFromBookmark for invalid starting bookmark
	CmdErrInvalidCommand  CommandError = 9    // CmdErrInvalidCommand for invalid/unknown command error
)

type Config

type Config struct {
	// Port to listen on
	Port uint16 `mapstructure:"Port"`
	// Filename of the binary data file
	Filename string `mapstructure:"Filename"`
	// Log
	Log log.Config `mapstructure:"Log"`
}

Config type for datastreamer server

type EntryType

type EntryType uint32

EntryType type for the entry event types

type FileEntry

type FileEntry struct {
	Length uint32    // Total length of the entry (17 bytes + length(data))
	Type   EntryType // 0xb0:Bookmark, 1:Event1, 2:Event2,...
	Number uint64    // Entry number (sequential starting with 0)
	Data   []byte
	// contains filtered or unexported fields
}

FileEntry type for a data file entry

func DecodeBinaryToFileEntry

func DecodeBinaryToFileEntry(b []byte) (FileEntry, error)

DecodeBinaryToFileEntry decodes from binary bytes slice to file entry type

func (FileEntry) Encode added in v0.2.2

func (e FileEntry) Encode() []byte

Encode encodes the file entry to binary bytes

type HeaderEntry

type HeaderEntry struct {
	Version  uint8  // Stream file version
	SystemID uint64 // System identifier (e.g. ChainID)

	TotalLength  uint64 // Total bytes used in the file
	TotalEntries uint64 // Total number of data entries (packet type PtData)
	// contains filtered or unexported fields
}

HeaderEntry type for a header entry

type ProcessEntryFunc added in v0.1.0

type ProcessEntryFunc func(*FileEntry, *StreamClient, *StreamServer) error

ProcessEntryFunc type of the callback function to process the received entry

type ResultEntry

type ResultEntry struct {
	// contains filtered or unexported fields
}

ResultEntry type for a result entry

func DecodeBinaryToResultEntry

func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error)

DecodeBinaryToResultEntry decodes from binary bytes slice to a result entry type

type StreamBookmark added in v0.0.9

type StreamBookmark struct {
	// contains filtered or unexported fields
}

StreamBookmark type to manage index of bookmarks

func NewBookmark added in v0.1.9

func NewBookmark(fn string) (*StreamBookmark, error)

NewBookmark creates bookmark struct and opens or creates the bookmark database

func (*StreamBookmark) AddBookmark added in v0.0.9

func (b *StreamBookmark) AddBookmark(bookmark []byte, entryNum uint64) error

AddBookmark inserts or updates a bookmark

func (*StreamBookmark) GetBookmark added in v0.0.9

func (b *StreamBookmark) GetBookmark(bookmark []byte) (uint64, error)

GetBookmark gets a bookmark value

func (*StreamBookmark) PrintDump added in v0.0.9

func (b *StreamBookmark) PrintDump() error

PrintDump prints all bookmarks stored in the database

type StreamClient

type StreamClient struct {
	Id string // Client id
	// contains filtered or unexported fields
}

StreamClient type to manage a data stream client

func NewClient

func NewClient(server string, streamType StreamType) (*StreamClient, error)

NewClient creates a new data stream client

func NewClientWithLogsConfig added in v0.2.1

func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) (*StreamClient, error)

NewClientWithLogsConfig creates a new data stream client with logs configuration

func (*StreamClient) ExecCommandGetBookmark added in v0.2.0

func (c *StreamClient) ExecCommandGetBookmark(fromBookmark []byte) (FileEntry, error)

ExecCommandGetBookmark executes client TCP command to get a bookmark

func (*StreamClient) ExecCommandGetEntry added in v0.2.0

func (c *StreamClient) ExecCommandGetEntry(fromEntry uint64) (FileEntry, error)

ExecCommandGetEntry executes client TCP command to get an entry

func (*StreamClient) ExecCommandGetHeader added in v0.2.0

func (c *StreamClient) ExecCommandGetHeader() (HeaderEntry, error)

ExecCommandGetHeader executes client TCP command to get the header

func (*StreamClient) ExecCommandStart added in v0.2.0

func (c *StreamClient) ExecCommandStart(fromEntry uint64) error

ExecCommandStart executes client TCP command to start streaming from entry

func (*StreamClient) ExecCommandStartBookmark added in v0.2.0

func (c *StreamClient) ExecCommandStartBookmark(fromBookmark []byte) error

ExecCommandStartBookmark executes client TCP command to start streaming from bookmark

func (*StreamClient) ExecCommandStop added in v0.2.0

func (c *StreamClient) ExecCommandStop() error

ExecCommandStop executes client TCP command to stop streaming

func (*StreamClient) GetFromStream added in v0.2.0

func (c *StreamClient) GetFromStream() uint64

GetFromStream returns streaming start entry number from the latest start command executed

func (*StreamClient) GetTotalEntries added in v0.2.0

func (c *StreamClient) GetTotalEntries() uint64

GetTotalEntries returns total entries number from the latest header command executed

func (*StreamClient) SetProcessEntryFunc added in v0.1.0

func (c *StreamClient) SetProcessEntryFunc(f ProcessEntryFunc)

SetProcessEntryFunc sets the callback function to process entry

func (*StreamClient) Start

func (c *StreamClient) Start() error

Start connects to the data stream server and starts getting data from the server

type StreamFile

type StreamFile struct {
	// contains filtered or unexported fields
}

StreamFile type to manage a binary stream file

func NewStreamFile added in v0.1.9

func NewStreamFile(fn string, version uint8, systemID uint64, st StreamType) (*StreamFile, error)

NewStreamFile creates stream file struct and opens or creates the stream binary data file

func (*StreamFile) AddFileEntry

func (f *StreamFile) AddFileEntry(e FileEntry) error

AddFileEntry writes new data entry to the data stream file

type StreamRelay added in v0.1.0

type StreamRelay struct {
	// contains filtered or unexported fields
}

StreamRelay type to manage a data stream relay

func NewRelay added in v0.1.0

func NewRelay(server string, port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, cfg *log.Config) (*StreamRelay, error)

NewRelay creates a new data stream relay

func (*StreamRelay) Start added in v0.1.0

func (r *StreamRelay) Start() error

Start connects and syncs with master server then opens access to relay clients

type StreamServer

type StreamServer struct {
	// contains filtered or unexported fields
}

StreamServer type to manage a data stream server

func NewServer added in v0.1.0

func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, cfg *log.Config) (*StreamServer, error)

NewServer creates a new data stream server

func (*StreamServer) AddStreamBookmark added in v0.0.9

func (s *StreamServer) AddStreamBookmark(bookmark []byte) (uint64, error)

AddStreamBookmark adds a new bookmark in the current atomic operation

func (*StreamServer) AddStreamEntry

func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)

AddStreamEntry adds a new entry in the current atomic operation

func (*StreamServer) BookmarkPrintDump added in v0.0.9

func (s *StreamServer) BookmarkPrintDump()

BookmarkPrintDump prints all bookmarks

func (*StreamServer) CommitAtomicOp

func (s *StreamServer) CommitAtomicOp() error

CommitAtomicOp commits the current atomic operation and streams it to the clients

func (*StreamServer) GetBookmark added in v0.0.9

func (s *StreamServer) GetBookmark(bookmark []byte) (uint64, error)

GetBookmark returns the entry number pointed by the bookmark

func (*StreamServer) GetDataBetweenBookmarks added in v0.1.19

func (s *StreamServer) GetDataBetweenBookmarks(bookmarkFrom []byte, bookmarkTo []byte) ([]byte, error)

GetDataBetweenBookmarks returns the data between two bookmarks

func (*StreamServer) GetEntry added in v0.0.7

func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)

GetEntry searches in the stream file and returns the data for the requested entry

func (*StreamServer) GetFirstEventAfterBookmark added in v0.0.9

func (s *StreamServer) GetFirstEventAfterBookmark(bookmark []byte) (FileEntry, error)

GetFirstEventAfterBookmark searches in the stream file by bookmark and returns the first event entry data

func (*StreamServer) GetHeader added in v0.0.7

func (s *StreamServer) GetHeader() HeaderEntry

GetHeader returns the current committed header

func (*StreamServer) RollbackAtomicOp

func (s *StreamServer) RollbackAtomicOp() error

RollbackAtomicOp cancels the current atomic operation and rollbacks the changes

func (*StreamServer) Start

func (s *StreamServer) Start() error

Start opens access to TCP clients and starts broadcasting

func (*StreamServer) StartAtomicOp

func (s *StreamServer) StartAtomicOp() error

StartAtomicOp starts a new atomic operation

func (*StreamServer) TruncateFile added in v0.1.4

func (s *StreamServer) TruncateFile(entryNum uint64) error

TruncateFile truncates stream data file from an entry number onwards

func (*StreamServer) UpdateEntryData added in v0.1.4

func (s *StreamServer) UpdateEntryData(entryNum uint64, etype EntryType, data []byte) error

UpdateEntryData updates the internal data of an entry

type StreamType

type StreamType uint64

StreamType type for the stream types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL