cluster

package
v0.0.0-...-45dcbe3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2017 License: Apache-2.0 Imports: 17 Imported by: 0

README

GAM Cluster - Virtual Actors (Alpha)

Massively distributed actors for GO

GAM supports the classic actor model also found in Erlang and Akka.
Our cluster support however uses a different approach, Virtual Actor Model.

This is a model where each actor appears to always exist. There is no lifecycle as in the classic actor model. You get a reference to the actor by asking for it's ID.

e.g.

hello := shared.GetHelloGrain("abc")
res := hello.SayHello(&shared.HelloRequest{Name: "GAM"})

This will ask the cluster where the 'abc' actor is located. If it does not yet exist, it will be created for you.

See Microsoft Orleans for more info about the Virtual Actor Model: http://dotnet.github.io/orleans/

How to

Protobuf IDL Definition

Start by defining your messages and grain contracts. You do this by using Protobuf IDL files.

Here is the definition from the /examples/cluster/shared example

syntax = "proto3";
package shared;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

message AddRequest {
  double a = 1;
  double b = 2;
}

message AddResponse {
  double result = 1;
}

service Hello {
  rpc SayHello (HelloRequest) returns (HelloResponse) {} 
  rpc Add(AddRequest) returns (AddResponse) {}
}

Once you have this, you can generate your code using the protobuf protoc compiler.

Windows

#generate messages
protoc -I=. -I=%GOPATH%\src --gogoslick_out=. protos.proto
#generate grains 
protoc -I=. -I=%GOPATH%\src --gorleans_out=. protos.proto 

Implementing

Once the messages and contracts have been generated, you can start implementing your own business logic. This is essentially a type which is powered by a GAM actor behind the scenes.

package shared

//a Go struct implementing the Hello interface
type hello struct {
}

func (*hello) SayHello(r *HelloRequest) *HelloResponse {
	return &HelloResponse{Message: "hello " + r.Name}
}

func (*hello) Add(r *AddRequest) *AddResponse {
	return &AddResponse{Result: r.A + r.B}
}

//Register what implementation GAM should use when 
//creating actors for a certain grain type.
func init() {
	//apply DI and setup logic
	HelloFactory(func() Hello { return &hello{} })
}

Seed nodes

func main() {
    cluster.Start("127.0.0.1:7711")
    console.ReadLine()
}

Member nodes

func main() {
	cluster.Start("127.0.0.1:0", "127.0.0.1:7711")

    //get a reference to the virtual actor called "abc" of type Hello
	hello := shared.GetHelloGrain("abc")
	res := hello.SayHello(&shared.HelloRequest{Name: "GAM"})
	log.Printf("Message from grain %v", res.Message)
}

FAQ

Can I use GAM Cluster in production?

The GAM Cluster support is in alpha version, thus not production ready.

What about performance?

GAM Remoting is able to pass 1 million+ messages per second on a standard dev machine. This is the same infrastructure used in GAM cluster. GAM Cluster however uses an RPC API, meaning it is Request/Response in nature. If you wait for a response for each call, the throughput will ofcourse be a lot less. Async Fire and forget for performance, Request/Response for simplicity.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthProtos = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowProtos   = fmt.Errorf("proto: integer overflow")
)

Functions

func Get

func Get(name string, kind string) (*actor.PID, error)

Get a PID to a virtual actor

func SetLogLevel

func SetLogLevel(level log.Level)

SetLogLevel sets the log level for the logger.

SetLogLevel is safe to call concurrently

func Start

func Start(clusterName, address string, provider ClusterProvider)

Types

type ClusterProvider

type ClusterProvider interface {
	RegisterMember(clusterName string, address string, port int, knownKinds []string) error
	MonitorMemberStatusChanges()
	Shutdown() error
}

type ClusterTopologyEvent

type ClusterTopologyEvent []*MemberStatus

type Grain

type Grain struct {
	// contains filtered or unexported fields
}

func (*Grain) ID

func (g *Grain) ID() string

func (*Grain) Init

func (g *Grain) Init(id string)

type GrainCallConfig

type GrainCallConfig struct {
	RetryCount int
	Timeout    time.Duration
}

func ApplyGrainCallOptions

func ApplyGrainCallOptions(options []GrainCallOption) *GrainCallConfig

func DefaultGrainCallConfig

func DefaultGrainCallConfig() *GrainCallConfig

type GrainCallOption

type GrainCallOption func(*GrainCallConfig)

func WithRetry

func WithRetry(count int) GrainCallOption

func WithTimeout

func WithTimeout(timeout time.Duration) GrainCallOption

type GrainErrorResponse

type GrainErrorResponse struct {
	Err string `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"`
}

func (*GrainErrorResponse) Descriptor

func (*GrainErrorResponse) Descriptor() ([]byte, []int)

func (*GrainErrorResponse) Equal

func (this *GrainErrorResponse) Equal(that interface{}) bool

func (*GrainErrorResponse) Marshal

func (m *GrainErrorResponse) Marshal() (dAtA []byte, err error)

func (*GrainErrorResponse) MarshalTo

func (m *GrainErrorResponse) MarshalTo(dAtA []byte) (int, error)

func (*GrainErrorResponse) ProtoMessage

func (*GrainErrorResponse) ProtoMessage()

func (*GrainErrorResponse) Reset

func (m *GrainErrorResponse) Reset()

func (*GrainErrorResponse) Size

func (m *GrainErrorResponse) Size() (n int)

func (*GrainErrorResponse) String

func (this *GrainErrorResponse) String() string

func (*GrainErrorResponse) Unmarshal

func (m *GrainErrorResponse) Unmarshal(dAtA []byte) error

type GrainRequest

type GrainRequest struct {
	Method      string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"`
	MessageData []byte `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
}

func (*GrainRequest) Descriptor

func (*GrainRequest) Descriptor() ([]byte, []int)

func (*GrainRequest) Equal

func (this *GrainRequest) Equal(that interface{}) bool

func (*GrainRequest) Marshal

func (m *GrainRequest) Marshal() (dAtA []byte, err error)

func (*GrainRequest) MarshalTo

func (m *GrainRequest) MarshalTo(dAtA []byte) (int, error)

func (*GrainRequest) ProtoMessage

func (*GrainRequest) ProtoMessage()

func (*GrainRequest) Reset

func (m *GrainRequest) Reset()

func (*GrainRequest) Size

func (m *GrainRequest) Size() (n int)

func (*GrainRequest) String

func (this *GrainRequest) String() string

func (*GrainRequest) Unmarshal

func (m *GrainRequest) Unmarshal(dAtA []byte) error

type GrainResponse

type GrainResponse struct {
	MessageData []byte `protobuf:"bytes,1,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
}

func (*GrainResponse) Descriptor

func (*GrainResponse) Descriptor() ([]byte, []int)

func (*GrainResponse) Equal

func (this *GrainResponse) Equal(that interface{}) bool

func (*GrainResponse) Marshal

func (m *GrainResponse) Marshal() (dAtA []byte, err error)

func (*GrainResponse) MarshalTo

func (m *GrainResponse) MarshalTo(dAtA []byte) (int, error)

func (*GrainResponse) ProtoMessage

func (*GrainResponse) ProtoMessage()

func (*GrainResponse) Reset

func (m *GrainResponse) Reset()

func (*GrainResponse) Size

func (m *GrainResponse) Size() (n int)

func (*GrainResponse) String

func (this *GrainResponse) String() string

func (*GrainResponse) Unmarshal

func (m *GrainResponse) Unmarshal(dAtA []byte) error

type MemberAvailableEvent

type MemberAvailableEvent struct {
	MemberMeta
}

func (*MemberAvailableEvent) MemberStatusEvent

func (*MemberAvailableEvent) MemberStatusEvent()

type MemberByKindRequest

type MemberByKindRequest struct {
	// contains filtered or unexported fields
}

type MemberByKindResponse

type MemberByKindResponse struct {
	// contains filtered or unexported fields
}

type MemberJoinedEvent

type MemberJoinedEvent struct {
	MemberMeta
}

func (*MemberJoinedEvent) MemberStatusEvent

func (*MemberJoinedEvent) MemberStatusEvent()

type MemberLeftEvent

type MemberLeftEvent struct {
	MemberMeta
}

func (*MemberLeftEvent) MemberStatusEvent

func (*MemberLeftEvent) MemberStatusEvent()

type MemberMeta

type MemberMeta struct {
	Host  string
	Port  int
	Kinds []string
}

func (*MemberMeta) GetKinds

func (e *MemberMeta) GetKinds() []string

func (*MemberMeta) Name

func (e *MemberMeta) Name() string

type MemberRejoinedEvent

type MemberRejoinedEvent struct {
	MemberMeta
}

func (*MemberRejoinedEvent) MemberStatusEvent

func (*MemberRejoinedEvent) MemberStatusEvent()

type MemberStatus

type MemberStatus struct {
	MemberID string
	Host     string
	Port     int
	Kinds    []string
	Alive    bool
}

type MemberStatusEvent

type MemberStatusEvent interface {
	MemberStatusEvent()
	GetKinds() []string
}

type MemberUnavailableEvent

type MemberUnavailableEvent struct {
	MemberMeta
}

func (*MemberUnavailableEvent) MemberStatusEvent

func (*MemberUnavailableEvent) MemberStatusEvent()

type TakeOwnership

type TakeOwnership struct {
	Pid  *actor.PID `protobuf:"bytes,1,opt,name=pid" json:"pid,omitempty"`
	Name string     `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
}

func (*TakeOwnership) Descriptor

func (*TakeOwnership) Descriptor() ([]byte, []int)

func (*TakeOwnership) Equal

func (this *TakeOwnership) Equal(that interface{}) bool

func (*TakeOwnership) GetPid

func (m *TakeOwnership) GetPid() *actor.PID

func (*TakeOwnership) Marshal

func (m *TakeOwnership) Marshal() (dAtA []byte, err error)

func (*TakeOwnership) MarshalTo

func (m *TakeOwnership) MarshalTo(dAtA []byte) (int, error)

func (*TakeOwnership) ProtoMessage

func (*TakeOwnership) ProtoMessage()

func (*TakeOwnership) Reset

func (m *TakeOwnership) Reset()

func (*TakeOwnership) Size

func (m *TakeOwnership) Size() (n int)

func (*TakeOwnership) String

func (this *TakeOwnership) String() string

func (*TakeOwnership) Unmarshal

func (m *TakeOwnership) Unmarshal(dAtA []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL