types

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNegativeAcknowledgement is a error representing a negative message acknowledgement
	ErrNegativeAcknowledgement = errors.New("negative acknowledgement")
)

Functions

This section is empty.

Types

type Close

type Close func()

Close represents a closing method

type Consumer

type Consumer interface {
	// Subscribe creates a new topic subscription that will receive
	// messages consumed by the consumer of the given topic. This method
	// will return a message channel and a close function.
	// Once a message is successfully processed should the next message be called.
	Subscribe(topics ...Topic) (subscription <-chan *Message, err error)

	// Unsubscribe unsubscribes the given channel subscription from the given topic.
	// A boolean is returned that represents if the channel successfully got unsubscribed.
	Unsubscribe(subscription <-chan *Message) error

	// Close closes the kafka consumer, all topic subscriptions and event channels.
	Close() error
}

Consumer a message consumer

type Dialect

type Dialect interface {
	// Consumer returns the dialect consumer
	Consumer() Consumer

	// Producer returns the dialect producer
	Producer() Producer

	// Healthy when called should it check if the dialect's consumer/producer are healthy and
	// up and running. This method could be called to check if the service is up and running.
	// The user should implement the health check
	Healthy() bool

	// Opens the given dialect to start accepting incoming and outgoing connections.
	Open(topics []Topic) error

	// Close awaits till the consumer(s) and producer(s) of the given dialect are closed.
	// If an error is returned is the closing aborted and the error returned to the user.
	Close() error
}

Dialect represents a commander dialect. A dialect is responsible for the consumption/production of the targeted protocol.

type Handler

type Handler interface {
	Handle(*Message, Writer)
}

Handler interface handle wrapper

type HandlerFunc

type HandlerFunc func(*Message, Writer)

HandlerFunc message handle message, writer implementation

type Message

type Message struct {
	ID        string    `json:"id"`
	Topic     Topic     `json:"topic"`
	Action    string    `json:"action"`
	Version   Version   `json:"version"`
	Data      []byte    `json:"data"`
	Key       []byte    `json:"key"`
	Timestamp time.Time `json:"timestamp"`
	// contains filtered or unexported fields
}

Message representation

func NewMessage

func NewMessage(action string, version int8, key []byte, data []byte) *Message

NewMessage constructs a new message

func (*Message) Ack

func (message *Message) Ack() bool

Ack mark the message as acknowledged

func (*Message) Acked

func (message *Message) Acked() <-chan struct{}

Acked returns a channel thet get's closed once a acknowledged signal got sent

func (*Message) Ctx

func (message *Message) Ctx() context.Context

Ctx returns the message context. This method could safely be called concurrently.

func (*Message) Finally

func (message *Message) Finally() error

Finally is returned once the message is resolved. A ErrNegativeAcknowledgement error is returned if the message got negative acknowledged.

func (*Message) Nack

func (message *Message) Nack() bool

Nack send a negative acknowledged

func (*Message) Nacked

func (message *Message) Nacked() <-chan struct{}

Nacked returns a channel that get's closed once a negative acknowledged signal got sent

func (*Message) NewCtx

func (message *Message) NewCtx(ctx context.Context)

NewCtx updates the message context. This method could safely be called concurrently.

func (*Message) NewError

func (message *Message) NewError(action string, err error) *Message

NewError construct a new error message with the given message as parent

func (*Message) NewMessage

func (message *Message) NewMessage(action string, version Version, key metadata.Key, data []byte) *Message

NewMessage construct a new event message with the given message as parent

func (*Message) Reset

func (message *Message) Reset()

Reset set's up a new async resolver that awaits untill resolved

func (*Message) Schema

func (message *Message) Schema() interface{}

Schema returns the decoded message schema

type MessageType

type MessageType int8

MessageType represents a message type

const (
	EventMessage MessageType = iota + 1
	CommandMessage
)

Available message types

type Next

type Next func()

Next indicates that the next message could be called

type Producer

type Producer interface {
	// Publish produces a message to the given topic
	Publish(message *Message) error

	// Close closes the producer
	Close() error
}

Producer a message producer

type Resolved

type Resolved int

Resolved represents a message ack/nack status

const (
	UnkownResolvedStatus Resolved = iota
	ResolvedAck
	ResolvedNack
)

available Resolved types

type Topic

type Topic interface {
	// Dialect returns the topic Dialect
	Dialect() Dialect
	// Type returns the topic type
	Type() MessageType
	// Mode returns the topic mode
	Mode() TopicMode
	// HasMode checks if the topic represents the given topic type
	HasMode(TopicMode) bool
	// Name returns the topic name
	Name() string
}

Topic represents a subject for a dialect including it's consumer/producer mode.

func NewTopic

func NewTopic(name string, dialect Dialect, t MessageType, m TopicMode) Topic

NewTopic constructs a new commander topic for the given name, type, mode and dialect. If no topic mode is defined is the default mode (consume|produce) assigned to the topic.

type TopicMode

type TopicMode int8

TopicMode represents the mode of the given topic. The mode describes if the given topic is marked for consumption/production/streaming...

const (
	ConsumeMode TopicMode = 1 << iota
	ProduceMode

	DefaultMode = ConsumeMode | ProduceMode
)

Available topic modes

type Version

type Version int8

Version message version

const (
	NullVersion Version = 0
)

Null value of given type

func (Version) String

func (version Version) String() string

String returns the version as a string

type Writer

type Writer interface {
	// Event creates and produces a new event to the assigned group.
	Event(action string, version int8, key []byte, data []byte) (*Message, error)

	// Error produces a new error event to the assigned group.
	Error(action string, err error) (*Message, error)

	// Command produces a new command to the assigned group.
	Command(action string, version int8, key []byte, data []byte) (*Message, error)
}

Writer handle implementation for a given group and message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL