nsqpb

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2018 License: GPL-3.0 Imports: 13 Imported by: 7

README

nsqpb

Package for producing / consuming protobuf Messages with NSQ Protobuf Github
NSQ Messaging
NSQ Go Package

For testing with queue, run docker-compose file. It will run an image with nsqd and one with nsqlookupd

Documentation

Overview

Package nsqpb is a generated GoMock package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertLogLevel

func ConvertLogLevel(level logrus.Level) nsq.LogLevel

func LookupTopic

func LookupTopic(nsqdLookupHostPort string, topic string) bool

LookupTopics goes to nsqd and checks to see if topic is supported

Types

type HandleMessage

type HandleMessage interface {
	UnmarshalAndProcess(msg []byte, done chan int, finish chan int) error
}

HandleMessage is an interface for unmarshalling your messages to a struct or protobuf message, then processing the object. Fulfilling this interface is how you would interact w/ the nsq channels

type MockProducer

type MockProducer struct {
	// contains filtered or unexported fields
}

MockProducer is a mock of Producer interface

func NewMockProducer

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance

func (*MockProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProducer) WriteProto

func (m *MockProducer) WriteProto(message proto.Message, topicName string) error

WriteProto mocks base method

type MockProducerMockRecorder

type MockProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer

func (*MockProducerMockRecorder) WriteProto

func (mr *MockProducerMockRecorder) WriteProto(message, topicName interface{}) *gomock.Call

WriteProto indicates an expected call of WriteProto

type NSQLogger

type NSQLogger struct{}

func NewNSQLogger

func NewNSQLogger() (logger NSQLogger, level nsq.LogLevel)

func NewNSQLoggerAtLevel

func NewNSQLoggerAtLevel(lvl logrus.Level) (logger NSQLogger, level nsq.LogLevel)

func (NSQLogger) Output

func (n NSQLogger) Output(_ int, s string) error

type NsqConfig

type NsqConfig struct {
	NsqLookupdIp   string
	NsqdIp         string
	NsqdPort       string
	NsqLookupdPort string
	MaxInFlight    int
	TouchInterval  int
	Timeout        int64 // seconds
}

func DefaultNsqConf

func DefaultNsqConf() *NsqConfig

DefaultNsqConf returns new NsqConfig struct with default values. Searches environment variables for nsqlookupd ip addr and nsqd ip addr. defaults to 127.0.0.1 if not found.

func (*NsqConfig) LookupDAddress

func (n *NsqConfig) LookupDAddress() string

LookupDAddress returns `<ip>:<port>` of configured nsqlookupd, the format nsq package takes

func (*NsqConfig) NsqDAddress

func (n *NsqConfig) NsqDAddress() string

NsqDAddress returns `<ip>:<port>` of configured nsqd, the format nsq package takes

type PbProduce

type PbProduce struct {
	Producer *nsq.Producer
	// contains filtered or unexported fields
}

func DefaultProducer

func DefaultProducer() (producer *PbProduce, err error)

DefaultProducer will create a nsq producer using default config settings (getting nsqd addresses from teh environment with LOCALHOST defaults) it will also ping its configured nsqd to ensure that the producer has been configured correctly.

func GetInitProducer

func GetInitProducer() *PbProduce

use this to get a producer instance in your code, it will call only once. need to have global variable once and cachedProducer set in your service, then pass those to this. look into sync.Once if confused

func (*PbProduce) WriteProto

func (p *PbProduce) WriteProto(message proto.Message, topicName string) error

Write Protobuf Message to an NSQ topic with name topicName Gets the ip of the NSQ daemon from either the environment variable

`NSQD_IP` or sets it to 127.0.0.1. the NSQ daemon should run alongside

any service that produces messages, so this will work usually.

type Producer

type Producer interface {
	WriteProto(message proto.Message, topicName string) error
}

type ProtoConsume

type ProtoConsume struct {
	Handler          HandleMessage
	DecodeConfig     *nsq.Config
	Config           *NsqConfig
	StopChan         chan int
	ConsumerRecovery func()
	MessageRecovery  func(message *nsq.Message)
	// contains filtered or unexported fields
}

ProtoConsume wraps nsq.Message so that code outside the package can just add a UnmarshalProtoFunc that doesn't require messing with nsq fields. just write a function that unmarshals to your proto object and does work ...put in WORK.

func NewDefaultProtoConsume

func NewDefaultProtoConsume() *ProtoConsume

NewDefaultProtoConsume returns a new ProtoConsume object with nsq configuration and nsqpb configuration. also sets default message recovery and consumer recovery functions

func (*ProtoConsume) AddTopic

func (p *ProtoConsume) AddTopic(supportedTopic string)

Adds a supported topic to store on consumer

func (*ProtoConsume) ConsumeMessages

func (p *ProtoConsume) ConsumeMessages(topicName string, channelName string) error

Consume messages on a given topic / channel in NSQ protoconsume's UnmarshalProtoFunc will be added with a wrapper as a handler for the consumer. The ip address of the NSQLookupd instance can be set by the environment variable NSQLOOKUPD_IP, but will default to 127.0.0.1

func (*ProtoConsume) DeleteTopic

func (p *ProtoConsume) DeleteTopic(toRemove string)

TODO: does it matter to add a bool for pass/fail

func (*ProtoConsume) GetStats

func (p *ProtoConsume) GetStats() []*nsq.ConsumerStats

func (*ProtoConsume) GetTopics

func (p *ProtoConsume) GetTopics() []string

Retrieves all consumer supported topics

func (*ProtoConsume) NSQProtoConsume

func (p *ProtoConsume) NSQProtoConsume(msg *nsq.Message) error

NSQProtoConsume is a wrapper for `p.Handler.UnmarshalAndProcess` --> `nsq.HandlerFunc`

func (*ProtoConsume) Pause

func (p *ProtoConsume) Pause()

func (*ProtoConsume) UnPause

func (p *ProtoConsume) UnPause()

func (*ProtoConsume) WaitThenConsume added in v0.4.2

func (p *ProtoConsume) WaitThenConsume(topic, channel string, handler HandleMessage, waitInterval int)

WaitThenConsume will first ensure that the given topic exists by checking in lookupd (configured in ProtoConsume). If it doesn't exist, then it will seep for waitInterval seconds. If it does exists, then it will set *ProtoConsume's Handler to handler and start (*ProtoConsume).ConsumeMessages on the given topic and channel. any errors encountered will be logged this function doesn't return anything.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL