grid

package module
v3.2.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

README

grid

Grid is a library for doing distributed processing. It's main goal is to help in scheduling fine-grain stateful computations, which grid calls actors, and sending data between them. Its only service dependency is an Etcd v3 server, used for discovery and coordination. Grid uses gRPC for communication, and sends Protobuf messages.

Example

Below is a basic example of starting your grid application. If a "leader" definition is registered, the leader actor will be started for you when Serve is called. The "leader" actor can be thought of as an entry-point into you distributed application. You don't have to use it, but it is often convenient.

No matter how many processes are participating in the grid, only one leader actor is started per namespace, it is a singleton. The actor named "leader" is also special in that if the process currently running the leader dies, the leader will be started on another peer, if more than one peer is participating in the grid.

func main() {
    etcd, err := etcdv3.New(...)
    ...

    server, err := grid.NewServer(etcd, grid.ServerCfg{Namespace: "mygrid"})
    ...

    server.RegisterDef("leader", func(_ []byte) (grid.Actor, error) { return &LeaderActor{...}, nil })
    server.RegisterDef("worker", func(_ []byte) (grid.Actor, error) { return &WorkerActor{...}, nil })

    lis, err := net.Listen("tcp", ...)
    ...

    err = server.Serve(lis)
    ...
}

Actor

Anything that implements the Actor interface is an actor. Actors typically represent the central work of you application.

type Actor interface {
    Act(ctx context.Context)
}

Example Actor, Part 1

Below is an actor that starts other actors, this is a typical way of structuring an application with grid. Here the leader actor starts a worker on each peer in the grid. Actors are started by sending an ActorStart message to a peer. Each actor must have a unique name, per namespace. The name is registered in Etcd to make sure that it is unique across all the processes of a grid.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    // Discover participating peers.
    peers, err := a.client.Query(timeout, grid.Peers)
    ...

    for _, peer := range peers {
        // Actor names are unique, registered in etcd.
        // There can never be more than one actor with
        // a given name. When an actor exits or panics
        // its record is removed from etcd.
        start := grid.NewActorStart("worker-for-%v", peer.Peer())
        start.Type = "worker"

        // Start a new actor on the given peer. The message
        // "ActorStart" is special. When sent to the mailbox
        // of a peer, that peer will start an actor based on
        // the definition.
        res, err := a.client.Request(timeout, peer.Name(), start)
        ...
    }

    ...
}

Example Actor, Part 2

An actor will typically need to receive data to work on. This may come from the filesystem or a database, but it can also come from messages sent to a mailbox. Just like actors, a mailbox is unique by name. Etcd is used to register the name and guarantee that only one such mailbox exists.

const size = 10

type WorkerActor struct {
    server *grid.Server
}

func (a *WorkerActor) Act(ctx context.Context) {
    name, err := grid.ContextActorName(ctx)
    ...

    // Listen to a mailbox with the same
    // name as the actor.
    mailbox, err := grid.NewMailbox(a.server, name, size)
    ...
    defer mailbox.Close()

    for {
        select {
        case req := <-mailbox.C:
            switch req.Msg().(type) {
            case PingMsg:
                err := req.Respond(&PongMsg{
                    ...
                })
        }
    }
}

Example Actor, Part 3

Each actor receives a context as a parameter in its Act method. That context is created by the peer that started the actor. The context contains several useful values, they can be extracted using the Context* functions.

func (a *WorkerActor) Act(ctx context.Context) {
    // The ID of the actor in etcd.
    id, err := grid.ContextActorID(ctx)

    // The name of the actor, as given in ActorStart.
    name, err := grid.ContextActorName(ctx)

    // The namespace of the grid this actor is associated with.
    namespace, err := grid.ContextActorNamespace(ctx)
}

Example Actor, Part 4

An actor can exit whenever it wants, but it must exit when its context signals done. An actor should always monitor its context Done channel.

func (a *WorkerActor) Act(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            // Stop requested, clean up and exit.
            return
        case ...
        }
    }
}

Example Actor, Part 5

Each actor is registered into etcd. Consequently each actor's name acts like a mutex. If code requests the actor to start twice the second request will receive an error indicating that the actor is already registered.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    start := grid.NewActorStart("worker-%d", 0)
    start.Type = "worker"

    // First request to start.
    err := a.client.Request(timeout, peer, start)

    // Second request will fail, if the first succeeded.
    err = a.client.Request(timeout, peer, start)
}

Testing

With out running the following setup commands, you'll get a panic because the init function for the package golang.org\x\net\trace will be run twice and cause an http already registered panic.

$ go test
panic: /debug/requests is already registered. You may have two independent copies of golang.org/x/net/trace in your binary, trying to maintain separate state. This may involve a vendored copy of golang.org/x/net/trace.

The work around is to create a vendor directory:

go mod vendor

Note vendor/is included in the .gitignore file.

Kubernetes + Grid

The examples above are meant to give some intuitive sense of what the grid library does. Howevery what it does not do is:

  1. Package up your configuration and binaries
  2. Start your VMs
  3. Start your processes on those VMs
  4. Autoscale your VMs when resources run low
  5. Reschedule crashed processes
  6. etc...

This is intentional as other tools already do these things. At the top of our list is Kubernetes and Docker, which between the two perform all of the above.

Grid comes into the picture once you start building out your application logic and need things like coordination and messaging, which under the hood in grid is done with Etcd and gRPC - taking care of some boilerplate code for you.

Sending Messages

Sending messages is always done through the client. The client configuration has only one required parameter, the namespace of the grid to connect to. Different namespaces can communicate by simply creating clients to the namespace they wish to send messages.

const timeout = 2 * time.Second


func Example() {
    etcd, err := etcdv3.New(...)
    ...

    client, err := grid.NewClient(etcd, grid.ClientCfg{Namespace: "myapp"})
    ...

    res, err := client.Request(timeout, "some-mailbox-name", &MyMsg{
        ...
    })

    ... process the response ...
}

Broadcasting Messages

Broadcasting messages is a way for the client to send messages to a group of actors. There are currently two different strategies for message broadcasting:

  • First-one-wins, where the request context is canceled as soon as one actor responds to the message.
  • Delivery to all actors, waits for all responses or timeouts
const (
 timeout = 2 * time.Second
 numRetry = 3
)

func Example() {
 etcd, err := etcdv3.New(...)
 ...

 client, err := grid.NewClient(etcd, grid.ClientCfg{Namespace: "myapp"})
 ...

 grp := grid.NewListGroup("actor-1", "actor-2", "actor-3")

 // Make a request to each actor in the group in parallel, first result
 // back cancels all the other requests.
 res, err := client.Broadcast(timeout, grp.Fastest(), &MyMsg{...})


 // Deliver to all actors in the group, retry just those that
 // were not successful in the previous try, and fold new
 // results into previous results.
 var res, tmp BroadcastResult
 var err error
 retry.X(numRetry, func() bool  {
    tmp, err = client.Broadcast(timeout, grp.ExceptSuccesses(res), &MyMsg{...})
    res.Add(tmp)
    return err != nil
 })
}

Registering Messages

Every type of message must be registered before use. Each message must be a Protobuf message. See the Go Protobuf Tutorial for more information, or the example below:

syntax = "proto3";
package msg;

message Person {
    string name = 1;
    string email = 2;
    ...
}

Before using the message it needs to be registered, which can be done inside init functions, the main function, or just before first sending and receiving the message.

func main() {
    grid.Register(msg.Person{})

    ...
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidName when name contains invalid character codes.
	ErrInvalidName = errors.New("grid: invalid name")
	// ErrInvalidNamespace when namespace contains invalid
	// character codes.
	ErrInvalidNamespace = errors.New("grid: invalid namespace")
	// ErrInvalidActorType when the actor type contains invalid
	// character codes.
	ErrInvalidActorType = errors.New("grid: invalid actor type")
	// ErrInvalidActorName when the actor name contains invalid
	// character codes.
	ErrInvalidActorName = errors.New("grid: invalid actor name")
	// ErrInvalidMailboxName when a mailbox name contains invalid
	// character codes.
	ErrInvalidMailboxName = errors.New("grid: invalid mailbox name")
)
View Source
var (
	// ErrReceiverBusy when the message buffer of a mailbox is
	// full, conisder a larger size when creating the mailbox.
	ErrReceiverBusy = errors.New("grid: receiver busy")
	// ErrUnknownMailbox when a message is received by a peer for
	// a mailbox the peer does not serve, likely the mailbox has
	// moved between the time of discovery and the message receive.
	ErrUnknownMailbox = errors.New("grid: unknown mailbox")
	// ErrUnregisteredMailbox when a mailbox name does not exist in
	// the registry, likely it was never created or has died.
	ErrUnregisteredMailbox = errors.New("grid: unregistered mailbox")
	// ErrContextFinished when the context signals done before the
	// request could receive a response from the receiver.
	ErrContextFinished = errors.New("grid: context finished")
	// ErrIncompleteBroadcast when the Broadcast cannot successfully request
	// an actor in the Group
	ErrIncompleteBroadcast = errors.New("grid: incomplete broadcast")
)
View Source
var (
	// ErrNilEtcd when the etcd argument is nil.
	ErrNilEtcd = errors.New("grid: nil etcd")
	// ErrNilActor when an actor definition has been registered
	// but returns a nil actor and nil error when creating an actor.
	ErrNilActor = errors.New("grid: nil actor")
	// ErrInvalidContext when a context does not contain
	// the requested values.
	ErrInvalidContext = errors.New("grid: invalid context")
	// ErrDefNotRegistered when a actor type which has never
	// been registered is requested for start.
	ErrDefNotRegistered = errors.New("grid: def not registered")
	// ErrServerNotRunning when an operation which requires the
	// server be running, but is not, is requested.
	ErrServerNotRunning = errors.New("grid: server not running")
	// ErrAlreadyRegistered when a mailbox is created but someone
	// else has already created it.
	ErrAlreadyRegistered = errors.New("grid: already registered")
	// ErrWatchClosedUnexpectedly when a query watch closes before
	// it was requested to close, likely do to some etcd issue.
	ErrWatchClosedUnexpectedly = errors.New("grid: watch closed unexpectedly")
)
View Source
var (
	// ErrNilGroup when a method that requires a group is called
	// with a nil group.
	ErrNilGroup = errors.New("grid: nil group")
	// ErrNilClient when a client method is called on a nil client.
	ErrNilClient = errors.New("grid: nil client")
)
View Source
var (
	Delivery_Ver_name = map[int32]string{
		0: "V1",
	}
	Delivery_Ver_value = map[string]int32{
		"V1": 0,
	}
)

Enum value maps for Delivery_Ver.

View Source
var (
	// ErrAlreadyResponded when respond is called multiple
	// times on a request.
	ErrAlreadyResponded = errors.New("already responded")
)
View Source
var Wire_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "grid.wire",
	HandlerType: (*WireServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "Process",
			Handler:    _Wire_Process_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "wire.proto",
}

Wire_ServiceDesc is the grpc.ServiceDesc for Wire service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

Functions

func ContextActorID

func ContextActorID(c context.Context) (string, error)

ContextActorID returns the ID that is used to register the actor in etcd.

func ContextActorName

func ContextActorName(c context.Context) (string, error)

ContextActorName returns just the actor name, ie: no namespace, associated with this context.

func ContextActorNamespace

func ContextActorNamespace(c context.Context) (string, error)

ContextActorNamespace returns the namespace of the grid this actor is associated with.

func Register

func Register(v interface{}) error

Register a message so it may be sent and received. Value v should not be a pointer to a type, but the type itself.

For example:

Register(MyMsg{})    // Correct
Register(&MyMsg{})   // Incorrect

func RegisterWireServer

func RegisterWireServer(s grpc.ServiceRegistrar, srv WireServer)

Types

type Ack

type Ack struct {
	// contains filtered or unexported fields
}

func (*Ack) Descriptor deprecated

func (*Ack) Descriptor() ([]byte, []int)

Deprecated: Use Ack.ProtoReflect.Descriptor instead.

func (*Ack) ProtoMessage

func (*Ack) ProtoMessage()

func (*Ack) ProtoReflect added in v3.2.14

func (x *Ack) ProtoReflect() protoreflect.Message

func (*Ack) Reset

func (x *Ack) Reset()

func (*Ack) String

func (x *Ack) String() string

type Actor

type Actor interface {
	Act(c context.Context)
}

Actor that does work.

type ActorStart

type ActorStart struct {
	Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
	Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
	// contains filtered or unexported fields
}

func NewActorStart

func NewActorStart(name string, v ...interface{}) *ActorStart

NewActorStart message with the name of the actor to start, its type will be equal to its name unless its changed:

start := NewActorStart("worker")

Format names can also be used for more complicated names, just remember to override the type:

start := NewActorStart("worker-%d-group-%d", i, j)
start.Type = "worker"

func (*ActorStart) Descriptor deprecated

func (*ActorStart) Descriptor() ([]byte, []int)

Deprecated: Use ActorStart.ProtoReflect.Descriptor instead.

func (*ActorStart) GetData

func (x *ActorStart) GetData() []byte

func (*ActorStart) GetName

func (x *ActorStart) GetName() string

func (*ActorStart) GetType

func (x *ActorStart) GetType() string

func (*ActorStart) ProtoMessage

func (*ActorStart) ProtoMessage()

func (*ActorStart) ProtoReflect added in v3.2.14

func (x *ActorStart) ProtoReflect() protoreflect.Message

func (*ActorStart) Reset

func (x *ActorStart) Reset()

func (*ActorStart) String

func (x *ActorStart) String() string

type BroadcastResult

type BroadcastResult map[string]*Result

BroadcastResult is used to store the results of the Broadcast

func (BroadcastResult) Add

func (b BroadcastResult) Add(other BroadcastResult)

Add combines two BroadcastResults, by overwriting previous results if they exist

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client for grid-actors or non-actors to make requests to grid-actors. The client can be used by multiple go-routines.

func NewClient

func NewClient(etcd *etcdv3.Client, cfg ClientCfg) (*Client, error)

NewClient using the given etcd client and configuration.

func (*Client) Broadcast

func (c *Client) Broadcast(timeout time.Duration, g *Group, msg interface{}) (BroadcastResult, error)

Broadcast a message to all members in a Group

func (*Client) BroadcastC

func (c *Client) BroadcastC(ctx context.Context, g *Group, msg interface{}) (BroadcastResult, error)

BroadcastC (broadcast) a message to all members in a Group. The context can be used to control cancellations or timeouts

func (*Client) Check added in v3.2.14

func (c *Client) Check(ctx context.Context, peer string) (*healthpb.HealthCheckResponse, error)

func (*Client) Close

func (c *Client) Close() error

Close all outbound connections of this client immediately.

func (*Client) Query

func (c *Client) Query(timeout time.Duration, filter EntityType) ([]*QueryEvent, error)

Query in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes.

func (*Client) QueryC

func (c *Client) QueryC(ctx context.Context, filter EntityType) ([]*QueryEvent, error)

QueryC (query) in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes. The context can be used to control cancelation or timeouts.

func (*Client) QueryWatch

func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error)

QueryWatch monitors the entry and exit of peers, actors, or mailboxes.

Example usage:

client, err := grid.NewClient(...)
...

currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers)
...

for _, peer := range currentpeers {
    // Do work regarding peer.
}

for event := range watch {
    switch event.Type {
    case grid.WatchError:
        // Error occured watching peers, deal with error.
    case grid.EntityLost:
        // Existing peer lost, reschedule work on extant peers.
    case grid.EntityFound:
        // New peer found, assign work, get data, reschedule, etc.
    }
}

func (*Client) Request

func (c *Client) Request(timeout time.Duration, receiver string, msg interface{}) (interface{}, error)

Request a response for the given message.

func (*Client) RequestC

func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) (interface{}, error)

RequestC (request) a response for the given message. The context can be used to control cancelation or timeouts.

func (*Client) WaitUntilServing added in v3.2.14

func (c *Client) WaitUntilServing(ctx context.Context, peer string) error

WaitUntilServing blocks until the peer is serving or the context is done. Will retry with exponential backoff.

func (*Client) Watch added in v3.2.14

func (c *Client) Watch(ctx context.Context, peer string) (healthpb.Health_WatchClient, error)

type ClientCfg

type ClientCfg struct {
	// Namespace of grid.
	Namespace string
	// Timeout for communication with etcd, and internal gossip.
	Timeout time.Duration
	// PeersRefreshInterval for polling list of peers in etcd.
	PeersRefreshInterval time.Duration
	// ConnectionsPerPeer sets the number gRPC connections to
	// establish to each remote. Default is max(1, numCPUs/2).
	// More connections allow for more messages per second,
	// but increases the number of file-handles used.
	ConnectionsPerPeer int
	// Logger optionally used for logging, default is to not log.
	Logger Logger
}

ClientCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.

type Delivery

type Delivery struct {
	Ver      Delivery_Ver `protobuf:"varint,1,opt,name=ver,proto3,enum=grid.Delivery_Ver" json:"ver,omitempty"`
	Data     []byte       `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	TypeName string       `protobuf:"bytes,3,opt,name=typeName,proto3" json:"typeName,omitempty"`
	Receiver string       `protobuf:"bytes,4,opt,name=receiver,proto3" json:"receiver,omitempty"`
	// contains filtered or unexported fields
}

func (*Delivery) Descriptor deprecated

func (*Delivery) Descriptor() ([]byte, []int)

Deprecated: Use Delivery.ProtoReflect.Descriptor instead.

func (*Delivery) GetData

func (x *Delivery) GetData() []byte

func (*Delivery) GetReceiver

func (x *Delivery) GetReceiver() string

func (*Delivery) GetTypeName

func (x *Delivery) GetTypeName() string

func (*Delivery) GetVer

func (x *Delivery) GetVer() Delivery_Ver

func (*Delivery) ProtoMessage

func (*Delivery) ProtoMessage()

func (*Delivery) ProtoReflect added in v3.2.14

func (x *Delivery) ProtoReflect() protoreflect.Message

func (*Delivery) Reset

func (x *Delivery) Reset()

func (*Delivery) String

func (x *Delivery) String() string

type Delivery_Ver

type Delivery_Ver int32
const (
	Delivery_V1 Delivery_Ver = 0
)

func (Delivery_Ver) Descriptor added in v3.2.14

func (Delivery_Ver) Enum added in v3.2.14

func (x Delivery_Ver) Enum() *Delivery_Ver

func (Delivery_Ver) EnumDescriptor deprecated

func (Delivery_Ver) EnumDescriptor() ([]byte, []int)

Deprecated: Use Delivery_Ver.Descriptor instead.

func (Delivery_Ver) Number added in v3.2.14

func (Delivery_Ver) String

func (x Delivery_Ver) String() string

func (Delivery_Ver) Type added in v3.2.14

type EchoMsg

type EchoMsg struct {
	Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
	// contains filtered or unexported fields
}

func (*EchoMsg) Descriptor deprecated

func (*EchoMsg) Descriptor() ([]byte, []int)

Deprecated: Use EchoMsg.ProtoReflect.Descriptor instead.

func (*EchoMsg) GetMsg

func (x *EchoMsg) GetMsg() string

func (*EchoMsg) ProtoMessage

func (*EchoMsg) ProtoMessage()

func (*EchoMsg) ProtoReflect added in v3.2.14

func (x *EchoMsg) ProtoReflect() protoreflect.Message

func (*EchoMsg) Reset

func (x *EchoMsg) Reset()

func (*EchoMsg) String

func (x *EchoMsg) String() string

type EntityType

type EntityType string
const (
	// Peers filter for query.
	Peers EntityType = "peer"
	// Actors filter for query.
	Actors EntityType = "actor"
	// Mailboxes filter for query.
	Mailboxes EntityType = "mailbox"
)

type EventType

type EventType int

EventType categorizing the event.

const (
	WatchError  EventType = 0
	EntityLost  EventType = 1
	EntityFound EventType = 2
)

type GRPCMailbox added in v3.2.14

type GRPCMailbox struct {
	// contains filtered or unexported fields
}

GRPCMailbox for receiving messages.

func (*GRPCMailbox) C added in v3.2.14

func (box *GRPCMailbox) C() <-chan Request

func (*GRPCMailbox) Close added in v3.2.14

func (box *GRPCMailbox) Close() error

Close the mailbox.

func (*GRPCMailbox) Name added in v3.2.14

func (box *GRPCMailbox) Name() string

Name of mailbox, without namespace.

func (*GRPCMailbox) String added in v3.2.14

func (box *GRPCMailbox) String() string

String of mailbox name, with full namespace.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group defines a group of actors. This struct is primarily used for broadcasting messages to all actors in a Group.

func NewListGroup

func NewListGroup(members ...string) *Group

NewListGroup creates a new Group

func (*Group) ExceptSuccesses

func (g *Group) ExceptSuccesses(res BroadcastResult) *Group

ExceptSuccesses filters out the successful members of the Group

func (*Group) Fastest

func (g *Group) Fastest() *Group

Fastest ensures that the Broadcast returns the BroadcastResult for the fastest member in the Group

func (*Group) Members

func (g *Group) Members() []string

Members returns the members (actors) of the Group

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger hides the logging function Printf behind a simple interface so libraries such as logrus can be used.

type Mailbox

type Mailbox interface {
	C() <-chan Request
	Close() error
}

type MakeActor

type MakeActor func(data []byte) (Actor, error)

MakeActor using the given data to parameterize the making of the actor; the data is optional.

type QueryEvent

type QueryEvent struct {
	// contains filtered or unexported fields
}

QueryEvent indicating that an entity has been discovered, lost, or some error has occured with the watch.

func NewQueryEvent

func NewQueryEvent(name, peer string, err error, entity EntityType, eventType EventType, annotations []string) *QueryEvent

NewQueryEvent does what it says.

func (*QueryEvent) Annotations

func (e *QueryEvent) Annotations() []string

Annotations of named entity. Currently only used by Peers as an option to the grid server.

func (*QueryEvent) Err

func (e *QueryEvent) Err() error

Err caught watching query events. The error is not associated with any particular entity, it's an error with the watch itself or a result of the watch.

func (*QueryEvent) Name

func (e *QueryEvent) Name() string

Name of entity that caused the event. For example, if mailboxes were queried the name is the mailbox name.

func (*QueryEvent) Peer

func (e *QueryEvent) Peer() string

Peer of named entity. For example, if mailboxes were queried then it's the peer the mailbox is running on. If the query was for peers, then methods Name and Peer return the same string.

func (*QueryEvent) String

func (e *QueryEvent) String() string

String representation of query event.

func (*QueryEvent) Type

func (e *QueryEvent) Type() EventType

EventType gets the type of event

type Request

type Request interface {
	Context() context.Context
	Msg() interface{}
	Ack() error
	Respond(msg interface{}) error
}

Request which must receive an ack or response.

type Result

type Result struct {
	Err error
	Val interface{}
}

Result stores the result of a Request

type Server

type Server struct {
	UnimplementedWireServer
	// contains filtered or unexported fields
}

Server of a grid.

func NewServer

func NewServer(etcd *etcdv3.Client, cfg ServerCfg) (*Server, error)

NewServer for the grid. The namespace must contain only characters in the set: [a-zA-Z0-9-_] and no other.

func (*Server) Context

func (s *Server) Context() context.Context

Context of the server, when it reports done the server is trying to shutdown. Actors automatically get this context, non-actors using mailboxes bound to this server should monitor this context to know when the server is trying to exit.

func (*Server) Name added in v3.2.14

func (s *Server) Name() string

Name of the server. Only valid after Serve() is called and the registry has started (server's name is the registry's name). Use

func (*Server) NewMailbox added in v3.2.14

func (s *Server) NewMailbox(name string, size int) (Mailbox, error)

NewMailbox for requests addressed to name. Size will be the mailbox's channel size.

Example Usage:

mailbox, err := server.NewMailbox("incoming", 10)
...
defer mailbox.Close()

for {
    select {
    case req := <-mailbox.C:
        // Do something with request, and then respond
        // or ack. A response or ack is required.
        switch m := req.Msg().(type) {
        case HiMsg:
            req.Respond(&HelloMsg{})
        }
    }
}

If the mailbox has already been created, in the calling process or any other process, an error is returned, since only one mailbox can claim a particular name.

Using a mailbox requires that the process creating the mailbox also started a grid Server.

func (*Server) Process

func (s *Server) Process(c context.Context, d *Delivery) (*Delivery, error)

Process a request and return a response. Implements the interface for gRPC definition of the wire service. Consider this a private method.

func (*Server) RegisterDef

func (s *Server) RegisterDef(actorType string, f MakeActor)

RegisterDef of an actor. When a ActorStart message is sent to a peer it will use the registered definitions to make and run the actor. If an actor with actorType "leader" is registered it will be started automatically when the Serve method is called.

func (*Server) Serve

func (s *Server) Serve(lis net.Listener) error

Serve the grid on the listener. The listener address type must be net.TCPAddr, otherwise an error will be returned.

func (*Server) Stop

func (s *Server) Stop()

Stop the server, blocking until all mailboxes registered with this server have called their close method.

func (*Server) WaitUntilStarted added in v3.2.14

func (s *Server) WaitUntilStarted(ctx context.Context) error

WaitUntilStarted waits until the registry has started or until the context is done. This allows users to safely access some runtime-specific parameters (e.g., Name()). There is no guarantee that the gRPC client has started: use Client.WaitUntilServing() for that.

type ServerCfg

type ServerCfg struct {
	// Namespace of grid.
	Namespace string
	// DisallowLeadership to prevent leader from running on a node.
	DisallowLeadership bool
	// Timeout for communication with etcd, and internal gossip.
	Timeout time.Duration
	// LeaseDuration for data in etcd.
	LeaseDuration time.Duration
	// Logger optionally used for logging, default is to not log.
	Logger Logger
	// Annotations optionally used annotating a grid server with metadata
	Annotations []string
}

ServerCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.

type UnimplementedWireServer

type UnimplementedWireServer struct {
}

UnimplementedWireServer must be embedded to have forward compatible implementations.

func (UnimplementedWireServer) Process

type UnsafeWireServer added in v3.2.14

type UnsafeWireServer interface {
	// contains filtered or unexported methods
}

UnsafeWireServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to WireServer will result in compilation errors.

type WireClient

type WireClient interface {
	Process(ctx context.Context, in *Delivery, opts ...grpc.CallOption) (*Delivery, error)
}

WireClient is the client API for Wire service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

func NewWireClient

func NewWireClient(cc grpc.ClientConnInterface) WireClient

type WireServer

type WireServer interface {
	Process(context.Context, *Delivery) (*Delivery, error)
	// contains filtered or unexported methods
}

WireServer is the server API for Wire service. All implementations must embed UnimplementedWireServer for forward compatibility

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL