messaging

package module
v2.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

README

go-messaging-lib

Overview

This project provides simple interfaces to interact with Veritone's core eventing system. There are two main patterns currently supported by this library:

  • Pub-Sub
  • Streaming

Depending on the underlying eventing system, user should use the corresponding package to initialize the its client. The supported systems are Kafka and NSQ.

Goals

  • Offers simple setup and just work out of the box.
  • Provides consistent interfaces that should work for various eventing systems (Kafka, RabbitMQ, NSQ, etc.), thus preventing major breaking changes.
  • Provides multiple examples to jump start.
  • Handles edge cases and difficult technical requirements behind the scene.
  • Exposes monitoring statistics with prometheus.

Usage

Please see the instructions

Basic Operations:

Create Producer
message, err := json.Marshal(data)
if err != nil {
    // handle error
}
producer := Producer(topic, kafka.StrategyRoundRobin, "localhost:9092")
msg, err := NewMessage("hash_key", message)
if err != nil {
    // handle error
}
err = producer.Produce(context.TODO(), msg)
if err != nil {
    // handle error
}
err = producer.Close()
if err != nil {
    // handle error
}
Create Consumer
consumer, err = kafka.Consumer("topic_name", "consumer_group_name", "localhost:9092")
if err != nil {
    // handle error
}
queue, err = consumer.Consume(context.TODO(), kafka.ConsumerGroupOption)
if err != nil {
    // handle error
}
for item := range queue {
    log.Printf("Received: (%s) (%#v) (%T)\n", item.Payload(), item.Metadata(), item.Raw())
}

Notes

This repo is still a WIP. It's not yet suitable for production use.

Mock

This package includes mocks of all its interface types (in package mocks) that is useful for testing. Please update it after changing any of the interfaces.

Mockery - A mock code autogenerator for golang

mockery -name=Manager
mockery -name=Producer
...

Documentation

Index

Constants

View Source
const JaegerAgentHostPort = "0.0.0.0:6831"

JaegerAgentHostPort point to the agent deployed with sidecar pattern Deployment should use the default port

Variables

This section is empty.

Functions

func MapToLogger

func MapToLogger(data map[string]string) []zap.Field

func NameFromEvent

func NameFromEvent(payload []byte) string

Types

type Consumer

type Consumer interface {
	Consume(context.Context, OptionCreator) (<-chan Event, error)
	io.Closer
}

Consumer defines functions of a consumer/subscriber

type Event

type Event interface {
	Payload() []byte
	Metadata() map[string]interface{}
	Raw() interface{}
}

type Logger

type Logger = *zap.Logger

func AddLogger

func AddLogger(level string) (Logger, error)

func MustAddLogger

func MustAddLogger(level string) Logger

type Manager

type Manager interface {
	ListTopics(context.Context) (interface{}, error)
	CreateTopics(context.Context, OptionCreator, ...string) error
	DeleteTopics(context.Context, ...string) error
	io.Closer
}

Manager provides basic administrative functions for the messaging system

type Messager

type Messager interface {
	Message() interface{}
}

Messager defines a contract for creating a compatible message type

type OptionCreator

type OptionCreator interface {
	Options() interface{}
}

OptionCreator defines a contract for making metadata/options for consumer

type Producer

type Producer interface {
	Produce(context.Context, Messager, ...Event) error
	io.Closer
}

Producer defines functions of a producer/publisher

type StreamReader

type StreamReader interface {
	io.ReadCloser
}

StreamReader reads stuff from a stream

type StreamWriter

type StreamWriter interface {
	io.WriteCloser
}

StreamWriter writes things to a stream

type Tracer

type Tracer struct {
	opentracing.Tracer
	// contains filtered or unexported fields
}

func AddTracer

func AddTracer(serviceName, env string) (*Tracer, error)

func MustAddTracer

func MustAddTracer(serviceName, env string) *Tracer

func (*Tracer) Decorate

func (t *Tracer) Decorate(msgType interface{}, msgPayload []byte) *events.VtEvent

func (*Tracer) Trace

func (t *Tracer) Trace(to *events.VtEvent, from *events.VtEvent)

Directories

Path Synopsis
example
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0
Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0 Code generated by mockery v1.0.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL