protocol

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const MaxTopicSize = 255

MaxTopicSize represents the maximum topic length. It is 255, as that is the maximum directory length in linux.

Variables

View Source
var (
	// ErrNotFound is returned when the log offset is above the logs head, has been
	// deleted, or does not point to a message batch.
	ErrNotFound = errors.New("offset not found")

	// ErrInternal is a server side error. This is an "ERR" response from the
	// server.
	ErrInternal = errors.New("internal server error")

	// ErrInvalid refers to an invalid request.
	ErrInvalid = errors.New("invalid request")

	// ErrMaxTopics means the maximum number of topics has already been
	// created.
	ErrMaxTopics = errors.New("maximum topics allowed")

	// ErrTopicNotAllowed means the maximum number of topics has already been
	// created.
	ErrTopicNotAllowed = errors.New("topic not allowed")

	// ErrInvalidOffset is returned when a read is attempted from a batch
	// offset that doesn't point to the beginning of a batch protocol message.
	ErrInvalidOffset = errors.New("invalid offset")

	// ErrRespInvalid are the error response bytes sent for invalid requests
	ErrRespInvalid = []byte("invalid request")

	// ErrRespEmptyMessage indicates a write was attempted that included no data
	ErrRespEmptyMessage = []byte("empty message not allowed")

	// ErrRespNoArguments indicates no arguments were supplied
	ErrRespNoArguments = []byte("must supply an argument")

	// ErrRespNotFound indicates the messages could not be found
	ErrRespNotFound = []byte("not found")

	// ErrRespServer indicates an internal server error
	ErrRespServer = []byte("internal error")

	// ErrRespTooLarge indicates a protocol error
	ErrRespTooLarge = []byte("too large")

	// ErrRespMaxTopics is the serialized form of ErrMaxTopics.
	ErrRespMaxTopics = []byte("maximum topics allowed")

	// ErrRespTopicNotAllowed is the serialized form of ErrTopicNotAllowed
	ErrRespTopicNotAllowed = []byte("topic not allowed")
)

Functions

func BatchEnvelopeSize added in v0.1.1

func BatchEnvelopeSize(topic string, body []byte, messages int) int

BatchEnvelopeSize returns the size of a message body including its protocol envelope.

func MessageSize

func MessageSize(bodySize int) int

MessageSize returns the size of the message, including protocol

Types

type Batch

type Batch struct {
	Size     int
	Checksum uint32
	Messages int
	// contains filtered or unexported fields
}

Batch represents a collection of Messages BATCH <size> <topic> <checksum> <messages>\r\n<data> NOTE no trailing newline after the data

func NewBatch

func NewBatch(conf *config.Config) *Batch

NewBatch returns a new instance of a batch

func (*Batch) Append

func (b *Batch) Append(p []byte) error

Append adds a new message's bytes to the batch

func (*Batch) AppendMessage

func (b *Batch) AppendMessage(m *Message) error

AppendMessage adds a new message to the batch

func (*Batch) Bytes

func (b *Batch) Bytes() []byte

Bytes returns a slice of raw bytes. Used by EventQ to write directly to the log.

func (*Batch) CalcSize

func (b *Batch) CalcSize() int

CalcSize calculates the full byte size of the batch. this is intended to be called to make sure the batch isn't larger than config.MaxBatchSize, so we're assuming the crc is the longest possible uint32 for now to save calculating the crc for every message

func (*Batch) Copy

func (b *Batch) Copy() *Batch

Copy returns a newly allocated copy

func (*Batch) Empty

func (b *Batch) Empty() bool

Empty returns true if the batch contains no messages

func (*Batch) FirstOffset

func (b *Batch) FirstOffset() uint64

FirstOffset returns the offset delta of the first message

func (*Batch) FromRequest

func (b *Batch) FromRequest(req *Request) (*Batch, error)

FromRequest parses a request, populating the batch. If validation fails, an error is returned.

func (*Batch) FullSize

func (b *Batch) FullSize() (int, bool)

FullSize returns the full size of the batch if it was previously read. The second return value indicates whether the batch was read or not.

func (*Batch) MessageBytes

func (b *Batch) MessageBytes() []byte

MessageBytes returns a byte slice of the batch of messages.

func (*Batch) ReadFrom

func (b *Batch) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements io.ReaderFrom

func (*Batch) Reset

func (b *Batch) Reset()

Reset puts a batch in an initial state so it can be reused

func (*Batch) SetChecksum

func (b *Batch) SetChecksum()

SetChecksum sets the batch's crc32

func (*Batch) SetTopic

func (b *Batch) SetTopic(topic []byte)

SetTopic sets the topic for a batch.

func (*Batch) String

func (b *Batch) String() string

func (*Batch) Topic

func (b *Batch) Topic() string

Topic returns the topic for the batch.

func (*Batch) TopicSlice

func (b *Batch) TopicSlice() []byte

TopicSlice returns the topic for the batch as a byte slice. The byte slice is not copied.

func (*Batch) Validate

func (b *Batch) Validate() error

Validate checks the batch's checksum TODO should add config.MaxMessageSize and config.MaxMessagesPerBatch and check them here, maybe?

func (*Batch) WriteTo

func (b *Batch) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo.

type BatchScanner

type BatchScanner struct {
	// contains filtered or unexported fields
}

BatchScanner can be used to scan through a reader, iterating over batches

func NewBatchScanner

func NewBatchScanner(conf *config.Config, r io.Reader) *BatchScanner

NewBatchScanner returns a new instance of *BatchScanner

func (*BatchScanner) Batch

func (s *BatchScanner) Batch() *Batch

Batch returns the current *Batch

func (*BatchScanner) Error

func (s *BatchScanner) Error() error

Error returns the current error

func (*BatchScanner) MaxSize added in v0.1.1

func (s *BatchScanner) MaxSize() int

func (*BatchScanner) Reset

func (s *BatchScanner) Reset(r io.Reader)

Reset sets *BatchScanner to its initial state

func (*BatchScanner) Scan

func (s *BatchScanner) Scan() bool

Scan iterates through the reader, stopping when a batch is read and populating the batch

func (*BatchScanner) Scanned

func (s *BatchScanner) Scanned() int

Scanned returns the number of bytes read

type ClientResponse

type ClientResponse struct {
	// contains filtered or unexported fields
}

ClientResponse is the response clients receive after making a request. There are a few possible responses: OK\r\n OK <offset> <batches>\r\n BATCH <size> <checksum> <messages>\r\n<data>... MOK <size>\r\n<body>\r\n ERR <reason>\r\n ERR\r\n

func NewClientBatchResponse

func NewClientBatchResponse(conf *config.Config, off uint64, batches int) *ClientResponse

NewClientBatchResponse returns a successful batch *ClientResponse

func NewClientErrResponse

func NewClientErrResponse(conf *config.Config, err error) *ClientResponse

NewClientErrResponse returns an error response

func NewClientMultiResponse

func NewClientMultiResponse(conf *config.Config, p []byte) *ClientResponse

NewClientMultiResponse returns a successful MOK response

func NewClientOKResponse

func NewClientOKResponse(conf *config.Config) *ClientResponse

NewClientOKResponse returns a successful batch *ClientResponse

func NewClientResponse

func NewClientResponse() *ClientResponse

func NewClientResponseConfig

func NewClientResponseConfig(conf *config.Config) *ClientResponse

NewClientResponseConfig creates a new instance of *ClientResponse

func (*ClientResponse) Batches

func (cr *ClientResponse) Batches() int

Batches returns the number of batches in an OK response

func (*ClientResponse) Error

func (cr *ClientResponse) Error() error

func (*ClientResponse) MultiResp

func (cr *ClientResponse) MultiResp() []byte

MultiResp returns the responses MOK response body

func (*ClientResponse) Offset

func (cr *ClientResponse) Offset() uint64

Offset returns the response offset. It will panic if the response type isn't for a batch.

func (*ClientResponse) Ok

func (cr *ClientResponse) Ok() bool

Ok returns true if the request has succeeded

func (*ClientResponse) ReadFrom

func (cr *ClientResponse) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements io.ReaderFrom

func (*ClientResponse) Reset

func (cr *ClientResponse) Reset()

Reset sets ClientResponse to initial values

func (*ClientResponse) SetBatches

func (cr *ClientResponse) SetBatches(n int)

SetBatches sets the number of batches in an OK response

func (*ClientResponse) SetError

func (cr *ClientResponse) SetError(err error)

SetError sets the error on the response

func (*ClientResponse) SetMultiResp

func (cr *ClientResponse) SetMultiResp(p []byte)

SetMultiResp sets the MOK response body

func (*ClientResponse) SetOK

func (cr *ClientResponse) SetOK()

func (*ClientResponse) SetOffset

func (cr *ClientResponse) SetOffset(off uint64)

SetOffset sets the offset number for a batch response

func (*ClientResponse) String

func (cr *ClientResponse) String() string

func (*ClientResponse) WithConfig

func (cr *ClientResponse) WithConfig(conf *config.Config) *ClientResponse

func (*ClientResponse) WriteTo

func (cr *ClientResponse) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo

type CloseRequest

type CloseRequest struct {
	// contains filtered or unexported fields
}

CloseRequest is an incoming STATS command CLOSE\r\n

func NewCloseRequest

func NewCloseRequest(conf *config.Config) *CloseRequest

NewCloseRequest returns a new instance of CloseRequest

func (*CloseRequest) FromRequest

func (r *CloseRequest) FromRequest(req *Request) (*CloseRequest, error)

FromRequest parses a request, populating the ReadRequest

func (*CloseRequest) Reset

func (r *CloseRequest) Reset()

Reset sets the CloseRequest to its initial values

func (*CloseRequest) WriteTo

func (r *CloseRequest) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo

type CmdType

type CmdType uint8

CmdType is the type for logd commands.

const (

	// CmdBatch is a batch command type.
	CmdBatch CmdType

	// CmdRead is the new read request type
	CmdRead

	// CmdTail is similar to READ, except it always starts from the
	// beginning of the log.
	CmdTail

	// CmdStats returns some internal stats.
	CmdStats

	// CmdClose is a close command type.
	CmdClose

	// CmdConfig is a CONFIG command type
	CmdConfig
)

func (*CmdType) Bytes

func (cmd *CmdType) Bytes() []byte

Bytes returns a byte representation of a command

func (*CmdType) String

func (cmd *CmdType) String() string

type ConfigRequest

type ConfigRequest struct {
	// contains filtered or unexported fields
}

ConfigRequest is an incoming CONFIG command CONFIG\r\n

func NewConfigRequest

func NewConfigRequest(conf *config.Config) *ConfigRequest

NewConfigRequest returns a new instance of ConfigRequest

func (*ConfigRequest) FromRequest

func (r *ConfigRequest) FromRequest(req *Request) (*ConfigRequest, error)

FromRequest parses a request, populating the ReadRequest

func (*ConfigRequest) Reset

func (r *ConfigRequest) Reset()

Reset sets the ConfigRequest to its initial values

func (*ConfigRequest) WriteTo

func (r *ConfigRequest) WriteTo(w io.Writer) (int64, error)

type ConfigResponse

type ConfigResponse struct {
	// contains filtered or unexported fields
}

ConfigResponse is a representation of the server-side config which is intended as a client multi ok response.

func NewConfigResponse

func NewConfigResponse(conf *config.Config) *ConfigResponse

func (*ConfigResponse) Config

func (cr *ConfigResponse) Config() *config.Config

Config returns the most recently read config.

func (*ConfigResponse) MultiResponse

func (cr *ConfigResponse) MultiResponse() []byte

MultiResponse returns a server-side MOK response body

func (*ConfigResponse) Parse

func (cr *ConfigResponse) Parse(b []byte) error

Parse reads and returns a config struct from a byte slice

func (*ConfigResponse) ReadFrom

func (cr *ConfigResponse) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements io.ReaderFrom interface.

func (*ConfigResponse) Reset

func (cr *ConfigResponse) Reset()

Reset sets the ConfigResponse to its initial values

func (*ConfigResponse) WriteTo

func (cr *ConfigResponse) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo interface.

type Error

type Error string

Error is a client error type

func (Error) Error

func (pe Error) Error() string

type Message

type Message struct {
	Offset uint64 // firstOffset + offsetDelta
	Delta  uint64
	Body   []byte
	Size   int // size of the message, not including \r\n
	// contains filtered or unexported fields
}

Message is a new message type

func NewMessage

func NewMessage(maxSize int) *Message

NewMessage returns a Message MSG <size>\r\n<body>\r\n

func (*Message) BodyBytes

func (m *Message) BodyBytes() []byte

BodyBytes returns the bytes of the message body

func (*Message) Copy

func (m *Message) Copy() *Message

Copy returns a copy of the message. Convenient for clients.

func (*Message) ReadFrom

func (m *Message) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements io.ReaderFrom

func (*Message) Reset

func (m *Message) Reset()

Reset sets the message to its initial value so i can be reused

func (*Message) SetBody

func (m *Message) SetBody(b []byte)

SetBody sets the body of a message

func (*Message) String

func (m *Message) String() string

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo

type Read

type Read struct {
	Offset   uint64
	Messages int
	// contains filtered or unexported fields
}

Read represents a read request READ <topic> <offset> <messages>\r\n

func NewRead

func NewRead(conf *config.Config) *Read

NewRead returns a new instance of a READ request

func (*Read) FromRequest

func (r *Read) FromRequest(req *Request) (*Read, error)

FromRequest parses a request, populating the Read struct. If validation fails, an error is returned

func (*Read) Reset

func (r *Read) Reset()

Reset puts READ in an initial state so it can be reused

func (*Read) SetTopic

func (r *Read) SetTopic(topic []byte)

SetTopic sets the topic for a batch.

func (*Read) Topic

func (r *Read) Topic() string

Topic returns the topic for the batch.

func (*Read) TopicSlice

func (r *Read) TopicSlice() []byte

TopicSlice returns the topic for the batch as a byte slice. The byte slice is not copied.

func (*Read) Validate

func (r *Read) Validate() error

Validate checks the READ arguments are valid

func (*Read) WriteTo

func (r *Read) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo

type Request

type Request struct {
	Name CmdType

	Response *Response
	// contains filtered or unexported fields
}

Request represents a single request / response.

func NewRequest

func NewRequest() *Request

NewRequest returns a new, unconfigured instance of *Request

func NewRequestConfig

func NewRequestConfig(conf *config.Config) *Request

NewRequestConfig returns a new instance of *Request

func (*Request) Bytes

func (req *Request) Bytes() []byte

Bytes returns the raw byte representation of the request

func (*Request) FullSize

func (req *Request) FullSize() int

FullSize returns the total byte size of the request

func (*Request) ReadFrom

func (req *Request) ReadFrom(r io.Reader) (int64, error)

ReadFrom implements io.ReaderFrom

func (*Request) Reset

func (req *Request) Reset()

Reset sets the request to its initial values

func (*Request) Respond

func (req *Request) Respond(resp *Response)

Respond sends a Response over the channel back to the conn goroutine

func (*Request) Responded

func (req *Request) Responded() chan *Response

Responded returns a channel that a response will be passed to. the event handler uses this to pass messages to the conn goroutines.

func (*Request) String

func (req *Request) String() string

func (*Request) Topic

func (req *Request) Topic() string

Topic returns the topic for the request, if any

func (*Request) WithConfig

func (req *Request) WithConfig(conf *config.Config) *Request

WithConfig returns the request instance with the supplied config.

func (*Request) WriteResponse

func (req *Request) WriteResponse(resp *Response, cr *ClientResponse) (int64, error)

WriteResponse is used by the event loop to write a single response. Should be used for all commands except READ. It can be used for the READ response envelope.

type RespType

type RespType uint8

RespType is the response status return type

const (

	// RespOK indicates a successful client request.
	RespOK RespType

	// RespEOF indicates a client's read request has been closed by the
	// server.
	RespEOF

	// RespErr indicates a failed response.
	RespErr

	// RespErrClient indicates a failed response due to client error.
	RespErrClient
)

func (RespType) String

func (resp RespType) String() string

type Response

type Response struct {
	ClientResponse *ClientResponse
	// contains filtered or unexported fields
}

Response is a response the conn can use to send bytes back to the client. can returns bytes as well as *os.File-s

func NewResponse

func NewResponse() *Response

NewResponse returns a new response.

func NewResponseConfig

func NewResponseConfig(conf *config.Config) *Response

NewResponseConfig returns a new response with configuration.

func NewResponseErr

func NewResponseErr(conf *config.Config, req *Request, err error) (*Response, error)

NewResponseErr returns a new instance of Response and writes it to the request

func (*Response) AddReader

func (r *Response) AddReader(rdr io.ReadCloser) error

AddReader adds a reader for the server to send back over the conn

func (*Response) NumReaders

func (r *Response) NumReaders() int

NumReaders returns the number of io.Readers available

func (*Response) Reset

func (r *Response) Reset()

Reset sets the response to its initial values

func (*Response) ScanReader

func (r *Response) ScanReader() (io.ReadCloser, error)

ScanReader returns the next reader, or io.EOF if they've all been scanned

func (*Response) WithConfig

func (r *Response) WithConfig(conf *config.Config) *Response

type StatsRequest

type StatsRequest struct {
	// contains filtered or unexported fields
}

StatsRequest is an incoming STATS command STATS\r\n

func NewStatsRequest

func NewStatsRequest(conf *config.Config) *StatsRequest

NewStatsRequest returns a new instance of StatsRequest

func (*StatsRequest) FromRequest

func (r *StatsRequest) FromRequest(req *Request) (*StatsRequest, error)

FromRequest parses a request, populating the ReadRequest

func (*StatsRequest) Reset

func (r *StatsRequest) Reset()

Reset sets the StatsRequest to its initial values

type Tail

type Tail struct {
	Messages int
	// contains filtered or unexported fields
}

Tail represents a TAIL request TAIL <topic> <messages>\r\n

func NewTail

func NewTail(conf *config.Config) *Tail

NewTail returns a new instance of a TAIL request

func (*Tail) FromRequest

func (t *Tail) FromRequest(req *Request) (*Tail, error)

FromRequest parses a request, populating the Tail struct. If validation fails, an error is returned.

func (*Tail) Reset

func (t *Tail) Reset()

Reset puts TAIL in an initial state so it can be reused

func (*Tail) SetTopic

func (t *Tail) SetTopic(topic []byte)

SetTopic sets the topic of the TAIL request

func (*Tail) Topic

func (t *Tail) Topic() string

Topic returns the topic as a string

func (*Tail) TopicSlice

func (t *Tail) TopicSlice() []byte

TopicSlice returns the topic as a byte slice reference. It is not copied.

func (*Tail) Validate

func (t *Tail) Validate() error

Validate checks the TAIL arguments are valid

func (*Tail) WriteTo

func (t *Tail) WriteTo(w io.Writer) (int64, error)

WriteTo implements io.WriterTo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL