types

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ContentTypeProtobuf   = "application/vnd.google.protobuf"
	ContentTypeFlatbuffer = "application/x-flatbuffers"
	ContentTypeJSON       = "application/json"
)
View Source
const (
	Backend                 = "meta.backend"
	MaxPayloadBytes         = "meta.maxPayloadBytes"
	UserIdentifyHeader      = "meta.header.userIdentifyHeader"
	GroupIdentifyHeader     = "meta.header.groupIdentifyHeader"
	StreamLength            = "stream.length"
	StreamApproximateLength = "stream.approxMaxLength"
)

const attributes keys the queue service implement must provide.

Variables

Functions

func Compare

func Compare(o1, o2 Offset) (int, error)

func NewLengthDelimitedFrameReader

func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser

NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed frames off of a stream.

The protocol is:

stream: message ...
message: prefix body
prefix: 4 byte uint32 in BigEndian order, denotes length of body
body: bytes (0..prefix)

If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead will be returned along with the number of bytes read.

func NewLengthDelimitedFrameWriter

func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer

func WithUser

func WithUser(ctx context.Context, user User) context.Context

WithUser saves User into context.

Types

type Attributes

type Attributes map[string]string

type AttributesCodec

type AttributesCodec interface {
	MediaType() string

	AttributesEncoder

	AttributesDecoder
}

AttributesCodec helps to encode or decode Attributes from or to bytes.

func AttributesCodecFor

func AttributesCodecFor(contentType string) AttributesCodec

type AttributesDecoder

type AttributesDecoder interface {
	Decode([]byte, *Attributes) error
}

type AttributesEncoder

type AttributesEncoder interface {
	Encode(Attributes Attributes, w io.Writer) error
}

type DataFrame

type DataFrame struct {
	Data []byte

	Index Index

	Tags Tags

	Message string
}

func (*DataFrame) Empty

func (f *DataFrame) Empty() bool

type DataFrameCodec

type DataFrameCodec interface {
	MediaType() string

	DataFrameEncoder

	DataFrameDecoder
}

DataFrameCodec helps to encode or decode a DataFrame from or to bytes.

func DataFrameCodecFor

func DataFrameCodecFor(contentType string) DataFrameCodec

type DataFrameDecoder

type DataFrameDecoder interface {
	// Decode decodes DataFrame from bytes.
	Decode([]byte, *DataFrame) error

	// DecodeList attempts to decode DataFrameList from bytes.
	DecodeList([]byte) ([]DataFrame, error)
}

type DataFrameEncoder

type DataFrameEncoder interface {
	// Encode encodes DataFrame into bytes.
	Encode(frame DataFrame, w io.Writer) error

	// EncodeList attempts to encode batch of DataFrame into bytes.
	EncodeList(list []DataFrame, w io.Writer) error
}

type DataFrameReader

type DataFrameReader interface {
	// FrameChan return a DataFrame channel.
	FrameChan() <-chan DataFrame
}

type Index

type Index uint64

func FromUint64

func FromUint64(i uint64) Index

func FromUint64Slice

func FromUint64Slice(ii []uint64) []Index

func LargestIndex

func LargestIndex(dfs []DataFrame) Index

func (Index) Uint64

func (i Index) Uint64() uint64

type Interface

type Interface interface {
	// End normally emits 'EOS' symbol to end up the queue asynchronously,
	// but if force set to true, stream ends up directly.
	// Undelivered data will be truncated.
	End(ctx context.Context, force bool) error
	// Truncate truncates data before the specific index.
	Truncate(ctx context.Context, index uint64) error
	// Put appends new data into stream.
	Put(ctx context.Context, data []byte, tags Tags) (index uint64, err error)
	// Get returns data frames from the index of stream in queue.
	// Param length specifies the expected message count.
	// And if timeout is set, this call will block until length got satisfied or
	// timeout timer fires.
	Get(ctx context.Context, index uint64, length int, timeout time.Duration, tags Tags) (dfs []DataFrame, err error)
	// Watch subscribe to queue service, when new data frame is appended through Put method,
	// watcher will emit it through its result channel.
	// Param index specifies the beginning message index of the watch.
	// Param window specifies the largest size the Watcher could transfer at one time.
	Watch(ctx context.Context, index uint64, indexOnly bool, noAck bool, window uint64) (Watcher, error)
	// Commit commits indices to make the corresponding messages marked as consumed.
	Commit(ctx context.Context, del bool, indexes ...uint64) error
	// Del deletes indices to make the corresponding messages deleted from stream.
	Del(ctx context.Context, indexes ...uint64) error
	// Attributes reflects self dynamic attributes by K/V pairs.
	Attributes() Attributes
}

Interface of QueueService. Core abstraction for streaming framework.

type Offset

type Offset string
const (
	OffsetEOS Offset = "eos"
)

func (Offset) IsInf

func (o Offset) IsInf() bool

func (Offset) Uint64

func (o Offset) Uint64() (uint64, bool)

type Range

type Range struct {
	LeftInclude  bool
	RightInclude bool
	PositiveInf  bool

	Begin uint64
	End   uint64
}

func ParseRange

func ParseRange(input string) (Range, error)

func (Range) Empty

func (r Range) Empty() bool

func (Range) String

func (r Range) String() string

type StreamStatus

type StreamStatus string
const (
	StreamOk     StreamStatus = "OK"
	StreamCancel StreamStatus = "Cancel"
	StreamEnd    StreamStatus = "End"
)

type Tags

type Tags map[string]string

func (Tags) Contains

func (t Tags) Contains(t1 Tags) bool

func (Tags) Diff

func (t Tags) Diff(t1 Tags) (add Tags, del Tags, update Tags)

func (Tags) Empty

func (t Tags) Empty() bool

func (Tags) Equals

func (t Tags) Equals(t1 Tags) bool

func (Tags) Get

func (t Tags) Get(key string) string

func (Tags) Has

func (t Tags) Has(key string) bool

func (Tags) Set

func (t Tags) Set(key string, value string)

func (Tags) ToJSON

func (t Tags) ToJSON() string

func (Tags) Validate

func (t Tags) Validate() error

type User

type User interface {
	// Uid represents the user id.
	Uid() string

	// Gid represents the group id of user.
	Gid() string

	// Token represents the access token of the queue service.
	Token() string
}

User authenticated information.

func UserFromContext

func UserFromContext(ctx context.Context) (User, bool)

UserFromContext loads User from context.

type UserAware

type UserAware interface {
	// User returns the user info.
	User() User
}

type UserWithToken

type UserWithToken interface {
	// Token to access the backend service.
	Token() string
}

type Watcher

type Watcher interface {
	// Watcher is a kind of DataFrameReader.
	DataFrameReader
	// Close stops Watcher and closes the FrameChan.
	Close()
}

Watcher is the entity following the stream.

type WorkerStatus

type WorkerStatus string
const (
	WorkerRunning WorkerStatus = "Running"
	WorkerStopped WorkerStatus = "Stopped"
	WorkerError   WorkerStatus = "Error"
	WorkerUnknown WorkerStatus = "Unknown"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL