natsmq

package
v1.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConn

func NewConn(c *Config) (*nats.Conn, error)

func NewEncodedConn

func NewEncodedConn(c *Config) (*nats.EncodedConn, error)

Types

type Config added in v0.6.0

type Config struct {
	ClientId   string             `json:"client_id" yaml:"client_id"`
	ClusterId  string             `json:"clusterId" yaml:"cluster_id"`
	BrokerId   string             `json:"brokerId" yaml:"broker_id"`
	Broker     string             `json:"brokers" yaml:"broker"`
	User       string             `json:"user" yaml:"user"`
	Password   string             `json:"password" yaml:"password"`
	Token      string             `json:"token" yaml:"token"`
	Channels   map[string]string  `json:"channels" yaml:"channels"`
	AckTimeout int64              `json:"ack_timeout" yaml:"ack_timeout"`
	TLS        SSL                `json:"tls" yaml:"tls"`
	NatsConn   []nats.Option      `json:"-" yaml:"-"`
	StanConn   []stan.Option      `json:"-" yaml:"-"`
	Logger     *zap.SugaredLogger `json:"-" yaml:"-"`
	Tracer     opentracing.Tracer `json:"-" yaml:"-"`
}

Config represent stan.Conn and nats.Conn connection parameters

func (*Config) GetNatsUserInfo added in v0.6.0

func (c *Config) GetNatsUserInfo() nats.Option

type MessageInfo added in v0.9.0

type MessageInfo struct {
	Nuid      string `json:"nuid" yaml:"nuid"`
	Channel   string `json:"channel" yaml:"channel"`
	Sequence  uint64 `json:"sequence" yaml:"sequence"`
	Timestamp int64  `json:"timestamp" yaml:"timestamp"`
}

type NatsSub

type NatsSub struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(c *NatsSubOpts) (*NatsSub, error)

func (*NatsSub) Errors

func (ns *NatsSub) Errors() <-chan error

func (*NatsSub) Messages

func (ns *NatsSub) Messages() <-chan *nats.Msg

func (*NatsSub) StartConsumption

func (ns *NatsSub) StartConsumption(ctx context.Context, handler NatsSubHandler)

func (*NatsSub) Stop

func (ns *NatsSub) Stop() error

type NatsSubHandler added in v0.7.0

type NatsSubHandler func(ctx context.Context, data []byte, reply string) error

type NatsSubOpts added in v0.8.0

type NatsSubOpts struct {
	Sub     nats.SubscriptionType `json:"-" yaml:"-"`
	Subject string                `json:"subject" yaml:"subject"`
	*Config
}

type SSL added in v0.6.0

type SSL struct {
	Enabled  bool   `json:"enabled" yaml:"enabled"`
	CaPath   string `json:"ca" yaml:"ca"`
	KeyPath  string `json:"key" yaml:"key"`
	CertPath string `json:"cert" yaml:"cert"`
	Verify   bool   `json:"verify" yaml:"verify"`
	AuthType string `json:"auth_type" yaml:"auth_type"`
}

SSL contains tls connection options

func (*SSL) Load added in v0.8.0

func (s *SSL) Load() (*tls.Config, error)

type StanConn

type StanConn struct {
	// contains filtered or unexported fields
}

func NewStanConn

func NewStanConn(c *Config) (*StanConn, error)

func (*StanConn) DefaultAckHandler added in v0.1.5

func (sc *StanConn) DefaultAckHandler(nid string, err error)

func (*StanConn) SendAsyncMessage

func (sc *StanConn) SendAsyncMessage(channel string, data interface{})

func (*StanConn) SendMessage

func (sc *StanConn) SendMessage(channel string, data interface{})

func (*StanConn) Stop

func (sc *StanConn) Stop()

type StanSub

type StanSub struct {
	// contains filtered or unexported fields
}

func NewChanSub

func NewChanSub(c *StanSubOpts) (*StanSub, error)

NewChanSub creates connection with channel-named clientID creating subscription with a whole service lifetime context

func (*StanSub) Errors

func (ns *StanSub) Errors() <-chan error

func (*StanSub) Messages

func (ns *StanSub) Messages() <-chan *stan.Msg

func (*StanSub) StartConsumption

func (ns *StanSub) StartConsumption(ctx context.Context, handler StanSubHandler)

func (*StanSub) Stop

func (ns *StanSub) Stop()

type StanSubHandler added in v0.7.0

type StanSubHandler func(ctx context.Context, data []byte, info *MessageInfo) error

type StanSubOpts added in v0.8.0

type StanSubOpts struct {
	Opts    []stan.SubscriptionOption `json:"-" yaml:"-"`
	Channel string                    `json:"channel" yaml:"channel"`
	*Config
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL