Documentation ¶
Index ¶
- Constants
- Variables
- func BatchEnvelopeSize(topic string, body []byte, messages int) int
- func MessageSize(bodySize int) int
- type Batch
- func (b *Batch) Append(p []byte) error
- func (b *Batch) AppendMessage(m *Message) error
- func (b *Batch) Bytes() []byte
- func (b *Batch) CalcSize() int
- func (b *Batch) Copy() *Batch
- func (b *Batch) Empty() bool
- func (b *Batch) FirstOffset() uint64
- func (b *Batch) FromRequest(req *Request) (*Batch, error)
- func (b *Batch) FullSize() (int, bool)
- func (b *Batch) MessageBytes() []byte
- func (b *Batch) ReadFrom(r io.Reader) (int64, error)
- func (b *Batch) Reset()
- func (b *Batch) SetChecksum()
- func (b *Batch) SetTopic(topic []byte)
- func (b *Batch) String() string
- func (b *Batch) Topic() string
- func (b *Batch) TopicSlice() []byte
- func (b *Batch) Validate() error
- func (b *Batch) WriteTo(w io.Writer) (int64, error)
- type BatchScanner
- type ClientResponse
- func NewClientBatchResponse(conf *config.Config, off uint64, batches int) *ClientResponse
- func NewClientErrResponse(conf *config.Config, err error) *ClientResponse
- func NewClientMultiResponse(conf *config.Config, p []byte) *ClientResponse
- func NewClientOKResponse(conf *config.Config) *ClientResponse
- func NewClientResponse() *ClientResponse
- func NewClientResponseConfig(conf *config.Config) *ClientResponse
- func (cr *ClientResponse) Batches() int
- func (cr *ClientResponse) Error() error
- func (cr *ClientResponse) MultiResp() []byte
- func (cr *ClientResponse) Offset() uint64
- func (cr *ClientResponse) Ok() bool
- func (cr *ClientResponse) ReadFrom(r io.Reader) (int64, error)
- func (cr *ClientResponse) Reset()
- func (cr *ClientResponse) SetBatches(n int)
- func (cr *ClientResponse) SetError(err error)
- func (cr *ClientResponse) SetMultiResp(p []byte)
- func (cr *ClientResponse) SetOK()
- func (cr *ClientResponse) SetOffset(off uint64)
- func (cr *ClientResponse) String() string
- func (cr *ClientResponse) WithConfig(conf *config.Config) *ClientResponse
- func (cr *ClientResponse) WriteTo(w io.Writer) (int64, error)
- type CloseRequest
- type CmdType
- type ConfigRequest
- type ConfigResponse
- func (cr *ConfigResponse) Config() *config.Config
- func (cr *ConfigResponse) MultiResponse() []byte
- func (cr *ConfigResponse) Parse(b []byte) error
- func (cr *ConfigResponse) ReadFrom(r io.Reader) (int64, error)
- func (cr *ConfigResponse) Reset()
- func (cr *ConfigResponse) WriteTo(w io.Writer) (int64, error)
- type Error
- type Message
- type Read
- type Request
- func (req *Request) Bytes() []byte
- func (req *Request) FullSize() int
- func (req *Request) ReadFrom(r io.Reader) (int64, error)
- func (req *Request) Reset()
- func (req *Request) Respond(resp *Response)
- func (req *Request) Responded() chan *Response
- func (req *Request) String() string
- func (req *Request) Topic() string
- func (req *Request) WithConfig(conf *config.Config) *Request
- func (req *Request) WriteResponse(resp *Response, cr *ClientResponse) (int64, error)
- type RespType
- type Response
- type StatsRequest
- type Tail
Constants ¶
const MaxTopicSize = 255
MaxTopicSize represents the maximum topic length. It is 255, as that is the maximum directory length in linux.
Variables ¶
var ( // ErrNotFound is returned when the log offset is above the logs head, has been // deleted, or does not point to a message batch. ErrNotFound = errors.New("offset not found") // ErrInternal is a server side error. This is an "ERR" response from the // server. ErrInternal = errors.New("internal server error") // ErrInvalid refers to an invalid request. ErrInvalid = errors.New("invalid request") // ErrMaxTopics means the maximum number of topics has already been // created. ErrMaxTopics = errors.New("maximum topics allowed") // ErrTopicNotAllowed means the maximum number of topics has already been // created. ErrTopicNotAllowed = errors.New("topic not allowed") // ErrInvalidOffset is returned when a read is attempted from a batch // offset that doesn't point to the beginning of a batch protocol message. ErrInvalidOffset = errors.New("invalid offset") // ErrRespInvalid are the error response bytes sent for invalid requests ErrRespInvalid = []byte("invalid request") // ErrRespEmptyMessage indicates a write was attempted that included no data ErrRespEmptyMessage = []byte("empty message not allowed") // ErrRespNoArguments indicates no arguments were supplied ErrRespNoArguments = []byte("must supply an argument") // ErrRespNotFound indicates the messages could not be found ErrRespNotFound = []byte("not found") // ErrRespServer indicates an internal server error ErrRespServer = []byte("internal error") // ErrRespTooLarge indicates a protocol error ErrRespTooLarge = []byte("too large") // ErrRespMaxTopics is the serialized form of ErrMaxTopics. ErrRespMaxTopics = []byte("maximum topics allowed") // ErrRespTopicNotAllowed is the serialized form of ErrTopicNotAllowed ErrRespTopicNotAllowed = []byte("topic not allowed") )
Functions ¶
func BatchEnvelopeSize ¶ added in v0.1.1
BatchEnvelopeSize returns the size of a message body including its protocol envelope.
func MessageSize ¶
MessageSize returns the size of the message, including protocol
Types ¶
type Batch ¶
type Batch struct { Size int Checksum uint32 Messages int // contains filtered or unexported fields }
Batch represents a collection of Messages BATCH <size> <topic> <checksum> <messages>\r\n<data> NOTE no trailing newline after the data
func (*Batch) AppendMessage ¶
AppendMessage adds a new message to the batch
func (*Batch) Bytes ¶
Bytes returns a slice of raw bytes. Used by EventQ to write directly to the log.
func (*Batch) CalcSize ¶
CalcSize calculates the full byte size of the batch. this is intended to be called to make sure the batch isn't larger than config.MaxBatchSize, so we're assuming the crc is the longest possible uint32 for now to save calculating the crc for every message
func (*Batch) FirstOffset ¶
FirstOffset returns the offset delta of the first message
func (*Batch) FromRequest ¶
FromRequest parses a request, populating the batch. If validation fails, an error is returned.
func (*Batch) FullSize ¶
FullSize returns the full size of the batch if it was previously read. The second return value indicates whether the batch was read or not.
func (*Batch) MessageBytes ¶
MessageBytes returns a byte slice of the batch of messages.
func (*Batch) Reset ¶
func (b *Batch) Reset()
Reset puts a batch in an initial state so it can be reused
func (*Batch) TopicSlice ¶
TopicSlice returns the topic for the batch as a byte slice. The byte slice is not copied.
type BatchScanner ¶
type BatchScanner struct {
// contains filtered or unexported fields
}
BatchScanner can be used to scan through a reader, iterating over batches
func NewBatchScanner ¶
func NewBatchScanner(conf *config.Config, r io.Reader) *BatchScanner
NewBatchScanner returns a new instance of *BatchScanner
func (*BatchScanner) MaxSize ¶ added in v0.1.1
func (s *BatchScanner) MaxSize() int
func (*BatchScanner) Reset ¶
func (s *BatchScanner) Reset(r io.Reader)
Reset sets *BatchScanner to its initial state
func (*BatchScanner) Scan ¶
func (s *BatchScanner) Scan() bool
Scan iterates through the reader, stopping when a batch is read and populating the batch
func (*BatchScanner) Scanned ¶
func (s *BatchScanner) Scanned() int
Scanned returns the number of bytes read
type ClientResponse ¶
type ClientResponse struct {
// contains filtered or unexported fields
}
ClientResponse is the response clients receive after making a request. There are a few possible responses: OK\r\n OK <offset> <batches>\r\n BATCH <size> <checksum> <messages>\r\n<data>... MOK <size>\r\n<body>\r\n ERR <reason>\r\n ERR\r\n
func NewClientBatchResponse ¶
func NewClientBatchResponse(conf *config.Config, off uint64, batches int) *ClientResponse
NewClientBatchResponse returns a successful batch *ClientResponse
func NewClientErrResponse ¶
func NewClientErrResponse(conf *config.Config, err error) *ClientResponse
NewClientErrResponse returns an error response
func NewClientMultiResponse ¶
func NewClientMultiResponse(conf *config.Config, p []byte) *ClientResponse
NewClientMultiResponse returns a successful MOK response
func NewClientOKResponse ¶
func NewClientOKResponse(conf *config.Config) *ClientResponse
NewClientOKResponse returns a successful batch *ClientResponse
func NewClientResponse ¶
func NewClientResponse() *ClientResponse
func NewClientResponseConfig ¶
func NewClientResponseConfig(conf *config.Config) *ClientResponse
NewClientResponseConfig creates a new instance of *ClientResponse
func (*ClientResponse) Batches ¶
func (cr *ClientResponse) Batches() int
Batches returns the number of batches in an OK response
func (*ClientResponse) Error ¶
func (cr *ClientResponse) Error() error
func (*ClientResponse) MultiResp ¶
func (cr *ClientResponse) MultiResp() []byte
MultiResp returns the responses MOK response body
func (*ClientResponse) Offset ¶
func (cr *ClientResponse) Offset() uint64
Offset returns the response offset. It will panic if the response type isn't for a batch.
func (*ClientResponse) Ok ¶
func (cr *ClientResponse) Ok() bool
Ok returns true if the request has succeeded
func (*ClientResponse) ReadFrom ¶
func (cr *ClientResponse) ReadFrom(r io.Reader) (int64, error)
ReadFrom implements io.ReaderFrom
func (*ClientResponse) Reset ¶
func (cr *ClientResponse) Reset()
Reset sets ClientResponse to initial values
func (*ClientResponse) SetBatches ¶
func (cr *ClientResponse) SetBatches(n int)
SetBatches sets the number of batches in an OK response
func (*ClientResponse) SetError ¶
func (cr *ClientResponse) SetError(err error)
SetError sets the error on the response
func (*ClientResponse) SetMultiResp ¶
func (cr *ClientResponse) SetMultiResp(p []byte)
SetMultiResp sets the MOK response body
func (*ClientResponse) SetOK ¶
func (cr *ClientResponse) SetOK()
func (*ClientResponse) SetOffset ¶
func (cr *ClientResponse) SetOffset(off uint64)
SetOffset sets the offset number for a batch response
func (*ClientResponse) String ¶
func (cr *ClientResponse) String() string
func (*ClientResponse) WithConfig ¶
func (cr *ClientResponse) WithConfig(conf *config.Config) *ClientResponse
type CloseRequest ¶
type CloseRequest struct {
// contains filtered or unexported fields
}
CloseRequest is an incoming STATS command CLOSE\r\n
func NewCloseRequest ¶
func NewCloseRequest(conf *config.Config) *CloseRequest
NewCloseRequest returns a new instance of CloseRequest
func (*CloseRequest) FromRequest ¶
func (r *CloseRequest) FromRequest(req *Request) (*CloseRequest, error)
FromRequest parses a request, populating the ReadRequest
func (*CloseRequest) Reset ¶
func (r *CloseRequest) Reset()
Reset sets the CloseRequest to its initial values
type CmdType ¶
type CmdType uint8
CmdType is the type for logd commands.
const ( // CmdBatch is a batch command type. CmdBatch CmdType // CmdRead is the new read request type CmdRead // CmdTail is similar to READ, except it always starts from the // beginning of the log. CmdTail // CmdStats returns some internal stats. CmdStats // CmdClose is a close command type. CmdClose // CmdConfig is a CONFIG command type CmdConfig )
type ConfigRequest ¶
type ConfigRequest struct {
// contains filtered or unexported fields
}
ConfigRequest is an incoming CONFIG command CONFIG\r\n
func NewConfigRequest ¶
func NewConfigRequest(conf *config.Config) *ConfigRequest
NewConfigRequest returns a new instance of ConfigRequest
func (*ConfigRequest) FromRequest ¶
func (r *ConfigRequest) FromRequest(req *Request) (*ConfigRequest, error)
FromRequest parses a request, populating the ReadRequest
func (*ConfigRequest) Reset ¶
func (r *ConfigRequest) Reset()
Reset sets the ConfigRequest to its initial values
type ConfigResponse ¶
type ConfigResponse struct {
// contains filtered or unexported fields
}
ConfigResponse is a representation of the server-side config which is intended as a client multi ok response.
func NewConfigResponse ¶
func NewConfigResponse(conf *config.Config) *ConfigResponse
func (*ConfigResponse) Config ¶
func (cr *ConfigResponse) Config() *config.Config
Config returns the most recently read config.
func (*ConfigResponse) MultiResponse ¶
func (cr *ConfigResponse) MultiResponse() []byte
MultiResponse returns a server-side MOK response body
func (*ConfigResponse) Parse ¶
func (cr *ConfigResponse) Parse(b []byte) error
Parse reads and returns a config struct from a byte slice
func (*ConfigResponse) ReadFrom ¶
func (cr *ConfigResponse) ReadFrom(r io.Reader) (int64, error)
ReadFrom implements io.ReaderFrom interface.
func (*ConfigResponse) Reset ¶
func (cr *ConfigResponse) Reset()
Reset sets the ConfigResponse to its initial values
type Message ¶
type Message struct { Offset uint64 // firstOffset + offsetDelta Delta uint64 Body []byte Size int // size of the message, not including \r\n // contains filtered or unexported fields }
Message is a new message type
func NewMessage ¶
NewMessage returns a Message MSG <size>\r\n<body>\r\n
type Read ¶
Read represents a read request READ <topic> <offset> <messages>\r\n
func (*Read) FromRequest ¶
FromRequest parses a request, populating the Read struct. If validation fails, an error is returned
func (*Read) TopicSlice ¶
TopicSlice returns the topic for the batch as a byte slice. The byte slice is not copied.
type Request ¶
Request represents a single request / response.
func NewRequest ¶
func NewRequest() *Request
NewRequest returns a new, unconfigured instance of *Request
func NewRequestConfig ¶
NewRequestConfig returns a new instance of *Request
func (*Request) Responded ¶
Responded returns a channel that a response will be passed to. the event handler uses this to pass messages to the conn goroutines.
func (*Request) WithConfig ¶
WithConfig returns the request instance with the supplied config.
func (*Request) WriteResponse ¶
func (req *Request) WriteResponse(resp *Response, cr *ClientResponse) (int64, error)
WriteResponse is used by the event loop to write a single response. Should be used for all commands except READ. It can be used for the READ response envelope.
type RespType ¶
type RespType uint8
RespType is the response status return type
const ( // RespOK indicates a successful client request. RespOK RespType // RespEOF indicates a client's read request has been closed by the // server. RespEOF // RespErr indicates a failed response. RespErr // RespErrClient indicates a failed response due to client error. RespErrClient )
type Response ¶
type Response struct { ClientResponse *ClientResponse // contains filtered or unexported fields }
Response is a response the conn can use to send bytes back to the client. can returns bytes as well as *os.File-s
func NewResponseConfig ¶
NewResponseConfig returns a new response with configuration.
func NewResponseErr ¶
NewResponseErr returns a new instance of Response and writes it to the request
func (*Response) AddReader ¶
func (r *Response) AddReader(rdr io.ReadCloser) error
AddReader adds a reader for the server to send back over the conn
func (*Response) NumReaders ¶
NumReaders returns the number of io.Readers available
func (*Response) ScanReader ¶
func (r *Response) ScanReader() (io.ReadCloser, error)
ScanReader returns the next reader, or io.EOF if they've all been scanned
type StatsRequest ¶
type StatsRequest struct {
// contains filtered or unexported fields
}
StatsRequest is an incoming STATS command STATS\r\n
func NewStatsRequest ¶
func NewStatsRequest(conf *config.Config) *StatsRequest
NewStatsRequest returns a new instance of StatsRequest
func (*StatsRequest) FromRequest ¶
func (r *StatsRequest) FromRequest(req *Request) (*StatsRequest, error)
FromRequest parses a request, populating the ReadRequest
func (*StatsRequest) Reset ¶
func (r *StatsRequest) Reset()
Reset sets the StatsRequest to its initial values
type Tail ¶
type Tail struct { Messages int // contains filtered or unexported fields }
Tail represents a TAIL request TAIL <topic> <messages>\r\n
func (*Tail) FromRequest ¶
FromRequest parses a request, populating the Tail struct. If validation fails, an error is returned.
func (*Tail) TopicSlice ¶
TopicSlice returns the topic as a byte slice reference. It is not copied.