aq

package
v0.0.0-...-7902219 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Subscriber is a watermill subscriber that reads messages from Oracle Advanced Queue.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPublisherClosed = errors.New("publisher is closed")
)
View Source
var (
	ErrSubscriberClosed = errors.New("subscriber is closed")
)

Functions

This section is empty.

Types

type AQMessage

type AQMessage []byte

type JSONMarshaler

type JSONMarshaler struct{}

JSONMarshaler uses encoding/json to marshal Watermill messages.

func (JSONMarshaler) Marshal

func (JSONMarshaler) Marshal(msg *message.Message) (AQMessage, error)

Marshal transforms a watermill message into JSON format.

func (JSONMarshaler) Unmarshal

func (JSONMarshaler) Unmarshal(aqMsg AQMessage) (*message.Message, error)

Unmarshal extracts a watermill message from a nats message.

type Marshaler

type Marshaler interface {
	// Marshal transforms a watermill message into NATS wire format.
	Marshal(msg *message.Message) (AQMessage, error)
}

Marshaler provides transport encoding functions

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

MarshalerUnmarshaler provides both Marshaler and Unmarshaler implementations

type Message

type Message struct {
	MsgID    string
	Data     []byte
	Metadata map[string]string
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher inserts the Messages as rows into a SQL table..

func NewPublisher

func NewPublisher(db Transactor, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher, which means that all the Publish calls called before are finished and no more Publish calls are accepted. Close is blocking until all the ongoing Publish calls have returned.

func (*Publisher) Publish

func (p *Publisher) Publish(topic string, messages ...*message.Message) (err error)

type PublisherConfig

type PublisherConfig struct {
	QueueConsumer  string // Name of the queue subscriber/consumer
	Payload        string // Payload type
	Transformation string // Payload oracle transformation name

	Marshaler Marshaler
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(db Transactor, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) error

type SubscriberConfig

type SubscriberConfig struct {
	QueueConsumer  string        // Name of the queue subscriber/consumer
	Payload        string        // Payload type
	Transformation string        // Payload oracle transformation name
	QueueWaitTime  time.Duration // Wait time for the queue to return data
	BatchSize      int           // Number of messages to fetch in a single call

	Timeout time.Duration // Timeout for the sql query operation

	Unmarshaler Unmarshaler
}

type Transactor

type Transactor interface {
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
	Close() error
}

type Unmarshaler

type Unmarshaler interface {
	// Unmarshal produces a watermill message from NATS wire format.
	Unmarshal(AQMessage) (*message.Message, error)
}

Unmarshaler provides transport decoding function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL