streaming

package
v0.0.0-...-d386c04 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2024 License: BSD-3-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotConnected        = errors.New("INFRASTRUCTURE.STREAMING.NOT_CONNECTED.ERROR")
	ErrAlreadyConnected    = errors.New("INFRASTRUCTURE.STREAMING.ALREADY_CONNECTED.ERROR")
	ErrSubNotConnected     = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.NOT_CONNECTED.ERROR")
	ErrSubAlreadyConnected = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.ALREADY_CONNECTED.ERROR")
	ErrSubTerminiated      = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.TERMINATED.ERROR")
	ErrSubAckFail          = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.ACK_FAIL.ERROR")
	ErrSubNakFail          = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.NAK_FAIL.ERROR")
)
View Source
var (
	MetaId               = "KANTHOR_META_ID"
	HeaderTelemetryTrace = "x-telemtry-trace"
)

Functions

func NatsMsgFromEvent

func NatsMsgFromEvent(subject string, event *Event) *natscore.Msg

func NewNatsConn

func NewNatsConn(uri string, logger logging.Logger) (*natscore.Conn, error)

Types

type Config

type Config struct {
	Name       string           `json:"name" yaml:"name" mapstructure:"name"`
	Uri        string           `json:"uri" yaml:"uri" mapstructure:"uri"`
	Nats       NatsConfig       `json:"nats" yaml:"nats" mapstructure:"nats"`
	Publisher  PublisherConfig  `json:"publisher" yaml:"publisher" mapstructure:"publisher"`
	Subscriber SubscriberConfig `json:"subscriber" yaml:"subscriber" mapstructure:"subscriber"`
}

func (*Config) Validate

func (conf *Config) Validate() error

type Event

type Event struct {
	Subject string `json:"subject"`

	Id       string            `json:"id"`
	Data     []byte            `json:"data"`
	Metadata map[string]string `json:"metadata"`
}

func NatsMsgToEvent

func NatsMsgToEvent(msg *natscore.Msg) *Event

func (*Event) String

func (e *Event) String() string

func (*Event) Validate

func (e *Event) Validate() error

type NatsConfig

type NatsConfig struct {
	Replicas int `json:"replicas" yaml:"replicas" mapstructure:"replicas"`
	Limits   struct {
		Size     int64 `json:"size" yaml:"size" mapstructure:"size"`
		MsgSize  int32 `json:"msg_size" yaml:"msg_size" mapstructure:"msg_size"`
		MsgCount int64 `json:"msg_count" yaml:"msg_count" mapstructure:"msg_count"`
		MsgAge   int64 `json:"msg_age" yaml:"msg_age" mapstructure:"msg_age"`
	} `json:"limits" yaml:"limits" mapstructure:"limits"`
}

func (*NatsConfig) Validate

func (conf *NatsConfig) Validate() error

type NatsPublisher

type NatsPublisher struct {
	// contains filtered or unexported fields
}

func (*NatsPublisher) Name

func (publisher *NatsPublisher) Name() string

func (*NatsPublisher) Pub

func (publisher *NatsPublisher) Pub(ctx context.Context, events map[string]*Event) map[string]error

type NatsSubscriber

type NatsSubscriber struct {
	// contains filtered or unexported fields
}

func (*NatsSubscriber) Connect

func (subscriber *NatsSubscriber) Connect(ctx context.Context) error

func (*NatsSubscriber) Disconnect

func (subscriber *NatsSubscriber) Disconnect(ctx context.Context) error

func (*NatsSubscriber) Liveness

func (subscriber *NatsSubscriber) Liveness() error

func (*NatsSubscriber) Name

func (subscriber *NatsSubscriber) Name() string

func (*NatsSubscriber) Readiness

func (subscriber *NatsSubscriber) Readiness() error

func (*NatsSubscriber) Sub

func (subscriber *NatsSubscriber) Sub(ctx context.Context, topic string, handler SubHandler) error

type Publisher

type Publisher interface {
	Name() string
	Pub(ctx context.Context, events map[string]*Event) map[string]error
}

type PublisherConfig

type PublisherConfig struct {
	RateLimit int `json:"rate_limit" yaml:"rate_limit" mapstructure:"rate_limit"`
}

func (*PublisherConfig) Validate

func (conf *PublisherConfig) Validate() error

type Stream

type Stream interface {
	patterns.Connectable
	Publisher(name string) Publisher
	Subscriber(name string) Subscriber
}

func New

func New(conf *Config, logger logging.Logger) (Stream, error)

func NewNats

func NewNats(conf *Config, logger logging.Logger) (Stream, error)

type SubHandler

type SubHandler func(ctx context.Context, events map[string]*Event) map[string]error

type Subscriber

type Subscriber interface {
	patterns.Connectable
	Name() string
	Sub(ctx context.Context, topic string, handler SubHandler) error
}

type SubscriberConfig

type SubscriberConfig struct {
	// MaxRetry is how many times we should try to re-deliver message if we get any error
	MaxRetry    int `json:"max_retry" yaml:"max_retry" mapstructure:"max_retry"`
	Concurrency int `json:"concurrency" yaml:"concurrency" mapstructure:"concurrency"`
	Throughput  int `json:"throughput" yaml:"throughput" mapstructure:"throughput"`
}

func (*SubscriberConfig) Validate

func (conf *SubscriberConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL