netlog: github.com/ninibe/netlog Index | Files | Directories

package netlog

import "github.com/ninibe/netlog"

Index

Package Files

errors.go integrity_checker.go message.go message_buffer.go netlog.go segment_monitor.go streamer_atomicmap.go topic.go topic_atomicmap.go topicscanner.go topicscanner_atomicmap.go util.go

Variables

var (
    // ErrUnknown is returned when an underlying stardard Go error reaches the user.
    ErrUnknown = newErr(http.StatusInternalServerError, "netlog: unkwown error")
    // ErrInvalidDir is returned when the data folder provided does not exists or is not writable.
    ErrInvalidDir = newErr(http.StatusInternalServerError, "netlog: invalid data directory")

    // ErrBadRequest is returned when invalid parameters are received.
    ErrBadRequest = newErr(http.StatusBadRequest, "netlog: bad request")
    // ErrInvalidOffset is returned when the requested offset can not be parsed into an number.
    ErrInvalidOffset = newErr(http.StatusBadRequest, "netlog: invalid offset")
    // ErrInvalidDuration is returned when a given big duration can not be parsed
    ErrInvalidDuration = newErr(http.StatusBadRequest, "netlog: invalid duration")
    // ErrInvalidCompression is returning when the compression type defined is unknown
    ErrInvalidCompression = newErr(http.StatusBadRequest, "netlog: invalid compression type")
    // ErrTopicExists is returning when trying to create an already existing topic.
    ErrTopicExists = newErr(http.StatusBadRequest, "netlog: topic exists")
    // ErrEndOfTopic is returned when the reader has read all the way until the end of the topic.
    ErrEndOfTopic = newErr(http.StatusNotFound, "netlog: end of topic")
    // ErrTopicNotFound is returned when addressing an non-existing topic.
    ErrTopicNotFound = newErr(http.StatusNotFound, "netlog: topic not found")

    // ErrScannerNotFound is returning when using a non-existing scanner ID.
    ErrScannerNotFound = newErr(http.StatusNotFound, "netlog: scanner not found")
    // ErrOffsetNotFound is returning when the offset is no longer or not yet present in the topic.
    ErrOffsetNotFound = newErr(http.StatusNotFound, "netlog: offset not found")

    // ErrCRC is returned when a message's payload does not match's the CRC header.
    ErrCRC = newErr(http.StatusInternalServerError, "netlog: checksum error")
    // ErrBusy is retuning when trying to close or delete a topic with readers attached to it.
    ErrBusy = newErr(http.StatusConflict, "netlog: resource busy")
)

func NopWCloser Uses

func NopWCloser(w io.Writer) io.WriteCloser

NopWCloser returns a WriteCloser with a no-op Close method wrapping the provided Writer w.

type BLTopicScanner Uses

type BLTopicScanner struct {
    // contains filtered or unexported fields
}

BLTopicScanner implements TopicScanner reading from BigLog.

func (*BLTopicScanner) Close Uses

func (ts *BLTopicScanner) Close() error

Close implements io.Closer and releases the TopicScanner resources.

func (*BLTopicScanner) ID Uses

func (ts *BLTopicScanner) ID() string

ID returns the ID of the scanner

func (*BLTopicScanner) Info Uses

func (ts *BLTopicScanner) Info() TScannerInfo

Info returns a TScannerInfo struct with the scanner's next offset and the initial offset.

func (*BLTopicScanner) Scan Uses

func (ts *BLTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

Scan advances the Scanner to the next message, returning the message and the offset. Scan will block when it reaches EOF until there is more data available, the user must provide a context to cancel the request when it needs to stop waiting.

type CompressionType Uses

type CompressionType uint8

CompressionType indicates a type of compression for message sets

const (
    // CompressionDefault is used when falling back to the default compression of the system.
    CompressionDefault CompressionType = 0
    // CompressionNone is used by messages sets with uncompressed payloads
    CompressionNone CompressionType = 1
    // CompressionGzip is used by message sets with gzipped payloads
    CompressionGzip CompressionType = 2
    // CompressionSnappy is used by message sets with snappy payloads
    CompressionSnappy CompressionType = 3
)

type IntegrityChecker Uses

type IntegrityChecker struct {
    // contains filtered or unexported fields
}

IntegrityChecker is used to check the integrity of an entire topic.

func NewIntegrityChecker Uses

func NewIntegrityChecker(t *Topic, from int64) (*IntegrityChecker, error)

NewIntegrityChecker creates a new integrity checker for a given topic.

func (*IntegrityChecker) Check Uses

func (ic *IntegrityChecker) Check(ctx context.Context) (errors []*IntegrityError)

Check reads all data collecting errors which then returns. Is recommended to pass a cancellable context since this operation can be slow.

func (*IntegrityChecker) Close Uses

func (ic *IntegrityChecker) Close() error

Close releases the underlying resources.

type IntegrityError Uses

type IntegrityError struct {
    Offset   int64              `json:"offset"`
    ODelta   int                `json:"odelta"`
    Type     IntegrityErrorType `json:"type"`
    Expected string             `json:"expected"`
    Actual   string             `json:"actual"`
}

IntegrityError is the struct with metadata about an any integrity error found.

func CheckMessageIntegrity Uses

func CheckMessageIntegrity(m Message, delta int) *IntegrityError

CheckMessageIntegrity checks the integrity of a single message

type IntegrityErrorType Uses

type IntegrityErrorType string

IntegrityErrorType is the category of possible errors in the data.

const (

    // IntegrityChecksumErr is returned when the checksum in the message
    // header doesn't match the checksum recalculated from the payload.
    IntegrityChecksumErr IntegrityErrorType = "checksum"

    // IntegrityLengthErr is returned when the length in the message
    // header doesn't match the length of the payload.
    IntegrityLengthErr IntegrityErrorType = "length"

    // IntegrityUnknownErr is returned when data can not be read because
    // of an underlying error reading the data.
    IntegrityUnknownErr IntegrityErrorType = "unknown"
)

type Message Uses

type Message []byte

Message the unit of data storage.

func MessageFromPayload Uses

func MessageFromPayload(p []byte) Message

MessageFromPayload returns a message with the appropriate calculated headers from a give data payload.

func MessageSet Uses

func MessageSet(msgs []Message, comp CompressionType) Message

MessageSet returns a new message with a batch of compressed messages as payload Compression will compress the payload and set the compression header, please be ware that compression at this level is only meant for batching several messages into a single message-set in increase throughput. MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming clients that further messages are embedded in the payload.

func ReadMessage Uses

func ReadMessage(r io.Reader) (entry Message, err error)

ReadMessage reads a message from r and returns it if the message is compressed it does not attempt to unpack the contents.

func Unpack Uses

func Unpack(set Message) ([]Message, error)

Unpack takes a message-set and returns a slice with the component messages.

func (*Message) Bytes Uses

func (m *Message) Bytes() []byte

Bytes returns the entire message casted back to bytes.

func (*Message) CRC32 Uses

func (m *Message) CRC32() uint32

CRC32 returns the checksum of the payload.

func (*Message) ChecksumOK Uses

func (m *Message) ChecksumOK() bool

ChecksumOK recalculates the CRC from the payload and compares it with the one stored in the header

func (*Message) CompVer Uses

func (m *Message) CompVer() uint8

CompVer returns the first byte which reflects both compression and format version.

func (*Message) Compression Uses

func (m *Message) Compression() CompressionType

Compression returns the compression encoded in bits 4 to 8 of the header.

func (*Message) PLength Uses

func (m *Message) PLength() uint32

PLength returns the length (bytes) of the payload.

func (*Message) Payload Uses

func (m *Message) Payload() []byte

Payload returns the data bytes.

func (*Message) Size Uses

func (m *Message) Size() int

Size returns the total size in bytes of the message.

func (*Message) Version Uses

func (m *Message) Version() uint8

Version returns the format version encoded in bits 0 to 3 of the header.

type NLError Uses

type NLError interface {
    Error() string
    String() string
    StatusCode() int
}

NLError is a known NetLog error with an associates status code.

func ExtErr Uses

func ExtErr(err error) NLError

ExtErr maps external errors, mostly BigLog errors to NetLog errors.

type NetLog Uses

type NetLog struct {
    // contains filtered or unexported fields
}

NetLog is the main struct that serves a set of topics, usually it must be wrapped with an HTTP transport.

func NewNetLog Uses

func NewNetLog(dataDir string, opts ...Option) (nl *NetLog, err error)

NewNetLog creates a new NetLog in a given data folder that must exist and be writable.

func (*NetLog) CreateTopic Uses

func (nl *NetLog) CreateTopic(name string, settings TopicSettings) (t *Topic, err error)

CreateTopic creates a new topic with a given name and default settings.

func (*NetLog) DeleteTopic Uses

func (nl *NetLog) DeleteTopic(name string, force bool) (err error)

DeleteTopic deletes an existing topic by name.

func (*NetLog) Topic Uses

func (nl *NetLog) Topic(name string) (*Topic, error)

Topic returns an existing topic by name.

func (*NetLog) TopicList Uses

func (nl *NetLog) TopicList() []string

TopicList returns the list of existing topic names.

type Option Uses

type Option func(*NetLog)

Option is the type of function used to set internal parameters.

func DefaultTopicSettings Uses

func DefaultTopicSettings(settings TopicSettings) Option

DefaultTopicSettings sets the default topic settings used if no other is defined at creation time.

func MonitorInterval Uses

func MonitorInterval(interval bigduration.BigDuration) Option

MonitorInterval defines de interval at which the segment monitor in charge of spiting and discarding segments runs.

type PersistentTopicScanner Uses

type PersistentTopicScanner struct {
    // contains filtered or unexported fields
}

PersistentTopicScanner synchronizes the underlying scanner state to a given writer

func (*PersistentTopicScanner) Close Uses

func (p *PersistentTopicScanner) Close() error

Close deletes the offset tracking file, closes the offset channel and closes the underlying scanner

func (*PersistentTopicScanner) ID Uses

func (p *PersistentTopicScanner) ID() string

ID the ID of the scanner

func (*PersistentTopicScanner) Info Uses

func (p *PersistentTopicScanner) Info() TScannerInfo

Info returns a TScannerInfo struct with the scanner's next offset and the last scanned one

func (*PersistentTopicScanner) Scan Uses

func (p *PersistentTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

Scan offloads the actual scan to the underlying scanner while updates the last read offset

type SegmentMonitor Uses

type SegmentMonitor struct {
    // contains filtered or unexported fields
}

SegmentMonitor periodically checks for segments to split or discard at a given interval.

type StreamerAtomicMap Uses

type StreamerAtomicMap struct {
    // contains filtered or unexported fields
}

StreamerAtomicMap is a copy-on-write thread-safe map of pointers to Streamer

func NewStreamerAtomicMap Uses

func NewStreamerAtomicMap() *StreamerAtomicMap

NewStreamerAtomicMap returns a new initialized StreamerAtomicMap

func (*StreamerAtomicMap) Delete Uses

func (am *StreamerAtomicMap) Delete(key string)

Delete removes the pointer to Streamer under key from the map

func (*StreamerAtomicMap) Get Uses

func (am *StreamerAtomicMap) Get(key string) (value *biglog.Streamer, ok bool)

Get returns a pointer to Streamer for a given key

func (*StreamerAtomicMap) GetAll Uses

func (am *StreamerAtomicMap) GetAll() map[string]*biglog.Streamer

GetAll returns the underlying map of pointers to Streamer this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*StreamerAtomicMap) Len Uses

func (am *StreamerAtomicMap) Len() int

Len returns the number of elements in the map

func (*StreamerAtomicMap) Set Uses

func (am *StreamerAtomicMap) Set(key string, value *biglog.Streamer)

Set inserts in the map a pointer to Streamer under a given key

type TScannerInfo Uses

type TScannerInfo struct {
    ID      string `json:"id"`
    Next    int64  `json:"next"`
    From    int64  `json:"from"`
    Persist bool   `json:"persistent"`
}

TScannerInfo holds the scanner's offset information

type Topic Uses

type Topic struct {
    // contains filtered or unexported fields
}

Topic is a log of linear messages.

func (*Topic) CheckIntegrity Uses

func (t *Topic) CheckIntegrity(ctx context.Context, from int64) ([]*IntegrityError, error)

CheckIntegrity scans the topic and checks for inconsistencies in the data

func (*Topic) CheckSegments Uses

func (t *Topic) CheckSegments() error

CheckSegments is called by the runner and discards or splits segments when conditions are met.

func (*Topic) DeleteScanner Uses

func (t *Topic) DeleteScanner(ID string) (err error)

DeleteScanner removes the scanner from the topic

func (*Topic) DirPath Uses

func (t *Topic) DirPath() string

DirPath returns the absolute path to the folder with the topic's files

func (*Topic) FlushBuffered Uses

func (t *Topic) FlushBuffered() error

FlushBuffered flushes all buffered messages into the BigLog. Notice that the BigLog might have a buffer on its own that this function does not flush, so calling this does not mean the data has been stored on disk.

func (*Topic) Info Uses

func (t *Topic) Info() (i *TopicInfo, err error)

Info provides all public topic information.

func (*Topic) Name Uses

func (t *Topic) Name() string

Name returns the Topic's name, which maps to the folder name

func (*Topic) NewScanner Uses

func (t *Topic) NewScanner(from int64, persist bool) (ts TopicScanner, err error)

NewScanner creates a new scanner starting at offset `from`. If `persist` is true, the scanner and it's state will survive server restarts

func (*Topic) ParseOffset Uses

func (t *Topic) ParseOffset(str string) (int64, error)

ParseOffset converts an offset string into a numeric precise offset 'beginning', 'first' or 'oldest' return the lowest available offset in the topic 'last' or 'latest' return the highest available offset in the topic 'end' or 'now' return the next offset to be written in the topic numeric string values are directly converted to integer duration notation like "1day" returns the first offset available since 1 day ago.

func (*Topic) Payload Uses

func (t *Topic) Payload(offset int64) ([]byte, error)

Payload is a utility method to fetch the payload of a single offset.

func (*Topic) ReadFrom Uses

func (t *Topic) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom reads an entry or stream of entries from r until EOF is reached writes the entry/stream into the topic is the entry is valid. The return value n is the number of bytes read. It implements the io.ReaderFrom interface.

func (*Topic) Scanner Uses

func (t *Topic) Scanner(ID string) (ts TopicScanner, err error)

Scanner returns an existing scanner for the topic given and ID or ErrScannerNotFound if it doesn't exists.

func (*Topic) Sync Uses

func (t *Topic) Sync() error

Sync flushes all data to disk.

func (*Topic) Write Uses

func (t *Topic) Write(p []byte) (n int, err error)

Write implements the io.Writer interface for a Topic.

func (*Topic) WriteN Uses

func (t *Topic) WriteN(p []byte, n int) (written int, err error)

WriteN writes a set of N messages to the Topic

type TopicAtomicMap Uses

type TopicAtomicMap struct {
    // contains filtered or unexported fields
}

TopicAtomicMap is a copy-on-write thread-safe map of pointers to Topic

func NewTopicAtomicMap Uses

func NewTopicAtomicMap() *TopicAtomicMap

NewTopicAtomicMap returns a new initialized TopicAtomicMap

func (*TopicAtomicMap) Delete Uses

func (am *TopicAtomicMap) Delete(key string)

Delete removes the pointer to Topic under key from the map

func (*TopicAtomicMap) Get Uses

func (am *TopicAtomicMap) Get(key string) (value *Topic, ok bool)

Get returns a pointer to Topic for a given key

func (*TopicAtomicMap) GetAll Uses

func (am *TopicAtomicMap) GetAll() map[string]*Topic

GetAll returns the underlying map of pointers to Topic this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*TopicAtomicMap) Len Uses

func (am *TopicAtomicMap) Len() int

Len returns the number of elements in the map

func (*TopicAtomicMap) Set Uses

func (am *TopicAtomicMap) Set(key string, value *Topic)

Set inserts in the map a pointer to Topic under a given key

type TopicInfo Uses

type TopicInfo struct {
    *biglog.Info
    Scanners map[string]TScannerInfo `json:"scanners"`
}

TopicInfo returns the topic information including information about size, segments, scanners and streamers

type TopicScanner Uses

type TopicScanner interface {
    ID() string
    Scan(ctx context.Context) (m Message, offset int64, err error)
    Info() TScannerInfo
    Close() error
}

TopicScanner reads one by one over the messages in a topic blocking until new data is available for a period of time. TopicScanners are thread-safe.

func NewTopicScanner Uses

func NewTopicScanner(t *Topic, ID string, from int64, persist bool) (TopicScanner, error)

NewTopicScanner returns a new topic scanner ready to scan starting at offset `from`, if persist is true, the scanner and its last position will survive across server restarts

type TopicScannerAtomicMap Uses

type TopicScannerAtomicMap struct {
    // contains filtered or unexported fields
}

TopicScannerAtomicMap is a copy-on-write thread-safe map of TopicScanner

func NewTopicScannerAtomicMap Uses

func NewTopicScannerAtomicMap() *TopicScannerAtomicMap

NewTopicScannerAtomicMap returns a new initialized TopicScannerAtomicMap

func (*TopicScannerAtomicMap) Delete Uses

func (am *TopicScannerAtomicMap) Delete(key string)

Delete removes the TopicScanner under key from the map

func (*TopicScannerAtomicMap) Get Uses

func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool)

Get returns a TopicScanner for a given key

func (*TopicScannerAtomicMap) GetAll Uses

func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner

GetAll returns the underlying map of TopicScanner this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*TopicScannerAtomicMap) Len Uses

func (am *TopicScannerAtomicMap) Len() int

Len returns the number of elements in the map

func (*TopicScannerAtomicMap) Set Uses

func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner)

Set inserts in the map a TopicScanner under a given key

type TopicSettings Uses

type TopicSettings struct {
    // SegAge is the age at after which old segments are discarded.
    SegAge bigduration.BigDuration `json:"segment_age,ommitempty"`
    // SegSize is the size at which a new segment should be created.
    SegSize int64 `json:"segment_size,ommitempty"`
    // BatchNumMessages is the maximum number of messages to be batched.
    BatchNumMessages int `json:"batch_num_messages,ommitempty"`
    // BatchInterval is the interval at which batched messages are flushed to disk.
    BatchInterval bigduration.BigDuration `json:"batch_interval,ommitempty"`
    // CompressionType allows to specify how batches are compressed.
    CompressionType CompressionType `json:"compression_type,ommitempty"`
}

TopicSettings holds the tunable settings of a topic.

Directories

PathSynopsis
biglog
biglog/example
integration
transport

Package netlog imports 22 packages (graph) and is imported by 2 packages. Updated 2018-11-04. Refresh now. Tools for package owners.