nats_utils

package module
v0.0.0-...-781ab4c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2019 License: MIT Imports: 11 Imported by: 0

README

nats_utils

Installation

Run command on you [$GOPATH/src] path:

go get -u github.com/alex60217101990/nats_utils

Usage

Create service:

	natsServer := NewStreamingService(
		SetContext(ctx),
		SetConfigs(&Config{
			Async:                true,
			ClusterID:            &ClusterID,
			MsgChannelBufferSize: &MsgChannelBufferSize,
			MaxPubAcksInflight:   &MaxPubAcksInflight,
			Compress:             true,
			Logger:               nil,
			Options: &nats_streaming.Options{
				Servers:  servers,
				Secure:   false,
				User:     "some_user",
				Password: "some_password",
			},
		}),
	)

Connect with NATS single node or cluster:

    natsServer.Connect()

Publishing:

Run publisher loop:

    go natsServer.RunPublisher() 

Publish message to channel:

    natsServer.PublisherPush("test_channel", []byte(fmt.Sprintf("test_message: %v", t)))

Subscribing:

Usage with handler function:

    natsServer.RunSubscriber(SubscriberConfig{
		Type:        ClassicSubscribe,
		StartType:   DeliverAllAvailable,
		SubMsgChan:  subMsgChan,
		ChannelName: &ChannelName,
		MsgHandler: func(m *stan.Msg) {
			//some action with current message...
		},
		MaxInflight:    &MaxPubAcksInflight,
		AckWaitSeconds: &AckWaitSeconds,
		DurableName:    &DurableName,
    })

Usage with ring buffering channel:

    subMsgChan := channels.NewRingChannel(channels.BufferCap(500))
    defer subMsgChan.Close()
    natsServer.RunSubscriber(SubscriberConfig{
		Type:        ClassicSubscribe,
		StartType:   DeliverAllAvailable,
		SubMsgChan:  subMsgChan,
		ChannelName: &ChannelName,
		MaxInflight:    &MaxPubAcksInflight,
		AckWaitSeconds: &AckWaitSeconds,
		DurableName:    &DurableName,
    })

    for {
		select {
		case m := <-subMsgChan.Out():
			//some action with current message...
		}
	}

License

MIT

Documentation

Index

Constants

View Source
const (
	DefaultMaxPubAcksInflight = 1000
	DefaultMsgChanSize        = 500
)

Variables

This section is empty.

Functions

func SetConfigs

func SetConfigs(conf *Config) func(*NatsStreamingService) error

func SetContext

func SetContext(ctx context.Context) func(*NatsStreamingService) error

func SetPubID

func SetPubID(pubID string) func(*NatsStreamingService) error

Types

type Config

type Config struct {
	Async                bool
	ClusterID            *string
	MsgChannelBufferSize *int
	MaxPubAcksInflight   *int
	Compress             bool
	Logger               *log.Logger
	Options              *nats.Options
}

type MsgEvent

type MsgEvent struct {
	Subject string
	Body    []byte
}

type MsgSubInfo

type MsgSubInfo struct {
	Subject  string    `json:"subject"`
	Queue    *string   `json:"queue,omitempty"`
	Sequence uint      `json:"sequence"`
	Time     time.Time `json:"time"`
}

type NatsStreamingService

type NatsStreamingService struct {
	// contains filtered or unexported fields
}

func NewStreamingService

func NewStreamingService(options ...func(*NatsStreamingService) error) *NatsStreamingService

func (*NatsStreamingService) Close

func (n *NatsStreamingService) Close()

func (*NatsStreamingService) Connect

func (n *NatsStreamingService) Connect()

func (*NatsStreamingService) PublisherPush

func (n *NatsStreamingService) PublisherPush(subject string, data []byte)

func (*NatsStreamingService) RunPublisher

func (n *NatsStreamingService) RunPublisher()

func (*NatsStreamingService) RunSubscriber

func (n *NatsStreamingService) RunSubscriber(config SubscriberConfig) error

type StorageSubscribeLast

type StorageSubscribeLast interface {
	SaveLastMessageInfo(key string, lastMsgInfo MsgSubInfo) error
}

type StreamingService

type StreamingService interface {
	Connect()
	RunPublisher()
	RunSubscriber(config SubscriberConfig) error
	PublisherPush(subject string, data []byte)
	Close()
}

type SubscribeStartType

type SubscribeStartType int
const (
	StartWithLastReceived SubscribeStartType = (1 + iota) // Последнее полученное сообщение
	DeliverAllAvailable                                   // Начало канала
	StartAtSequence                                       // Конкретное сообщение, индексация начинается с 1
	StartAtTime                                           // Конкретное время, когда сообщение пришло на канал
)

func (SubscribeStartType) String

func (t SubscribeStartType) String() string

func (SubscribeStartType) Val

func (t SubscribeStartType) Val() int

type SubscribeType

type SubscribeType int
const (
	ClassicSubscribe SubscribeType = (1 + iota)
	QueueSubscribe
)

func (SubscribeType) String

func (s SubscribeType) String() string

func (SubscribeType) Val

func (s SubscribeType) Val() int

type SubscriberConfig

type SubscriberConfig struct {
	Type                         SubscribeType
	StartType                    SubscribeStartType
	QueueName                    *string
	SubMsgChan                   *channels.RingChannel
	UnsubscribeAfterSubscribeEnd bool
	ChannelName                  *string
	MsgHandler                   func(m *stan.Msg)
	LastMsgStorage               StorageSubscribeLast
	MaxInflight                  *int
	AckWaitSeconds               *int
	DurableName                  *string    // only for 'Durable' subscribes or queues
	StartMsgPosition             *int       // only for 'StartAtSequence' type
	StartTime                    *time.Time // only for 'StartAtTime' type
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL