qp

package module
v0.0.0-...-226497a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2014 License: MIT Imports: 8 Imported by: 0

README

go GoDoc wercker status

The Go implementation of the QP protocol.

Usage

See the example code for a working example of how to implement Go services using QP.

Publish

Use a NewPublisher to publish events.

// make a transport
transport := redis.NewPubSub("127.0.0.1:6379")
transport.Start()
defer transport.Stop()

// make a publisher
pub := qp.NewPublisher("name", "instanceID", qp.JSON, transport)

// publish messages
pub.Publish("channel1", "Hello QP")
pub.Publish("channel2", "Bonjour QP")
pub.Publish("channel3", "¡hola! QP")
Subscribe

Use a NewSubscriber to subscribe to events.

// make a transport
transport := redis.NewPubSub("127.0.0.1:6379")
transport.Start()
defer transport.Stop()

// make a subscriber
sub := qp.NewSubscriber(qp.JSON, transport)

// subscribe to messages
sub.SubscribeFunc("channel1", func(event *qp.Event) {
  log.Println(event)
})
sub.SubscribeFunc("channel2", func(event *qp.Event) {
  log.Println(event)
})
sub.SubscribeFunc("channel3", func(event *qp.Event) {
  log.Println(event)
})
Request

Use a NewRequester to make requests.

// make a transport
transport := redis.NewDirect("127.0.0.1:6379")
transport.Start()
defer transport.Stop()

// make a requester
req := qp.NewRequester("webserver", "one", qp.JSON, t)

// issue a request and wait 1 second for the response
response := req.Issue([]string{"channel1","channel2","channel3"}, "some data").Response(1 * time.Second)

log.Println(response)
Responders

Use a NewResponder to respond to requests.

// make a transport
transport := redis.NewDirect("127.0.0.1:6379")
transport.Start()
defer transport.Stop()

res := qp.NewResponder("service", "one", qp.JSON, t)
res.HandleFunc("channel1", func(r *qp.Request) {
  // do some work and update r.Data
})
res.HandleFunc("channel2", func(r *qp.Request) {
  // do some work and update r.Data
})
Service

A Service is a special Responder that responds to requests on a channel of its own name.

// make a transport
transport := redis.NewDirect("127.0.0.1:6379")
transport.Start()
defer transport.Stop()

qp.ServiceFunc("serviceName", "instance", qp.JSON, transport, func(r *qp.Request) {
  // provide your service
})

Documentation

Overview

Package qp implements the "QP" protocol found at qp.github.io

The QP protocol allows for agnostic communication with any underlying message queue system. By using QP, you remove technology-dependent code from your application, while gaining additional functionality, such as QP's pipeline concept.

Publish and Subscribe

The pub/sub model is achieved by using the Publisher and Subscriber types, which expose Publish and Subscribe methods respectively.

Request and Response

Making requests and getting back a response from a pipeline of handlers is handled by using the Requester type, which offers the Issue method.

Building services that respond to requests can be achieved by using the Responder type, which exposes the Handle method.

Name and instance ID

Most types require a name and instance ID. The name describes the type of capability being provided. Instance ID allows for many things of the same type to coexist. All name and instance ID combinations must be unique within a system.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotRunning = errors.New("transport is not running")

ErrNotRunning is returned when an method is called on a transport that is not running.

View Source
var ErrRunning = errors.New("transport is running")

ErrRunning is returned when an method is called on a transport that is running.

View Source
var ErrTimeout = errors.New("timed out")

ErrTimeout represents situations when timeouts have occurred.

View Source
var JSON = NewCodec(func(object interface{}) ([]byte, error) {
	return json.Marshal(object)
}, func(data []byte, to interface{}) error {
	return json.Unmarshal(data, to)
})

JSON is a Codec that talks JSON.

Functions

func Service

func Service(name, instanceID string, codec Codec, transport DirectTransport, handler TransactionHandler) error

Service is an endpoint that automatically subscribes to its own name, allowing other endpoints to issue requests to it. Multiple services with the same name will automatically draw upon the same channel, creating implicit load balancing.

func ServiceFunc

func ServiceFunc(name, instanceID string, codec Codec, transport DirectTransport, handler TransactionHandlerFunc) error

ServiceFunc creates a service with a TransactionHandlerFunc rather than a TransactionHandler.

func ServiceLogger

func ServiceLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger, handler TransactionHandler) error

ServiceLogger does the same thing as Service but also uses the specified Logger to log to.

func ServiceLoggerFunc

func ServiceLoggerFunc(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger, handler TransactionHandlerFunc) error

ServiceLoggerFunc does the same thing ServiceLogger does but takes a TransactionHandlerFunc rather than a TransactionHandler.

Types

type Codec

type Codec interface {
	// Marshal takes an object and creates a byte slice representation
	// of the object in the underlying data format.
	Marshal(object interface{}) ([]byte, error)
	// Unmarshal takes a bytes slice of data in the underlying data format
	// and decodes it into the provided object
	Unmarshal(data []byte, to interface{}) error
}

Codec defines types that can marshal and unmarshal data to and from bytes.

func NewCodec

func NewCodec(marshal func(object interface{}) ([]byte, error), unmarshal func(data []byte, to interface{}) error) Codec

NewCodec makes a new Codec with the specified marshal and unmarshal functions.

type DirectTransport

type DirectTransport interface {
	start.StartStopper
	// Send sends data on the channel.
	Send(channel string, data []byte) error
	// OnMessage binds the handler to the specified channel.
	// Only one handler can be associated with a given channel.
	// Multiple calls to OnMessage wiht the same channel will replace the previous handler.
	OnMessage(channel string, handler Handler) error
}

DirectTransport represents a transport capable of providing request/response capabilities.

type Event

type Event struct {
	// From is the instance ID of the sender.
	From string `json:"from"`
	// Data is the payload of the event.
	Data interface{} `json:"data"`
}

Event defines all the fields and information included as part of a Event to a request.

type EventHandler

type EventHandler interface {
	Handle(*Event)
}

EventHandler represents types capable of handling Requests.

type EventHandlerFunc

type EventHandlerFunc func(*Event)

EventHandlerFunc represents functions capable of handling Requests.

func (EventHandlerFunc) Handle

func (f EventHandlerFunc) Handle(r *Event)

Handle calls the EventHandlerFunc in order to handle the specific Event.

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future implements a future for a response object It allows execution to continue until the response object is requested from this object, at which point it blocks and waits for the response to come back.

func (*Future) Response

func (r *Future) Response(timeout time.Duration) (*Transaction, error)

Response uses a future mechanism to retrieve the response. Execution continues asynchronously until this method is called, at which point execution blocks until the Response object is available, or if the timeout is reached. If the Response times out, nil is returned.

type Handler

type Handler interface {
	Handle(msg *Message)
}

Handler represents types capable of handling messages from the transports.

type HandlerFunc

type HandlerFunc func(msg *Message)

HandlerFunc represents functions capable of handling messages.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(msg *Message)

Handle calls the HandlerFunc.

type Message

type Message struct {
	// The channel the Message came from.
	Source string
	// The data of the message.
	Data []byte
}

Message represents a single message of data and its source.

func (*Message) String

func (m *Message) String() string

type PubSubTransport

type PubSubTransport interface {
	start.StartStopper
	// Publish publishes data on the specified channel.
	Publish(channel string, data []byte) error
	// Subscribe binds the handler to the specified channel.
	// Only one handler can be associated with a given channel.
	// Multiple calls to Subscribe with the same channel will replace the previous handler.
	Subscribe(channel string, handler Handler) error
}

PubSubTransport represents a transport capable of providing publish/subscribe capabilities.

type Publisher

type Publisher interface {
	// Publish publishes the object on the specified channel.
	Publish(channel string, obj interface{}) error
}

Publisher represents types capable of publishing events.

func NewPublisher

func NewPublisher(name, instanceID string, codec Codec, transport PubSubTransport) Publisher

NewPublisher makes a new publisher capable of Publishing events.

type RequestID

type RequestID uint64

RequestID represents a unique ID for a Request.

type Requester

type Requester interface {
	// Issue issues the request and returns a Future from which you can
	// get the response.
	// The pipeline may be one or more endpoints. If it is more than one, each will receive
	// the message, in order, and have an opportunity to mutate it before it is dispatched
	// to the next endpoint in the pipeline.
	// The provided object will be serialized and send as the "data" field in the message.
	Issue(pipeline []string, obj interface{}) (*Future, error)
}

Requester represents a type capable of issuing requests and getting responses.

func NewRequester

func NewRequester(name, instanceID string, codec Codec, transport DirectTransport) (Requester, error)

NewRequester makes a new object capable of making requests and handling responses.

func NewRequesterLogger

func NewRequesterLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger) (Requester, error)

NewRequesterLogger makes a new object capable of making requests and handling responses with logs going to the specified Logger.

type Responder

type Responder interface {
	// Handle binds a TransactionHandler to the specified channel.
	Handle(channel string, handler TransactionHandler) error
	// HandleFunc binds the specified function to the specified channel.
	HandleFunc(channel string, f TransactionHandlerFunc) error
}

Responder represents types capable of responding to requests.

func NewResponder

func NewResponder(name, instanceID string, codec Codec, transport DirectTransport) Responder

NewResponder makes a new object capable of responding to requests.

func NewResponderLogger

func NewResponderLogger(name, instanceID string, codec Codec, transport DirectTransport, logger slog.Logger) Responder

NewResponderLogger makes a new object capable of responding to requests, which will log errors to the specified Logger.

type Signal

type Signal struct{}

Signal is an empty struct type useful for signalling down channels.

type Subscriber

type Subscriber interface {
	// Subscribe binds the handler to the specified channel.
	Subscribe(channel string, handler EventHandler) error
	// SubscribeFunc binds the EventHandlerFunc to the specified channel.
	SubscribeFunc(channel string, fn EventHandlerFunc) error
}

Subscriber represents types capable of subscribing to events.

func NewSubscriber

func NewSubscriber(codec Codec, transport PubSubTransport) Subscriber

NewSubscriber creates a Subscriber object capable of subscribing to events.

func NewSubscriberLogger

func NewSubscriberLogger(codec Codec, transport PubSubTransport, logger slog.Logger) Subscriber

NewSubscriberLogger creates a Subscriber object capable of subscribing to events, while logging errors to the specified logger.

type Transaction

type Transaction struct {
	// To is an array of destination addresses
	To []string `json:"to"`
	// From is an array of addresses encountered thus far
	From []string `json:"from"`
	// ID is a number identifying this message
	ID RequestID `json:"id"`
	// Data is an arbitrary data payload
	Data interface{} `json:"data"`
}

Transaction defines all the fields and information in the standard qp request object.

func (*Transaction) Abort

func (r *Transaction) Abort()

Abort clears the To slice indicating that the Transaction should be sent back to the originator.

type TransactionHandler

type TransactionHandler interface {
	Handle(req *Transaction) *Transaction
}

TransactionHandler represents types capable of handling Requests.

type TransactionHandlerFunc

type TransactionHandlerFunc func(r *Transaction) *Transaction

TransactionHandlerFunc represents functions capable of handling Requests.

func (TransactionHandlerFunc) Handle

Handle calls the TransactionHandlerFunc in order to handle the specific Transaction.

Directories

Path Synopsis
codecs
Package example is a real multi-service implementation with HTTP endpoint (server) that uses Redis transports.
Package example is a real multi-service implementation with HTTP endpoint (server) that uses Redis transports.
Package inproc provides transports for in-process operations.
Package inproc provides transports for in-process operations.
Package redis implements the various qp transports.
Package redis implements the various qp transports.
Package templates contains template code for writing QP components, such as codecs and transports.
Package templates contains template code for writing QP components, such as codecs and transports.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL