Documentation ¶
Index ¶
- Constants
- Variables
- func CompressionTypeFromPb(c hstreampb.CompressionType) (tp compression.CompressionType)
- func CompressionTypeToPb(c compression.CompressionType) (tp hstreampb.CompressionType)
- func EnableForceDelete()
- func EnableIgnoreNoneExist()
- func FromPbHRecord(rid *hstreampb.RecordId, pb *hstreampb.HStreamRecord) (*Record.ReceivedHRecord, error)
- func FromPbRawRecord(rid *hstreampb.RecordId, pb *hstreampb.HStreamRecord) (*Record.ReceivedRawRecord, error)
- func HStreamRecordToPb(r Record.HStreamRecord) (*hstreampb.HStreamRecord, error)
- func ReceivedRecordFromPb(record *hstreampb.HStreamRecord, rid *hstreampb.RecordId) (Record.ReceivedRecord, error)
- func RecordHeaderFromPb(pb *hstreampb.HStreamRecordHeader) (Record.RecordHeader, error)
- func RecordIdFromPb(pb *hstreampb.RecordId) Record.RecordId
- func RecordIdToPb(r Record.RecordId) *hstreampb.RecordId
- func RecordTypeFromPb(pb hstreampb.HStreamRecordHeader_Flag) (Record.RecordType, error)
- func RecordTypeToPb(r Record.RecordType) (flag hstreampb.HStreamRecordHeader_Flag)
- func ShardToPb(shard *Shard) *hstreampb.Shard
- func StatsIntervalsToPb(intervals []int32) *hstreampb.StatsIntervalVals
- func SubscriptionOffsetToPb(offset SubscriptionOffset) hstreampb.SpecialOffset
- type AppendResult
- type BatchProducer
- type ConnectorStatsType
- type Consumer
- type DeleteStreamOpts
- type FetchRecords
- type FetchResult
- type HStreamClient
- func (c *HStreamClient) AdminRequest(cmd string) (string, error)
- func (c *HStreamClient) AdminRequestToServer(addr, cmd string) (string, error)
- func (c *HStreamClient) CheckExist(subId string) (bool, error)
- func (c *HStreamClient) CreateStream(streamName string, opts ...StreamOpts) error
- func (c *HStreamClient) CreateSubscription(subId string, streamName string, opts ...SubscriptionOpts) error
- func (c *HStreamClient) DeleteShardReader(shardId uint64, readerId string) error
- func (c *HStreamClient) DeleteStream(streamName string, opts ...DeleteStreamOpts) error
- func (c *HStreamClient) DeleteSubscription(subId string, force bool) error
- func (c *HStreamClient) GetStatsRequest(addr string, statsTypes []StatType) ([]StatResult, error)
- func (c *HStreamClient) ListShards(streamName string) ([]Shard, error)
- func (c *HStreamClient) ListStreams() ([]Stream, error)
- func (c *HStreamClient) ListSubscriptions() ([]Subscription, error)
- func (c *HStreamClient) LookupShard(shardId uint64) (string, error)
- func (c *HStreamClient) NewBatchProducer(streamName string, opts ...ProducerOpt) (*BatchProducer, error)
- func (c *HStreamClient) NewConsumer(consumerName, subId string) *Consumer
- func (c *HStreamClient) NewProducer(streamName string) (*Producer, error)
- func (c *HStreamClient) NewShardReader(streamName string, readerId string, shardId uint64, opts ...ShardReaderOpts) (*ShardReader, error)
- func (c *HStreamClient) StreamStatsAllRequest(method string, intervals []int32) (map[string]*Stats, error)
- func (c *HStreamClient) StreamStatsRequest(method, streamName string, intervals []int32) (*Stats, error)
- type Producer
- type ProducerOpt
- type QueryStatsType
- type Shard
- type ShardMap
- type ShardOffset
- type ShardReader
- type ShardReaderOpts
- type StatError
- type StatResult
- type StatType
- type StatValue
- type Stats
- type Stream
- type StreamOpts
- type StreamStatsType
- type Subscription
- type SubscriptionOffset
- type SubscriptionOpts
- type SubscriptionStatsType
- type TLSOps
- type ViewStatsType
Constants ¶
const ( MAX_BATCH_ACKIDS = 100 ACK_COLLECT_TIMEOUT = time.Second * 5 )
const ( DefaultBatchProducerFlushTimeout = 100 * time.Millisecond DefaultMaxBatchRecordsSize = 1024 * 1024 // 1MB DefaultBatchRecordsCount = 100 )
const DEFAULT_SHARDMAP_DEGREE = 32
Variables ¶
var ( ForceDelete = false IgnoreNoneExist = false )
Functions ¶
func CompressionTypeFromPb ¶ added in v0.3.0
func CompressionTypeFromPb(c hstreampb.CompressionType) (tp compression.CompressionType)
func CompressionTypeToPb ¶ added in v0.3.0
func CompressionTypeToPb(c compression.CompressionType) (tp hstreampb.CompressionType)
func EnableForceDelete ¶ added in v0.3.0
func EnableForceDelete()
func EnableIgnoreNoneExist ¶ added in v0.3.0
func EnableIgnoreNoneExist()
func FromPbHRecord ¶
func FromPbHRecord(rid *hstreampb.RecordId, pb *hstreampb.HStreamRecord) (*Record.ReceivedHRecord, error)
func FromPbRawRecord ¶
func FromPbRawRecord(rid *hstreampb.RecordId, pb *hstreampb.HStreamRecord) (*Record.ReceivedRawRecord, error)
func HStreamRecordToPb ¶ added in v0.1.1
func HStreamRecordToPb(r Record.HStreamRecord) (*hstreampb.HStreamRecord, error)
func ReceivedRecordFromPb ¶ added in v0.1.1
func ReceivedRecordFromPb(record *hstreampb.HStreamRecord, rid *hstreampb.RecordId) (Record.ReceivedRecord, error)
func RecordHeaderFromPb ¶ added in v0.1.1
func RecordHeaderFromPb(pb *hstreampb.HStreamRecordHeader) (Record.RecordHeader, error)
func RecordTypeFromPb ¶ added in v0.1.1
func RecordTypeFromPb(pb hstreampb.HStreamRecordHeader_Flag) (Record.RecordType, error)
func RecordTypeToPb ¶ added in v0.1.1
func RecordTypeToPb(r Record.RecordType) (flag hstreampb.HStreamRecordHeader_Flag)
func StatsIntervalsToPb ¶ added in v0.1.2
func StatsIntervalsToPb(intervals []int32) *hstreampb.StatsIntervalVals
func SubscriptionOffsetToPb ¶ added in v0.2.0
func SubscriptionOffsetToPb(offset SubscriptionOffset) hstreampb.SpecialOffset
Types ¶
type AppendResult ¶
type AppendResult interface { // Ready will return when the append request return, // or an error if append fails. Ready() (Record.RecordId, error) }
AppendResult is a handler to process the results of append operation.
type BatchProducer ¶
type BatchProducer struct {
// contains filtered or unexported fields
}
BatchProducer is a producer that can batch write multiple records to the specified stream.
func (*BatchProducer) Append ¶
func (p *BatchProducer) Append(record Record.HStreamRecord) AppendResult
Append will write batch records to the specified stream. This is an asynchronous method. The backend goroutines are responsible for collecting the batch records and sending the data to the server when the trigger conditions are met.
type ConnectorStatsType ¶ added in v0.3.2
type ConnectorStatsType int
const ( ConnectorDeliveredInRecords ConnectorStatsType = iota + 1 ConnectorDeliveredInBytes )
func ConnStatsTypeFromPb ¶ added in v0.3.2
func ConnStatsTypeFromPb(tp *hstreampb.StatType_ConnStat) (t ConnectorStatsType)
func (ConnectorStatsType) String ¶ added in v0.3.2
func (c ConnectorStatsType) String() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer will consume records from specific subscription.
func NewConsumer ¶
func NewConsumer(client *HStreamClient, subId string, consumerName string) *Consumer
func (*Consumer) StartFetch ¶
func (c *Consumer) StartFetch() chan FetchRecords
StartFetch consumes data from the specified subscription. This method is an asynchronous method that allows the user to retrieve the return value from the result channel when consumption is complete.
type DeleteStreamOpts ¶ added in v0.3.0
type DeleteStreamOpts func()
DeleteStreamOpts is the option for DeleteStream.
type FetchRecords ¶ added in v0.1.2
type FetchRecords struct { Result []*FetchResult Err error }
FetchRecords encapsulates the results of StreamingFetch. StreamingFetch may consume one or more pieces of data at a time.
type FetchResult ¶
type FetchResult struct { Record.ReceivedRecord // contains filtered or unexported fields }
func (*FetchResult) Ack ¶
func (f *FetchResult) Ack()
type HStreamClient ¶
HStreamClient is the client for the HStreamDB service.
func NewHStreamClient ¶
func NewHStreamClient(address string, tlsOps ...TLSOps) (*HStreamClient, error)
func (*HStreamClient) AdminRequest ¶ added in v0.1.2
func (c *HStreamClient) AdminRequest(cmd string) (string, error)
AdminRequest send admin command request to a random server in the cluster
func (*HStreamClient) AdminRequestToServer ¶ added in v0.1.4
func (c *HStreamClient) AdminRequestToServer(addr, cmd string) (string, error)
AdminRequestToServer send admin command request to a specified server at given address
func (*HStreamClient) CheckExist ¶
func (c *HStreamClient) CheckExist(subId string) (bool, error)
CheckExist will send a CheckExistRPC to the server and wait for response.
func (*HStreamClient) CreateStream ¶
func (c *HStreamClient) CreateStream(streamName string, opts ...StreamOpts) error
CreateStream will send a CreateStreamRPC to HStreamDB server and wait for response.
func (*HStreamClient) CreateSubscription ¶
func (c *HStreamClient) CreateSubscription(subId string, streamName string, opts ...SubscriptionOpts) error
CreateSubscription will send a CreateSubscriptionRPC to HStreamDB server and wait for response.
func (*HStreamClient) DeleteShardReader ¶ added in v0.3.0
func (c *HStreamClient) DeleteShardReader(shardId uint64, readerId string) error
DeleteShardReader delete specific shardReader
func (*HStreamClient) DeleteStream ¶
func (c *HStreamClient) DeleteStream(streamName string, opts ...DeleteStreamOpts) error
DeleteStream will send a DeleteStreamRPC to HStreamDB server and wait for response.
func (*HStreamClient) DeleteSubscription ¶
func (c *HStreamClient) DeleteSubscription(subId string, force bool) error
DeleteSubscription will send a DeleteSubscriptionRPC to the server and wait for response.
func (*HStreamClient) GetStatsRequest ¶ added in v0.3.2
func (c *HStreamClient) GetStatsRequest(addr string, statsTypes []StatType) ([]StatResult, error)
func (*HStreamClient) ListShards ¶ added in v0.2.0
func (c *HStreamClient) ListShards(streamName string) ([]Shard, error)
ListShards will send a ListShardsRPC to HStreamDB server and wait for response.
func (*HStreamClient) ListStreams ¶
func (c *HStreamClient) ListStreams() ([]Stream, error)
ListStreams will send a ListStreamsRPC to HStreamDB server and wait for response.
func (*HStreamClient) ListSubscriptions ¶
func (c *HStreamClient) ListSubscriptions() ([]Subscription, error)
ListSubscriptions will send a ListSubscriptionsRPC to the server and wait for response.
func (*HStreamClient) LookupShard ¶ added in v0.2.0
func (c *HStreamClient) LookupShard(shardId uint64) (string, error)
LookupShard will send a LookupShardRPC to HStreamDB server and wait for response.
func (*HStreamClient) NewBatchProducer ¶
func (c *HStreamClient) NewBatchProducer(streamName string, opts ...ProducerOpt) (*BatchProducer, error)
NewBatchProducer will create a BatchProducer for specific stream
func (*HStreamClient) NewConsumer ¶
func (c *HStreamClient) NewConsumer(consumerName, subId string) *Consumer
NewConsumer will create a new Consumer for the specific subscription.
func (*HStreamClient) NewProducer ¶
func (c *HStreamClient) NewProducer(streamName string) (*Producer, error)
NewProducer will create a Producer for specific stream
func (*HStreamClient) NewShardReader ¶ added in v0.2.0
func (c *HStreamClient) NewShardReader(streamName string, readerId string, shardId uint64, opts ...ShardReaderOpts) (*ShardReader, error)
NewShardReader create a shardReader to read data from specific shard
func (*HStreamClient) StreamStatsAllRequest ¶ added in v0.1.2
func (*HStreamClient) StreamStatsRequest ¶ added in v0.1.2
func (c *HStreamClient) StreamStatsRequest(method, streamName string, intervals []int32) (*Stats, error)
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer produce a single piece of data to the specified stream.
func (*Producer) Append ¶
func (p *Producer) Append(record Record.HStreamRecord) AppendResult
Append will write a single record to the specified stream. This is a synchronous method.
type ProducerOpt ¶
type ProducerOpt func(producer *BatchProducer)
ProducerOpt is the option for the BatchProducer.
func TimeOut ¶
func TimeOut(timeOut int) ProducerOpt
TimeOut set millisecond time-trigger for BatchProducer to flush data to server. If timeOut <= 0, which means never trigger by time out.
func WithBatch ¶ added in v0.1.2
func WithBatch(batchSize int, maxBytes uint64) ProducerOpt
WithBatch set the batchSize-trigger for BatchProducer, batchSize must greater than 0
func WithCompression ¶ added in v0.3.0
func WithCompression(compressionType compression.CompressionType) ProducerOpt
WithCompression set the compression algorithm
func WithFlowControl ¶ added in v0.1.2
func WithFlowControl(maxBytes uint64) ProducerOpt
WithFlowControl set the flow control for BatchProducer. The maxBytes parameter indicates the maximum number of bytes of data that have not been appended successfully, including all data that has been sent to the server and all data that has not been sent to the server. maxBytes == 0 will disable the flow control.
type QueryStatsType ¶ added in v0.3.2
type QueryStatsType int
const ( QueryTotalInputRecords QueryStatsType = iota + 1 QueryTotalOutputRecords QueryTotalExcuteErrors )
func QueryStatsTypeFromPb ¶ added in v0.3.2
func QueryStatsTypeFromPb(tp *hstreampb.StatType_QueryStat) (t QueryStatsType)
func (QueryStatsType) String ¶ added in v0.3.2
func (q QueryStatsType) String() string
type Shard ¶ added in v0.2.0
func ShardFromPb ¶ added in v0.2.0
type ShardMap ¶ added in v0.2.0
type ShardMap struct {
// contains filtered or unexported fields
}
func NewShardMap ¶ added in v0.2.0
func (*ShardMap) FindLessOrEqual ¶ added in v0.2.0
func (*ShardMap) ReplaceOrInsert ¶ added in v0.2.0
type ShardOffset ¶ added in v0.2.0
type ShardOffset interface {
// contains filtered or unexported methods
}
ShardOffset is used to specify a specific offset for the shardReader.
var ( // EarliestShardOffset specifies that the data is read from the start of the shard EarliestShardOffset ShardOffset = earliestShardOffset{} // LatestShardOffset specifies that the data is read from the current tail of the shard LatestShardOffset ShardOffset = latestShardOffset{} )
func NewRecordOffset ¶ added in v0.2.0
func NewRecordOffset(recordId Record.RecordId) ShardOffset
NewRecordOffset create a RecordOffset of a shard
type ShardReader ¶ added in v0.2.0
type ShardReader struct {
// contains filtered or unexported fields
}
ShardReader is used to read data from the specified shard
func (*ShardReader) Close ¶ added in v0.3.0
func (s *ShardReader) Close()
func (*ShardReader) Read ¶ added in v0.2.0
func (s *ShardReader) Read(ctx context.Context) ([]Record.ReceivedRecord, error)
Read read records from shard
type ShardReaderOpts ¶ added in v0.2.0
type ShardReaderOpts func(reader *ShardReader)
ShardReaderOpts is the option for the ShardReader.
func WithMaxRecords ¶ added in v0.3.0
func WithMaxRecords(cnt uint32) ShardReaderOpts
WithMaxRecords is used to specify the maximum number of records that can be read in a single read.
func WithReaderTimeout ¶ added in v0.2.0
func WithReaderTimeout(timeout uint32) ShardReaderOpts
WithReaderTimeout is used to specify read timeout.
func WithShardOffset ¶ added in v0.2.0
func WithShardOffset(offset ShardOffset) ShardReaderOpts
WithShardOffset is used to specify the read offset of the reader.
type StatError ¶ added in v0.3.2
func StatErrorFromPb ¶ added in v0.3.2
type StatResult ¶ added in v0.3.2
type StatResult interface {
// contains filtered or unexported methods
}
type StatType ¶ added in v0.3.2
type StatType interface { String() string // contains filtered or unexported methods }
func StatsTypeFromPb ¶ added in v0.3.2
type StatValue ¶ added in v0.3.2
func StatValueFromPb ¶ added in v0.3.2
type Stats ¶ added in v0.1.2
type Stats struct {
Values []float64
}
func StatsFromPb ¶ added in v0.1.2
func StatsFromPb(stats *hstreampb.StatsDoubleVals) (*Stats, error)
type Stream ¶
type Stream struct { StreamName string ReplicationFactor uint32 // backlog duration == 0 means forbidden backlog BacklogDuration uint32 ShardCount uint32 CreationTime time.Time }
func StreamFromPb ¶ added in v0.1.1
func (*Stream) StreamToPb ¶ added in v0.1.1
type StreamOpts ¶ added in v0.1.1
type StreamOpts func(stream *Stream)
StreamOpts is the option for the Stream.
func EnableBacklog ¶ added in v0.1.1
func EnableBacklog(backlogDuration uint32) StreamOpts
EnableBacklog sets the backlog duration in seconds for the stream.
func WithReplicationFactor ¶ added in v0.1.1
func WithReplicationFactor(replicationFactor uint32) StreamOpts
WithReplicationFactor sets the replication factor of the stream.
func WithShardCount ¶ added in v0.2.0
func WithShardCount(shardCnt uint32) StreamOpts
WithShardCount sets the number of shards in the stream.
type StreamStatsType ¶ added in v0.3.1
type StreamStatsType int
const ( StreamAppendInBytes StreamStatsType = iota + 1 StreamAppendInRecords StreamAppendTotal StreamAppendFailed )
func StreamStatsTypeFromPb ¶ added in v0.3.2
func StreamStatsTypeFromPb(tp *hstreampb.StatType_StreamStat) (t StreamStatsType)
func (StreamStatsType) String ¶ added in v0.3.1
func (s StreamStatsType) String() string
type Subscription ¶
type Subscription struct { SubscriptionId string StreamName string AckTimeoutSeconds int32 MaxUnackedRecords int32 Offset SubscriptionOffset CreationTime time.Time }
func SubscriptionFromPb ¶ added in v0.1.1
func SubscriptionFromPb(pb *hstreampb.Subscription) Subscription
func (*Subscription) SubscriptionToPb ¶ added in v0.1.1
func (s *Subscription) SubscriptionToPb() *hstreampb.Subscription
type SubscriptionOffset ¶ added in v0.2.0
type SubscriptionOffset uint8
const ( EARLIEST SubscriptionOffset = iota + 1 LATEST )
func SubscriptionOffsetFromPb ¶ added in v0.2.0
func SubscriptionOffsetFromPb(offset hstreampb.SpecialOffset) SubscriptionOffset
type SubscriptionOpts ¶ added in v0.2.0
type SubscriptionOpts func(sub *Subscription)
SubscriptionOpts is the option for the Subscription.
func WithAckTimeout ¶ added in v0.2.0
func WithAckTimeout(timeout int32) SubscriptionOpts
WithAckTimeout sets the ack timeout in seconds.
func WithMaxUnackedRecords ¶ added in v0.2.0
func WithMaxUnackedRecords(cnt int32) SubscriptionOpts
WithMaxUnackedRecords sets the max unacked records. If the number of records that have not been acked reaches the set value, the server will stop pushing more records to the client.
func WithOffset ¶ added in v0.2.0
func WithOffset(offset SubscriptionOffset) SubscriptionOpts
WithOffset sets the subscription offset.
type SubscriptionStatsType ¶ added in v0.3.1
type SubscriptionStatsType int
const ( SubSendOutBytes SubscriptionStatsType = iota + 1 SubSendOutRecords SubSendOutRecordsFailed SubResendRecords SubResendRecordsFailed ReceivedAcks SubRequestMessages SubResponseMessages )
func SubStatsTypeFromPb ¶ added in v0.3.2
func SubStatsTypeFromPb(tp *hstreampb.StatType_SubStat) (t SubscriptionStatsType)
func (SubscriptionStatsType) String ¶ added in v0.3.1
func (s SubscriptionStatsType) String() string
type TLSOps ¶ added in v0.3.0
TLSOps set tls configurations
func WithClientCert ¶ added in v0.3.0
WithClientCert set client cert path
func WithClientKey ¶ added in v0.3.0
WithClientKey set client key path
type ViewStatsType ¶ added in v0.3.2
type ViewStatsType int
const (
ViewTotalExecuteQueries ViewStatsType = iota + 1
)
func ViewStatsTypeFromPb ¶ added in v0.3.2
func ViewStatsTypeFromPb(tp *hstreampb.StatType_ViewStat) (t ViewStatsType)
func (ViewStatsType) String ¶ added in v0.3.2
func (v ViewStatsType) String() string