pkg

package
v0.0.0-...-4778dc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RetentionNone        RetentionType = 0
	RetentionTime        RetentionType = 1
	RetentionSize        RetentionType = 2
	FixedNumSegments     ScaleType     = 0
	ByRateInKbytesPerSec ScaleType     = 1
	ByRateInEventsPerSec ScaleType     = 2
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientConfig

type ClientConfig struct {
	MaxConnectionsInPool     uint32
	MaxControllerConnections uint32
	RetryPolicy              *RetryWithBackoff
	ControllerUri            string
	TransactionTimeout       uint64
	TlsEnabled               bool
	AuthEnabled              bool
	ReaderWrapperBufferSize  uint64
	RequestTimeout           uint64
	Trustcerts               string
	Credentials              *Credentials
	DisableCertVerification  bool
}

func NewClientConfig

func NewClientConfig() *ClientConfig

New ClientConfig with default config

type Credentials

type Credentials struct {
	Type                    CredentialsType
	Username                string
	Password                string
	Token                   string
	Path                    string
	Json                    string
	DisableCertVerification bool
}

func NewCredentials

func NewCredentials() *Credentials

type CredentialsType

type CredentialsType int
const (
	CredentialsBasic          CredentialsType = 0
	CredentialsBasicWithToken CredentialsType = 1
	CredentialsKeycloak       CredentialsType = 2
	CredentialsJson           CredentialsType = 3
)

type Operation

type Operation struct {
	Id     int64
	ObjPtr unsafe.Pointer
	ErrPtr unsafe.Pointer
	ErrLen int64
}

type RetentionPolicy

type RetentionPolicy struct {
	Type           RetentionType
	RetentionParam int64
}

type RetentionType

type RetentionType int

type RetryWithBackoff

type RetryWithBackoff struct {
	InitialDelay       uint64
	BackoffCoefficient uint32
	MaxAttempt         int32
	MaxDelay           uint64
	ExpirationTime     int64
}

func NewRetryWithBackoff

func NewRetryWithBackoff() *RetryWithBackoff

New RetryWithBackoff with default config

type ScalePolicy

type ScalePolicy struct {
	Type        ScaleType
	TargetRate  int32
	ScaleFactor int32
	MinSegments int32
}

type ScaleType

type ScaleType int

type SegmentSlice

type SegmentSlice struct {
	Slice *C.Slice
}

SegmentSlice represents a slice of a segment that can be read by the user. To get a SegmentSlice, call reader.GetSegmentSlice() To read events from a SegmentSlice, call slice.Next()

func (*SegmentSlice) Close

func (slice *SegmentSlice) Close()

Close the SegmentSlice. Codes in Rust side will release some resources.

func (*SegmentSlice) Next

func (slice *SegmentSlice) Next() ([]byte, error)

Get the next event. If no more event, will return nil.

type StreamConfiguration

type StreamConfiguration struct {
	Scope     string
	Stream    string
	Scale     ScalePolicy
	Retention RetentionPolicy
	Tags      string
}

func NewStreamConfiguration

func NewStreamConfiguration(scope, stream string) *StreamConfiguration

New StreamConfiguration with default config: FixedNumSegments and NoneRetention Policy

type StreamManager

type StreamManager struct {
	Manager *C.StreamManager
}

StreamManager is responsible for creating scope, stream, writers and readerGroup.

func NewStreamManager

func NewStreamManager(config *ClientConfig) (*StreamManager, error)

Create a StreamManager with specific clientConfig. It will start a reactor goroutine and waiting for the callback response from rust side. E.g.

config := client.NewClientConfig()
manager, err := client.NewStreamManager(config)

func (*StreamManager) Close

func (manager *StreamManager) Close()

Close the StreamManager. Stop the reactor goroutine and release the resources

func (*StreamManager) CreateReaderGroup

func (manager *StreamManager) CreateReaderGroup(readerGroup string, scope string, stream string, readFromTail bool) (*StreamReaderGroup, error)

Create ReaderGroup for given scope/stream. If you want read latest data in the stream, set readFromTail=true, or read data from the very beginning.

func (*StreamManager) CreateScope

func (manager *StreamManager) CreateScope(scope string) (bool, error)

Create scope with a given name. If scope has already exists, return false, else return true. If doesn't success, return error.

func (*StreamManager) CreateStream

func (manager *StreamManager) CreateStream(streamConfig *StreamConfiguration) (bool, error)

Create stream with a given name. If stream has already exists, return false, else return true. If doesn't success, return error.

func (*StreamManager) CreateWriter

func (manager *StreamManager) CreateWriter(scope string, stream string) (*StreamWriter, error)

Create EventStreamWriter with 15MB buffer. TODO: Make buffer size configurable.

type StreamReader

type StreamReader struct {
	Reader *C.StreamReader
}

StreamReader is responsible for reading data from given stream.

func (*StreamReader) Close

func (reader *StreamReader) Close()

Close the StreamReader. Codes in Rust side will release related resources.

func (*StreamReader) GetSegmentSlice

func (reader *StreamReader) GetSegmentSlice(timeout time.Duration) (*SegmentSlice, error)

getSegmentSlice reads from a channel and returns a SegmentSlice or an error. It waits for an object from the channel and processes it accordingly. If the object contains a valid slice, it returns a SegmentSlice with the corresponding data. If the object contains an error, it returns an error with the error message extracted from the object.

type StreamReaderGroup

type StreamReaderGroup struct {
	ReaderGroup *C.StreamReaderGroup
}

func (*StreamReaderGroup) Close

func (readerGroup *StreamReaderGroup) Close()

func (*StreamReaderGroup) CreateReader

func (readerGroup *StreamReaderGroup) CreateReader(reader string) (*StreamReader, error)

Create StreamReader with a given name.

type StreamWriter

type StreamWriter struct {
	// Rust binding reference
	Writer *C.StreamWriter
	// 15MB writer buffer. If there is no more buffer for new events, user goroutine will wait until there are enough buffer for writing.
	Sem *semaphore.Weighted
	Ctx context.Context
}

StreamWriter is responsible for writing data to given stream.

func (*StreamWriter) Close

func (writer *StreamWriter) Close()

Close the StreamWriter. Codes in Rust side will release related resources.

func (*StreamWriter) Flush

func (writer *StreamWriter) Flush() error

Wait the all events been writen to pravega

func (*StreamWriter) WriteEvent

func (writer *StreamWriter) WriteEvent(event []byte) error

write event with empty routingKey TODO: write event with random routingKey

func (*StreamWriter) WriteEventByRoutingKey

func (writer *StreamWriter) WriteEventByRoutingKey(routingKey string, event []byte) error

Write event with given routing key. It's an async operation, if no enough buffer for new coming events, user golang will block on Sem.Acquire()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL