nsq

package
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidMessageHandler returned when the message handler doesn't implement the underlying interface
	ErrInvalidMessageHandler = errors.New("invalid message handler provided")
)

Functions

func NewNsqHandler

func NewNsqHandler(ctx context.Context, n *Nsq, topic string, handler broker.Handler) *nsqHandler

NewNsqHandler creates a new nsq message Handler

func NewNsqSubscriber

func NewNsqSubscriber(n *Nsq, topic string, opts ...broker.SubscribeOption) *subscriber

NewNsqSubscriber returns a new subscriber instance for NSQ subscription

func WithChannelName

func WithChannelName(name string) broker.SubscribeOption

WithChannelName defines a channel name for the subscriber

Types

type Config

type Config struct {
	Producer Producer
	Lookupd  Lookupd
}

Config holds database configuration

func NewConfig

func NewConfig() *Config

NewConfig returns the parsed config for nsq from env

func (*Config) Channel

func (c *Config) Channel(topic string) string

Channel returns the channel name for the topic

func (*Config) NSQConfig

func (c *Config) NSQConfig() *nsq.Config

NSQConfig returns the new config

func (*Config) UnmarshalEnv

func (c *Config) UnmarshalEnv(es env.EnvSet) error

UnmarshalEnv env.EnvSet to Config

type Initializer

type Initializer struct {
	// contains filtered or unexported fields
}

func NewInitializer

func NewInitializer(n *Nsq) *Initializer

NewInitializer returns a new JetStream Initialiazer

func (*Initializer) AddDependency

func (i *Initializer) AddDependency(dep interface{}) error

AddDependency adds necessary service components as dependencies

func (*Initializer) CanRun

func (i *Initializer) CanRun() bool

CanRun returns true if the component has anything to Run

func (*Initializer) CanStop

func (i *Initializer) CanStop() bool

CanRun returns true if the component has anything to Run

func (*Initializer) Dependencies

func (i *Initializer) Dependencies() []string

Dependencies returns the string names of service components that are required as dependencies for this component

func (*Initializer) Run

func (i *Initializer) Run(ctx context.Context) error

Run start the service component

func (*Initializer) Stop

func (i *Initializer) Stop(ctx context.Context) error

Stop - stops the running

type Lookupd

type Lookupd struct {
	Host string `env:"BROKER_NSQ_LOOKUPD_HOST,default=127.0.0.1"`
	Port string `env:"BROKER_NSQ_LOOKUPD_PORT,default=4161"`
}

Lookup holds config for NSQLookupd

func (*Lookupd) Address

func (l *Lookupd) Address() string

Address returns the formatted address for the nsq lookupd

type Nsq

type Nsq struct {
	Producer  *nsq.Producer
	Consumers map[string]*nsq.Consumer
	*Config
	// contains filtered or unexported fields
}

Nsq holds our broker instance

func NewNsq

func NewNsq() *Nsq

NewNsq returns a new instance of NSQ

func (*Nsq) HasInitializer

func (n *Nsq) HasInitializer() bool

func (*Nsq) Initializer

func (n *Nsq) Initializer() component.Initializer

func (*Nsq) Logger

func (n *Nsq) Logger() logger.Logger

Logger returns the initialized logger instance

func (*Nsq) Publish

func (n *Nsq) Publish(ctx context.Context, topic string, message interface{}, opts ...broker.PublishOption) error

Publish publishes the topic message

func (*Nsq) String

func (n *Nsq) String() string

func (*Nsq) Subscribe

func (n *Nsq) Subscribe(ctx context.Context, topic string, handler broker.Handler, opts ...broker.SubscribeOption) error

Subscribe subcribes for the given topic

func (*Nsq) Unsubscribe

func (n *Nsq) Unsubscribe(topic string) error

Ubsubscribe method is not applicable

type Producer

type Producer struct {
	Host string `env:"BROKER_PRODUCER_HOST,default=127.0.0.1"`
	Port string `env:"BROKER_PRODUCER_PORT,default=4150"`
}

Producer holds config for NSQ producer

func (*Producer) Address

func (p *Producer) Address() string

Address returns the formatted address for the producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL