Documentation ¶
Index ¶
- Constants
- type ClientConfig
- type Credentials
- type CredentialsType
- type Operation
- type RetentionPolicy
- type RetentionType
- type RetryWithBackoff
- type ScalePolicy
- type ScaleType
- type SegmentSlice
- type StreamConfiguration
- type StreamManager
- func (manager *StreamManager) Close()
- func (manager *StreamManager) CreateReaderGroup(readerGroup string, scope string, stream string, readFromTail bool) (*StreamReaderGroup, error)
- func (manager *StreamManager) CreateScope(scope string) (bool, error)
- func (manager *StreamManager) CreateStream(streamConfig *StreamConfiguration) (bool, error)
- func (manager *StreamManager) CreateWriter(scope string, stream string) (*StreamWriter, error)
- type StreamReader
- type StreamReaderGroup
- type StreamWriter
Constants ¶
const ( RetentionNone RetentionType = 0 RetentionTime RetentionType = 1 RetentionSize RetentionType = 2 FixedNumSegments ScaleType = 0 ByRateInKbytesPerSec ScaleType = 1 ByRateInEventsPerSec ScaleType = 2 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ClientConfig ¶
type ClientConfig struct { MaxConnectionsInPool uint32 MaxControllerConnections uint32 RetryPolicy *RetryWithBackoff ControllerUri string TransactionTimeout uint64 TlsEnabled bool AuthEnabled bool ReaderWrapperBufferSize uint64 RequestTimeout uint64 Trustcerts string Credentials *Credentials DisableCertVerification bool }
type Credentials ¶
type Credentials struct { Type CredentialsType Username string Password string Token string Path string Json string DisableCertVerification bool }
func NewCredentials ¶
func NewCredentials() *Credentials
type CredentialsType ¶
type CredentialsType int
const ( CredentialsBasic CredentialsType = 0 CredentialsBasicWithToken CredentialsType = 1 CredentialsKeycloak CredentialsType = 2 CredentialsJson CredentialsType = 3 )
type RetentionPolicy ¶
type RetentionPolicy struct { Type RetentionType RetentionParam int64 }
type RetentionType ¶
type RetentionType int
type RetryWithBackoff ¶
type RetryWithBackoff struct { InitialDelay uint64 BackoffCoefficient uint32 MaxAttempt int32 MaxDelay uint64 ExpirationTime int64 }
func NewRetryWithBackoff ¶
func NewRetryWithBackoff() *RetryWithBackoff
New RetryWithBackoff with default config
type ScalePolicy ¶
type SegmentSlice ¶
SegmentSlice represents a slice of a segment that can be read by the user. To get a SegmentSlice, call reader.GetSegmentSlice() To read events from a SegmentSlice, call slice.Next()
func (*SegmentSlice) Close ¶
func (slice *SegmentSlice) Close()
Close the SegmentSlice. Codes in Rust side will release some resources.
func (*SegmentSlice) Next ¶
func (slice *SegmentSlice) Next() ([]byte, error)
Get the next event. If no more event, will return nil.
type StreamConfiguration ¶
type StreamConfiguration struct { Scope string Stream string Scale ScalePolicy Retention RetentionPolicy Tags string }
func NewStreamConfiguration ¶
func NewStreamConfiguration(scope, stream string) *StreamConfiguration
New StreamConfiguration with default config: FixedNumSegments and NoneRetention Policy
type StreamManager ¶
type StreamManager struct {
Manager *C.StreamManager
}
StreamManager is responsible for creating scope, stream, writers and readerGroup.
func NewStreamManager ¶
func NewStreamManager(config *ClientConfig) (*StreamManager, error)
Create a StreamManager with specific clientConfig. It will start a reactor goroutine and waiting for the callback response from rust side. E.g.
config := client.NewClientConfig() manager, err := client.NewStreamManager(config)
func (*StreamManager) Close ¶
func (manager *StreamManager) Close()
Close the StreamManager. Stop the reactor goroutine and release the resources
func (*StreamManager) CreateReaderGroup ¶
func (manager *StreamManager) CreateReaderGroup(readerGroup string, scope string, stream string, readFromTail bool) (*StreamReaderGroup, error)
Create ReaderGroup for given scope/stream. If you want read latest data in the stream, set readFromTail=true, or read data from the very beginning.
func (*StreamManager) CreateScope ¶
func (manager *StreamManager) CreateScope(scope string) (bool, error)
Create scope with a given name. If scope has already exists, return false, else return true. If doesn't success, return error.
func (*StreamManager) CreateStream ¶
func (manager *StreamManager) CreateStream(streamConfig *StreamConfiguration) (bool, error)
Create stream with a given name. If stream has already exists, return false, else return true. If doesn't success, return error.
func (*StreamManager) CreateWriter ¶
func (manager *StreamManager) CreateWriter(scope string, stream string) (*StreamWriter, error)
Create EventStreamWriter with 15MB buffer. TODO: Make buffer size configurable.
type StreamReader ¶
type StreamReader struct {
Reader *C.StreamReader
}
StreamReader is responsible for reading data from given stream.
func (*StreamReader) Close ¶
func (reader *StreamReader) Close()
Close the StreamReader. Codes in Rust side will release related resources.
func (*StreamReader) GetSegmentSlice ¶
func (reader *StreamReader) GetSegmentSlice(timeout time.Duration) (*SegmentSlice, error)
getSegmentSlice reads from a channel and returns a SegmentSlice or an error. It waits for an object from the channel and processes it accordingly. If the object contains a valid slice, it returns a SegmentSlice with the corresponding data. If the object contains an error, it returns an error with the error message extracted from the object.
type StreamReaderGroup ¶
type StreamReaderGroup struct {
ReaderGroup *C.StreamReaderGroup
}
func (*StreamReaderGroup) Close ¶
func (readerGroup *StreamReaderGroup) Close()
func (*StreamReaderGroup) CreateReader ¶
func (readerGroup *StreamReaderGroup) CreateReader(reader string) (*StreamReader, error)
Create StreamReader with a given name.
type StreamWriter ¶
type StreamWriter struct { // Rust binding reference Writer *C.StreamWriter // 15MB writer buffer. If there is no more buffer for new events, user goroutine will wait until there are enough buffer for writing. Sem *semaphore.Weighted Ctx context.Context }
StreamWriter is responsible for writing data to given stream.
func (*StreamWriter) Close ¶
func (writer *StreamWriter) Close()
Close the StreamWriter. Codes in Rust side will release related resources.
func (*StreamWriter) Flush ¶
func (writer *StreamWriter) Flush() error
Wait the all events been writen to pravega
func (*StreamWriter) WriteEvent ¶
func (writer *StreamWriter) WriteEvent(event []byte) error
write event with empty routingKey TODO: write event with random routingKey
func (*StreamWriter) WriteEventByRoutingKey ¶
func (writer *StreamWriter) WriteEventByRoutingKey(routingKey string, event []byte) error
Write event with given routing key. It's an async operation, if no enough buffer for new coming events, user golang will block on Sem.Acquire()