transports

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2023 License: MIT Imports: 30 Imported by: 9

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
View Source
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }

Functions

func CreateTopicIfDoesNotExist

func CreateTopicIfDoesNotExist(brokerAddr, topic string, numPartitions int32, configEntries map[string]*string) error

func ParseKafkaURL

func ParseKafkaURL(brokerURL string) ([]string, *sarama.Config)

func ResumptionForTimestamp added in v0.0.16

func ResumptionForTimestamp(brokerParams []BrokerParams, timestamp int64) ([]byte, error)

Types

type BrokerParams added in v0.0.13

type BrokerParams struct {
	URL          string   `yaml:"URL"`
	DefaultTopic string   `yaml:"DefaultTopic"`
	Topics       []string `yaml:"Topics"`
	Rollback     int64    `yaml:"Rollback"`
}

type Consumer

type Consumer interface {
	ProducerCounter
	// Start sets up communication with the broker and begins processing
	// messages. If you want to ensure receipt of 100% of messages, you should
	// call Start() only after setting up subscriptions with Subscribe()
	Start() error
	// Subscribe enables subscribing to either oredred chain updates or unordered
	// pending batches. Calling Subscribe on a chan *ChainUpdate will return a
	// subscription for ordered chain updates. Calling subscribe on a
	// *PendingBatch will return a subscription for unordered pending batches.
	Subscribe(ch interface{}) types.Subscription
	// SubscribeReorg subscribes to information about large chain reorgs.
	SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription
	// Close shuts down the transport layer, which in turn will cause
	// subscriptions to stop producing messages.
	Close()
	Ready() <-chan struct{}
	WhyNotReady(types.Hash) string
}

Consumer can be used to receive messages over a transport layer.

func NewKafkaConsumer

func NewKafkaConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)

NewKafkaConsumer provides a transports.Consumer that pulls messages from a Kafka broker

func NewNullConsumer

func NewNullConsumer() Consumer

func ResolveConsumer added in v0.0.13

func ResolveConsumer(brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)

brokerURL, defaultTopic string, topics []string, resumption []byte, rollback, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash

func ResolveMuxConsumer added in v0.0.13

func ResolveMuxConsumer(brokerParams []BrokerParams, resumption []byte, lastNumber int64, lastHash types.Hash, lastWeight *big.Int, reorgThreshold int64, trackedPrefixes []*regexp.Regexp, whitelist map[uint64]types.Hash) (Consumer, error)

ResolveMuxConsumer takes a list of broker configurations

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Close

func (kc *KafkaConsumer) Close()

func (*KafkaConsumer) ProducerCount added in v1.3.0

func (kc *KafkaConsumer) ProducerCount(d time.Duration) uint

func (*KafkaConsumer) Ready

func (kc *KafkaConsumer) Ready() <-chan struct{}

func (*KafkaConsumer) Start

func (kc *KafkaConsumer) Start() error

func (*KafkaConsumer) Subscribe

func (kc *KafkaConsumer) Subscribe(ch interface{}) types.Subscription

func (*KafkaConsumer) SubscribeReorg

func (kc *KafkaConsumer) SubscribeReorg(ch chan<- map[int64]types.Hash) types.Subscription

func (*KafkaConsumer) WhyNotReady

func (kc *KafkaConsumer) WhyNotReady(hash types.Hash) string

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func (*KafkaProducer) AddBlock

func (kp *KafkaProducer) AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error

func (*KafkaProducer) LatestBlockFromFeed

func (kp *KafkaProducer) LatestBlockFromFeed() (int64, error)

LatestBlockFromFeed scans the feed the producer is configured for and finds the latest block number. This should be used once on startup, and is intended to allow producers to sync to a particular block before the begin emitting messages. Producers should start emitting when they reach this number, to avoid skipped blocks (which will hault consumers). Producer applications should provide some kind of override, resuming at a block specified by an operator in case messages are needed to start on the correct side of a reorg while the feed has messages from a longer but invalid chain.

func (*KafkaProducer) ProducerCount added in v1.3.0

func (kp *KafkaProducer) ProducerCount(d time.Duration) uint

func (*KafkaProducer) PurgeReplayCache added in v1.4.0

func (kp *KafkaProducer) PurgeReplayCache()

func (*KafkaProducer) Reorg

func (kp *KafkaProducer) Reorg(number int64, hash types.Hash) (func(), error)

func (*KafkaProducer) SendBatch

func (kp *KafkaProducer) SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error

func (*KafkaProducer) SetHealth added in v1.3.0

func (kp *KafkaProducer) SetHealth(b bool)

type KafkaResumptionMessage

type KafkaResumptionMessage struct {
	// contains filtered or unexported fields
}

func (*KafkaResumptionMessage) Key

func (m *KafkaResumptionMessage) Key() []byte

func (*KafkaResumptionMessage) Offset

func (m *KafkaResumptionMessage) Offset() int64

func (*KafkaResumptionMessage) Source

func (m *KafkaResumptionMessage) Source() string

func (*KafkaResumptionMessage) Time

func (m *KafkaResumptionMessage) Time() time.Time

func (*KafkaResumptionMessage) Value

func (m *KafkaResumptionMessage) Value() []byte

type Producer

type Producer interface {
	ProducerCounter
	LatestBlockFromFeed() (int64, error)
	// AddBlock will send information about a block over the transport layer.
	AddBlock(number int64, hash, parentHash types.Hash, weight *big.Int, updates map[string][]byte, deletes map[string]struct{}, batches map[string]types.Hash) error
	// SendBatch will send information about batches over the transport layer.
	// Batches should correspond to batches indicated in a previous AddBlock call
	SendBatch(batchid types.Hash, delete []string, update map[string][]byte) error
	// Reorg will send information about large chain reorgs over the transport
	// layer. The "done" function returned by the Reorg() method should be called
	// after all blocks and batches for a given reorg have been sent to the
	// producer.
	Reorg(number int64, hash types.Hash) (func(), error)
	// SetHealth allows producers to mark that they are in an unhealthy state and not currently producing
	SetHealth(bool)

	// Producers track which blocks they have seen to avoid replaying them. If applications
	// intend to replay blocks, they should call this function first.
	PurgeReplayCache()
}

Producer can be used to send block metadata over a messaging transport.

func NewKafkaProducer

func NewKafkaProducer(brokerURL, defaultTopic string, schema map[string]string) (Producer, error)

func NewWebsocketProducer added in v0.0.38

func NewWebsocketProducer(wsurl string, resumer StreamsResumption) (Producer, error)

func ResolveMuxProducer added in v0.0.38

func ResolveMuxProducer(brokerParams []ProducerBrokerParams, resumer StreamsResumption) (Producer, error)

func ResolveProducer added in v0.0.13

func ResolveProducer(brokerURL, defaultTopic string, schema map[string]string) (Producer, error)

func ResolveProducerWithResumer added in v0.0.38

func ResolveProducerWithResumer(brokerURL, defaultTopic string, schema map[string]string, resumer StreamsResumption) (Producer, error)

type ProducerBrokerParams added in v0.0.38

type ProducerBrokerParams struct {
	URL          string            `yaml:"URL"`
	DefaultTopic string            `yaml:"DefaultTopic"`
	Schema       map[string]string `yaml:"Schema"`
}

type ProducerCounter added in v1.3.0

type ProducerCounter interface {
	ProducerCount(time.Duration) uint
}

type StreamsResumption added in v0.0.38

type StreamsResumption interface {
	// BlocksFrom produces PendingBatches. This stream does not deal with
	// subbatches, so the PendingBatches must include all values. BlocksFrom
	// should watch for context.Done() and stop producing blocks if the context
	// finishes before
	BlocksFrom(ctx context.Context, block uint64, hash types.Hash) (chan *delivery.PendingBatch, error)
	GetBlock(ctx context.Context, block uint64) *delivery.PendingBatch
}

type TransportBatch added in v0.0.38

type TransportBatch struct {
	Number     hexutil.Uint64           `json:"number"`
	Weight     *hexutil.Big             `json:"weight"`
	Hash       types.Hash               `json:"hash"`
	ParentHash types.Hash               `json:"parent"`
	Values     map[string]hexutil.Bytes `json:"values"`
	Deletes    []string                 `json:"deletes"`
	Batches    map[string]types.Hash    `json:"batches"`
}

func (*TransportBatch) ToPendingBatch added in v0.0.38

func (tb *TransportBatch) ToPendingBatch() *delivery.PendingBatch

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL