hedwig

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2019 License: Apache-2.0 Imports: 28 Imported by: 0

README

Hedwig Library for Go

Build Status Go Report Card Godoc Coverage

Hedwig is a inter-service communication bus that works on AWS SQS/SNS, while keeping things pretty simple and straight forward. It uses JSON schema draft v4 for schema validation so all incoming and outgoing messages are validated against pre-defined schema.

Hedwig allows separation of concerns between consumers and publishers so your services are loosely coupled, and the contract is enforced by the schema validation. Hedwig may also be used to build asynchronous APIs.

For intra-service messaging, see Taskhawk.

Fan Out

Hedwig utilizes SNS for fan-out configuration. A publisher publishes messages on a topic. This message may be received by zero or more consumers. The publisher need not be aware of the consuming application. There are a variety of messages that may be published as such, but they generally fall into two buckets:

  • Asynchronous API Requests: Hedwig may be used to call APIs asynchronously. The contract is enforced by your infra-structure by connecting SNS topics to SQS queues, and payload is validated using the schema you define. Response is a delivered using a separate message if required.
  • Notifications: The most common use case is to notify other services/apps that may be interested in events. For example, your User Management app can publish a user.created message notification to all your apps. As publishers and consumers are loosely coupled, this separation of concerns is very effective in ensuring a stable eco-system.

Provisioning

Hedwig works on SQS and SNS as backing queues. Before you can publish/consume messages, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Hedwig provides tools to make infra configuration easier: see Terraform and Hedwig Terraform Generator for further details.

Quick Start

First, install the library:

go get github.com/Automatic/hedwig-go

Create a JSON-schema and save as schema.json:


    {
        "id": "https://hedwig.automatic.com/schema#",
        "$schema": "http://json-schema.org/draft-04/schema",
        "schemas": {
            "email.send": {
                "1.*": {
                    "description": "Request to send email",
                    "type": "object",
                    "required": [
                        "to",
                        "subject"
                    ],
                    "properties": {
                        "to": {
                            "type": "string",
                            "pattern": "^\\S+@\\S+$"
                        },
                        "subject": {
                            "type": "string",
                            "minLength": 2
                        }
                    }
                }
            }
        }
    }

Next, set up a few configuration settings:

    validator, err := hedwig.NewMessageValidator("schema.json")
    if err != nil {
        panic("Failed to create validator")
    }
    settings := &hedwig.Settings{
        AWSAccessKey:              <YOUR AWS KEY>,
        AWSAccountID:              <YOUR AWS ACCOUNT ID>,
        AWSRegion:                 <YOUR AWS REGION>,
        AWSSecretKey:              <YOUR AWS SECRET KEY>,
        AWSSessionToken:           <YOUR AWS SESSION TOKEN>,
        CallbackRegistry:          hedwig.NewCallbackRegistry(),
        Publisher:                 "MYAPP",
        QueueName:                 "DEV-MYAPP",
        MessageRouting:            map[hedwig.MessageRouteKey]string{
            hedwig.MessageRouteKey{
                MessageType:    "email.send",
    		        MessageMajorVersion: 1,
    	      }: "send_email",
        },
        Validator:                 validator,
    }

These configuration settings will be passed into the library for initialization.

Next define the models associated with the schemas. These models should have factory functions as well.

    type SendEmail struct {
        Subject string `json:"subject"`
        To      string `json:"to"`
    }

    // Factory that returns empty struct
    func NewSendEmailData() interface{} { return new(SendEmail) }

Then, simply define your topic handler and register the handler:

    // Handler
    func HandleSendEmail(ctx context.Context, msg *hedwig.Message) error {
        // Send email
    }

    // Register handler
    cbk := CallbackKey{
        MessageType:    "email.send",
        MessageMajorVersion: 1,
    }
    settings.CallbackRegistry.RegisterCallback(cbk, HandleSendEmail, NewSendEmailData)

Initialize the publisher

    sessionCache := hedwig.NewAWSSessionsCache()
    publisher := hedwig.NewPublisher(sessionCache, settings)

And finally, send a message:

    headers := map[string]string{}
    msg, err := hedwig.NewMessage(settings, "email.send", "1.0", headers, data)
    if err != nil {
        return err
    }
    publisher.Publish(context.Background(), msg)

Development

Prerequisites

Install go1.11.x

Getting Started

$ cd ${GOPATH}/src/github.com/Automatic/hedwig-go
$ go build
Running Tests

$ make test

Getting Help

We use GitHub issues for tracking bugs and feature requests.

  • If it turns out that you may have found a bug, please open an issue

Release notes

Current version: v1.0.1-dev

v1.0.0
  • Initial version

Documentation

Index

Constants

View Source
const (
	FormatVersionV1 = "1.0"

	FormatCurrentVersion = FormatVersionV1
)

Message format versions

Variables

View Source
var ErrRetry = errors.New("Retry error")

ErrRetry should cause the task to retry, but not treat the retry as an error

Functions

func NewLambdaHandler

func NewLambdaHandler(consumer ILambdaConsumer) lambda.Handler

NewLambdaHandler returns a new lambda Handler that can be started like so:

func main() {
    lambda.StartHandler(NewLambdaHandler(consumer))
}

If you want to add additional error handle (e.g. panic catch etc), you can always use your own Handler, and call LambdaHandler.Invoke

Types

type AWSSessionsCache

type AWSSessionsCache struct {
	// contains filtered or unexported fields
}

AWSSessionsCache is a cache that holds sessions

func NewAWSSessionsCache

func NewAWSSessionsCache() *AWSSessionsCache

NewAWSSessionsCache creates a new session cache

func (*AWSSessionsCache) GetSession

func (c *AWSSessionsCache) GetSession(settings *Settings) *session.Session

GetSession retrieves a session if it is cached, otherwise creates one

type CallbackFunction

type CallbackFunction func(context.Context, *Message) error

CallbackFunction is the function signature for a hedwig callback function

type CallbackKey

type CallbackKey struct {
	// Message type
	MessageType string
	// Message major version
	MessageMajorVersion int
}

CallbackKey is a key identifying a hedwig callback

type CallbackRegistry

type CallbackRegistry struct {
	// contains filtered or unexported fields
}

CallbackRegistry maps hedwig messages to callback functions and callback datas

func NewCallbackRegistry

func NewCallbackRegistry() *CallbackRegistry

NewCallbackRegistry creates a callback registry

func (*CallbackRegistry) RegisterCallback

func (cr *CallbackRegistry) RegisterCallback(cbk CallbackKey, cbf CallbackFunction, newData NewData)

RegisterCallback registers the given callback function to the given message type and message major version. Required for consumers. An error will be returned if an incoming message is missing a callback.

type GetLoggerFunc

type GetLoggerFunc func(ctx context.Context) Logger

GetLoggerFunc returns the logger object

func LogrusGetLoggerFunc

func LogrusGetLoggerFunc(fn func(ctx context.Context) *logrus.Entry) GetLoggerFunc

type ILambdaConsumer

type ILambdaConsumer interface {
	// HandleLambdaInput processes hedwig messages for the provided message types for Lambda apps
	HandleLambdaEvent(ctx context.Context, snsEvent events.SNSEvent) error
}

ILambdaConsumer represents a lambda event consumer

func NewLambdaConsumer

func NewLambdaConsumer(sessionCache *AWSSessionsCache, settings *Settings) ILambdaConsumer

NewLambdaConsumer creates a new consumer object used for lambda apps

type IMessageValidator

type IMessageValidator interface {
	SchemaRoot() string
	Validate(message *Message) error
}

IMessageValidator handles validating Hedwig messages

func NewMessageValidator

func NewMessageValidator(schemaFilePath string) (IMessageValidator, error)

NewMessageValidator creates a new validator from the given file

func NewMessageValidatorFromBytes

func NewMessageValidatorFromBytes(schemaFile []byte) (IMessageValidator, error)

NewMessageValidatorFromBytes from an byte encoded schema file

type IPublisher

type IPublisher interface {
	Publish(ctx context.Context, message *Message) error
}

IPublisher handles all publish related functions

func NewPublisher

func NewPublisher(sessionCache *AWSSessionsCache, settings *Settings) IPublisher

NewPublisher creates a new Publisher

type IQueueConsumer

type IQueueConsumer interface {
	// ListenForMessages starts a hedwig listener for the provided message types
	//
	// This function never returns by default. Possible shutdown methods:
	// 1. Cancel the context - returns immediately.
	// 2. Set a deadline on the context of less than 10 seconds - returns after processing current messages.
	// 3. Run for limited number of loops by setting LoopCount on the request - returns after running loop a finite
	// number of times
	ListenForMessages(ctx context.Context, request *ListenRequest) error
}

IQueueConsumer represents a hedwig queue consumer

func NewQueueConsumer

func NewQueueConsumer(sessionCache *AWSSessionsCache, settings *Settings) IQueueConsumer

NewQueueConsumer creates a new consumer object used for a queue

type JSONTime

type JSONTime time.Time

JSONTime is just a wrapper around time that serializes time to epoch in milliseconds

func (JSONTime) MarshalJSON

func (t JSONTime) MarshalJSON() ([]byte, error)

MarshalJSON changes time to epoch in milliseconds

func (*JSONTime) UnmarshalJSON

func (t *JSONTime) UnmarshalJSON(b []byte) error

UnmarshalJSON changes time from epoch in milliseconds

type LambdaHandler

type LambdaHandler struct {
	// contains filtered or unexported fields
}

LambdaHandler implements Lambda.Handler interface

func (*LambdaHandler) Invoke

func (handler *LambdaHandler) Invoke(ctx context.Context, payload []byte) ([]byte, error)

type LambdaRequest

type LambdaRequest struct {
	// Context for request
	Context context.Context
	// SNS record for this request
	EventRecord *events.SNSEventRecord
}

LambdaRequest contains request objects for a lambda

type ListenRequest

type ListenRequest struct {
	NumMessages        uint32 // default 1
	VisibilityTimeoutS uint32 // defaults to queue configuration
	LoopCount          uint32 // defaults to infinite loops
}

ListenRequest represents a request to listen for messages

type Logger

type Logger interface {
	// Error logs an error with a message. `fields` can be used as additional metadata for structured logging.
	// You can generally expect one of these fields to be available: message_sqs_id, message_sns_id.
	// By default fields are logged as a map using fmt.Sprintf
	Error(err error, message string, fields LoggingFields)

	// Warn logs a warn level log with a message. `fields` param works the same as `Error`.
	Warn(err error, message string, fields LoggingFields)

	// Info logs a debug level log with a message. `fields` param works the same as `Error`.
	Info(message string, fields LoggingFields)

	// Debug logs a debug level log with a message. `fields` param works the same as `Error`.
	Debug(message string, fields LoggingFields)
}

Logger represents an logging interface that this library expects

type LoggingFields

type LoggingFields map[string]interface{}

type Message

type Message struct {
	Data          interface{} `json:"data"`
	FormatVersion string      `json:"format_version"`
	ID            string      `json:"id"`
	Metadata      *metadata   `json:"metadata"`
	Schema        string      `json:"schema"`

	DataSchemaVersion *semver.Version `json:"-"`
	// contains filtered or unexported fields
}

Message model for hedwig messages.

func NewMessage

func NewMessage(settings *Settings, dataType string, dataSchemaVersion string, headers map[string]string, data interface{}) (*Message, error)

NewMessage creates new Hedwig messages based off of message type and schema version

func (*Message) DataJSONString

func (m *Message) DataJSONString() (string, error)

DataJSONString returns a string representation of Message

func (*Message) JSONString

func (m *Message) JSONString() (string, error)

JSONString returns a string representation of Message

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(b []byte) error

UnmarshalJSON serializes the message from json

type MessageDefaultHeadersHook

type MessageDefaultHeadersHook func(ctx context.Context, message *Message) map[string]string

MessageDefaultHeadersHook is called to return default headers per message

type MessageRouteKey

type MessageRouteKey struct {
	// Message type
	MessageType string
	// Message major version
	MessageMajorVersion int
}

MessageRouteKey is a key identifying a message route

type NewData

type NewData func() interface{}

NewData is a function that returns a pointer to struct type that a hedwig message data should conform to

type PreDeserializeHook

type PreDeserializeHook func(ctx *context.Context, messageData *string) error

PreDeserializeHook is called after a message has been deserialized from JSON, but before a Message is created and validated. This hook may be used to modify the format over the wire.

type PreProcessHookLambda

type PreProcessHookLambda func(r *LambdaRequest) error

PreProcessHookLambda is called on a sns event before any processing happens for a lambda. This hook may be used to perform initializations such as set up a global request id based on message headers.

type PreProcessHookSQS

type PreProcessHookSQS func(r *SQSRequest) error

PreProcessHookSQS is called on a message before any processing happens for a SQS queue. This hook may be used to perform initializations such as set up a global request id based on message headers.

type PreSerializeHook

type PreSerializeHook func(ctx *context.Context, messageData *string) error

PreSerializeHook is called before a message is serialized to JSON. This hook may be used to modify the format over the wire.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher handles hedwig publishing for Automatic

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, message *Message) error

Publish a message on Hedwig

type SQSRequest

type SQSRequest struct {
	// Context for request
	Context context.Context
	// SQS message for this request
	QueueMessage *sqs.Message
}

SQSRequest contains request objects for a SQS handler

type Settings

type Settings struct {
	// AWS Region
	AWSRegion string
	// AWS account id
	AWSAccountID string
	// AWS access key
	AWSAccessKey string
	// AWS secret key
	AWSSecretKey string
	// AWS session tokenthat represents temporary credentials (i.e. for Lambda app)
	AWSSessionToken string

	// AWS read timeout for publisher
	AWSReadTimeoutS time.Duration // optional; default: 2 seconds

	// AWS debug request error logs toggle
	AWSDebugRequestLogEnabled bool

	// CallbackRegistry contains callbacks and message data factories by message type and message version
	CallbackRegistry *CallbackRegistry

	// GetLogger is a function that takes the context object and returns a logger. This may be used to plug in
	// your desired logger library. Defaults to using std library.
	// Convenience structs are provided for popular libraries: LogrusGetLoggerFunc
	GetLogger GetLoggerFunc

	// Returns default headers for a message before a message is published. This will apply to ALL messages.
	// Can be used to inject custom headers (i.e. request id).
	MessageDefaultHeadersHook MessageDefaultHeadersHook

	// Maps message type and major version to topic names
	//   <message type>, <message version> => topic name
	// An entry is required for every message type that the app wants to consumer or publish. It is
	// recommended that major versions of a message be published on separate topics.
	MessageRouting map[MessageRouteKey]string

	// Hedwig pre process hook called before any processing is done on message
	PreProcessHookLambda PreProcessHookLambda // optional
	PreProcessHookSQS    PreProcessHookSQS    // optional

	// Hedwig hook called before a message is serialized to JSON
	PreSerializeHook PreSerializeHook // optional

	// Hedwig hook called before a message has been deserialized into a Message struct
	PreDeserializeHook PreDeserializeHook // optional

	// Publisher name
	Publisher string

	// Hedwig queue name. Exclude the `HEDWIG-` prefix
	QueueName string

	// ShutdownTimeout is the time the app has to shut down before being brutally killed
	ShutdownTimeout time.Duration // optional; defaults to 10s

	// Message validator using JSON schema for validation. Additional JSON schema formats may be added.
	// Please see github.com/santhosh-tekuri/jsonschema for more details.
	Validator IMessageValidator
}

Settings for Hedwig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL