server

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownTopic = errors.New("unknown topic")
	ErrUnmarshall   = errors.New("unmarshall error")
)
View Source
var (
	ErrIncorrectScheme = errors.New("incorrect scheme")
)
View Source
var (
	ErrNoBrokerSpecified = errors.New("no broker specified")
)

Functions

func NewLogger

func NewLogger() zerolog.Logger

func RegisterSourceFactory added in v1.0.1

func RegisterSourceFactory(scheme string, factory SourceFactory)

RegisterSourceFactory registers a new source factory for the considered scheme.

func StartCommand

func StartCommand()

Types

type Configuration

type Configuration struct {
	// The port the server will listen to.
	Port uint16
	// The list of topics/stream names (Kafka/Redis) subscribers can consume.
	Topics []string
	// The server URI used to connect to the stream source (either Kafka or Redis).
	Source string
}

type LoggerFunc

type LoggerFunc func(e *zerolog.Event)

LoggerFunc turns a function into an a zerolog marshaller.

func KafkaMessageAsZerologObject

func KafkaMessageAsZerologObject(message kafka.Message) LoggerFunc

AsEventTraitZerologObject converts a kafka message into a LogObjectMarshaler.

func MapAsZerologObject added in v1.0.1

func MapAsZerologObject(m map[string]interface{}) LoggerFunc

MapAsZerologObject converts a map into a LogObjectMarshaler.

func (LoggerFunc) MarshalZerologObject

func (f LoggerFunc) MarshalZerologObject(e *zerolog.Event)

MarshalZerologObject makes the LoggerFunc type a LogObjectMarshaler.

type Resolver

type Resolver struct {
	// contains filtered or unexported fields
}

func NewResolver

func NewResolver(cfg *Configuration, log zerolog.Logger) (*Resolver, error)

func (*Resolver) Event

func (r *Resolver) Event(
	ctx context.Context,
	args *struct {
		On       string
		At       scalar.Offset
		Matching *string
	}) (<-chan *scalar.JSONObject, error)

func (*Resolver) Topics

func (r *Resolver) Topics() []string

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(cfg *Configuration) *Server

func (*Server) Start

func (s *Server) Start()

type Source added in v1.0.1

type Source interface {
	URI() *url.URL
	// NewConsumer returns a new observable consuming messages from the this source, from a topic, starting
	// at provided offset (if supported).
	NewConsumer(ctx context.Context, topic string, offset int64) rxgo.Observable
}

Source specifies types which are able to provide a source of events through an Observable.

func NewSource added in v1.0.1

func NewSource(uri *url.URL) (Source, error)

NewSource returns a new instance of source given the uri. The uri contains all the required information to perform a connection to the source endpoint.

type SourceFactory added in v1.0.1

type SourceFactory func(uri *url.URL) (Source, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL