eventstream

package module
v3.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2021 License: Apache-2.0 Imports: 19 Imported by: 0

README

Build Status

eventstream-go-sdk

Go SDK for integrating with AccelByte's event stream

Usage

Install
go get -u github.com/AccelByte/eventstream-go-sdk
Importing
eventstream "github.com/AccelByte/eventstream-go-sdk/v3"

To create a new event stream client, use this function:

client, err := eventstream.NewClient(prefix, stream, brokers, config)

NewClient requires 4 parameters :

  • prefix : Topic prefix from client (string)
  • stream : Stream name. e.g. kafka, stdout, none (string)
  • brokers : List of kafka broker (array of string)
  • config : Custom broker configuration from client. This is optional and only uses the first arguments. (variadic *BrokerConfig)

Supported Stream

Currently event stream are supported by these stream:

Kafka Stream

Publish and subscribe an event to / from Kafka stream.

currently compatible with golang version from 1.12+ and Kafka versions from 0.10.1.0 to 2.1.0.

To create a kafka stream client, just pass the stream parameter with kafka.

Custom Configuration

SDK support with custom configuration for kafka stream, that is :

  • DialTimeout : Timeout duration during connecting to kafka broker. Default: 10 Seconds (time.Duration)
  • ReadTimeout : Timeout duration during consume topic from kafka broker. Default: 10 Seconds (time.Duration)
  • WriteTimeout : Timeout duration during publish event to kafka broker. Default: 10 Seconds (time.Duration)
  • LogMode : eventstream will print log based on following levels: info, warn, debug, error and off. Default: off (string)
  • StrictValidation : If it set true, eventstream will enable strict validation for event fields, Default: False (boolean)
    config := &eventstream.BrokerConfig{
		LogMode:          eventstream.InfoLevel,
		StrictValidation: true,
		DialTimeout:      1 * time.Second,
		ReadTimeout:      1 * time.Second,
		WriteTimeout:     1 * time.Second,
	}
Stdout Stream

This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this will print unnecessary log.

To create a stdout stream client, just pass the stream parameter with stdout.

Blackhole

This is used when client don't want the service to send event data to anywhere.

To create a blackhole client, just pass the stream parameter with none.

Publish Subscribe Event

Publish

Publish or sent an event into stream. Client able to publish an event into single or multiple topic. Publish to kafka stream support with exponential backoff retry. (max 3x)

To publish an event, use this function:

err := client.Publish(
		NewPublish().
			Topic(TopicName).
			EventName(EventName).
			Namespace(Namespace).
			ClientID(ClientID).
			UserID(UserID).
			SessionID(SessionID).
			TraceID(TraceID).
			SpanContext(SpanContext).
			Context(Context).
			EventID(eventID int).
			EventType(eventType int).
			EventLevel(eventLevel int).
			ServiceName(serviceName string).
			ClientIDs(clientIDs []string).
			TargetUserIDs(targetUserIDs []string).
			TargetNamespace(targetNamespace string).
			Privacy(privacy bool).
			AdditionalFields(additionalFields map[string]interface{}).
			Version(Version).
			Payload(Payload).
			ErrorCallback(func(event *Event, err error) {}))
Parameter
  • Topic : List of topic / channel. (variadic string - alphaNumeric(256) - Required)
  • EventName : Event name. (string - alphaNumeric(256) - Required) * Namespace : Event namespace. (string - alphaNumeric(256) - Required)
  • ClientID : Publisher client ID. (string - UUID v4 without Hyphens)
  • UserID : Publisher user ID. (string - UUID v4 without Hyphens)
  • SessionID : Publisher session ID. (string - UUID v4 without Hyphens)
  • TraceID : Trace ID. (string - UUID v4 without Hyphens)
  • SpanContext : Opentracing Jaeger Span Context(string - optional)
  • Context : Golang context. (context - default: context.background)
  • Version : Version of schema. (integer - default: 1)
  • EventID : Event ID. Backward compatibility. (integer)
  • EventType : Event Type. Backward compatibility. (integer)
  • EventLevel : Event Level. Backward compatibility. (integer)
  • ServiceName : Service Name. Backward compatibility. (string)
  • ClientIDs : List of client IDs. Backward compatibility. ([]string - UUID v4 without Hyphens)
  • TargetUserIDs : List of target client IDs. Backward compatibility. ([]string - UUID v4 without Hyphens)
  • TargetNamespace : Target Namespace. Backward compatibility. (string)
  • Privacy : Privacy. Backward compatibility. (bool)
  • AdditionalFields : Additional fields. Backward compatibility. (map[string]interface{})
  • Payload : Additional attribute. (map[string]interface{})
  • ErrorCallback : Callback function when event failed to publish. (func(event *Event, err error){})
Subscribe

To subscribe an event from specific topic in stream, client should be register a callback function that executed once event received. A callback aimed towards specific topic and event name.

To subscribe an event, use this function:

err := client.Register(
		NewSubscribe().
			Topic(topicName).
			EventName(mockEvent.EventName).
			GroupID(groupID).
			Offset(offset).
			Context(ctx).
			Callback(func(ctx context.Context, event *Event, err error) error { return nil }))
Unsubscribe

To unsubscribe a topic from the stream, client should close passed context

To unsubscribe from the topic, use this function:

ctx, cancel := context.WithCancel(context.Background())

err := client.Register(
    NewSubscribe().
        Topic(topicName).
        EventName(mockEvent.EventName).
        GroupID(groupID).
        Context(ctx).
        Callback(func(ctx context.Context, event *Event, err error) error {
            if ctx.Error() != nil {
                // unsubscribed
                return nil
            }           

            return nil
        }))

cancel() // cancel context to unsubscribe
Parameter
  • Topic : Subscribed topic. (string - alphaNumeric(256) - Required)
  • EventName : Event name. (string - alphaNumeric(256) - Required)
  • Namespace : Event namespace. (string - alphaNumeric(256) - Required)
  • GroupID : Message broker group / queue ID. (string - alphaNumeric(256) - default: *)
  • Context : Golang context. (context - default: context.background)
  • Callback : Callback function when receive event. (func(ctx context.Context,event *Event, err error) error {} - required)
  • Offset : Offset(position) inside the topic from which processing begins(int64 - Optional - default: -1 (the tail))

Callback function passing 3 parameters:

  • ctx context to check that consumer unsubscribed
  • event is object that store event message.
  • err is an error that happen when consume the message.

Callback function return 1 result(error):

  • return nil to commit the event(mark as processed)
  • return any error to retry processing(worker will be selected randomly)

Event Message

Event message is a set of event information that would be publish or consume by client.

Event message format :

  • id : Event ID (string - UUID v4 without Hyphens)
  • name : Event name (string)
  • namespace : Event namespace (string)
  • traceId : Trace ID (string - UUID v4 without Hyphens)
  • spanContext : Opentracing Jaeger Span Context (string - optional)
  • clientId : Publisher client ID (string - UUID v4 without Hyphens)
  • userId : Publisher user ID (string - UUID v4 without Hyphens)
  • sessionId : Publisher session ID (string - UUID v4 without Hyphens)
  • timestamp : Event time (time.Time)
  • event_id : Event id. backward compatibility. (integer)
  • event_type : Event type. backward compatibility. (integer)
  • event_level : Event level. backward compatibility. (integer)
  • service : Service name. backward compatibility. (string)
  • client_ids : Client IDs. backward compatibility. ([]string - UUID v4 without Hyphens)
  • target_user_ids : Target user IDs. backward compatibility. ([]string - UUID v4 without Hyphens)
  • target_namespace : Target namespace. backward compatibility. (string)
  • privacy : Privacy. backward compatibility. (bool)
  • additional_fields : Set of data / object that given by producer. Each data have own key for specific purpose. Backward compatibility. (map[string]interface{})
  • version : Event schema version (integer)
  • payload : Set of data / object that given by producer. Each data have own key for specific purpose. (map[string]interface{})

SpanContext usage

  • Create Jaeger Span Context from an event
	import "github.com/AccelByte/go-restful-plugins/v3/pkg/jaeger"

    spanContextString := event.SpanContext
	span, ctx := jaeger.ChildSpanFromRemoteSpan(rootCtx, "service-name.operation-name", spanContextString)
  • Put existing Jaeger Span Context into an event
	import "github.com/AccelByte/go-restful-plugins/v3/pkg/jaeger"

    spanContextString := jaeger.GetSpanContextString(span)
    
    err := client.Register(
        NewSubscribe().
            Topic(topicName).
            SpanContext(spanContextString).
            // ...
License
Copyright © 2020, AccelByte Inc. Released under the Apache License, Version 2.0

Documentation

Index

Constants

View Source
const (
	OffLevel   = "off"
	InfoLevel  = "info"
	DebugLevel = "debug"
	WarnLevel  = "warn"
	ErrorLevel = "error"
)

log level

View Source
const (
	DefaultSSLCertPath = "/etc/ssl/certs/ca-certificates.crt" // Alpine certificate path

)

Variables

This section is empty.

Functions

func GetTLSCertFromFile

func GetTLSCertFromFile(path string) (*tls.Certificate, error)

GetTLSCertFromFile reads file, divides into key and certificates

Types

type BlackholeClient

type BlackholeClient struct{}

BlackholeClient satisfies the publisher for mocking

func (*BlackholeClient) Publish

func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) error

func (*BlackholeClient) Register

func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) error

type BrokerConfig

type BrokerConfig struct {
	LogMode          string
	StrictValidation bool
	CACertFile       string
	DialTimeout      time.Duration
	ReadTimeout      time.Duration
	WriteTimeout     time.Duration
}

BrokerConfig is custom configuration for message broker

type Client

type Client interface {
	Publish(publishBuilder *PublishBuilder) error
	Register(subscribeBuilder *SubscribeBuilder) error
}

Client is an interface for event stream functionality

func NewClient

func NewClient(prefix, stream string, brokers []string, config ...*BrokerConfig) (Client, error)

type ConfigWrapper

type ConfigWrapper struct {
	Writer *kafka.Writer
	// contains filtered or unexported fields
}

type Event

type Event struct {
	ID               string                 `json:"id"`
	EventName        string                 `json:"name"`
	Namespace        string                 `json:"namespace"`
	ClientID         string                 `json:"clientId"`
	TraceID          string                 `json:"traceId"`
	SpanContext      string                 `json:"spanContext"`
	UserID           string                 `json:"userId"`
	SessionID        string                 `json:"sessionId"`
	Timestamp        string                 `json:"timestamp"`
	Version          int                    `json:"version"`
	EventID          int                    `json:"event_id"`
	EventType        int                    `json:"event_type"`
	EventLevel       int                    `json:"event_level"`
	ServiceName      string                 `json:"service"`
	ClientIDs        []string               `json:"client_ids"`
	TargetUserIDs    []string               `json:"target_user_ids"`
	TargetNamespace  string                 `json:"target_namespace"`
	Privacy          bool                   `json:"privacy"`
	Topic            string                 `json:"topic"`
	AdditionalFields map[string]interface{} `json:"additional_fields,omitempty"`
	Payload          map[string]interface{} `json:"payload"`
}

Event defines the structure of event

func ConstructEvent

func ConstructEvent(publishBuilder *PublishBuilder) (kafka.Message, *Event, error)

ConstructEvent construct event message

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

KafkaClient wraps client's functionality for Kafka

func (*KafkaClient) Publish

func (client *KafkaClient) Publish(publishBuilder *PublishBuilder) error

Publish send event to single or multiple topic with exponential backoff retry

func (*KafkaClient) Register

func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error

Register register callback function and then subscribe topic nolint: gocognit,funlen

type PublishBuilder

type PublishBuilder struct {
	// contains filtered or unexported fields
}

PublishBuilder defines the structure of message which is sent through message broker

func NewPublish

func NewPublish() *PublishBuilder

NewPublish create new PublishBuilder instance

func (*PublishBuilder) AdditionalFields

func (p *PublishBuilder) AdditionalFields(additionalFields map[string]interface{}) *PublishBuilder

AdditionalFields set AdditionalFields of publisher event

func (*PublishBuilder) ClientID

func (p *PublishBuilder) ClientID(clientID string) *PublishBuilder

ClientID set clientID of publisher event

func (*PublishBuilder) ClientIDs

func (p *PublishBuilder) ClientIDs(clientIDs []string) *PublishBuilder

ClientIDs set clientIDs of publisher event

func (*PublishBuilder) Context

func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder

Context define client context when publish event. default: context.Background()

func (*PublishBuilder) ErrorCallback

func (p *PublishBuilder) ErrorCallback(errorCallback func(event *Event, err error)) *PublishBuilder

ErrorCallback function to handle the event when failed to publish

func (*PublishBuilder) EventID

func (p *PublishBuilder) EventID(eventID int) *PublishBuilder

EventID set eventID of publisher event

func (*PublishBuilder) EventLevel

func (p *PublishBuilder) EventLevel(eventLevel int) *PublishBuilder

EventLevel set eventLevel of publisher event

func (*PublishBuilder) EventName

func (p *PublishBuilder) EventName(eventName string) *PublishBuilder

EventName set name of published event

func (*PublishBuilder) EventType

func (p *PublishBuilder) EventType(eventType int) *PublishBuilder

EventType set eventType of publisher event

func (*PublishBuilder) Namespace

func (p *PublishBuilder) Namespace(namespace string) *PublishBuilder

Namespace set namespace of published event

func (*PublishBuilder) Payload

func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder

Payload is a event payload that will be published

func (*PublishBuilder) Privacy

func (p *PublishBuilder) Privacy(privacy bool) *PublishBuilder

Privacy set privacy of publisher event

func (*PublishBuilder) ServiceName

func (p *PublishBuilder) ServiceName(serviceName string) *PublishBuilder

ServiceName set serviceName of publisher event

func (*PublishBuilder) SessionID

func (p *PublishBuilder) SessionID(sessionID string) *PublishBuilder

SessionID set sessionID of publisher event

func (*PublishBuilder) SpanContext

func (p *PublishBuilder) SpanContext(spanID string) *PublishBuilder

SpanContext set jaeger spanContext of publisher event

func (*PublishBuilder) TargetNamespace

func (p *PublishBuilder) TargetNamespace(targetNamespace string) *PublishBuilder

TargetNamespace set targetNamespace of publisher event

func (*PublishBuilder) TargetUserIDs

func (p *PublishBuilder) TargetUserIDs(targetUserIDs []string) *PublishBuilder

TargetUserIDs set targetUserIDs of publisher event

func (*PublishBuilder) Topic

func (p *PublishBuilder) Topic(topics ...string) *PublishBuilder

Topic set channel / topic name

func (*PublishBuilder) TraceID

func (p *PublishBuilder) TraceID(traceID string) *PublishBuilder

TraceID set traceID of publisher event

func (*PublishBuilder) UserID

func (p *PublishBuilder) UserID(userID string) *PublishBuilder

UserID set userID of publisher event

func (*PublishBuilder) Version

func (p *PublishBuilder) Version(version int) *PublishBuilder

Version set event schema version

type StdoutClient

type StdoutClient struct {
	// contains filtered or unexported fields
}

StdoutClient satisfies the publisher for mocking

func (*StdoutClient) Publish

func (client *StdoutClient) Publish(publishBuilder *PublishBuilder) error

Publish print event to console

func (*StdoutClient) Register

func (client *StdoutClient) Register(subscribeBuilder *SubscribeBuilder) error

Register print event to console

type SubscribeBuilder

type SubscribeBuilder struct {
	// contains filtered or unexported fields
}

SubscribeBuilder defines the structure of message which is sent through message broker

func NewSubscribe

func NewSubscribe() *SubscribeBuilder

NewSubscribe create new SubscribeBuilder instance

func (*SubscribeBuilder) Callback

func (s *SubscribeBuilder) Callback(
	callback func(ctx context.Context, event *Event, err error) error,
) *SubscribeBuilder

Callback to do when the event received

func (*SubscribeBuilder) Context

Context define client context when subscribe event. default: context.Background()

func (*SubscribeBuilder) EventName

func (s *SubscribeBuilder) EventName(eventName string) *SubscribeBuilder

EventName set event name that will be subscribe

func (*SubscribeBuilder) GroupID

func (s *SubscribeBuilder) GroupID(groupID string) *SubscribeBuilder

GroupID set subscriber groupID or queue group name

func (*SubscribeBuilder) Offset

func (s *SubscribeBuilder) Offset(offset int64) *SubscribeBuilder

Offset set Offset of the event to start

func (*SubscribeBuilder) Topic

func (s *SubscribeBuilder) Topic(topic string) *SubscribeBuilder

Topic set topic that will be subscribe

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL