binding

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2020 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package binding defines interfaces for protocol bindings.

NOTE: Most applications that emit or consume events should use the ../client package, which provides a simpler API to the underlying binding.

The interfaces in this package provide extra encoding and protocol information to allow efficient forwarding and end-to-end reliable delivery between a Receiver and a Sender belonging to different bindings. This is useful for intermediary applications that route or forward events, but not necessary for most "endpoint" applications that emit or consume events.

Protocol Bindings

A protocol binding implements at least Message, Sender and Receiver, and usually Encoder.

Receiver: receives protocol messages and wraps them to implement the Message interface.

Message: interface that defines the visitors for an encoded event in structured mode, binary mode or event mode. A method is provided to read the Encoding of the message

Sender: converts arbitrary Message implementations to a protocol-specific form and sends them. A protocol Sender should preserve the spec-version and structured/binary mode of sent messages as far as possible. This package provides generic Sender wrappers to pre-process messages into a specific spec-version or structured/binary mode when the user requires that.

Message and ExactlyOnceMessage provide methods to allow acknowledgments to propagate when a reliable messages is forwarded from a Receiver to a Sender. QoS 0 (unreliable), 1 (at-least-once) and 2 (exactly-once) are supported.

Intermediaries

Intermediaries can forward Messages from a Receiver to a Sender without knowledge of the underlying protocols. The Message interface allows structured messages to be forwarded without decoding and re-encoding. It also allows any Message to be fully decoded and examined as needed.

Example (Implementing)

Example of implementing a transport including a simple message type, and a transport sender and receiver.

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"io"
	"io/ioutil"

	"github.com/cloudevents/sdk-go/pkg/binding"
	"github.com/cloudevents/sdk-go/pkg/binding/format"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
)

// ExMessage is a json.RawMessage, a byte slice containing a JSON encoded event.
// It implements binding.MockStructuredMessage
//
// Note: a good binding implementation should provide an easy way to convert
// between the Message implementation and the "native" message format.
// In this case it's as simple as:
//
//	native = ExMessage(impl)
//	impl = json.RawMessage(native)
//
// For example in a HTTP binding it should be easy to convert between
// the HTTP binding.Message implementation and net/http.Request and
// Response types.  There are no interfaces for this conversion as it
// requires the use of unknown types.
type ExMessage json.RawMessage

func (m ExMessage) GetParent() binding.Message {
	return nil
}

func (m ExMessage) Encoding() binding.Encoding {
	return binding.EncodingStructured
}

func (m ExMessage) Structured(ctx context.Context, b binding.StructuredEncoder) error {
	return b.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m))
}

func (m ExMessage) Binary(context.Context, binding.BinaryEncoder) error {
	return binding.ErrNotBinary
}

func (m ExMessage) Finish(error) error { return nil }

var _ binding.Message = (*ExMessage)(nil)

// ExSender sends by writing JSON encoded events to an io.Writer
// ExSender supports transcoding
// ExSender implements directly StructuredEncoder & EventEncoder
type ExSender struct {
	encoder      *json.Encoder
	transformers binding.TransformerFactories
}

func NewExSender(w io.Writer, factories ...binding.TransformerFactory) binding.Sender {
	return &ExSender{encoder: json.NewEncoder(w), transformers: factories}
}

func (s *ExSender) Send(ctx context.Context, m binding.Message) error {
	// Encode tries the various encodings, starting with provided root encoder factories.
	// If a sender doesn't support a specific encoding, a null root encoder factory could be provided.
	_, err := binding.Encode(
		ctx,
		m,
		s,
		nil,
		s.transformers,
	)

	return err
}

func (s *ExSender) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error {
	if f == format.JSON {
		b, err := ioutil.ReadAll(event)
		if err != nil {
			return err
		}
		return s.encoder.Encode(json.RawMessage(b))
	} else {
		return binding.ErrNotStructured
	}
}

func (s *ExSender) Close(context.Context) error { return nil }

var _ binding.Sender = (*ExSender)(nil)
var _ binding.StructuredEncoder = (*ExSender)(nil)

// ExReceiver receives by reading JSON encoded events from an io.Reader
type ExReceiver struct{ decoder *json.Decoder }

func NewExReceiver(r io.Reader) binding.Receiver { return &ExReceiver{json.NewDecoder(r)} }

func (r *ExReceiver) Receive(context.Context) (binding.Message, error) {
	var rm json.RawMessage
	err := r.decoder.Decode(&rm) // This is just a byte copy.
	return ExMessage(rm), err
}
func (r *ExReceiver) Close(context.Context) error { return nil }

// NewExTransport returns a transport.Transport which is implemented by
// an ExSender and an ExReceiver
func NewExTransport(r io.Reader, w io.Writer) transport.Transport {
	return binding.NewTransportAdapter(NewExSender(w), NewExReceiver(r), []func(ctx context.Context) context.Context{})
}

// Example of implementing a transport including a simple message type,
// and a transport sender and receiver.
func main() {}
Output:

Example (Using)

This example shows how to use a transport in sender, receiver, and intermediary processes.

The sender and receiver use the client.Client API to send and receive messages. the transport. Only the intermediary example actually uses the transport APIs for efficiency and reliability in forwarding events.

package main

import (
	"context"
	"fmt"
	"io"
	"strconv"

	"github.com/cloudevents/sdk-go/pkg/cloudevents"
	"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
)

const count = 3 // Example ends after this many events.

// The sender uses the cloudevents.Client API, not the transport APIs directly.
func runSender(w io.Writer) error {
	c, err := client.New(NewExTransport(nil, w), client.WithoutTracePropagation())
	if err != nil {
		return err
	}
	for i := 0; i < count; i++ {
		e := cloudevents.New()
		e.SetType("example.com/event")
		e.SetSource("example.com/source")
		e.SetID(strconv.Itoa(i))
		if err := e.SetData(fmt.Sprintf("hello %d", i)); err != nil {
			return err
		}
		if _, _, err := c.Send(context.TODO(), e); err != nil {
			return err
		}
	}
	return nil
}

// The receiver uses the cloudevents.Client API, not the transport APIs directly.
func runReceiver(r io.Reader) error {
	i := 0
	process := func(e cloudevents.Event) error {
		fmt.Printf("%s\n", e)
		i++
		if i == count {
			return io.EOF
		}
		return nil
	}
	c, err := client.New(NewExTransport(r, nil), client.WithoutTracePropagation())
	if err != nil {
		return err
	}
	return c.StartReceiver(context.TODO(), process)
}

// The intermediary receives events and forwards them to another
// process using ExReceiver and ExSender directly.
//
// By forwarding a transport.Message instead of a cloudevents.Event,
// it allows the transports to avoid un-necessary decoding of
// structured events, and to exchange delivery status between reliable
// transports. Even transports using different protocols can ensure
// reliable delivery.
func runIntermediary(r io.Reader, w io.WriteCloser) error {
	defer w.Close()
	for {
		receiver := NewExReceiver(r)
		sender := NewExSender(w)
		for i := 0; i < count; i++ {
			if m, err := receiver.Receive(context.TODO()); err != nil {
				return err
			} else if err := sender.Send(context.TODO(), m); err != nil {
				return err
			}
		}
	}
}

// This example shows how to use a transport in sender, receiver,
// and intermediary processes.
//
// The sender and receiver use the client.Client API to send and
// receive messages.  the transport.  Only the intermediary example
// actually uses the transport APIs for efficiency and reliability in
// forwarding events.
func main() {
	r1, w1 := io.Pipe() // The sender-to-intermediary pipe
	r2, w2 := io.Pipe() // The intermediary-to-receiver pipe

	done := make(chan error)
	go func() { done <- runReceiver(r2) }()
	go func() { done <- runIntermediary(r1, w2) }()
	go func() { done <- runSender(w1) }()
	for i := 0; i < 2; i++ {
		if err := <-done; err != nil && err != io.EOF {
			fmt.Println(err)
		}
	}

}
Output:

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 0
Data,
  "hello 0"

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 1
Data,
  "hello 1"

Validation: valid
Context Attributes,
  specversion: 1.0
  type: example.com/event
  source: example.com/source
  id: 2
Data,
  "hello 2"

Index

Examples

Constants

View Source
const (
	SKIP_DIRECT_STRUCTURED_ENCODING = "SKIP_DIRECT_STRUCTURED_ENCODING"
	SKIP_DIRECT_BINARY_ENCODING     = "SKIP_DIRECT_BINARY_ENCODING"
	PREFERRED_EVENT_ENCODING        = "PREFERRED_EVENT_ENCODING"
)

Variables

View Source
var ErrCannotConvertToEvent = errors.New("cannot convert message to event")
View Source
var ErrNotBinary = errors.New("message is not in binary mode")

ErrNotBinary returned by Message.Binary for non-binary messages.

View Source
var ErrNotStructured = errors.New("message is not in structured mode")

ErrNotStructured returned by Message.Structured for non-structured messages.

View Source
var ErrUnknownEncoding = errors.New("unknown Message encoding")

Error to specify that or the Message is not an event or it is encoded with an unknown encoding

Functions

func EventContextToBinaryEncoder added in v1.1.0

func EventContextToBinaryEncoder(c cloudevents.EventContext, b BinaryEncoder) (err error)

func GetOrDefaultFromCtx added in v1.1.0

func GetOrDefaultFromCtx(ctx context.Context, key string, def interface{}) interface{}

func WithForceBinary added in v1.1.0

func WithForceBinary(ctx context.Context) context.Context

Force binary encoding during the encoding process

func WithForceStructured added in v1.1.0

func WithForceStructured(ctx context.Context) context.Context

Force structured encoding during the encoding process

func WithPreferredEventEncoding added in v1.1.0

func WithPreferredEventEncoding(ctx context.Context, enc Encoding) context.Context

Define the preferred encoding from event to message during the encoding process

func WithSkipDirectBinaryEncoding added in v1.1.0

func WithSkipDirectBinaryEncoding(ctx context.Context, skip bool) context.Context

Skip direct binary to binary encoding during the encoding process

func WithSkipDirectStructuredEncoding added in v1.1.0

func WithSkipDirectStructuredEncoding(ctx context.Context, skip bool) context.Context

Skip direct structured to structured encoding during the encoding process

Types

type BinaryEncoder added in v0.10.0

type BinaryEncoder interface {
	// Method invoked at the beginning of the visit. Useful to perform initial memory allocations
	Start(ctx context.Context) error

	// Set a standard attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding. See package cloudevents/types
	SetAttribute(attribute spec.Attribute, value interface{}) error

	// Set an extension attribute.
	//
	// The value can either be the correct golang type for the attribute, or a canonical
	// string encoding. See package cloudevents/types
	SetExtension(name string, value interface{}) error

	// SetData receives an io.Reader for the data attribute.
	// io.Reader could be empty, meaning that message payload is empty
	SetData(data io.Reader) error

	// End method is invoked only after the whole encoding process ends successfully.
	// If it fails, it's never invoked. It can be used to finalize the message.
	End() error
}

BinaryEncoder is used to visit a binary Message and generate a new representation.

Protocols that supports binary encoding should implement this interface to implement direct binary -> binary transfer and event -> binary.

Start() and End() methods are invoked every time this BinaryEncoder implementation is used to visit a Message

type BindingTransport added in v1.1.0

type BindingTransport struct {
	Sender                  Sender
	Receiver                Receiver
	SenderContextDecorators []func(context.Context) context.Context
	// contains filtered or unexported fields
}

BindingTransport implements transport.Transport using a Sender and Receiver.

func NewTransportAdapter added in v1.1.0

func NewTransportAdapter(s Sender, r Receiver, senderContextDecorators []func(context.Context) context.Context) *BindingTransport

func (*BindingTransport) HasConverter added in v1.1.0

func (t *BindingTransport) HasConverter() bool

func (*BindingTransport) HasTracePropagation added in v1.1.0

func (t *BindingTransport) HasTracePropagation() bool

func (*BindingTransport) Send added in v1.1.0

func (*BindingTransport) SetConverter added in v1.1.0

func (t *BindingTransport) SetConverter(transport.Converter)

func (*BindingTransport) SetReceiver added in v1.1.0

func (t *BindingTransport) SetReceiver(r transport.Receiver)

func (*BindingTransport) StartReceiver added in v1.1.0

func (t *BindingTransport) StartReceiver(ctx context.Context) error

type ChanReceiver added in v0.10.0

type ChanReceiver <-chan Message

ChanReceiver implements Receiver by receiving from a channel.

func (ChanReceiver) Close added in v0.10.0

func (r ChanReceiver) Close(ctx context.Context) error

func (ChanReceiver) Receive added in v0.10.0

func (r ChanReceiver) Receive(ctx context.Context) (Message, error)

type ChanSender added in v0.10.0

type ChanSender chan<- Message

ChanSender implements Sender by sending on a channel.

func (ChanSender) Close added in v0.10.0

func (s ChanSender) Close(ctx context.Context) (err error)

func (ChanSender) Send added in v0.10.0

func (s ChanSender) Send(ctx context.Context, m Message) (err error)

type Closer added in v0.10.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer is the common interface for things that can be closed

type Encoding added in v1.1.0

type Encoding int

Encoding enum specifies the type of encodings supported by binding interfaces

const (
	// Binary encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message
	EncodingBinary Encoding = iota
	// Structured encoding as specified in https://github.com/cloudevents/spec/blob/master/spec.md#message
	EncodingStructured
	// Message is an instance of EventMessage or it contains it nested (through MessageWrapper)
	EncodingEvent
	// When the encoding is unknown (which means that the message is a non-event)
	EncodingUnknown
)

func Encode added in v1.1.0

func Encode(
	ctx context.Context,
	message Message,
	structuredEncoder StructuredEncoder,
	binaryEncoder BinaryEncoder,
	transformers TransformerFactories,
) (Encoding, error)

This is the full algorithm to encode a Message using transformers: 1. It first tries direct encoding using RunEncoders 2. If no direct encoding is possible, it goes through ToEvent to generate an event representation 3. Using the encoders previously defined You can tweak the encoding process using the context decorators WithForceStructured, WithForceStructured, etc. This function guarantees that transformers are invoked only one time during the encoding process. Returns: * EncodingStructured, nil if message was structured and correctly translated to Event * EncodingBinary, nil if message was binary and correctly translated to Event * EncodingStructured, err if message was structured but error happened during translation * BinaryEncoding, err if message was binary but error happened during translation * EncodingUnknown, ErrUnknownEncoding if message is not recognized

func RunDirectEncoding added in v1.1.0

func RunDirectEncoding(
	ctx context.Context,
	message Message,
	structuredEncoder StructuredEncoder,
	binaryEncoder BinaryEncoder,
	factories TransformerFactories,
) (Encoding, error)

Invokes the encoders. createRootStructuredEncoder and createRootBinaryEncoder could be null if the protocol doesn't support it

Returns: * EncodingStructured, nil if message was structured and correctly translated to Event * EncodingBinary, nil if message was binary and correctly translated to Event * EncodingStructured, err if message was structured but error happened during translation * BinaryEncoding, err if message was binary but error happened during translation * EncodingUnknown, ErrUnknownEncoding if message is not recognized

func ToEvent added in v0.11.0

func ToEvent(ctx context.Context, message Message, transformers ...TransformerFactory) (e ce.Event, encoding Encoding, err error)

Translates a Message with a valid Structured or Binary representation to an Event The TransformerFactories **aren't invoked** during the transformation to event, but after the event instance is generated

type EventMessage

type EventMessage ce.Event

EventMessage type-converts a cloudevents.Event object to implement Message. This allows local cloudevents.Event objects to be sent directly via Sender.Send()

s.Send(ctx, binding.EventMessage(e))

func (EventMessage) Binary added in v0.11.0

func (m EventMessage) Binary(ctx context.Context, b BinaryEncoder) (err error)

func (EventMessage) Encoding added in v1.1.0

func (m EventMessage) Encoding() Encoding

func (EventMessage) Finish

func (EventMessage) Finish(error) error

func (EventMessage) GetParent added in v1.1.0

func (m EventMessage) GetParent() Message

func (*EventMessage) SetEvent added in v1.1.0

func (m *EventMessage) SetEvent(e ce.Event) error

func (EventMessage) Structured

func (m EventMessage) Structured(ctx context.Context, builder StructuredEncoder) error

type EventTransformer added in v1.1.0

type EventTransformer func(*ce.Event) error

type ExactlyOnceMessage

type ExactlyOnceMessage interface {
	Message

	// Received is called by a forwarding QoS2 Sender when it gets
	// acknowledgment of receipt (e.g. AMQP 'accept' or MQTT PUBREC)
	//
	// The receiver must call settle(nil) when it get's the ack-of-ack
	// (e.g. AMQP 'settle' or MQTT PUBCOMP) or settle(err) if the
	// transfer fails.
	//
	// Finally the Sender calls Finish() to indicate the message can be
	// discarded.
	//
	// If sending fails, or if the sender does not support QoS 2, then
	// Finish() may be called without any call to Received()
	Received(settle func(error))
}

ExactlyOnceMessage is implemented by received Messages that support QoS 2. Only transports that support QoS 2 need to implement or use this interface.

type Message

type Message interface {
	// Return the type of the message Encoding.
	// The encoding should be preferably computed when the message is constructed.
	Encoding() Encoding

	// Structured transfers a structured-mode event to a StructuredEncoder.
	// Returns ErrNotStructured if message is not in structured mode.
	//
	// Returns a different err if something wrong happened while trying to read the structured event
	// In this case, the caller must Finish the message with appropriate error
	//
	// This allows Senders to avoid re-encoding messages that are
	// already in suitable structured form.
	Structured(context.Context, StructuredEncoder) error

	// Binary transfers a binary-mode event to an BinaryEncoder.
	// Returns ErrNotBinary if message is not in binary mode.
	//
	// Returns a different err if something wrong happened while trying to read the binary event
	// In this case, the caller must Finish the message with appropriate error
	//
	// Allows Senders to forward a binary message without allocating an
	// intermediate Event.
	Binary(context.Context, BinaryEncoder) error

	// Finish *must* be called when message from a Receiver can be forgotten by
	// the receiver. Sender.Send() calls Finish() when the message is sent.  A QoS
	// 1 sender should not call Finish() until it gets an acknowledgment of
	// receipt on the underlying transport.  For QoS 2 see ExactlyOnceMessage.
	//
	// Passing a non-nil err indicates sending or processing failed.
	// A non-nil return indicates that the message was not accepted
	// by the receivers peer.
	Finish(error) error
}

Message is the interface to a binding-specific message containing an event.

Reliable Delivery

There are 3 reliable qualities of service for messages:

0/at-most-once/unreliable: messages can be dropped silently.

1/at-least-once: messages are not dropped without signaling an error to the sender, but they may be duplicated in the event of a re-send.

2/exactly-once: messages are never dropped (without error) or duplicated, as long as both sending and receiving ends maintain some binding-specific delivery state. Whether this is persisted depends on the configuration of the binding implementations.

The Message interface supports QoS 0 and 1, the ExactlyOnceMessage interface supports QoS 2

The Structured and Binary methods provide optional optimized transfer of an event to a Sender, they may not be implemented by all Message instances. A Sender should try each method of interest and fall back to ToEvent() if none are supported.

func WithFinish added in v0.10.0

func WithFinish(m Message, finish func(error)) Message

WithFinish returns a wrapper for m that calls finish() and m.Finish() in its Finish(). Allows code to be notified when a message is Finished.

type MessageWrapper added in v1.1.0

type MessageWrapper interface {
	Message

	// Method to get the wrapped message
	GetWrappedMessage() Message
}

Message Wrapper interface is used to walk through a decorated Message and unwrap it.

type ReceiveCloser added in v0.10.0

type ReceiveCloser interface {
	Receiver
	Closer
}

ReceiveCloser is a Receiver that can be closed.

type Receiver

type Receiver interface {
	// Receive blocks till a message is received or ctx expires.
	//
	// A non-nil error means the receiver is closed.
	// io.EOF means it closed cleanly, any other value indicates an error.
	Receive(ctx context.Context) (Message, error)
}

Receiver receives messages.

type Requester added in v0.10.0

type Requester interface {
	// Request sends m like Sender.Send() but also arranges to receive a response.
	// The returned Receiver is used to receive the response.
	Request(ctx context.Context, m Message) (Receiver, error)
}

Requester sends a message and receives a response

Optional interface that may be implemented by protocols that support request/response correlation.

type SendCloser added in v0.10.0

type SendCloser interface {
	Sender
	Closer
}

SendCloser is a Sender that can be closed.

type Sender

type Sender interface {
	// Send a message.
	//
	// Send returns when the "outbound" message has been sent. The Sender may
	// still be expecting acknowledgment or holding other state for the message.
	//
	// m.Finish() is called when sending is finished: expected acknowledgments (or
	// errors) have been received, the Sender is no longer holding any state for
	// the message. m.Finish() may be called during or after Send().
	//
	// To support optimized forwading of structured-mode messages, Send()
	// should use the encoding returned by m.Structured() if there is one.
	// Otherwise m.Event() can be encoded as per the binding's rules.
	Send(ctx context.Context, m Message) error
}

Sender sends messages.

type StructuredEncoder added in v0.11.0

type StructuredEncoder interface {
	// Event receives an io.Reader for the whole event.
	SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error
}

StructuredEncoder is used to visit a structured Message and generate a new representation.

Protocols that supports structured encoding should implement this interface to implement direct structured -> structured transfer and event -> binary.

type TransformerFactories added in v0.11.0

type TransformerFactories []TransformerFactory

Utility type alias to manage multiple TransformerFactory

func (TransformerFactories) BinaryTransformer added in v0.11.0

func (t TransformerFactories) BinaryTransformer(encoder BinaryEncoder) BinaryEncoder

func (TransformerFactories) EventTransformer added in v0.11.0

func (t TransformerFactories) EventTransformer() EventTransformer

func (TransformerFactories) StructuredTransformer added in v0.11.0

func (t TransformerFactories) StructuredTransformer(encoder StructuredEncoder) StructuredEncoder

type TransformerFactory added in v0.11.0

type TransformerFactory interface {
	// Can return nil if the transformation doesn't support structured encoding directly
	StructuredTransformer(encoder StructuredEncoder) StructuredEncoder

	// Can return nil if the transformation doesn't support binary encoding directly
	BinaryTransformer(encoder BinaryEncoder) BinaryEncoder

	// Can return nil if the transformation doesn't support events
	EventTransformer() EventTransformer
}

Implements a transformation process while transferring the event from the Message implementation to the provided encoder

A transformer could optionally not provide an implementation for binary and/or structured encodings, returning nil to the respective factory method.

Directories

Path Synopsis
Package format formats structured events.
Package format formats structured events.
Package spec provides spec-version metadata.
Package spec provides spec-version metadata.
Package test contains test data and generic tests for testing bindings.
Package test contains test data and generic tests for testing bindings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL