pubsub

package
v0.0.0-...-35a4376 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ContentTypeHeader = "Content-Type"
	TimestampHeader   = "timestamp"
)

Variables

This section is empty.

Functions

func NewRecover

func NewRecover(logger log.Logger) func()

Types

type AckableMessage

type AckableMessage struct {
	// contains filtered or unexported fields
}

func NewAckableMessage

func NewAckableMessage(
	message *Message,
	span trace.Span,
	ack, nak func() error,
	nakWithDelay func(time.Duration) error,
) *AckableMessage

func (*AckableMessage) Ack

func (m *AckableMessage) Ack() error

func (*AckableMessage) Message

func (m *AckableMessage) Message() *Message

func (*AckableMessage) Nak

func (m *AckableMessage) Nak() error

func (*AckableMessage) NakWithDelay

func (m *AckableMessage) NakWithDelay(delay time.Duration) error

func (*AckableMessage) Span

func (m *AckableMessage) Span() trace.Span

type AckableMessages

type AckableMessages []*AckableMessage

func (AckableMessages) AckMessages

func (mgs AckableMessages) AckMessages() error

func (AckableMessages) NakMessages

func (mgs AckableMessages) NakMessages() error

func (AckableMessages) NakMessagesWithDelay

func (mgs AckableMessages) NakMessagesWithDelay(delay time.Duration) error

type Acknowledger

type Acknowledger func(*AckableMessage) error

func Ack

func Ack() Acknowledger

func Nack

func Nack() Acknowledger

func NackWithDelay

func NackWithDelay(d time.Duration) Acknowledger

type Config

type Config struct {
	Protocol Protocol  `json:"protocol"`
	URLs     []string  `json:"url"`
	TLS      TLSConfig `json:"tls"`
}

func NewConfig

func NewConfig() *Config

func (Config) ValidateConfigFields

func (c Config) ValidateConfigFields() error

type GenericPublisherToSubject

type GenericPublisherToSubject[T any] interface {
	Publish(ctx context.Context, data ...T) error
}

func NewGenericPublisherToSubject

func NewGenericPublisherToSubject[T any](
	publisher Publisher,
	encoder encoding.Encoder,
	subject string,
	extraHeaders map[string]string,
) GenericPublisherToSubject[T]

type GenericSubscriberToSubject

type GenericSubscriberToSubject[T any] interface {
	Subscribe(ctx context.Context, handler func(context.Context, T) Acknowledger) (Unsubscribe, error)
}

func NewGenericSubscriberToSubject

func NewGenericSubscriberToSubject[T any](
	subscriber Subscriber,
	decoder encoding.GenericDecoder[T],
	subject,
	consumer string,
) GenericSubscriberToSubject[T]

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(ctx context.Context, contentType encoding.ContentType, payload []byte, opts ...MessageOption) *Message

func (*Message) ID

func (m *Message) ID() string

func (*Message) Meta

func (m *Message) Meta() map[string]string

func (*Message) Payload

func (m *Message) Payload() []byte

type MessageOption

type MessageOption func(*Message)

func WithMessageID

func WithMessageID(id string) MessageOption

func WithMessageMeta

func WithMessageMeta(key, value string) MessageOption

func WithMessageMetas

func WithMessageMetas(metas map[string]string) MessageOption

type Messages

type Messages []*Message

func NewMessages

func NewMessages(msgs ...*Message) Messages

type MigrationInfo

type MigrationInfo interface {
	MigrationID() string
}

type Migrator

type Migrator interface {
	Migrate(ctx context.Context, migrations []MigrationInfo) error
	IsMigrated(ctx context.Context, migrations []MigrationInfo) (bool, error)
}

type Protocol

type Protocol string
const (
	KafkaProtocol         Protocol = "kafka"
	AmqpProtocol          Protocol = "amqp"
	NatsJetStreamProtocol Protocol = "nats_js"
)

type Publisher

type Publisher interface {
	Publish(ctx context.Context, subject string, msgs ...*Message) error
}

type Subscriber

type Subscriber interface {
	Subscribe(
		ctx context.Context,
		subject,
		consumer string,
		handler func(context.Context, *Message) Acknowledger,
	) (Unsubscribe, error)
}

type TLSConfig

type TLSConfig struct {
	Enabled        bool   `yaml:"enabled"`
	RootCAsFile    string `yaml:"root_cas_file"`
	ClientCertFile string `yaml:"client_cert_file"`
	ClientKeyFile  string `yaml:"client_key_file"`
}

func (TLSConfig) TLSConfig

func (c TLSConfig) TLSConfig() (*tls.Config, error)

type Unsubscribe

type Unsubscribe func() error

Directories

Path Synopsis
internal
nats
js

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL