server

package
v3.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2021 License: Apache-2.0 Imports: 42 Imported by: 3

Documentation

Overview

Package server is an interface for a micro server

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultContentType = "application/protobuf"

	DefaultCodecs = map[string]codec.NewCodec{
		"application/grpc":         grpc.NewCodec,
		"application/grpc+json":    grpc.NewCodec,
		"application/grpc+proto":   grpc.NewCodec,
		"application/json":         json.NewCodec,
		"application/json-rpc":     jsonrpc.NewCodec,
		"application/protobuf":     proto.NewCodec,
		"application/proto-rpc":    protorpc.NewCodec,
		"application/octet-stream": raw.NewCodec,
	}
)
View Source
var (
	DefaultAddress                 = ":0"
	DefaultName                    = "go.micro.server"
	DefaultVersion                 = "latest"
	DefaultId                      = uuid.New().String()
	DefaultServer           Server = newRpcServer()
	DefaultRouter                  = newRpcRouter()
	DefaultRegisterCheck           = func(context.Context) error { return nil }
	DefaultRegisterInterval        = time.Second * 30
	DefaultRegisterTTL             = time.Second * 90

	// NewServer creates a new server
	NewServer func(...Option) Server = newRpcServer
)

Functions

func Handle

func Handle(h Handler) error

Handle registers a handler interface with the default server to handle inbound requests

func Init

func Init(opt ...Option)

Init initialises the default server with options passed in

func NewContext

func NewContext(ctx context.Context, s Server) context.Context

func NewRouter

func NewRouter() *router

NewRouter returns a new router

func Run

func Run() error

Run starts the default server and waits for a kill signal before exiting. Also registers/deregisters the server

func Start

func Start() error

Start starts the default server

func Stop

func Stop() error

Stop stops the default server

func String

func String() string

String returns name of Server implementation

func Subscribe

func Subscribe(s Subscriber) error

Subscribe registers a subscriber interface with the default server which subscribes to specified topic with the broker

Types

type Handler

type Handler interface {
	Name() string
	Handler() interface{}
	Endpoints() []*registry.Endpoint
	Options() HandlerOptions
}

Handler interface represents a request handler. It's generated by passing any type of public concrete object with endpoints into server.NewHandler. Most will pass in a struct.

Example:

type Greeter struct {}

func (g *Greeter) Hello(context, request, response) error {
        return nil
}

func NewHandler

func NewHandler(h interface{}, opts ...HandlerOption) Handler

NewHandler creates a new handler interface using the default server Handlers are required to be a public object with public endpoints. Call to a service endpoint such as Foo.Bar expects the type:

type Foo struct {}
func (f *Foo) Bar(ctx, req, rsp) error {
	return nil
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error

HandlerFunc represents a single method of a handler. It's used primarily for the wrappers. What's handed to the actual method is the concrete request and response types.

type HandlerOption

type HandlerOption func(*HandlerOptions)

func EndpointMetadata

func EndpointMetadata(name string, md map[string]string) HandlerOption

EndpointMetadata is a Handler option that allows metadata to be added to individual endpoints.

func InternalHandler

func InternalHandler(b bool) HandlerOption

Internal Handler options specifies that a handler is not advertised to the discovery system. In the future this may also limit request to the internal network or authorised user.

type HandlerOptions

type HandlerOptions struct {
	Internal bool
	Metadata map[string]map[string]string
}

type HandlerWrapper

type HandlerWrapper func(HandlerFunc) HandlerFunc

HandlerWrapper wraps the HandlerFunc and returns the equivalent

type Message

type Message interface {
	// Topic of the message
	Topic() string
	// The decoded payload value
	Payload() interface{}
	// The content type of the payload
	ContentType() string
	// The raw headers of the message
	Header() map[string]string
	// The raw body of the message
	Body() []byte
	// Codec used to decode the message
	Codec() codec.Reader
}

Message is an async message interface

type Option

type Option func(*Options)

func Address

func Address(a string) Option

Address to bind to - host:port

func Advertise(a string) Option

The address to advertise for discovery - host:port

func Broker

func Broker(b broker.Broker) Option

Broker to use for pub/sub

func Codec

func Codec(contentType string, c codec.NewCodec) Option

Codec to use to encode/decode requests for a given content type

func Context

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the service Can be used for extra option values.

func Id

func Id(id string) Option

Unique server id

func Metadata

func Metadata(md map[string]string) Option

Metadata associated with the server

func Name

func Name(n string) Option

Server name

func RegisterCheck

func RegisterCheck(fn func(context.Context) error) Option

RegisterCheck run func before registry service

func RegisterInterval

func RegisterInterval(t time.Duration) Option

Register the service with at interval

func RegisterTTL

func RegisterTTL(t time.Duration) Option

Register the service with a TTL

func Registry

func Registry(r registry.Registry) Option

Registry used for discovery

func TLSConfig

func TLSConfig(t *tls.Config) Option

TLSConfig specifies a *tls.Config

func Tracer

func Tracer(t trace.Tracer) Option

Tracer mechanism for distributed tracking

func Transport

func Transport(t transport.Transport) Option

Transport mechanism for communication e.g http, rabbitmq, etc

func Version

func Version(v string) Option

Version of the service

func Wait

func Wait(wg *sync.WaitGroup) Option

Wait tells the server to wait for requests to finish before exiting If `wg` is nil, server only wait for completion of rpc handler. For user need finer grained control, pass a concrete `wg` here, server will wait against it on stop.

func WithRouter

func WithRouter(r Router) Option

WithRouter sets the request router

func WrapHandler

func WrapHandler(w HandlerWrapper) Option

Adds a handler Wrapper to a list of options passed into the server

func WrapSubscriber

func WrapSubscriber(w SubscriberWrapper) Option

Adds a subscriber Wrapper to a list of options passed into the server

type Options

type Options struct {
	Codecs       map[string]codec.NewCodec
	Broker       broker.Broker
	Registry     registry.Registry
	Tracer       trace.Tracer
	Transport    transport.Transport
	Metadata     map[string]string
	Name         string
	Address      string
	Advertise    string
	Id           string
	Version      string
	HdlrWrappers []HandlerWrapper
	SubWrappers  []SubscriberWrapper

	// RegisterCheck runs a check function before registering the service
	RegisterCheck func(context.Context) error
	// The register expiry time
	RegisterTTL time.Duration
	// The interval on which to register
	RegisterInterval time.Duration

	// The router for requests
	Router Router

	// TLSConfig specifies tls.Config for secure serving
	TLSConfig *tls.Config

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns config options for the default service

type Request

type Request interface {
	// Service name requested
	Service() string
	// The action requested
	Method() string
	// Endpoint name requested
	Endpoint() string
	// Content type provided
	ContentType() string
	// Header of the request
	Header() map[string]string
	// Body is the initial decoded value
	Body() interface{}
	// Read the undecoded request body
	Read() ([]byte, error)
	// The encoded message stream
	Codec() codec.Reader
	// Indicates whether its a stream
	Stream() bool
}

Request is a synchronous request interface

type Response

type Response interface {
	// Encoded writer
	Codec() codec.Writer
	// Write the header
	WriteHeader(map[string]string)
	// write a response directly to the client
	Write([]byte) error
}

Response is the response writer for unencoded messages

type Router

type Router interface {
	// ProcessMessage processes a message
	ProcessMessage(context.Context, Message) error
	// ServeRequest processes a request to completion
	ServeRequest(context.Context, Request, Response) error
}

Router handle serving messages

type Server

type Server interface {
	// Initialise options
	Init(...Option) error
	// Retrieve the options
	Options() Options
	// Register a handler
	Handle(Handler) error
	// Create a new handler
	NewHandler(interface{}, ...HandlerOption) Handler
	// Create a new subscriber
	NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
	// Register a subscriber
	Subscribe(Subscriber) error
	// Start the server
	Start() error
	// Stop the server
	Stop() error
	// Server implementation
	String() string
}

Server is a simple micro server abstraction

func FromContext

func FromContext(ctx context.Context) (Server, bool)

type Stream

type Stream interface {
	Context() context.Context
	Request() Request
	Send(interface{}) error
	Recv(interface{}) error
	Error() error
	Close() error
}

Stream represents a stream established with a client. A stream can be bidirectional which is indicated by the request. The last error will be left in Error(). EOF indicates end of the stream.

type StreamWrapper

type StreamWrapper func(Stream) Stream

StreamWrapper wraps a Stream interface and returns the equivalent. Because streams exist for the lifetime of a method invocation this is a convenient way to wrap a Stream as its in use for trace, monitoring, metrics, etc.

type Subscriber

type Subscriber interface {
	Topic() string
	Subscriber() interface{}
	Endpoints() []*registry.Endpoint
	Options() SubscriberOptions
}

Subscriber interface represents a subscription to a given topic using a specific subscriber function or object with endpoints. It mirrors the handler in its behaviour.

func NewSubscriber

func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber

NewSubscriber creates a new subscriber interface with the given topic and handler using the default server

type SubscriberFunc

type SubscriberFunc func(ctx context.Context, msg Message) error

SubscriberFunc represents a single method of a subscriber. It's used primarily for the wrappers. What's handed to the actual method is the concrete publication message.

type SubscriberOption

type SubscriberOption func(*SubscriberOptions)

func DisableAutoAck

func DisableAutoAck() SubscriberOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func InternalSubscriber

func InternalSubscriber(b bool) SubscriberOption

Internal Subscriber options specifies that a subscriber is not advertised to the discovery system.

func SubscriberContext

func SubscriberContext(ctx context.Context) SubscriberOption

SubscriberContext set context options to allow broker SubscriberOption passed

func SubscriberQueue

func SubscriberQueue(n string) SubscriberOption

Shared queue name distributed messages across subscribers

type SubscriberOptions

type SubscriberOptions struct {
	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is acked.
	AutoAck  bool
	Queue    string
	Internal bool
	Context  context.Context
}

func NewSubscriberOptions

func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions

type SubscriberWrapper

type SubscriberWrapper func(SubscriberFunc) SubscriberFunc

SubscriberWrapper wraps the SubscriberFunc and returns the equivalent

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL