cluster

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2021 License: Apache-2.0 Imports: 20 Imported by: 0

README

GAM Cluster - Virtual Actors (Alpha)

Massively distributed actors for GO

GAM supports the classic actor model also found in Erlang and Akka.
Our cluster support however uses a different approach, Virtual Actor Model.

This is a model where each actor appears to always exist. There is no lifecycle as in the classic actor model. You get a reference to the actor by asking for it's ID.

e.g.

hello := shared.GetHelloGrain("abc")
res := hello.SayHello(&shared.HelloRequest{Name: "GAM"})

This will ask the cluster where the 'abc' actor is located. If it does not yet exist, it will be created for you.

See Microsoft Orleans for more info about the Virtual Actor Model: http://dotnet.github.io/orleans/

How to

Protobuf IDL Definition

Start by defining your messages and grain contracts. You do this by using Protobuf IDL files.

Here is the definition from the /examples/cluster/shared example

syntax = "proto3";
package shared;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

message AddRequest {
  double a = 1;
  double b = 2;
}

message AddResponse {
  double result = 1;
}

service Hello {
  rpc SayHello (HelloRequest) returns (HelloResponse) {} 
  rpc Add(AddRequest) returns (AddResponse) {}
}

Once you have this, you can generate your code using the protobuf protoc compiler.

Windows

#generate messages
protoc -I=. -I=%GOPATH%\src --gogoslick_out=. protos.proto
#generate grains 
protoc -I=. -I=%GOPATH%\src --gorleans_out=. protos.proto 

Implementing

Once the messages and contracts have been generated, you can start implementing your own business logic. This is essentially a type which is powered by a GAM actor behind the scenes.

package shared

//a Go struct implementing the Hello interface
type hello struct {
}

func (*hello) SayHello(r *HelloRequest) *HelloResponse {
	return &HelloResponse{Message: "hello " + r.Name}
}

func (*hello) Add(r *AddRequest) *AddResponse {
	return &AddResponse{Result: r.A + r.B}
}

//Register what implementation GAM should use when 
//creating actors for a certain grain type.
func init() {
	//apply DI and setup logic
	HelloFactory(func() Hello { return &hello{} })
}

Seed nodes

func main() {
    cluster.Start("127.0.0.1:7711")
    console.ReadLine()
}

Member nodes

func main() {
	cluster.Start("127.0.0.1:0", "127.0.0.1:7711")

    //get a reference to the virtual actor called "abc" of type Hello
	hello := shared.GetHelloGrain("abc")
	res := hello.SayHello(&shared.HelloRequest{Name: "GAM"})
	log.Printf("Message from grain %v", res.Message)
}

FAQ

Can I use GAM Cluster in production?

The GAM Cluster support is in alpha version, thus not production ready.

What about performance?

GAM Remoting is able to pass 1 million+ messages per second on a standard dev machine. This is the same infrastructure used in GAM cluster. GAM Cluster however uses an RPC API, meaning it is Request/Response in nature. If you wait for a response for each call, the throughput will ofcourse be a lot less. Async Fire and forget for performance, Request/Response for simplicity.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthProtos = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowProtos   = fmt.Errorf("proto: integer overflow")
)

Functions

func Get

func Get(name string, kind string) (*actor.PID, remote.ResponseStatusCode)

Get a PID to a virtual actor

func GetMemberPIDs

func GetMemberPIDs(kind string) actor.PIDSet

Get PIDs of members for the specified kind

func RemoveCache

func RemoveCache(name string)

RemoveCache at PidCache

func Shutdown

func Shutdown(graceful bool)

func Start

func Start(clusterName, address string, provider ClusterProvider)

func StartWithConfig

func StartWithConfig(config *ClusterConfig)

Types

type ClusterConfig

type ClusterConfig struct {
	Name                        string
	Address                     string
	ClusterProvider             ClusterProvider
	RemotingOption              []remote.RemotingOption
	TimeoutTime                 time.Duration
	InitialMemberStatusValue    MemberStatusValue
	MemberStatusValueSerializer MemberStatusValueSerializer
	MemberStrategyBuilder       func(kind string) MemberStrategy
}

func NewClusterConfig

func NewClusterConfig(name string, address string, clusterProvider ClusterProvider) *ClusterConfig

func (*ClusterConfig) WithInitialMemberStatusValue

func (c *ClusterConfig) WithInitialMemberStatusValue(val MemberStatusValue) *ClusterConfig

func (*ClusterConfig) WithMemberStatusValueSerializer

func (c *ClusterConfig) WithMemberStatusValueSerializer(serializer MemberStatusValueSerializer) *ClusterConfig

func (*ClusterConfig) WithMemberStrategyBuilder

func (c *ClusterConfig) WithMemberStrategyBuilder(builder func(kind string) MemberStrategy) *ClusterConfig

func (*ClusterConfig) WithRemotingOption

func (c *ClusterConfig) WithRemotingOption(remotingOption []remote.RemotingOption) *ClusterConfig

func (*ClusterConfig) WithTimeout

func (c *ClusterConfig) WithTimeout(t time.Duration) *ClusterConfig

type ClusterProvider

type ClusterProvider interface {
	RegisterMember(clusterName string, address string, port int, knownKinds []string,
		statusValue MemberStatusValue, serializer MemberStatusValueSerializer) error
	MonitorMemberStatusChanges()
	UpdateMemberStatusValue(statusValue MemberStatusValue) error
	DeregisterMember() error
	Shutdown() error
}

type ClusterTopologyEvent

type ClusterTopologyEvent []*MemberStatus

type Grain

type Grain struct {
	// contains filtered or unexported fields
}

func (*Grain) ID

func (g *Grain) ID() string

func (*Grain) Init

func (g *Grain) Init(id string)

type GrainCallOptions

type GrainCallOptions struct {
	RetryCount  int
	Timeout     time.Duration
	RetryAction func(n int)
}

func DefaultGrainCallOptions

func DefaultGrainCallOptions() *GrainCallOptions

func NewGrainCallOptions

func NewGrainCallOptions() *GrainCallOptions

func (*GrainCallOptions) WithRetry

func (config *GrainCallOptions) WithRetry(count int) *GrainCallOptions

func (*GrainCallOptions) WithRetryAction

func (config *GrainCallOptions) WithRetryAction(act func(i int)) *GrainCallOptions

func (*GrainCallOptions) WithTimeout

func (config *GrainCallOptions) WithTimeout(timeout time.Duration) *GrainCallOptions

type GrainContext

type GrainContext interface {
	// Watch registers the actor as a monitor for the specified PID
	Watch(pid *actor.PID)

	// Unwatch unregisters the actor as a monitor for the specified PID
	Unwatch(pid *actor.PID)

	// Message returns the current message to be processed
	Message() interface{}

	// Sender returns the PID of actor that sent currently processed message
	Sender() *actor.PID

	//Tell sends a message to the given PID
	Tell(pid *actor.PID, message interface{})

	//Request sends a message to the given PID and also provides a Sender PID
	Request(pid *actor.PID, message interface{})

	// RequestFuture sends a message to a given PID and returns a Future
	RequestFuture(pid *actor.PID, message interface{}, timeout time.Duration) *actor.Future

	// Self returns the PID for the current actor
	Self() *actor.PID

	// Spawn starts a new child actor based on props and named with a unique id
	Spawn(props *actor.Props) *actor.PID

	// SpawnPrefix starts a new child actor based on props and named using a prefix followed by a unique id
	SpawnPrefix(props *actor.Props, prefix string) *actor.PID

	// SpawnNamed starts a new child actor based on props and named using the specified name
	//
	// ErrNameExists will be returned if id already exists
	SpawnNamed(props *actor.Props, id string) (*actor.PID, error)

	// Returns a slice of the actors children
	Children() []*actor.PID
}

type GrainErrorResponse

type GrainErrorResponse struct {
	Err                  string   `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*GrainErrorResponse) Descriptor

func (*GrainErrorResponse) Descriptor() ([]byte, []int)

func (*GrainErrorResponse) Equal

func (this *GrainErrorResponse) Equal(that interface{}) bool

func (*GrainErrorResponse) GetErr

func (m *GrainErrorResponse) GetErr() string

func (*GrainErrorResponse) Marshal

func (m *GrainErrorResponse) Marshal() (dAtA []byte, err error)

func (*GrainErrorResponse) MarshalTo

func (m *GrainErrorResponse) MarshalTo(dAtA []byte) (int, error)

func (*GrainErrorResponse) ProtoMessage

func (*GrainErrorResponse) ProtoMessage()

func (*GrainErrorResponse) Reset

func (m *GrainErrorResponse) Reset()

func (*GrainErrorResponse) Size

func (m *GrainErrorResponse) Size() (n int)

func (*GrainErrorResponse) String

func (this *GrainErrorResponse) String() string

func (*GrainErrorResponse) Unmarshal

func (m *GrainErrorResponse) Unmarshal(dAtA []byte) error

func (*GrainErrorResponse) XXX_DiscardUnknown

func (m *GrainErrorResponse) XXX_DiscardUnknown()

func (*GrainErrorResponse) XXX_Marshal

func (m *GrainErrorResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*GrainErrorResponse) XXX_Merge

func (dst *GrainErrorResponse) XXX_Merge(src proto.Message)

func (*GrainErrorResponse) XXX_Size

func (m *GrainErrorResponse) XXX_Size() int

func (*GrainErrorResponse) XXX_Unmarshal

func (m *GrainErrorResponse) XXX_Unmarshal(b []byte) error

type GrainRequest

type GrainRequest struct {
	Method               string   `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"`
	MessageData          []byte   `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*GrainRequest) Descriptor

func (*GrainRequest) Descriptor() ([]byte, []int)

func (*GrainRequest) Equal

func (this *GrainRequest) Equal(that interface{}) bool

func (*GrainRequest) GetMessageData

func (m *GrainRequest) GetMessageData() []byte

func (*GrainRequest) GetMethod

func (m *GrainRequest) GetMethod() string

func (*GrainRequest) Marshal

func (m *GrainRequest) Marshal() (dAtA []byte, err error)

func (*GrainRequest) MarshalTo

func (m *GrainRequest) MarshalTo(dAtA []byte) (int, error)

func (*GrainRequest) ProtoMessage

func (*GrainRequest) ProtoMessage()

func (*GrainRequest) Reset

func (m *GrainRequest) Reset()

func (*GrainRequest) Size

func (m *GrainRequest) Size() (n int)

func (*GrainRequest) String

func (this *GrainRequest) String() string

func (*GrainRequest) Unmarshal

func (m *GrainRequest) Unmarshal(dAtA []byte) error

func (*GrainRequest) XXX_DiscardUnknown

func (m *GrainRequest) XXX_DiscardUnknown()

func (*GrainRequest) XXX_Marshal

func (m *GrainRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*GrainRequest) XXX_Merge

func (dst *GrainRequest) XXX_Merge(src proto.Message)

func (*GrainRequest) XXX_Size

func (m *GrainRequest) XXX_Size() int

func (*GrainRequest) XXX_Unmarshal

func (m *GrainRequest) XXX_Unmarshal(b []byte) error

type GrainResponse

type GrainResponse struct {
	MessageData          []byte   `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*GrainResponse) Descriptor

func (*GrainResponse) Descriptor() ([]byte, []int)

func (*GrainResponse) Equal

func (this *GrainResponse) Equal(that interface{}) bool

func (*GrainResponse) GetMessageData

func (m *GrainResponse) GetMessageData() []byte

func (*GrainResponse) Marshal

func (m *GrainResponse) Marshal() (dAtA []byte, err error)

func (*GrainResponse) MarshalTo

func (m *GrainResponse) MarshalTo(dAtA []byte) (int, error)

func (*GrainResponse) ProtoMessage

func (*GrainResponse) ProtoMessage()

func (*GrainResponse) Reset

func (m *GrainResponse) Reset()

func (*GrainResponse) Size

func (m *GrainResponse) Size() (n int)

func (*GrainResponse) String

func (this *GrainResponse) String() string

func (*GrainResponse) Unmarshal

func (m *GrainResponse) Unmarshal(dAtA []byte) error

func (*GrainResponse) XXX_DiscardUnknown

func (m *GrainResponse) XXX_DiscardUnknown()

func (*GrainResponse) XXX_Marshal

func (m *GrainResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*GrainResponse) XXX_Merge

func (dst *GrainResponse) XXX_Merge(src proto.Message)

func (*GrainResponse) XXX_Size

func (m *GrainResponse) XXX_Size() int

func (*GrainResponse) XXX_Unmarshal

func (m *GrainResponse) XXX_Unmarshal(b []byte) error

type MemberAvailableEvent

type MemberAvailableEvent struct {
	MemberMeta
}

func (*MemberAvailableEvent) MemberStatusEvent

func (*MemberAvailableEvent) MemberStatusEvent()

type MemberJoinedEvent

type MemberJoinedEvent struct {
	MemberMeta
}

func (*MemberJoinedEvent) MemberStatusEvent

func (*MemberJoinedEvent) MemberStatusEvent()

type MemberLeftEvent

type MemberLeftEvent struct {
	MemberMeta
}

func (*MemberLeftEvent) MemberStatusEvent

func (*MemberLeftEvent) MemberStatusEvent()

type MemberMeta

type MemberMeta struct {
	Host  string
	Port  int
	Kinds []string
}

func (*MemberMeta) GetKinds

func (e *MemberMeta) GetKinds() []string

func (*MemberMeta) Name

func (e *MemberMeta) Name() string

type MemberRejoinedEvent

type MemberRejoinedEvent struct {
	MemberMeta
}

func (*MemberRejoinedEvent) MemberStatusEvent

func (*MemberRejoinedEvent) MemberStatusEvent()

type MemberStatus

type MemberStatus struct {
	MemberID    string
	Host        string
	Port        int
	Kinds       []string
	Alive       bool
	StatusValue MemberStatusValue
}

func (*MemberStatus) Address

func (m *MemberStatus) Address() string

type MemberStatusEvent

type MemberStatusEvent interface {
	MemberStatusEvent()
	GetKinds() []string
}

type MemberStatusValue

type MemberStatusValue interface {
	IsSame(val MemberStatusValue) bool
}

type MemberStatusValueSerializer

type MemberStatusValueSerializer interface {
	ToValueBytes(val MemberStatusValue) []byte
	FromValueBytes(val []byte) MemberStatusValue
}

type MemberStrategy

type MemberStrategy interface {
	GetAllMembers() []*MemberStatus
	AddMember(member *MemberStatus)
	UpdateMember(member *MemberStatus)
	RemoveMember(member *MemberStatus)
	GetPartition(key string) string
	GetActivator() string
}

type MemberUnavailableEvent

type MemberUnavailableEvent struct {
	MemberMeta
}

func (*MemberUnavailableEvent) MemberStatusEvent

func (*MemberUnavailableEvent) MemberStatusEvent()

type NilMemberStatusValueSerializer

type NilMemberStatusValueSerializer struct{}

func (*NilMemberStatusValueSerializer) FromValueBytes

func (s *NilMemberStatusValueSerializer) FromValueBytes(val []byte) MemberStatusValue

func (*NilMemberStatusValueSerializer) ToValueBytes

func (s *NilMemberStatusValueSerializer) ToValueBytes(val MemberStatusValue) []byte

type Rendezvous

type Rendezvous struct {
	// contains filtered or unexported fields
}

func NewRendezvous

func NewRendezvous(memberStrategy MemberStrategy) *Rendezvous

func (*Rendezvous) GetByRdv

func (r *Rendezvous) GetByRdv(key string) string

Get returns the node with the highest score for the given key. If this Hash has no nodes, an empty string is returned.

func (*Rendezvous) UpdateRdv

func (r *Rendezvous) UpdateRdv()

type SimpleRoundRobin

type SimpleRoundRobin struct {
	// contains filtered or unexported fields
}

func NewSimpleRoundRobin

func NewSimpleRoundRobin(memberStrategy MemberStrategy) *SimpleRoundRobin

func (*SimpleRoundRobin) GetByRoundRobin

func (r *SimpleRoundRobin) GetByRoundRobin() string

type TakeOwnership

type TakeOwnership struct {
	Pid                  *actor.PID `protobuf:"bytes,1,opt,name=pid" json:"pid,omitempty"`
	Name                 string     `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
	XXX_sizecache        int32      `json:"-"`
}

func (*TakeOwnership) Descriptor

func (*TakeOwnership) Descriptor() ([]byte, []int)

func (*TakeOwnership) Equal

func (this *TakeOwnership) Equal(that interface{}) bool

func (*TakeOwnership) GetName

func (m *TakeOwnership) GetName() string

func (*TakeOwnership) GetPid

func (m *TakeOwnership) GetPid() *actor.PID

func (*TakeOwnership) Marshal

func (m *TakeOwnership) Marshal() (dAtA []byte, err error)

func (*TakeOwnership) MarshalTo

func (m *TakeOwnership) MarshalTo(dAtA []byte) (int, error)

func (*TakeOwnership) ProtoMessage

func (*TakeOwnership) ProtoMessage()

func (*TakeOwnership) Reset

func (m *TakeOwnership) Reset()

func (*TakeOwnership) Size

func (m *TakeOwnership) Size() (n int)

func (*TakeOwnership) String

func (this *TakeOwnership) String() string

func (*TakeOwnership) Unmarshal

func (m *TakeOwnership) Unmarshal(dAtA []byte) error

func (*TakeOwnership) XXX_DiscardUnknown

func (m *TakeOwnership) XXX_DiscardUnknown()

func (*TakeOwnership) XXX_Marshal

func (m *TakeOwnership) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*TakeOwnership) XXX_Merge

func (dst *TakeOwnership) XXX_Merge(src proto.Message)

func (*TakeOwnership) XXX_Size

func (m *TakeOwnership) XXX_Size() int

func (*TakeOwnership) XXX_Unmarshal

func (m *TakeOwnership) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL