remote

package
v0.0.0-...-be88ffe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2022 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package remote provides access to actors across a network or other I/O connection.

Index

Constants

This section is empty.

Variables

View Source
var (
	ActorPidRespErr         interface{} = &ActorPidResponse{StatusCode: ResponseStatusCodeERROR.ToInt32()}
	ActorPidRespTimeout     interface{} = &ActorPidResponse{StatusCode: ResponseStatusCodeTIMEOUT.ToInt32()}
	ActorPidRespUnavailable interface{} = &ActorPidResponse{StatusCode: ResponseStatusCodeUNAVAILABLE.ToInt32()}
)
View Source
var (
	ErrInvalidLengthProtos = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowProtos   = fmt.Errorf("proto: integer overflow")
)
View Source
var DefaultSerializerID int32
View Source
var ErrActivatorUnavailable = &ActivatorError{ResponseStatusCodeUNAVAILABLE.ToInt32(), true}

ErrActivatorUnavailable : this error will not panic the Activator. It simply tells Partition this Activator is not available Partition will then find next available Activator to spawn

Functions

func Deserialize

func Deserialize(message []byte, typeName string, serializerID int32) (interface{}, error)

func NewEndpointLazy

func NewEndpointLazy(em *endpointManager, address string) *endpointLazy

func RegisterRemotingServer

func RegisterRemotingServer(s *grpc.Server, srv RemotingServer)

func RegisterSerializer

func RegisterSerializer(serializer Serializer)

func RegisterSerializerAsDefault

func RegisterSerializerAsDefault(serializer Serializer)

func Serialize

func Serialize(message interface{}, serializerID int32) ([]byte, string, error)

func SetLogLevel

func SetLogLevel(level log.Level)

SetLogLevel sets the log level for the logger.

SetLogLevel is safe to call concurrently

Types

type ActivatorError

type ActivatorError struct {
	Code       int32
	DoNotPanic bool
}

func (*ActivatorError) Error

func (e *ActivatorError) Error() string

type ActorPidRequest

type ActorPidRequest struct {
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Kind string `protobuf:"bytes,2,opt,name=kind,proto3" json:"kind,omitempty"`
}

func (*ActorPidRequest) Descriptor

func (*ActorPidRequest) Descriptor() ([]byte, []int)

func (*ActorPidRequest) Equal

func (this *ActorPidRequest) Equal(that interface{}) bool

func (*ActorPidRequest) GetKind

func (m *ActorPidRequest) GetKind() string

func (*ActorPidRequest) GetName

func (m *ActorPidRequest) GetName() string

func (*ActorPidRequest) Marshal

func (m *ActorPidRequest) Marshal() (dAtA []byte, err error)

func (*ActorPidRequest) MarshalTo

func (m *ActorPidRequest) MarshalTo(dAtA []byte) (int, error)

func (*ActorPidRequest) ProtoMessage

func (*ActorPidRequest) ProtoMessage()

func (*ActorPidRequest) Reset

func (m *ActorPidRequest) Reset()

func (*ActorPidRequest) Size

func (m *ActorPidRequest) Size() (n int)

func (*ActorPidRequest) String

func (this *ActorPidRequest) String() string

func (*ActorPidRequest) Unmarshal

func (m *ActorPidRequest) Unmarshal(dAtA []byte) error

func (*ActorPidRequest) XXX_DiscardUnknown

func (m *ActorPidRequest) XXX_DiscardUnknown()

func (*ActorPidRequest) XXX_Marshal

func (m *ActorPidRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ActorPidRequest) XXX_Merge

func (m *ActorPidRequest) XXX_Merge(src proto.Message)

func (*ActorPidRequest) XXX_Size

func (m *ActorPidRequest) XXX_Size() int

func (*ActorPidRequest) XXX_Unmarshal

func (m *ActorPidRequest) XXX_Unmarshal(b []byte) error

type ActorPidResponse

type ActorPidResponse struct {
	Pid        *actor.PID `protobuf:"bytes,1,opt,name=pid,proto3" json:"pid,omitempty"`
	StatusCode int32      `protobuf:"varint,2,opt,name=status_code,json=statusCode,proto3" json:"status_code,omitempty"`
}

func (*ActorPidResponse) Descriptor

func (*ActorPidResponse) Descriptor() ([]byte, []int)

func (*ActorPidResponse) Equal

func (this *ActorPidResponse) Equal(that interface{}) bool

func (*ActorPidResponse) GetPid

func (m *ActorPidResponse) GetPid() *actor.PID

func (*ActorPidResponse) GetStatusCode

func (m *ActorPidResponse) GetStatusCode() int32

func (*ActorPidResponse) Marshal

func (m *ActorPidResponse) Marshal() (dAtA []byte, err error)

func (*ActorPidResponse) MarshalTo

func (m *ActorPidResponse) MarshalTo(dAtA []byte) (int, error)

func (*ActorPidResponse) ProtoMessage

func (*ActorPidResponse) ProtoMessage()

func (*ActorPidResponse) Reset

func (m *ActorPidResponse) Reset()

func (*ActorPidResponse) Size

func (m *ActorPidResponse) Size() (n int)

func (*ActorPidResponse) String

func (this *ActorPidResponse) String() string

func (*ActorPidResponse) Unmarshal

func (m *ActorPidResponse) Unmarshal(dAtA []byte) error

func (*ActorPidResponse) XXX_DiscardUnknown

func (m *ActorPidResponse) XXX_DiscardUnknown()

func (*ActorPidResponse) XXX_Marshal

func (m *ActorPidResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ActorPidResponse) XXX_Merge

func (m *ActorPidResponse) XXX_Merge(src proto.Message)

func (*ActorPidResponse) XXX_Size

func (m *ActorPidResponse) XXX_Size() int

func (*ActorPidResponse) XXX_Unmarshal

func (m *ActorPidResponse) XXX_Unmarshal(b []byte) error

type BlockList

type BlockList struct {
	// contains filtered or unexported fields
}

TODO: document it

func NewBlockList

func NewBlockList() BlockList

func (*BlockList) Block

func (bl *BlockList) Block(memberIDs ...string)

Block adds the given memberID list to the BlockList

func (*BlockList) BlockedMembers

func (bl *BlockList) BlockedMembers() map[string]struct{}

Returns back a copy of the internal blockedMembers map

func (*BlockList) IsBlocked

func (bl *BlockList) IsBlocked(memberID string) bool

IsBlocked returns true if the given memberID string has been ever added to the BlockList

type Config

type Config struct {
	Host                     string
	Port                     int
	AdvertisedHost           string
	ServerOptions            []grpc.ServerOption
	CallOptions              []grpc.CallOption
	DialOptions              []grpc.DialOption
	EndpointWriterBatchSize  int
	EndpointWriterQueueSize  int
	EndpointManagerBatchSize int
	EndpointManagerQueueSize int
	Kinds                    map[string]*actor.Props
}

func Configure

func Configure(host string, port int, kinds ...*Kind) Config

func (Config) Address

func (rc Config) Address() string

func (Config) WithAdvertisedHost

func (rc Config) WithAdvertisedHost(address string) Config

func (Config) WithCallOptions

func (rc Config) WithCallOptions(options ...grpc.CallOption) Config

func (Config) WithDialOptions

func (rc Config) WithDialOptions(options ...grpc.DialOption) Config

func (Config) WithEndpointManagerBatchSize

func (rc Config) WithEndpointManagerBatchSize(batchSize int) Config

func (Config) WithEndpointManagerQueueSize

func (rc Config) WithEndpointManagerQueueSize(queueSize int) Config

func (Config) WithEndpointWriterBatchSize

func (rc Config) WithEndpointWriterBatchSize(batchSize int) Config

func (Config) WithEndpointWriterQueueSize

func (rc Config) WithEndpointWriterQueueSize(queueSize int) Config

func (Config) WithServerOptions

func (rc Config) WithServerOptions(options ...grpc.ServerOption) Config

type ConnectRequest

type ConnectRequest struct {
}

func (*ConnectRequest) Descriptor

func (*ConnectRequest) Descriptor() ([]byte, []int)

func (*ConnectRequest) Equal

func (this *ConnectRequest) Equal(that interface{}) bool

func (*ConnectRequest) Marshal

func (m *ConnectRequest) Marshal() (dAtA []byte, err error)

func (*ConnectRequest) MarshalTo

func (m *ConnectRequest) MarshalTo(dAtA []byte) (int, error)

func (*ConnectRequest) ProtoMessage

func (*ConnectRequest) ProtoMessage()

func (*ConnectRequest) Reset

func (m *ConnectRequest) Reset()

func (*ConnectRequest) Size

func (m *ConnectRequest) Size() (n int)

func (*ConnectRequest) String

func (this *ConnectRequest) String() string

func (*ConnectRequest) Unmarshal

func (m *ConnectRequest) Unmarshal(dAtA []byte) error

func (*ConnectRequest) XXX_DiscardUnknown

func (m *ConnectRequest) XXX_DiscardUnknown()

func (*ConnectRequest) XXX_Marshal

func (m *ConnectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ConnectRequest) XXX_Merge

func (m *ConnectRequest) XXX_Merge(src proto.Message)

func (*ConnectRequest) XXX_Size

func (m *ConnectRequest) XXX_Size() int

func (*ConnectRequest) XXX_Unmarshal

func (m *ConnectRequest) XXX_Unmarshal(b []byte) error

type ConnectResponse

type ConnectResponse struct {
	DefaultSerializerId int32 `protobuf:"varint,1,opt,name=default_serializer_id,json=defaultSerializerId,proto3" json:"default_serializer_id,omitempty"`
}

func (*ConnectResponse) Descriptor

func (*ConnectResponse) Descriptor() ([]byte, []int)

func (*ConnectResponse) Equal

func (this *ConnectResponse) Equal(that interface{}) bool

func (*ConnectResponse) GetDefaultSerializerId

func (m *ConnectResponse) GetDefaultSerializerId() int32

func (*ConnectResponse) Marshal

func (m *ConnectResponse) Marshal() (dAtA []byte, err error)

func (*ConnectResponse) MarshalTo

func (m *ConnectResponse) MarshalTo(dAtA []byte) (int, error)

func (*ConnectResponse) ProtoMessage

func (*ConnectResponse) ProtoMessage()

func (*ConnectResponse) Reset

func (m *ConnectResponse) Reset()

func (*ConnectResponse) Size

func (m *ConnectResponse) Size() (n int)

func (*ConnectResponse) String

func (this *ConnectResponse) String() string

func (*ConnectResponse) Unmarshal

func (m *ConnectResponse) Unmarshal(dAtA []byte) error

func (*ConnectResponse) XXX_DiscardUnknown

func (m *ConnectResponse) XXX_DiscardUnknown()

func (*ConnectResponse) XXX_Marshal

func (m *ConnectResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ConnectResponse) XXX_Merge

func (m *ConnectResponse) XXX_Merge(src proto.Message)

func (*ConnectResponse) XXX_Size

func (m *ConnectResponse) XXX_Size() int

func (*ConnectResponse) XXX_Unmarshal

func (m *ConnectResponse) XXX_Unmarshal(b []byte) error

type EndpointConnectedEvent

type EndpointConnectedEvent struct {
	Address string
}

type EndpointTerminatedEvent

type EndpointTerminatedEvent struct {
	Address string
}

type JsonMessage

type JsonMessage struct {
	TypeName string
	Json     string
}

type Kind

type Kind struct {
	Kind  string
	Props *actor.Props
}

func NewKind

func NewKind(kind string, props *actor.Props) *Kind

type MessageBatch

type MessageBatch struct {
	TypeNames   []string           `protobuf:"bytes,1,rep,name=type_names,json=typeNames,proto3" json:"type_names,omitempty"`
	TargetNames []string           `protobuf:"bytes,2,rep,name=target_names,json=targetNames,proto3" json:"target_names,omitempty"`
	Envelopes   []*MessageEnvelope `protobuf:"bytes,3,rep,name=envelopes,proto3" json:"envelopes,omitempty"`
}

func (*MessageBatch) Descriptor

func (*MessageBatch) Descriptor() ([]byte, []int)

func (*MessageBatch) Equal

func (this *MessageBatch) Equal(that interface{}) bool

func (*MessageBatch) GetEnvelopes

func (m *MessageBatch) GetEnvelopes() []*MessageEnvelope

func (*MessageBatch) GetTargetNames

func (m *MessageBatch) GetTargetNames() []string

func (*MessageBatch) GetTypeNames

func (m *MessageBatch) GetTypeNames() []string

func (*MessageBatch) Marshal

func (m *MessageBatch) Marshal() (dAtA []byte, err error)

func (*MessageBatch) MarshalTo

func (m *MessageBatch) MarshalTo(dAtA []byte) (int, error)

func (*MessageBatch) ProtoMessage

func (*MessageBatch) ProtoMessage()

func (*MessageBatch) Reset

func (m *MessageBatch) Reset()

func (*MessageBatch) Size

func (m *MessageBatch) Size() (n int)

func (*MessageBatch) String

func (this *MessageBatch) String() string

func (*MessageBatch) Unmarshal

func (m *MessageBatch) Unmarshal(dAtA []byte) error

func (*MessageBatch) XXX_DiscardUnknown

func (m *MessageBatch) XXX_DiscardUnknown()

func (*MessageBatch) XXX_Marshal

func (m *MessageBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MessageBatch) XXX_Merge

func (m *MessageBatch) XXX_Merge(src proto.Message)

func (*MessageBatch) XXX_Size

func (m *MessageBatch) XXX_Size() int

func (*MessageBatch) XXX_Unmarshal

func (m *MessageBatch) XXX_Unmarshal(b []byte) error

type MessageEnvelope

type MessageEnvelope struct {
	TypeId        int32          `protobuf:"varint,1,opt,name=type_id,json=typeId,proto3" json:"type_id,omitempty"`
	MessageData   []byte         `protobuf:"bytes,2,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"`
	Target        int32          `protobuf:"varint,3,opt,name=target,proto3" json:"target,omitempty"`
	Sender        *actor.PID     `protobuf:"bytes,4,opt,name=sender,proto3" json:"sender,omitempty"`
	SerializerId  int32          `protobuf:"varint,5,opt,name=serializer_id,json=serializerId,proto3" json:"serializer_id,omitempty"`
	MessageHeader *MessageHeader `protobuf:"bytes,6,opt,name=message_header,json=messageHeader,proto3" json:"message_header,omitempty"`
}

func (*MessageEnvelope) Descriptor

func (*MessageEnvelope) Descriptor() ([]byte, []int)

func (*MessageEnvelope) Equal

func (this *MessageEnvelope) Equal(that interface{}) bool

func (*MessageEnvelope) GetMessageData

func (m *MessageEnvelope) GetMessageData() []byte

func (*MessageEnvelope) GetMessageHeader

func (m *MessageEnvelope) GetMessageHeader() *MessageHeader

func (*MessageEnvelope) GetSender

func (m *MessageEnvelope) GetSender() *actor.PID

func (*MessageEnvelope) GetSerializerId

func (m *MessageEnvelope) GetSerializerId() int32

func (*MessageEnvelope) GetTarget

func (m *MessageEnvelope) GetTarget() int32

func (*MessageEnvelope) GetTypeId

func (m *MessageEnvelope) GetTypeId() int32

func (*MessageEnvelope) Marshal

func (m *MessageEnvelope) Marshal() (dAtA []byte, err error)

func (*MessageEnvelope) MarshalTo

func (m *MessageEnvelope) MarshalTo(dAtA []byte) (int, error)

func (*MessageEnvelope) ProtoMessage

func (*MessageEnvelope) ProtoMessage()

func (*MessageEnvelope) Reset

func (m *MessageEnvelope) Reset()

func (*MessageEnvelope) Size

func (m *MessageEnvelope) Size() (n int)

func (*MessageEnvelope) String

func (this *MessageEnvelope) String() string

func (*MessageEnvelope) Unmarshal

func (m *MessageEnvelope) Unmarshal(dAtA []byte) error

func (*MessageEnvelope) XXX_DiscardUnknown

func (m *MessageEnvelope) XXX_DiscardUnknown()

func (*MessageEnvelope) XXX_Marshal

func (m *MessageEnvelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MessageEnvelope) XXX_Merge

func (m *MessageEnvelope) XXX_Merge(src proto.Message)

func (*MessageEnvelope) XXX_Size

func (m *MessageEnvelope) XXX_Size() int

func (*MessageEnvelope) XXX_Unmarshal

func (m *MessageEnvelope) XXX_Unmarshal(b []byte) error

type MessageHeader

type MessageHeader struct {
	HeaderData map[string]string `` /* 179-byte string literal not displayed */
}

func (*MessageHeader) Descriptor

func (*MessageHeader) Descriptor() ([]byte, []int)

func (*MessageHeader) Equal

func (this *MessageHeader) Equal(that interface{}) bool

func (*MessageHeader) GetHeaderData

func (m *MessageHeader) GetHeaderData() map[string]string

func (*MessageHeader) Marshal

func (m *MessageHeader) Marshal() (dAtA []byte, err error)

func (*MessageHeader) MarshalTo

func (m *MessageHeader) MarshalTo(dAtA []byte) (int, error)

func (*MessageHeader) ProtoMessage

func (*MessageHeader) ProtoMessage()

func (*MessageHeader) Reset

func (m *MessageHeader) Reset()

func (*MessageHeader) Size

func (m *MessageHeader) Size() (n int)

func (*MessageHeader) String

func (this *MessageHeader) String() string

func (*MessageHeader) Unmarshal

func (m *MessageHeader) Unmarshal(dAtA []byte) error

func (*MessageHeader) XXX_DiscardUnknown

func (m *MessageHeader) XXX_DiscardUnknown()

func (*MessageHeader) XXX_Marshal

func (m *MessageHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MessageHeader) XXX_Merge

func (m *MessageHeader) XXX_Merge(src proto.Message)

func (*MessageHeader) XXX_Size

func (m *MessageHeader) XXX_Size() int

func (*MessageHeader) XXX_Unmarshal

func (m *MessageHeader) XXX_Unmarshal(b []byte) error

type Ping

type Ping struct{}

Ping is message sent by the actor system to probe an actor is started.

type Pong

type Pong struct{}

Pong is response for ping.

type Remote

type Remote struct {
	// contains filtered or unexported fields
}

func GetRemote

func GetRemote(actorSystem *actor.ActorSystem) *Remote

func NewRemote

func NewRemote(actorSystem *actor.ActorSystem, config Config) *Remote

func (*Remote) ActivatorForAddress

func (r *Remote) ActivatorForAddress(address string) *actor.PID

ActivatorForAddress returns a PID for the activator at the given address

func (*Remote) BlockList

func (r *Remote) BlockList() *BlockList

func (*Remote) GetKnownKinds

func (r *Remote) GetKnownKinds() []string

GetKnownKinds returns a slice of known actor "Kinds"

func (*Remote) Id

func (r *Remote) Id() extensions.ExtensionId

func (*Remote) Register

func (r *Remote) Register(kind string, props *actor.Props)

Register a known actor props by name

func (*Remote) SendMessage

func (r *Remote) SendMessage(pid *actor.PID, header actor.ReadonlyMessageHeader, message interface{}, sender *actor.PID, serializerID int32)

func (*Remote) Shutdown

func (r *Remote) Shutdown(graceful bool)

func (*Remote) Spawn

func (r *Remote) Spawn(address, kind string, timeout time.Duration) (*ActorPidResponse, error)

Spawn spawns a remote actor of a given type at a given address

func (*Remote) SpawnFuture

func (r *Remote) SpawnFuture(address, name, kind string, timeout time.Duration) *actor.Future

SpawnFuture spawns a remote actor and returns a Future that completes once the actor is started

func (*Remote) SpawnNamed

func (r *Remote) SpawnNamed(address, name, kind string, timeout time.Duration) (*ActorPidResponse, error)

SpawnNamed spawns a named remote actor of a given type at a given address

func (*Remote) Start

func (r *Remote) Start()

Start the remote server

type RemotingClient

type RemotingClient interface {
	Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error)
	Receive(ctx context.Context, opts ...grpc.CallOption) (Remoting_ReceiveClient, error)
}

RemotingClient is the client API for Remoting service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewRemotingClient

func NewRemotingClient(cc *grpc.ClientConn) RemotingClient

type RemotingServer

type RemotingServer interface {
	Connect(context.Context, *ConnectRequest) (*ConnectResponse, error)
	Receive(Remoting_ReceiveServer) error
}

RemotingServer is the server API for Remoting service.

type Remoting_ReceiveClient

type Remoting_ReceiveClient interface {
	Send(*MessageBatch) error
	Recv() (*Unit, error)
	grpc.ClientStream
}

type Remoting_ReceiveServer

type Remoting_ReceiveServer interface {
	Send(*Unit) error
	Recv() (*MessageBatch, error)
	grpc.ServerStream
}

type ResponseError

type ResponseError struct {
	Code ResponseStatusCode
}

ResponseError is an error type. e.g.:

var err = &ResponseError{1}

func (*ResponseError) Error

func (r *ResponseError) Error() string

type ResponseStatusCode

type ResponseStatusCode int32
const (
	ResponseStatusCodeOK ResponseStatusCode = iota
	ResponseStatusCodeUNAVAILABLE
	ResponseStatusCodeTIMEOUT
	ResponseStatusCodePROCESSNAMEALREADYEXIST
	ResponseStatusCodeERROR
	ResponseStatusCodeDeadLetter
	ResponseStatusCodeMAX // just a boundary.
)

func (ResponseStatusCode) AsError

func (c ResponseStatusCode) AsError() *ResponseError

func (ResponseStatusCode) String

func (c ResponseStatusCode) String() string

func (ResponseStatusCode) ToInt32

func (c ResponseStatusCode) ToInt32() int32

type Serializer

type Serializer interface {
	Serialize(msg interface{}) ([]byte, error)
	Deserialize(typeName string, bytes []byte) (interface{}, error)
	GetTypeName(msg interface{}) (string, error)
}

type Unit

type Unit struct {
}

func (*Unit) Descriptor

func (*Unit) Descriptor() ([]byte, []int)

func (*Unit) Equal

func (this *Unit) Equal(that interface{}) bool

func (*Unit) Marshal

func (m *Unit) Marshal() (dAtA []byte, err error)

func (*Unit) MarshalTo

func (m *Unit) MarshalTo(dAtA []byte) (int, error)

func (*Unit) ProtoMessage

func (*Unit) ProtoMessage()

func (*Unit) Reset

func (m *Unit) Reset()

func (*Unit) Size

func (m *Unit) Size() (n int)

func (*Unit) String

func (this *Unit) String() string

func (*Unit) Unmarshal

func (m *Unit) Unmarshal(dAtA []byte) error

func (*Unit) XXX_DiscardUnknown

func (m *Unit) XXX_DiscardUnknown()

func (*Unit) XXX_Marshal

func (m *Unit) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Unit) XXX_Merge

func (m *Unit) XXX_Merge(src proto.Message)

func (*Unit) XXX_Size

func (m *Unit) XXX_Size() int

func (*Unit) XXX_Unmarshal

func (m *Unit) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL