netlog

package module
v0.0.0-...-5d4656b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2018 License: MPL-2.0 Imports: 22 Imported by: 2

README

Build Status godoc reference

NetLog

A lightweight, HTTP-centric, log-based (Kafka-style) message queue.

Alpha software

This is still early software and potentially buggy. To peek at the internals start with BigLog.

Roadmap
  • low-level log management
  • HTTP transport
  • scanner based pub/sub
  • custom data retention policy
  • persistent scanners
  • batching
  • compression
  • good test coverage
  • proper documentation
  • streaming based pub/sub
  • async replication
  • kinesis-compatible transport
  • gRPC transport
Non-goals
  • Match Kafka's performance.
  • Distributed system.
Getting started

While posting and fetching single messages is very inefficient, it's the simplest way to get started using nothing but curl commands.

# compile server
go get github.com/ninibe/netlog/cmd/netlog

# run server
bin/netlog

# create new topic
curl -XPOST localhost:7200/demo

# post messages
curl -XPOST localhost:7200/demo/payload --data-binary "message number one"
curl -XPOST localhost:7200/demo/payload --data-binary "message number two"
curl -XPOST localhost:7200/demo/payload --data-binary "message number three"

# check topic info
curl localhost:7200/demo

# create scanner
curl -XPOST "localhost:7200/demo/scanner?from=0"

export SC="...UUID RETURNED..."

# start scanning...
curl -XGET "localhost:7200/demo/scan?id=$SC"
x times ...

# wait 5 seconds for new messages
curl -XGET "localhost:7200/demo/scan?id=$SC&wait=5s"

# wait 5 minutes
curl -XGET "localhost:7200/demo/scan?id=$SC&wait=5m"

# post more messages in another window
curl -XPOST localhost:7200/demo/payload --data-binary "message number four"
curl -XPOST localhost:7200/demo/payload --data-binary "message number five"

# new scanner since 1 minute ago
curl -XPOST "localhost:7200/demo/scanner?from=1m"

One-line-ish pub/sub
# create new topic
curl -XPOST localhost:7200/pubsubdemo

# get scanner ID with jq
export SCANNER=$(curl -s -XPOST "localhost:7200/pubsubdemo/scanner?from=0&persist=true" | jq -r .id)

# subscribe to the topic
while true; do; curl "localhost:7200/pubsubdemo/scan?id=$SCANNER&wait=1h" && echo; done

# IN ANOTHER WINDOW

# publish on the topic
while true; do; read data; curl localhost:7200/pubsubdemo/payload --data-binary $data; done
# write something and hit enter

Contributing

Contributions are more than welcome, check the contributing guidelines. To ask any questions you can write to the netlog-dev mailing list.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknown is returned when an underlying stardard Go error reaches the user.
	ErrUnknown = newErr(http.StatusInternalServerError, "netlog: unkwown error")
	// ErrInvalidDir is returned when the data folder provided does not exists or is not writable.
	ErrInvalidDir = newErr(http.StatusInternalServerError, "netlog: invalid data directory")

	// ErrBadRequest is returned when invalid parameters are received.
	ErrBadRequest = newErr(http.StatusBadRequest, "netlog: bad request")
	// ErrInvalidOffset is returned when the requested offset can not be parsed into an number.
	ErrInvalidOffset = newErr(http.StatusBadRequest, "netlog: invalid offset")
	// ErrInvalidDuration is returned when a given big duration can not be parsed
	ErrInvalidDuration = newErr(http.StatusBadRequest, "netlog: invalid duration")
	// ErrInvalidCompression is returning when the compression type defined is unknown
	ErrInvalidCompression = newErr(http.StatusBadRequest, "netlog: invalid compression type")
	// ErrTopicExists is returning when trying to create an already existing topic.
	ErrTopicExists = newErr(http.StatusBadRequest, "netlog: topic exists")
	// ErrEndOfTopic is returned when the reader has read all the way until the end of the topic.
	ErrEndOfTopic = newErr(http.StatusNotFound, "netlog: end of topic")
	// ErrTopicNotFound is returned when addressing an non-existing topic.
	ErrTopicNotFound = newErr(http.StatusNotFound, "netlog: topic not found")

	// ErrScannerNotFound is returning when using a non-existing scanner ID.
	ErrScannerNotFound = newErr(http.StatusNotFound, "netlog: scanner not found")
	// ErrOffsetNotFound is returning when the offset is no longer or not yet present in the topic.
	ErrOffsetNotFound = newErr(http.StatusNotFound, "netlog: offset not found")

	// ErrCRC is returned when a message's payload does not match's the CRC header.
	ErrCRC = newErr(http.StatusInternalServerError, "netlog: checksum error")
	// ErrBusy is retuning when trying to close or delete a topic with readers attached to it.
	ErrBusy = newErr(http.StatusConflict, "netlog: resource busy")
)

Functions

func NopWCloser

func NopWCloser(w io.Writer) io.WriteCloser

NopWCloser returns a WriteCloser with a no-op Close method wrapping the provided Writer w.

Types

type BLTopicScanner

type BLTopicScanner struct {
	// contains filtered or unexported fields
}

BLTopicScanner implements TopicScanner reading from BigLog.

func (*BLTopicScanner) Close

func (ts *BLTopicScanner) Close() error

Close implements io.Closer and releases the TopicScanner resources.

func (*BLTopicScanner) ID

func (ts *BLTopicScanner) ID() string

ID returns the ID of the scanner

func (*BLTopicScanner) Info

func (ts *BLTopicScanner) Info() TScannerInfo

Info returns a TScannerInfo struct with the scanner's next offset and the initial offset.

func (*BLTopicScanner) Scan

func (ts *BLTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

Scan advances the Scanner to the next message, returning the message and the offset. Scan will block when it reaches EOF until there is more data available, the user must provide a context to cancel the request when it needs to stop waiting.

type CompressionType

type CompressionType uint8

CompressionType indicates a type of compression for message sets

const (
	// CompressionDefault is used when falling back to the default compression of the system.
	CompressionDefault CompressionType = 0
	// CompressionNone is used by messages sets with uncompressed payloads
	CompressionNone CompressionType = 1
	// CompressionGzip is used by message sets with gzipped payloads
	CompressionGzip CompressionType = 2
	// CompressionSnappy is used by message sets with snappy payloads
	CompressionSnappy CompressionType = 3
)

type IntegrityChecker

type IntegrityChecker struct {
	// contains filtered or unexported fields
}

IntegrityChecker is used to check the integrity of an entire topic.

func NewIntegrityChecker

func NewIntegrityChecker(t *Topic, from int64) (*IntegrityChecker, error)

NewIntegrityChecker creates a new integrity checker for a given topic.

func (*IntegrityChecker) Check

func (ic *IntegrityChecker) Check(ctx context.Context) (errors []*IntegrityError)

Check reads all data collecting errors which then returns. Is recommended to pass a cancellable context since this operation can be slow.

func (*IntegrityChecker) Close

func (ic *IntegrityChecker) Close() error

Close releases the underlying resources.

type IntegrityError

type IntegrityError struct {
	Offset   int64              `json:"offset"`
	ODelta   int                `json:"odelta"`
	Type     IntegrityErrorType `json:"type"`
	Expected string             `json:"expected"`
	Actual   string             `json:"actual"`
}

IntegrityError is the struct with metadata about an any integrity error found.

func CheckMessageIntegrity

func CheckMessageIntegrity(m Message, delta int) *IntegrityError

CheckMessageIntegrity checks the integrity of a single message

type IntegrityErrorType

type IntegrityErrorType string

IntegrityErrorType is the category of possible errors in the data.

const (

	// IntegrityChecksumErr is returned when the checksum in the message
	// header doesn't match the checksum recalculated from the payload.
	IntegrityChecksumErr IntegrityErrorType = "checksum"

	// IntegrityLengthErr is returned when the length in the message
	// header doesn't match the length of the payload.
	IntegrityLengthErr IntegrityErrorType = "length"

	// IntegrityUnknownErr is returned when data can not be read because
	// of an underlying error reading the data.
	IntegrityUnknownErr IntegrityErrorType = "unknown"
)

type Message

type Message []byte

Message the unit of data storage.

func MessageFromPayload

func MessageFromPayload(p []byte) Message

MessageFromPayload returns a message with the appropriate calculated headers from a give data payload.

func MessageSet

func MessageSet(msgs []Message, comp CompressionType) Message

MessageSet returns a new message with a batch of compressed messages as payload Compression will compress the payload and set the compression header, please be ware that compression at this level is only meant for batching several messages into a single message-set in increase throughput. MessageSet will panic if a compression type is not provided, since nothing would indicate to streaming clients that further messages are embedded in the payload.

func ReadMessage

func ReadMessage(r io.Reader) (entry Message, err error)

ReadMessage reads a message from r and returns it if the message is compressed it does not attempt to unpack the contents.

func Unpack

func Unpack(set Message) ([]Message, error)

Unpack takes a message-set and returns a slice with the component messages.

func (*Message) Bytes

func (m *Message) Bytes() []byte

Bytes returns the entire message casted back to bytes.

func (*Message) CRC32

func (m *Message) CRC32() uint32

CRC32 returns the checksum of the payload.

func (*Message) ChecksumOK

func (m *Message) ChecksumOK() bool

ChecksumOK recalculates the CRC from the payload and compares it with the one stored in the header

func (*Message) CompVer

func (m *Message) CompVer() uint8

CompVer returns the first byte which reflects both compression and format version.

func (*Message) Compression

func (m *Message) Compression() CompressionType

Compression returns the compression encoded in bits 4 to 8 of the header.

func (*Message) PLength

func (m *Message) PLength() uint32

PLength returns the length (bytes) of the payload.

func (*Message) Payload

func (m *Message) Payload() []byte

Payload returns the data bytes.

func (*Message) Size

func (m *Message) Size() int

Size returns the total size in bytes of the message.

func (*Message) Version

func (m *Message) Version() uint8

Version returns the format version encoded in bits 0 to 3 of the header.

type NLError

type NLError interface {
	Error() string
	String() string
	StatusCode() int
}

NLError is a known NetLog error with an associates status code.

func ExtErr

func ExtErr(err error) NLError

ExtErr maps external errors, mostly BigLog errors to NetLog errors.

type NetLog

type NetLog struct {
	// contains filtered or unexported fields
}

NetLog is the main struct that serves a set of topics, usually it must be wrapped with an HTTP transport.

func NewNetLog

func NewNetLog(dataDir string, opts ...Option) (nl *NetLog, err error)

NewNetLog creates a new NetLog in a given data folder that must exist and be writable.

func (*NetLog) CreateTopic

func (nl *NetLog) CreateTopic(name string, settings TopicSettings) (t *Topic, err error)

CreateTopic creates a new topic with a given name and default settings.

func (*NetLog) DeleteTopic

func (nl *NetLog) DeleteTopic(name string, force bool) (err error)

DeleteTopic deletes an existing topic by name.

func (*NetLog) Topic

func (nl *NetLog) Topic(name string) (*Topic, error)

Topic returns an existing topic by name.

func (*NetLog) TopicList

func (nl *NetLog) TopicList() []string

TopicList returns the list of existing topic names.

type Option

type Option func(*NetLog)

Option is the type of function used to set internal parameters.

func DefaultTopicSettings

func DefaultTopicSettings(settings TopicSettings) Option

DefaultTopicSettings sets the default topic settings used if no other is defined at creation time.

func MonitorInterval

func MonitorInterval(interval bigduration.BigDuration) Option

MonitorInterval defines de interval at which the segment monitor in charge of spiting and discarding segments runs.

type PersistentTopicScanner

type PersistentTopicScanner struct {
	// contains filtered or unexported fields
}

PersistentTopicScanner synchronizes the underlying scanner state to a given writer

func (*PersistentTopicScanner) Close

func (p *PersistentTopicScanner) Close() error

Close deletes the offset tracking file, closes the offset channel and closes the underlying scanner

func (*PersistentTopicScanner) ID

ID the ID of the scanner

func (*PersistentTopicScanner) Info

Info returns a TScannerInfo struct with the scanner's next offset and the last scanned one

func (*PersistentTopicScanner) Scan

func (p *PersistentTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error)

Scan offloads the actual scan to the underlying scanner while updates the last read offset

type SegmentMonitor

type SegmentMonitor struct {
	// contains filtered or unexported fields
}

SegmentMonitor periodically checks for segments to split or discard at a given interval.

type StreamerAtomicMap

type StreamerAtomicMap struct {
	// contains filtered or unexported fields
}

StreamerAtomicMap is a copy-on-write thread-safe map of pointers to Streamer

func NewStreamerAtomicMap

func NewStreamerAtomicMap() *StreamerAtomicMap

NewStreamerAtomicMap returns a new initialized StreamerAtomicMap

func (*StreamerAtomicMap) Delete

func (am *StreamerAtomicMap) Delete(key string)

Delete removes the pointer to Streamer under key from the map

func (*StreamerAtomicMap) Get

func (am *StreamerAtomicMap) Get(key string) (value *biglog.Streamer, ok bool)

Get returns a pointer to Streamer for a given key

func (*StreamerAtomicMap) GetAll

func (am *StreamerAtomicMap) GetAll() map[string]*biglog.Streamer

GetAll returns the underlying map of pointers to Streamer this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*StreamerAtomicMap) Len

func (am *StreamerAtomicMap) Len() int

Len returns the number of elements in the map

func (*StreamerAtomicMap) Set

func (am *StreamerAtomicMap) Set(key string, value *biglog.Streamer)

Set inserts in the map a pointer to Streamer under a given key

type TScannerInfo

type TScannerInfo struct {
	ID      string `json:"id"`
	Next    int64  `json:"next"`
	From    int64  `json:"from"`
	Persist bool   `json:"persistent"`
}

TScannerInfo holds the scanner's offset information

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is a log of linear messages.

func (*Topic) CheckIntegrity

func (t *Topic) CheckIntegrity(ctx context.Context, from int64) ([]*IntegrityError, error)

CheckIntegrity scans the topic and checks for inconsistencies in the data

func (*Topic) CheckSegments

func (t *Topic) CheckSegments() error

CheckSegments is called by the runner and discards or splits segments when conditions are met.

func (*Topic) DeleteScanner

func (t *Topic) DeleteScanner(ID string) (err error)

DeleteScanner removes the scanner from the topic

func (*Topic) DirPath

func (t *Topic) DirPath() string

DirPath returns the absolute path to the folder with the topic's files

func (*Topic) FlushBuffered

func (t *Topic) FlushBuffered() error

FlushBuffered flushes all buffered messages into the BigLog. Notice that the BigLog might have a buffer on its own that this function does not flush, so calling this does not mean the data has been stored on disk.

func (*Topic) Info

func (t *Topic) Info() (i *TopicInfo, err error)

Info provides all public topic information.

func (*Topic) Name

func (t *Topic) Name() string

Name returns the Topic's name, which maps to the folder name

func (*Topic) NewScanner

func (t *Topic) NewScanner(from int64, persist bool) (ts TopicScanner, err error)

NewScanner creates a new scanner starting at offset `from`. If `persist` is true, the scanner and it's state will survive server restarts

func (*Topic) ParseOffset

func (t *Topic) ParseOffset(str string) (int64, error)

ParseOffset converts an offset string into a numeric precise offset 'beginning', 'first' or 'oldest' return the lowest available offset in the topic 'last' or 'latest' return the highest available offset in the topic 'end' or 'now' return the next offset to be written in the topic numeric string values are directly converted to integer duration notation like "1day" returns the first offset available since 1 day ago.

func (*Topic) Payload

func (t *Topic) Payload(offset int64) ([]byte, error)

Payload is a utility method to fetch the payload of a single offset.

func (*Topic) ReadFrom

func (t *Topic) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom reads an entry or stream of entries from r until EOF is reached writes the entry/stream into the topic is the entry is valid. The return value n is the number of bytes read. It implements the io.ReaderFrom interface.

func (*Topic) Scanner

func (t *Topic) Scanner(ID string) (ts TopicScanner, err error)

Scanner returns an existing scanner for the topic given and ID or ErrScannerNotFound if it doesn't exists.

func (*Topic) Sync

func (t *Topic) Sync() error

Sync flushes all data to disk.

func (*Topic) Write

func (t *Topic) Write(p []byte) (n int, err error)

Write implements the io.Writer interface for a Topic.

func (*Topic) WriteN

func (t *Topic) WriteN(p []byte, n int) (written int, err error)

WriteN writes a set of N messages to the Topic

type TopicAtomicMap

type TopicAtomicMap struct {
	// contains filtered or unexported fields
}

TopicAtomicMap is a copy-on-write thread-safe map of pointers to Topic

func NewTopicAtomicMap

func NewTopicAtomicMap() *TopicAtomicMap

NewTopicAtomicMap returns a new initialized TopicAtomicMap

func (*TopicAtomicMap) Delete

func (am *TopicAtomicMap) Delete(key string)

Delete removes the pointer to Topic under key from the map

func (*TopicAtomicMap) Get

func (am *TopicAtomicMap) Get(key string) (value *Topic, ok bool)

Get returns a pointer to Topic for a given key

func (*TopicAtomicMap) GetAll

func (am *TopicAtomicMap) GetAll() map[string]*Topic

GetAll returns the underlying map of pointers to Topic this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*TopicAtomicMap) Len

func (am *TopicAtomicMap) Len() int

Len returns the number of elements in the map

func (*TopicAtomicMap) Set

func (am *TopicAtomicMap) Set(key string, value *Topic)

Set inserts in the map a pointer to Topic under a given key

type TopicInfo

type TopicInfo struct {
	*biglog.Info
	Scanners map[string]TScannerInfo `json:"scanners"`
}

TopicInfo returns the topic information including information about size, segments, scanners and streamers

type TopicScanner

type TopicScanner interface {
	ID() string
	Scan(ctx context.Context) (m Message, offset int64, err error)
	Info() TScannerInfo
	Close() error
}

TopicScanner reads one by one over the messages in a topic blocking until new data is available for a period of time. TopicScanners are thread-safe.

func NewTopicScanner

func NewTopicScanner(t *Topic, ID string, from int64, persist bool) (TopicScanner, error)

NewTopicScanner returns a new topic scanner ready to scan starting at offset `from`, if persist is true, the scanner and its last position will survive across server restarts

type TopicScannerAtomicMap

type TopicScannerAtomicMap struct {
	// contains filtered or unexported fields
}

TopicScannerAtomicMap is a copy-on-write thread-safe map of TopicScanner

func NewTopicScannerAtomicMap

func NewTopicScannerAtomicMap() *TopicScannerAtomicMap

NewTopicScannerAtomicMap returns a new initialized TopicScannerAtomicMap

func (*TopicScannerAtomicMap) Delete

func (am *TopicScannerAtomicMap) Delete(key string)

Delete removes the TopicScanner under key from the map

func (*TopicScannerAtomicMap) Get

func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool)

Get returns a TopicScanner for a given key

func (*TopicScannerAtomicMap) GetAll

func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner

GetAll returns the underlying map of TopicScanner this map must NOT be modified, to change the map safely use the Set and Delete functions and Get the value again

func (*TopicScannerAtomicMap) Len

func (am *TopicScannerAtomicMap) Len() int

Len returns the number of elements in the map

func (*TopicScannerAtomicMap) Set

func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner)

Set inserts in the map a TopicScanner under a given key

type TopicSettings

type TopicSettings struct {
	// SegAge is the age at after which old segments are discarded.
	SegAge bigduration.BigDuration `json:"segment_age,ommitempty"`
	// SegSize is the size at which a new segment should be created.
	SegSize int64 `json:"segment_size,ommitempty"`
	// BatchNumMessages is the maximum number of messages to be batched.
	BatchNumMessages int `json:"batch_num_messages,ommitempty"`
	// BatchInterval is the interval at which batched messages are flushed to disk.
	BatchInterval bigduration.BigDuration `json:"batch_interval,ommitempty"`
	// CompressionType allows to specify how batches are compressed.
	CompressionType CompressionType `json:"compression_type,ommitempty"`
}

TopicSettings holds the tunable settings of a topic.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL