hstream

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: Apache-2.0 Imports: 26 Imported by: 3

Documentation

Index

Constants

View Source
const (
	MAX_BATCH_ACKIDS    = 100
	ACK_COLLECT_TIMEOUT = time.Second * 5
)
View Source
const (
	DefaultBatchProducerFlushTimeout = 100 * time.Millisecond
	DefaultMaxBatchRecordsSize       = 1024 * 1024 // 1MB
	DefaultBatchRecordsCount         = 100
)
View Source
const DEFAULT_SHARDMAP_DEGREE = 32

Variables

View Source
var (
	ForceDelete     = false
	IgnoreNoneExist = false
)

Functions

func CompressionTypeFromPb added in v0.3.0

func CompressionTypeFromPb(c hstreampb.CompressionType) (tp compression.CompressionType)

func CompressionTypeToPb added in v0.3.0

func CompressionTypeToPb(c compression.CompressionType) (tp hstreampb.CompressionType)

func EnableForceDelete added in v0.3.0

func EnableForceDelete()

func EnableIgnoreNoneExist added in v0.3.0

func EnableIgnoreNoneExist()

func HStreamRecordToPb added in v0.1.1

func HStreamRecordToPb(r Record.HStreamRecord) (*hstreampb.HStreamRecord, error)

func ReceivedRecordFromPb added in v0.1.1

func ReceivedRecordFromPb(record *hstreampb.HStreamRecord, rid *hstreampb.RecordId) (Record.ReceivedRecord, error)

func RecordHeaderFromPb added in v0.1.1

func RecordHeaderFromPb(pb *hstreampb.HStreamRecordHeader) (Record.RecordHeader, error)

func RecordIdFromPb added in v0.1.1

func RecordIdFromPb(pb *hstreampb.RecordId) Record.RecordId

func RecordIdToPb added in v0.1.1

func RecordIdToPb(r Record.RecordId) *hstreampb.RecordId

func RecordTypeFromPb added in v0.1.1

func RecordTypeToPb added in v0.1.1

func RecordTypeToPb(r Record.RecordType) (flag hstreampb.HStreamRecordHeader_Flag)

func ShardToPb added in v0.2.0

func ShardToPb(shard *Shard) *hstreampb.Shard

func StatsIntervalsToPb added in v0.1.2

func StatsIntervalsToPb(intervals []int32) *hstreampb.StatsIntervalVals

func SubscriptionOffsetToPb added in v0.2.0

func SubscriptionOffsetToPb(offset SubscriptionOffset) hstreampb.SpecialOffset

Types

type AppendResult

type AppendResult interface {
	// Ready will return when the append request return,
	// or an error if append fails.
	Ready() (Record.RecordId, error)
}

AppendResult is a handler to process the results of append operation.

type BatchProducer

type BatchProducer struct {
	// contains filtered or unexported fields
}

BatchProducer is a producer that can batch write multiple records to the specified stream.

func (*BatchProducer) Append

func (p *BatchProducer) Append(record Record.HStreamRecord) AppendResult

Append will write batch records to the specified stream. This is an asynchronous method. The backend goroutines are responsible for collecting the batch records and sending the data to the server when the trigger conditions are met.

func (*BatchProducer) Stop

func (p *BatchProducer) Stop()

Stop will stop the BatchProducer.

type ConnectorStatsType added in v0.3.2

type ConnectorStatsType int
const (
	ConnectorDeliveredInRecords ConnectorStatsType = iota + 1
	ConnectorDeliveredInBytes
)

func ConnStatsTypeFromPb added in v0.3.2

func ConnStatsTypeFromPb(tp *hstreampb.StatType_ConnStat) (t ConnectorStatsType)

func (ConnectorStatsType) String added in v0.3.2

func (c ConnectorStatsType) String() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer will consume records from specific subscription.

func NewConsumer

func NewConsumer(client *HStreamClient, subId string, consumerName string) *Consumer

func (*Consumer) StartFetch

func (c *Consumer) StartFetch() chan FetchRecords

StartFetch consumes data from the specified subscription. This method is an asynchronous method that allows the user to retrieve the return value from the result channel when consumption is complete.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop will stop the consumer.

type DeleteStreamOpts added in v0.3.0

type DeleteStreamOpts func()

DeleteStreamOpts is the option for DeleteStream.

type FetchRecords added in v0.1.2

type FetchRecords struct {
	Result []*FetchResult
	Err    error
}

FetchRecords encapsulates the results of StreamingFetch. StreamingFetch may consume one or more pieces of data at a time.

type FetchResult

type FetchResult struct {
	Record.ReceivedRecord
	// contains filtered or unexported fields
}

func (*FetchResult) Ack

func (f *FetchResult) Ack()

type HStreamClient

type HStreamClient struct {
	client.Client
}

HStreamClient is the client for the HStreamDB service.

func NewHStreamClient

func NewHStreamClient(address string, tlsOps ...TLSOps) (*HStreamClient, error)

func (*HStreamClient) AdminRequest added in v0.1.2

func (c *HStreamClient) AdminRequest(cmd string) (string, error)

AdminRequest send admin command request to a random server in the cluster

func (*HStreamClient) AdminRequestToServer added in v0.1.4

func (c *HStreamClient) AdminRequestToServer(addr, cmd string) (string, error)

AdminRequestToServer send admin command request to a specified server at given address

func (*HStreamClient) CheckExist

func (c *HStreamClient) CheckExist(subId string) (bool, error)

CheckExist will send a CheckExistRPC to the server and wait for response.

func (*HStreamClient) CreateStream

func (c *HStreamClient) CreateStream(streamName string, opts ...StreamOpts) error

CreateStream will send a CreateStreamRPC to HStreamDB server and wait for response.

func (*HStreamClient) CreateSubscription

func (c *HStreamClient) CreateSubscription(subId string, streamName string, opts ...SubscriptionOpts) error

CreateSubscription will send a CreateSubscriptionRPC to HStreamDB server and wait for response.

func (*HStreamClient) DeleteShardReader added in v0.3.0

func (c *HStreamClient) DeleteShardReader(shardId uint64, readerId string) error

DeleteShardReader delete specific shardReader

func (*HStreamClient) DeleteStream

func (c *HStreamClient) DeleteStream(streamName string, opts ...DeleteStreamOpts) error

DeleteStream will send a DeleteStreamRPC to HStreamDB server and wait for response.

func (*HStreamClient) DeleteSubscription

func (c *HStreamClient) DeleteSubscription(subId string, force bool) error

DeleteSubscription will send a DeleteSubscriptionRPC to the server and wait for response.

func (*HStreamClient) GetStatsRequest added in v0.3.2

func (c *HStreamClient) GetStatsRequest(addr string, statsTypes []StatType) ([]StatResult, error)

func (*HStreamClient) ListShards added in v0.2.0

func (c *HStreamClient) ListShards(streamName string) ([]Shard, error)

ListShards will send a ListShardsRPC to HStreamDB server and wait for response.

func (*HStreamClient) ListStreams

func (c *HStreamClient) ListStreams() ([]Stream, error)

ListStreams will send a ListStreamsRPC to HStreamDB server and wait for response.

func (*HStreamClient) ListSubscriptions

func (c *HStreamClient) ListSubscriptions() ([]Subscription, error)

ListSubscriptions will send a ListSubscriptionsRPC to the server and wait for response.

func (*HStreamClient) LookupShard added in v0.2.0

func (c *HStreamClient) LookupShard(shardId uint64) (string, error)

LookupShard will send a LookupShardRPC to HStreamDB server and wait for response.

func (*HStreamClient) NewBatchProducer

func (c *HStreamClient) NewBatchProducer(streamName string, opts ...ProducerOpt) (*BatchProducer, error)

NewBatchProducer will create a BatchProducer for specific stream

func (*HStreamClient) NewConsumer

func (c *HStreamClient) NewConsumer(consumerName, subId string) *Consumer

NewConsumer will create a new Consumer for the specific subscription.

func (*HStreamClient) NewProducer

func (c *HStreamClient) NewProducer(streamName string) (*Producer, error)

NewProducer will create a Producer for specific stream

func (*HStreamClient) NewShardReader added in v0.2.0

func (c *HStreamClient) NewShardReader(streamName string, readerId string, shardId uint64, opts ...ShardReaderOpts) (*ShardReader, error)

NewShardReader create a shardReader to read data from specific shard

func (*HStreamClient) StreamStatsAllRequest added in v0.1.2

func (c *HStreamClient) StreamStatsAllRequest(method string, intervals []int32) (map[string]*Stats, error)

func (*HStreamClient) StreamStatsRequest added in v0.1.2

func (c *HStreamClient) StreamStatsRequest(method, streamName string, intervals []int32) (*Stats, error)

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer produce a single piece of data to the specified stream.

func (*Producer) Append

func (p *Producer) Append(record Record.HStreamRecord) AppendResult

Append will write a single record to the specified stream. This is a synchronous method.

func (*Producer) Stop

func (p *Producer) Stop()

type ProducerOpt

type ProducerOpt func(producer *BatchProducer)

ProducerOpt is the option for the BatchProducer.

func TimeOut

func TimeOut(timeOut int) ProducerOpt

TimeOut set millisecond time-trigger for BatchProducer to flush data to server. If timeOut <= 0, which means never trigger by time out.

func WithBatch added in v0.1.2

func WithBatch(batchSize int, maxBytes uint64) ProducerOpt

WithBatch set the batchSize-trigger for BatchProducer, batchSize must greater than 0

func WithCompression added in v0.3.0

func WithCompression(compressionType compression.CompressionType) ProducerOpt

WithCompression set the compression algorithm

func WithFlowControl added in v0.1.2

func WithFlowControl(maxBytes uint64) ProducerOpt

WithFlowControl set the flow control for BatchProducer. The maxBytes parameter indicates the maximum number of bytes of data that have not been appended successfully, including all data that has been sent to the server and all data that has not been sent to the server. maxBytes == 0 will disable the flow control.

type QueryStatsType added in v0.3.2

type QueryStatsType int
const (
	QueryTotalInputRecords QueryStatsType = iota + 1
	QueryTotalOutputRecords
	QueryTotalExcuteErrors
)

func QueryStatsTypeFromPb added in v0.3.2

func QueryStatsTypeFromPb(tp *hstreampb.StatType_QueryStat) (t QueryStatsType)

func (QueryStatsType) String added in v0.3.2

func (q QueryStatsType) String() string

type Shard added in v0.2.0

type Shard struct {
	ShardId      uint64
	StreamName   string
	StartHashKey string
	EndHashKey   string
}

func ShardFromPb added in v0.2.0

func ShardFromPb(pbShard *hstreampb.Shard) Shard

func (*Shard) Less added in v0.2.0

func (s *Shard) Less(other btree.Item) bool

FIXME: maybe need to use object pool here

type ShardMap added in v0.2.0

type ShardMap struct {
	// contains filtered or unexported fields
}

func NewShardMap added in v0.2.0

func NewShardMap(degree int) *ShardMap

func (*ShardMap) Ascend added in v0.2.0

func (m *ShardMap) Ascend() []Shard

func (*ShardMap) Clear added in v0.2.0

func (m *ShardMap) Clear()

func (*ShardMap) Delete added in v0.2.0

func (m *ShardMap) Delete(shard *Shard)

func (*ShardMap) FindLessOrEqual added in v0.2.0

func (m *ShardMap) FindLessOrEqual(key string) *Shard

func (*ShardMap) ReplaceOrInsert added in v0.2.0

func (m *ShardMap) ReplaceOrInsert(shard *Shard)

type ShardOffset added in v0.2.0

type ShardOffset interface {
	// contains filtered or unexported methods
}

ShardOffset is used to specify a specific offset for the shardReader.

var (
	// EarliestShardOffset specifies that the data is read from the start of the shard
	EarliestShardOffset ShardOffset = earliestShardOffset{}
	// LatestShardOffset specifies that the data is read from the current tail of the shard
	LatestShardOffset ShardOffset = latestShardOffset{}
)

func NewRecordOffset added in v0.2.0

func NewRecordOffset(recordId Record.RecordId) ShardOffset

NewRecordOffset create a RecordOffset of a shard

type ShardReader added in v0.2.0

type ShardReader struct {
	// contains filtered or unexported fields
}

ShardReader is used to read data from the specified shard

func (*ShardReader) Close added in v0.3.0

func (s *ShardReader) Close()

func (*ShardReader) Read added in v0.2.0

Read read records from shard

type ShardReaderOpts added in v0.2.0

type ShardReaderOpts func(reader *ShardReader)

ShardReaderOpts is the option for the ShardReader.

func WithMaxRecords added in v0.3.0

func WithMaxRecords(cnt uint32) ShardReaderOpts

WithMaxRecords is used to specify the maximum number of records that can be read in a single read.

func WithReaderTimeout added in v0.2.0

func WithReaderTimeout(timeout uint32) ShardReaderOpts

WithReaderTimeout is used to specify read timeout.

func WithShardOffset added in v0.2.0

func WithShardOffset(offset ShardOffset) ShardReaderOpts

WithShardOffset is used to specify the read offset of the reader.

type StatError added in v0.3.2

type StatError struct {
	Type    StatType
	Message string
}

func StatErrorFromPb added in v0.3.2

func StatErrorFromPb(pb *hstreampb.StatError) StatError

type StatResult added in v0.3.2

type StatResult interface {
	// contains filtered or unexported methods
}

type StatType added in v0.3.2

type StatType interface {
	String() string
	// contains filtered or unexported methods
}

func StatsTypeFromPb added in v0.3.2

func StatsTypeFromPb(tp *hstreampb.StatType) StatType

type StatValue added in v0.3.2

type StatValue struct {
	Type  StatType
	Value map[string]int64
}

func StatValueFromPb added in v0.3.2

func StatValueFromPb(pb *hstreampb.StatValue) StatValue

type Stats added in v0.1.2

type Stats struct {
	Values []float64
}

func StatsFromPb added in v0.1.2

func StatsFromPb(stats *hstreampb.StatsDoubleVals) (*Stats, error)

type Stream

type Stream struct {
	StreamName        string
	ReplicationFactor uint32
	// backlog duration == 0 means forbidden backlog
	BacklogDuration uint32
	ShardCount      uint32
	CreationTime    time.Time
}

func StreamFromPb added in v0.1.1

func StreamFromPb(pb *hstreampb.Stream) Stream

func (*Stream) StreamToPb added in v0.1.1

func (s *Stream) StreamToPb() *hstreampb.Stream

type StreamOpts added in v0.1.1

type StreamOpts func(stream *Stream)

StreamOpts is the option for the Stream.

func EnableBacklog added in v0.1.1

func EnableBacklog(backlogDuration uint32) StreamOpts

EnableBacklog sets the backlog duration in seconds for the stream.

func WithReplicationFactor added in v0.1.1

func WithReplicationFactor(replicationFactor uint32) StreamOpts

WithReplicationFactor sets the replication factor of the stream.

func WithShardCount added in v0.2.0

func WithShardCount(shardCnt uint32) StreamOpts

WithShardCount sets the number of shards in the stream.

type StreamStatsType added in v0.3.1

type StreamStatsType int
const (
	StreamAppendInBytes StreamStatsType = iota + 1
	StreamAppendInRecords
	StreamAppendTotal
	StreamAppendFailed
)

func StreamStatsTypeFromPb added in v0.3.2

func StreamStatsTypeFromPb(tp *hstreampb.StatType_StreamStat) (t StreamStatsType)

func (StreamStatsType) String added in v0.3.1

func (s StreamStatsType) String() string

type Subscription

type Subscription struct {
	SubscriptionId    string
	StreamName        string
	AckTimeoutSeconds int32
	MaxUnackedRecords int32
	Offset            SubscriptionOffset
	CreationTime      time.Time
}

func SubscriptionFromPb added in v0.1.1

func SubscriptionFromPb(pb *hstreampb.Subscription) Subscription

func (*Subscription) SubscriptionToPb added in v0.1.1

func (s *Subscription) SubscriptionToPb() *hstreampb.Subscription

type SubscriptionOffset added in v0.2.0

type SubscriptionOffset uint8
const (
	EARLIEST SubscriptionOffset = iota + 1
	LATEST
)

func SubscriptionOffsetFromPb added in v0.2.0

func SubscriptionOffsetFromPb(offset hstreampb.SpecialOffset) SubscriptionOffset

type SubscriptionOpts added in v0.2.0

type SubscriptionOpts func(sub *Subscription)

SubscriptionOpts is the option for the Subscription.

func WithAckTimeout added in v0.2.0

func WithAckTimeout(timeout int32) SubscriptionOpts

WithAckTimeout sets the ack timeout in seconds.

func WithMaxUnackedRecords added in v0.2.0

func WithMaxUnackedRecords(cnt int32) SubscriptionOpts

WithMaxUnackedRecords sets the max unacked records. If the number of records that have not been acked reaches the set value, the server will stop pushing more records to the client.

func WithOffset added in v0.2.0

func WithOffset(offset SubscriptionOffset) SubscriptionOpts

WithOffset sets the subscription offset.

type SubscriptionStatsType added in v0.3.1

type SubscriptionStatsType int
const (
	SubSendOutBytes SubscriptionStatsType = iota + 1
	SubSendOutRecords
	SubSendOutRecordsFailed
	SubResendRecords
	SubResendRecordsFailed
	ReceivedAcks
	SubRequestMessages
	SubResponseMessages
)

func SubStatsTypeFromPb added in v0.3.2

func SubStatsTypeFromPb(tp *hstreampb.StatType_SubStat) (t SubscriptionStatsType)

func (SubscriptionStatsType) String added in v0.3.1

func (s SubscriptionStatsType) String() string

type TLSOps added in v0.3.0

type TLSOps func(cfg *security.TLSAuth)

TLSOps set tls configurations

func WithCaCert added in v0.3.0

func WithCaCert(ca string) TLSOps

WithCaCert set ca path

func WithClientCert added in v0.3.0

func WithClientCert(cert string) TLSOps

WithClientCert set client cert path

func WithClientKey added in v0.3.0

func WithClientKey(key string) TLSOps

WithClientKey set client key path

type ViewStatsType added in v0.3.2

type ViewStatsType int
const (
	ViewTotalExecuteQueries ViewStatsType = iota + 1
)

func ViewStatsTypeFromPb added in v0.3.2

func ViewStatsTypeFromPb(tp *hstreampb.StatType_ViewStat) (t ViewStatsType)

func (ViewStatsType) String added in v0.3.2

func (v ViewStatsType) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL