adaptiveservice

package module
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT Imports: 28 Imported by: 21

README

Adaptiveservice

Adaptiveservice is a message oriented micro service framework.

adaptiveservice go doc

Hello example

Demo

$ cd examples/hello/

# start server, waiting requests from clients, ctrl+c to exit
$ go run server/helloserver.go

# start client in another terminal
# client discovers the server then sends request and prints the reply
$ go run client/helloclient.go
I am hello server, John

We can also start client before server to get the same result, client will wait in discovering stage before server successfully started into service ready.

Client side programing

The client imports the server's messages definitions:

import msg "github.com/godevsig/adaptiveservice/examples/hello/message"

Then we new a client which discovers the service identified by {"example", "hello"} where "example" is the publisher of the service, "hello" is the name of the service. When successfully discovered, a none-nil connection towards that service is established.

c := as.NewClient()

conn := <-c.Discover("example", "hello")
if conn == nil {
	fmt.Println(as.ErrServiceNotFound("example", "hello"))
	return
}
defer conn.Close()

Then we can use the connection to send our request msg.HelloRequest, asking "I am John, who are you?" and wait for the reply msg.HelloReply, then we print the reply.Answer.

request := msg.HelloRequest{Who: "John", Question: "who are you"}
var reply msg.HelloReply
if err := conn.SendRecv(request, &reply); err != nil {
	fmt.Println(err)
	return
}
fmt.Println(reply.Answer)

Here the client knows the protocol with the sersver that msg.HelloReply is the reply type of msg.HelloRequest. The knowledge of message protocol is "business logic", defined in server side, clients use such knowledge by importing the "message" package of the server.

Client discovers service by name

c.Discover() discovers the wanted named service in all available scopes, which are:

  • Process scope: client and server run in same process. This scope uses go channel as transport layer, avoids data copy and serialization cost thus can improve performance significantly. See gshellos to make client and server run in process scope.
  • OS scope: client and server run in same OS, which can be same host OS, or same VM, or same container. This scope uses unix socket as transport layer.
  • LAN scope: client and server run in same LAN, using LAN broadcasting to discover services.
  • WAN scope: This scope requires a "root registry" presents in one "public" node in the network that client and server can access. The root registry also setups proxy when necessary for the server if the server's network is invisible by the client.

By default the client will wait forever in discovering the wanted service, Client.SetDiscoverTimeout can be used set a timeout to discover.

c.Discover() returns a channel from which user can get connections established to the server. There could be many instances providing the same service in the network.

The connection can be multiplexed to get streams that provides independent context to transfer messages. See also gshellos usage

Server side programing

Server needs to define messages it can handle, and the messages it returns to clients as reply:

import as "github.com/godevsig/adaptiveservice"

// HelloRequest is the request from clients
type HelloRequest struct {
	Who      string
	Question string
}

// HelloReply is the reply of HelloRequest to clients
type HelloReply struct {
	Answer string
}

// Handle handles msg.
func (msg HelloRequest) Handle(stream as.ContextStream) (reply interface{}) {
	answer := "I don't know"

	question := strings.ToLower(msg.Question)
	switch {
	case strings.Contains(question, "who are you"):
		answer = "I am hello server"
	case strings.Contains(question, "how are you"):
		answer = "I am good"
	}
	return HelloReply{answer + ", " + msg.Who}
}

func init() {
	as.RegisterType(HelloRequest{})
	as.RegisterType(HelloReply{})
}

Here HelloRequest is a known message. A known message has a Handle method, which defines how to process the msg sent by clients. In the init function, the instances of known message and the reply message are registered to Adaptiveservice type system which are used to route the incoming request message(HelloRequest here) to its handler.

  • Incoming message are handled concurrently in a goroutine worker pool
  • The reply returned by the handler will be delivered to the client if it is none-nil.
  • The reply can also be an error, which will be delivered as err in Recv(msgPtr) err of client.
  • as.ContextStream can be optionally used:
    • Each stream has its own context which can be used to set/get context of the same stream.
    • Inside the message handler, the server can exchange messages with the client. See subsequent message
Server publishes service

The messages are defined in a package, server main package imports examples/hello/message.

import (
	"fmt"

	as "github.com/godevsig/adaptiveservice"
	msg "github.com/godevsig/adaptiveservice/examples/hello/message"
)

func main() {
	s := as.NewServer().SetPublisher("example")

	knownMsgs := []as.KnownMessage{msg.HelloRequest{}}
	if err := s.Publish("hello", knownMsgs); err != nil {
		fmt.Println(err)
		return
	}

	if err := s.Serve(); err != nil { // ctrl+c to exit
		fmt.Println(err)
	}
}

By calling as.NewServer() followed by s.Publish, the server registers the known messages to service {"example", "hello"}, then server enters Serve to service the requests.

See also publish a service

See also a more useful example that has multiplexed streams and implements PUB/SUB pattern: echo example

3 API to transfer messages

Messages are server defined structs, or base types in golang. Unlike protobuf that users define .proto files and compile them into langueage source code, Adaptiveservice uses plain go file: servers define them, client imports them. We found in this way it is simpler to maintain the code, but of course it only supports native golang.

  • Send(msg) and Recv(msgPtr): send or receive "one-way" data
  • SendRecv(msg, msgPtr): send and wait for reply from peer

See also sned recv and sendrecv

Block diagram

adaptiveservice architecture

Performance

In gshellos based grepo, there is a asbench test that mimics a download test:

c := as.NewClient(as.WithLogger(lg), as.WithScope(Scope)).SetDiscoverTimeout(3)
conn := <-c.Discover(asbench.Publisher, asbench.Service)
if conn == nil {
	return as.ErrServiceNotFound(asbench.Publisher, asbench.Service)
}
defer conn.Close()

var running = true

dld := func() int64 {
	stream := conn.NewStream()
	req := asbench.DownloadRequest{Name: "testdld", ID: int32(1), Size: int32(*size)}
	//fmt.Println("request:", req)
	var rep []byte
	var counter int64
	for running {
		if err := stream.SendRecv(&req, &rep); err != nil {
			panic(err)
		}
		counter++
	}
	//fmt.Println("reply:", rep)
	return counter
}

The performance result is: adaptiveservice performance

Documentation

Overview

Package adaptiveservice is a message oriented micro service framework.

Servers define micro services identified as name of "publisher_service" and publish them to all available scopes: in same process, and further in same OS, and then further in same local network, and then public network where a public root registry address needs to be configured. In process and OS scope, one service name can only be announced once, duplicated service name is treated as error. In network scope, there can be multiple services with the same name, in this case, each service provider publishes the service "publisher_service" along with an unique provider ID.

Clients then discover wanted micro services in a way that shortest scope comes first. The discover() API returns a connection channel, reading the channel the client will get one or more connections, with each represents a connection to one of the service providers providing the wanted micro service. The connection then can be used to send/receive messages to/from the service provider.

Connections can be multiplexed on client side: NewStream() API creates a new context in which the messages are transferred independently from other contexts over the same underlying connection. The intention of the multiplexer is to have scalability on client side: users use this mechanism to send parallel request messages towards the same service provider to increase execution concurrency.

For server side, the incoming messages are handled in auto-scaled worker pool, so the multiplexer used on client side is not needed on server side. Servers listen to different transports for all available scopes:

process scope, go channels are used
OS scope, unix domain socket is used
network scope, tcp socket is used

Messages that satisfy Handle() interface are known messages. Typically server defines Handle() method for every message type it can handle, then when the known message arrived on one of the transports it is listening, the message is delivered to one of the workers in which the message's Handle() is called. Clients do not define Handle() method, they just send and receive message in a natural synchronized fashion.

Services that are behind NAT can be auto proxied by the builtin reverseProxy service provided by the daemon server in the local network or by the root registry.

Index

Constants

View Source
const (
	// BuiltinPublisher name
	BuiltinPublisher = "builtin"
	// OK can be returned by known messages as reply to indicate
	// everything is OK. Client should use type int to receive it.
	OK = 0
)
View Source
const (
	SrvMessageTracing  = "messageTracing"
	MaxTracingSessions = 4096
)

SrvMessageTracing : service messageTracing

View Source
const (
	// DefaultQSizePerCore is the default value for qSizePerCore
	DefaultQSizePerCore int = 128
	// DefaultQWeight is the default value for qWeight
	DefaultQWeight int = 8
)
View Source
const SrvIPObserver = "IPObserver"

SrvIPObserver : service IPObserver

View Source
const SrvLANRegistry = "LANRegistry"

SrvLANRegistry : service LANRegistry

View Source
const SrvProviderInfo = "providerInfo"

SrvProviderInfo : service providerInfo

View Source
const SrvRegistryInfo = "registryInfo"

SrvRegistryInfo : service registryInfo

View Source
const SrvReverseProxy = "reverseProxy"

SrvReverseProxy : service reverseProxy

View Source
const SrvServiceLister = "serviceLister"

SrvServiceLister : service serviceLister

Variables

View Source
var (
	// ErrServiceNotReachable is an error where the service exists
	// but somehow can not be reached, e.g. the service is behind NAT.
	ErrServiceNotReachable = errors.New("service not reachable")
	// ErrConnReset is an error where the connection was forced closed
	// by peer.
	ErrConnReset = errors.New("connection reset by peer")
	// ErrServerClosed is an error where the server was closed by signal.
	ErrServerClosed = errors.New("server closed by signal")
	// ErrRecvTimeout is an error where no data was received within
	// specified duration.
	ErrRecvTimeout = errors.New("receive timeout")
)

Functions

func ErrServiceNotFound

func ErrServiceNotFound(publisher, service string) error

ErrServiceNotFound returns an error that no wanted service was found

func GetKnownMessageTypes added in v0.10.3

func GetKnownMessageTypes() []string

GetKnownMessageTypes returns all KnownMessage types

func GetRegisteredTypeByName added in v0.10.3

func GetRegisteredTypeByName(name string) reflect.Type

GetRegisteredTypeByName returns a reflect.Type if it has been registered

func NewStreamIO added in v0.9.5

func NewStreamIO(stream Stream) io.ReadWriteCloser

NewStreamIO wraps the stream to be an io.ReadWriteCloser in which Read() is a Stream.Recv() that only receives []byte, Write is a Stream.Send() that only sends []byte. Use Read() Write() in pair on the client/server peer, don't mix use them with Send() or Recv().

func ReadAllTracedMsg added in v0.10.4

func ReadAllTracedMsg() (string, error)

ReadAllTracedMsg is equivalent to ReadTracedMsg("00000000-0000-0000-0000-000000000000.0")

func ReadTracedMsg added in v0.10.1

func ReadTracedMsg(token string) (string, error)

ReadTracedMsg reads all the collected traced messages by the token returned by TraceMsgByType().

token: should be in the format like 30180061-1044-4b9e-a8ee-174806afe058.0
 Special tokens starting with 00000000-0000-0000-0000-000000000000
 are used to retrieve all records on behalf of all previous tokens.

func RegisterType

func RegisterType(i interface{})

RegisterType registers the type infomation to encoding sub system.

func RegisterTypeNoPanic added in v0.9.24

func RegisterTypeNoPanic(i interface{}) (err error)

RegisterTypeNoPanic is like RegisterType but recovers from panic.

func TraceMsgByName added in v0.10.3

func TraceMsgByName(name string, count uint32) (token string, err error)

TraceMsgByName is like TraceMsgByType but takes the message type name

func TraceMsgByNameWithFilters added in v0.10.5

func TraceMsgByNameWithFilters(name string, count uint32, filters []string) (token string, err error)

TraceMsgByNameWithFilters is like TraceMsgByTypeWithFilters but takes the message type name

func TraceMsgByType added in v0.10.1

func TraceMsgByType(msg any, count uint32) (token string, err error)

TraceMsgByType traces the message type specified by 'msg' 'count' times repeatedly. Each matching of the 'msg' type starts a message tracing session and has a unique session token.

Tracing is type based and always starts from the client side. If a message to be sent by client matches the specified type, it is marked as traced message, a special traced flag will be carried along the entire path across all the service nodes that are involved to handle this message.

When such messages with traced flag are being handled in `Handle(ContextStream) any` on server side, all subsequent messages under the same stream context are related messages. All related messages will also carry the same traced flag and propagate the flag further to next hop and next next hop... All related messages with traced flag will be recorded by built-in service "messageTracing".

msg: any value with the same type of the message to be traced

A call to TraceMsgByType() with 'count' equals 1 only starts a one time tracing session. The subsequent messages with the same type will not be traced unless another call to TraceMsgByType() is made with the same type. TraceMsgByType can work with different input message types at the same time.

msg: any value with the same type of the message to be traced
count: the maximum number is the value of MaxTracingSessions
 >0: trace count times
  0: stop tracing the specified message type

The returned token has below forms:

if count > 1, for example count = 100
 30180061-1044-4b9e-a8ee-174806afe058.0..99
if count = 1:
 30180061-1044-4b9e-a8ee-174806afe058.0
if count = 0:
 NA

func TraceMsgByTypeWithFilters added in v0.10.5

func TraceMsgByTypeWithFilters(msg any, count uint32, filters []string) (token string, err error)

TraceMsgByTypeWithFilters is like TraceMsgByType but takes extra filters. Filter should be in the form of "field=pattern", in which pattern supports simple wildcard. Tracing sessions start only if all filters match their patterns.

func UnTraceMsgAll added in v0.10.4

func UnTraceMsgAll()

UnTraceMsgAll untags all message types that have been tagged by TraceMsgBy* functions

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client uses services.

func NewClient

func NewClient(options ...Option) *Client

NewClient creates a client which discovers services.

func (*Client) Discover

func (c *Client) Discover(publisher, service string, providerIDs ...string) <-chan Connection

Discover discovers the wanted service and returns the connection channel, from which user can get one or more connections. Each connection represents a connection to one of the service providers providing the wanted micro service.

Only one service(identified by publisher name and service name) can exist in ScopeProcess and ScopeOS, but in ScopeLAN and ScopeWAN there can be many systems providing the same service, each systeam(called provider) has an unique provider ID.

Use providerIDs to select target providers in ScopeLAN or ScopeWAN, if no provider id presents, discover searches scopes by distance that is in the order of ScopeProcess, ScopeOS, ScopeLAN, ScopeWAN, and returns only one connection towards the found service which may have been randomly selected if more than one services were found.

If any of publisher or service or provider ids contains "*", discover will return all currently available connections of the wanted service(s). Make sure to close ALL the connections it returns after use.

func (*Client) SetDeepCopy

func (c *Client) SetDeepCopy() *Client

SetDeepCopy sets the client to deep copy the message before sending it out. This option is only useful for process scope where by default the server can modify the message that the client owns because server and client in the same process space, and this case only happens when the message handler in the server has reference receiver.

Unexported fields in the message are not copied but set to zero value. Default is zero copy.

func (*Client) SetDiscoverTimeout

func (c *Client) SetDiscoverTimeout(timeout int) *Client

SetDiscoverTimeout sets the wait timeout in seconds of the discover procedure. The discover procedure waits for the wanted service to be available until timeout. The connection channel returned by Discover() will be closed if timeout happens.

> 0 : wait timeout seconds
= 0 : do not wait
< 0 : wait forever

Default -1.

func (*Client) SetProviderSelectionMethod added in v0.12.1

func (c *Client) SetProviderSelectionMethod(method ProviderSelectionMethod) *Client

SetProviderSelectionMethod sets the ProviderSelectionMethod. It can be called multiple times with different ProviderSelectionMethod and the provider selection procedure will follow the calling sequence, e.g.

c.SetProviderSelectionMethod(Capacity).SetProviderSelectionMethod(Latency)

In this case, it will first select the top 50% of the provider(s) with Capacity method. If there are still multiple providers, the Latency method is then applied in turn to also select top 50%. The Random ProviderSelectionMethod is applied at last if needed to ensure that wanted number of provider(s) will be ultimately selected.

type Connection

type Connection interface {
	// default stream.
	Stream
	// NewStream creates a new stream.
	NewStream() Stream
	// Close closes the connection.
	Close()
}

Connection is the connection between client and server.

type Context

type Context interface {
	// PutVar puts value v to the underlying map overriding the old value of the same type.
	PutVar(v interface{})

	// GetVar gets value that v points to from the underlying map, it panics if v
	// is not a non-nil pointer.
	// The value that v points to will be set to the value in the context if value
	// of the same type has been putted to the map, otherwise zero value will be set.
	GetVar(v interface{})

	// SetContext sets the context with value v which supposedly is a pointer to
	// an instance of the struct associated to the connection.
	// It panics if v is not a non-nil pointer.
	// It is supposed to be called only once upon a new connection is connected.
	SetContext(v interface{})
	// GetContext gets the context that has been set by SetContext.
	GetContext() interface{}
}

Context represents a context.

type ContextStream

type ContextStream interface {
	Context
	Stream
}

ContextStream is a stream with an associated context. Messages from the same stream have the same context, their handlers may be executed concurrently.

type GetObservedIP added in v0.9.5

type GetObservedIP struct{}

GetObservedIP returns the observed IP of the client. The reply is string type.

func (GetObservedIP) Handle added in v0.9.5

func (msg GetObservedIP) Handle(stream ContextStream) (reply interface{})

Handle handles GetObservedIP message.

type HighPriorityMessage

type HighPriorityMessage interface {
	KnownMessage
	IsHighPriority()
}

HighPriorityMessage is high priority KnownMessage.

type KnownMessage

type KnownMessage interface {
	// Handle handles the message.
	// If reply is nil, no message will be sent back.
	// If reply is not nil, the value will be sent back to the stream peer.
	// If reply is error type, the peer's Recv() call will return it as error.
	// Otherwise the reply will be received by the peer's Recv() as normal message.
	//
	// The message may be marshaled or compressed.
	// Remember in golang assignment to interface is also value copy,
	// so return reply as &someStruct whenever possible in your handler implementation.
	//
	// Users can directly use ContextStream to send/receive messages to/from the stream
	// peer(the client) via a private dedicated channel.
	//
	// Use of reply is more like RPC fashion, where clients "call" the Handle() method
	// on the server side as if Handle() were called on clients.
	// Always return reply to client is a good idea since client is waiting for the reply.
	// In the case where only final status is needed, return builtin OK on success or
	// return error type if any error happened, e.g. return errors.New("some error").
	//
	// Use of ContextStream is more like client and server entered in an interactive
	// session, in which several messages are exchanged between client and server.
	//
	// Cares should be taken if you mix the use of reply and ContextStream.
	Handle(stream ContextStream) (reply interface{})
}

KnownMessage represents a message with a handler that knows how to process the message. KnownMessage is normal priority message.

type ListService

type ListService struct {
	TargetScope Scope
	Publisher   string
	Service     string
}

ListService lists all services in specified scopes matching publisher/service name which can be wildcard:

 "*" matches all
"*bar*" matches bar, foobar, or foobarabc
"foo*abc*" matches foobarabc, foobarabc123, or fooabc

The reply is [4][]*ServiceInfo

func (*ListService) Handle

func (msg *ListService) Handle(stream ContextStream) (reply interface{})

Handle handles ListService message.

type Logger

type Logger interface {
	Debugf(format string, args ...interface{})
	Infof(format string, args ...interface{})
	Warnf(format string, args ...interface{})
	Errorf(format string, args ...interface{})
}

Logger is the logger interface.

type LoggerAll

type LoggerAll struct{}

LoggerAll prints all regardless of loglevel

func (LoggerAll) Debugf

func (LoggerAll) Debugf(format string, args ...interface{})

Debugf is Debugf

func (LoggerAll) Errorf

func (LoggerAll) Errorf(format string, args ...interface{})

Errorf is Errorf

func (LoggerAll) Infof

func (LoggerAll) Infof(format string, args ...interface{})

Infof is Infof

func (LoggerAll) Warnf

func (LoggerAll) Warnf(format string, args ...interface{})

Warnf is Warnf

type LoggerNull

type LoggerNull struct{}

LoggerNull prints no log

func (LoggerNull) Debugf

func (LoggerNull) Debugf(format string, args ...interface{})

Debugf is Debugf

func (LoggerNull) Errorf

func (LoggerNull) Errorf(format string, args ...interface{})

Errorf is Errorf

func (LoggerNull) Infof

func (LoggerNull) Infof(format string, args ...interface{})

Infof is Infof

func (LoggerNull) Warnf

func (LoggerNull) Warnf(format string, args ...interface{})

Warnf is Warnf

type LowPriorityMessage

type LowPriorityMessage interface {
	KnownMessage
	IsLowPriority()
}

LowPriorityMessage is Low priority KnownMessage.

type MsgQInfo added in v0.12.1

type MsgQInfo struct {
	NumCPU          int
	ResidentWorkers int
	QueueWeight     int
	QueueLen        int
	QueueSize       int
	BusyWorkerNum   int
	IdleWorkerNum   int
}

MsgQInfo is the running status of the message queue

type Netconn

type Netconn interface {
	// Close closes the connection.
	// Any blocked Read or Write operations will be unblocked and return errors.
	Close() error
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr
	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr
}

Netconn is the underlying net connection.

type Option

type Option func(*conf)

Option is option to be set.

func WithLogger

func WithLogger(lg Logger) Option

WithLogger sets the logger.

func WithProviderID

func WithProviderID(id string) Option

WithProviderID sets the provider ID which is used to identify service in the network where there are multiple publisher_service instances found in the registry. Provider ID is usually shared by servers in the same network node.

func WithQsize

func WithQsize(size int) Option

WithQsize sets the transport layer message queue size.

func WithRegistryAddr

func WithRegistryAddr(addr string) Option

WithRegistryAddr sets the registry address in format ip:port.

func WithScope

func WithScope(scope Scope) Option

WithScope sets the publishing or discovering scope, which is an ORed value of ScopeProcess, ScopeOS, ScopeLAN or ScopeWAN. Default is ScopeAll.

type ProviderSelectionMethod added in v0.12.1

type ProviderSelectionMethod uint16

ProviderSelectionMethod is the method to select the most suitable service provider when client discovered provider candidates more than one.

Random the last method used when there are still more than one providers after other ProviderSelectionMethods were applied.

const (
	// Capacity is the strategy to select the service provider which should
	// be more capable to serve the client. The selected provider
	// has fewer items to precess or is under lighter working load
	// or has more workers.
	Capacity ProviderSelectionMethod = iota + 1
	// Latency is the strategy to select the service provider that has faster
	// response time, this usually indicates better networking connection
	// and the service provider is in a healthy responsive condition.
	Latency
	// Invalid is invalid strategy
	Invalid
)

type QueryMsgQInfo added in v0.12.1

type QueryMsgQInfo struct{}

QueryMsgQInfo gets the MsgQInfo

func (QueryMsgQInfo) Handle added in v0.12.1

func (msg QueryMsgQInfo) Handle(stream ContextStream) (reply any)

Handle handles QueryMsgQInfo

type ReqProviderInfo

type ReqProviderInfo struct{}

ReqProviderInfo gets self provider ID, reply with string.

func (*ReqProviderInfo) Handle

func (msg *ReqProviderInfo) Handle(stream ContextStream) (reply interface{})

Handle handles ReqProviderInfo.

type Scope

type Scope uint16

Scope is publishing and discovering scope

const (
	// ScopeProcess is a scope where publishing and discovering services
	// only happen in same process.
	ScopeProcess Scope = 1 << iota
	// ScopeOS is a scope where publishing and discovering services
	// only happen in same OS.
	ScopeOS
	// ScopeLAN is a scope where publishing and discovering services
	// only happen in same local network.
	ScopeLAN
	// ScopeWAN is a scope where publishing and discovering services
	// only happen in same reachable public network.
	ScopeWAN
	// ScopeNetwork is a shortcut for ScopeLAN and ScopeWAN
	ScopeNetwork = ScopeLAN | ScopeWAN
	// ScopeAll includes all scopes, this is the default value if
	// no other Scope specified.
	ScopeAll = ScopeProcess | ScopeOS | ScopeLAN | ScopeWAN
)

type Server

type Server struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Server provides services.

func NewServer

func NewServer(options ...Option) *Server

NewServer creates a server which publishes services.

func (*Server) Close

func (s *Server) Close()

Close triggers the close procedure of the server.

func (*Server) CloseWait added in v0.9.14

func (s *Server) CloseWait()

CloseWait triggers the close procedure and waits until the server is fully closed.

func (*Server) DisableMsgTypeCheck

func (s *Server) DisableMsgTypeCheck() *Server

DisableMsgTypeCheck disables message type checking for incoming messages.

func (*Server) EnableAutoReverseProxy

func (s *Server) EnableAutoReverseProxy() *Server

EnableAutoReverseProxy tries to enable reverse proxy.

func (*Server) EnableIPObserver added in v0.9.5

func (s *Server) EnableIPObserver() *Server

EnableIPObserver enables IP observer service which helps the requesting client to find out its observed IP address.

func (*Server) EnableMessageTracer added in v0.10.1

func (s *Server) EnableMessageTracer() *Server

EnableMessageTracer enables message tracing service which helps to collect local traced messages.

func (*Server) EnableRootRegistry

func (s *Server) EnableRootRegistry() *Server

EnableRootRegistry makes the server become root registry.

func (*Server) EnableServiceLister

func (s *Server) EnableServiceLister() *Server

EnableServiceLister enables lister service which can list available services.

func (*Server) Publish

func (s *Server) Publish(serviceName string, knownMessages []KnownMessage, options ...ServiceOption) error

Publish publishes service to all available scope of Server s. knownMessages are messages that the service can handle, e.g. []KnownMessage{(*PublicStructA)(nil), (*PublicStructB)(nil), ...}, where (*PublicStructA) and (*PublicStructB) are the known messages that have `Handle(stream ContextStream) reply interface{}` method.

Publish panics if serviceName contains "_" or "/" or whitespace.

func (*Server) PublishIn added in v0.9.9

func (s *Server) PublishIn(scope Scope, serviceName string, knownMessages []KnownMessage, options ...ServiceOption) error

PublishIn is like Publish, but with specified scope which should be a subset of the scope of Server s.

func (*Server) Serve

func (s *Server) Serve() error

Serve starts serving.

func (*Server) SetBroadcastPort

func (s *Server) SetBroadcastPort(port string) *Server

SetBroadcastPort sets the broadcast port used by lan registry.

func (*Server) SetPublisher

func (s *Server) SetPublisher(publisherName string) *Server

SetPublisher declares the publisher of the server, which is usually an organization name. The default value of publisherName is "default.org".

SetPublisher panics if publisherName contains "_" or "/" or whitespace.

func (*Server) SetScaleFactors added in v0.9.20

func (s *Server) SetScaleFactors(residentWorkers, qSizePerCore, qWeight int) *Server

SetScaleFactors sets the scale factors to be applied on the internal message queue.

residentWorkers:
  > 0: the number of resident workers.
 <= 0: uses default value. Default is 1.
qSizePerCore:
  > 0: the internal message queue size per core.
 <= 0: uses default value. Default is DefaultQSizePerCore.
qWeight:
  > 0: the weight of the message queue.
    0: uses default value. Default is DefaultQWeight.
  < 0: the number of workers is fixed at residentWorkers.

A Server has one internal message queue, messages received from transport layer are put into the queue, a number of workers get message from the queue and handle it. The size of the message queue is qSizePerCore*core number.

When qWeight >= 0, a worker pool is created, the number of wokers scales automatically in the pool. The target number of available workers is dynamically adjusted by below formula periodically:

target available workers = len(message queue)/qWeight + residentWorkers

The worker pool continuously adds/removes workers to follow the target.

Be careful to set qWeight < 0, which effectively disables auto scala worker pool, which in turn only uses fixed number of workers(residentWorkers). Forever blocking may occur in such case, especially when residentWorkers = 1.

type ServiceInfo

type ServiceInfo struct {
	Publisher  string
	Service    string
	ProviderID string
	Addr       string // "192.168.0.11:12345", "192.168.0.11:12345P" if proxied
}

ServiceInfo is service information.

type ServiceOption

type ServiceOption func(*service)

ServiceOption is option for service.

func OnConnectFunc

func OnConnectFunc(fn func(Netconn) (takeOver bool)) ServiceOption

OnConnectFunc sets a function which is called when new incoming connection is established. Further message dispaching on this connection will stop if fn returns true, leaving the connection NOT closed, fn should then take over this Netconn and close it when finished.

func OnDisconnectFunc

func OnDisconnectFunc(fn func(Netconn)) ServiceOption

OnDisconnectFunc sets a function which is called when the connection was disconnected.

func OnNewStreamFunc

func OnNewStreamFunc(fn func(Context)) ServiceOption

OnNewStreamFunc sets a function which is called to initialize the context when new incoming stream is accepted.

func OnStreamCloseFunc added in v0.10.1

func OnStreamCloseFunc(fn func(Context)) ServiceOption

OnStreamCloseFunc sets a function which is called when the stream is being closed.

type Stream

type Stream interface {
	// Send sends a message to the stream peer. If msg is an error value, it will
	// be received and returned by peer's Recv() as error.
	Send(msg interface{}) error

	// Recv receives a message from the stream peer and stores it into the value
	// that msgPtr points to.
	//
	// msgPtr can be nil, where user only cares about error, otherwise
	// it panics if msgPtr is not a non-nil pointer.
	Recv(msgPtr interface{}) error

	// SendRecv combines send and receive on the same stream.
	SendRecv(msgSnd interface{}, msgRcvPtr interface{}) error

	// GetNetconn gets the transport connection.
	GetNetconn() Netconn

	// SetRecvTimeout sets the timeout for each Recv(), which waits at least duration
	// d and returns ErrRecvTimeout if no data was received within that duration.
	// A negative or zero duration causes Recv() waits forever.
	// Default is 0.
	SetRecvTimeout(d time.Duration)
	// Close closes the stream
	Close()
}

Stream is an independent channel multiplexed from the underlying connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL