nats

package
v0.0.0-...-fbce341 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxInFlightRequests = 256
)
View Source
const (
	ErrNoClientsAvailable = errors.ConstError("no clients available")
)

Variables

This section is empty.

Functions

func NewCache

func NewCache[V any](
	localCacheSize int,
	kv KeyValue[V],
	ttl time.Duration,
) cache.Cache

func SubjectName

func SubjectName(keys ...string) string

Types

type CacheEntry

type CacheEntry struct{}

type KeyValue

type KeyValue[T any] interface {
	Delegate() nats.KeyValue

	Bucket() string

	Get(key string) (KeyValueEntry[T], error)

	Put(key string, value T) (uint64, error)

	Watch(key string, opts ...nats.WatchOpt) (KeyWatcher[T], error)

	WatchAll(opts ...nats.WatchOpt) (KeyWatcher[T], error)

	Delete(key string) error
}

func CreateKeyValue

func CreateKeyValue[T any](js nats.JetStreamContext, cfg *nats.KeyValueConfig) (KeyValue[T], error)

func GetKeyValue

func GetKeyValue[T any](js nats.JetStreamContext, bucket string) (KeyValue[T], error)

type KeyValueEntry

type KeyValueEntry[T any] interface {
	// Bucket is the bucket the data was loaded from.
	Bucket() string
	// Key is the key that was retrieved.
	Key() string
	// Value is the retrieved value unmarshalled from json.
	Value() (T, error)
	// ValueRaw is the original []byte value.
	ValueRaw() []byte
	// Revision is a unique sequence for this value.
	Revision() uint64
	// Created is the time the data was put in the bucket.
	Created() time.Time
	// Delta is distance from the latest value.
	Delta() uint64
	// Operation returns Put or Delete or Purge.
	Operation() nats.KeyValueOp
}

type KeyWatcher

type KeyWatcher[T any] interface {
	// Context returns watcher context optionally provided by nats.Context option.
	Context() context.Context
	// Updates returns a channel to read any updates to entries.
	Updates() <-chan KeyValueEntry[T]
	// Stop will stop this watcher.
	Stop() error
}

type Publisher

type Publisher[T any] struct {
	Subject string
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher[T any](
	js nats.JetStreamContext,
	subject string,
	init func(js nats.JetStreamContext) error,
) (*Publisher[T], error)

func (Publisher[T]) Publish

func (p Publisher[T]) Publish(payload T, opts ...nats.PubOpt) (*nats.PubAck, error)

func (Publisher[T]) PublishAsync

func (p Publisher[T]) PublishAsync(payload T, opts ...nats.PubOpt) (nats.PubAckFuture, error)

func (Publisher[T]) PublishAsyncRaw

func (p Publisher[T]) PublishAsyncRaw(payload json.RawMessage, opts ...nats.PubOpt) (nats.PubAckFuture, error)

func (Publisher[T]) PublishRaw

func (p Publisher[T]) PublishRaw(payload json.RawMessage, opts ...nats.PubOpt) (*nats.PubAck, error)

type RouteOpt

type RouteOpt = func(opts *RouteOpts) error

func CacheRoute

func CacheRoute(cache bool) RouteOpt

type RouteOpts

type RouteOpts struct {
	Cache bool
}

func DefaultRouteOpts

func DefaultRouteOpts() RouteOpts

type Router

type Router interface {
	Request(req jsonrpc.Request, resp *jsonrpc.Response, timeout time.Duration, options ...RouteOpt) error

	RequestWithContext(ctx context.Context, req jsonrpc.Request, resp *jsonrpc.Response, options ...RouteOpt) error
}

func NewCachingRouter

func NewCachingRouter(cache cache.Cache, cachePrefix string, router Router) Router

func NewStaticError

func NewStaticError(error jsonrpc.Error) Router

func NewStaticResult

func NewStaticResult(result any) Router

type RpcServer

type RpcServer struct {
	Options RpcServerOptions
	// contains filtered or unexported fields
}

func NewRpcServer

func NewRpcServer(
	clientId string,
	conn *nats.EncodedConn,
	client *web3.Client,
	options ...RpcServerOption,
) (*RpcServer, error)

func (*RpcServer) Close

func (srv *RpcServer) Close()

func (*RpcServer) ListenAndServe

func (srv *RpcServer) ListenAndServe(
	ctx context.Context,
	subFn func(conn *nats.Conn, msgs chan *nats.Msg) ([]*nats.Subscription, error),
) error

type RpcServerOption

type RpcServerOption func(*RpcServerOptions) error

func MaxInFlightRequests

func MaxInFlightRequests(max int) RpcServerOption

MaxInFlightRequests is an RpcServerOption to set the max number of rpc requests that can be awaiting a response from the web3 client.

type RpcServerOptions

type RpcServerOptions struct {
	// ClientId is the unique node id (public key) for the web3 client
	ClientId string

	// NetworkId is the ethereum network id that the web3 client is connected to.
	NetworkId uint64

	// ChainId is the ethereum chain id that the web3 client is connect to.
	ChainId uint64

	// MaxInFlightRequests constrains the max number of rpc requests that can be
	// awaiting a response from the web3 client.
	MaxInFlightRequests int
}

func GetDefaultRpcServerOptions

func GetDefaultRpcServerOptions() RpcServerOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL