Documentation ¶
Overview ¶
Package partition provides utilities to partition requests to a cluster.
Every server in the cluster receives requests meant for any partition but these requests are then routed to the right partition where it is handled.
Each server in the cluster also serves traffic from other servers destined for its own partition.
Usage;
Every server in the cluster creates a router at initialization time:
endpointRegistry = partition.WithEndpointRegistry(...) router, err := partition.New(ctx, "ownIP:2222", handler, endpointRegistry)
The endpoint registry keeps track of all the servers in the cluster and their local addresses for inter-server communication
The handler is the servers own implementation of requests routed to it by other servers.
When an external request comes in, the server would hash the request and then use the router to execute it on the right server in its cluster:
resp, err := router.Run(ctx, hash, request)
Note that this call would end up being executed on another server in the cluster (or on the local server itself if this hash is mapped to the local server).
The effectiveness of this strategy depends on how uniform the hash is and the mapping strategy. The default mechanism to map hashes to specific servers in the cluster is to use a highest-random-weight algorithm which can be overridden using the WithPicker option.
The inter-server communication is via RPC and this also can be configured to alternate mechanisms using the WithNetwork option.
The requests and responses are expected to be byte slices. For stronger types, protobufs can be used to serialize structures or the runner package (see https://godoc.org/github.com/tvastar/cluster/pkg/partition/runner) for solution using gob-encoding and reflection.
Example ¶
package main import ( "context" "fmt" "time" "github.com/alicebob/miniredis" "github.com/tvastar/cluster/pkg/partition" ) func main() { minir, err := miniredis.Run() if err != nil { panic(err) } defer minir.Close() opt1 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_")) opt2 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_")) ctx := context.Background() h1 := handler(1) h2 := handler(2) part1, err := partition.New(ctx, ":2222", h1, opt1) if err != nil { panic(err) } defer part1.Close() part2, err := partition.New(ctx, ":2223", h2, opt2) if err != nil { panic(err) } defer part2.Close() // wait for both endpoints to be registered time.Sleep(100 * time.Millisecond) if _, err := part1.Run(ctx, 555, []byte("hello")); err != nil { panic(err) } if _, err := part2.Run(ctx, 555, []byte("hello")); err != nil { panic(err) } if _, err := part1.Run(ctx, 22222, []byte("hello")); err != nil { panic(err) } if _, err := part2.Run(ctx, 22222, []byte("hello")); err != nil { panic(err) } } type handler int func (h handler) Run(ctx context.Context, hash uint64, input []byte) ([]byte, error) { fmt.Printf("[%d] Run(%d, %s)\n", int(h), hash, string(input)) return input, nil } func (h handler) Close() error { return nil }
Output: [1] Run(555, hello) [1] Run(555, hello) [2] Run(22222, hello) [2] Run(22222, hello)
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewHashRing ¶
NewHashRing returns a picker which uses a consistent hashing scheme
The scheme is implemented using a hash ring of crc32 hashes.
Types ¶
type EndpointRegistry ¶
type EndpointRegistry interface { RegisterEndpoint(ctx context.Context, addr string) (io.Closer, error) ListEndpoints(ctx context.Context, refresh bool) ([]string, error) }
EndpointRegistry manages the live list of endpoints in a cluster.
The partition package does not cache the results so any requirement to cache this should be
func NewRedisRegistry ¶
func NewRedisRegistry(addr string, prefix string) EndpointRegistry
NewRedisRegistry returns a new registry based on Redis
Note that this fetches the endpoint from redis on every call A wrapper implementation can easily override this behaviour by caching the results of the last call to ListEndpoints.
type IncorrectPartitionError ¶
type IncorrectPartitionError struct{}
IncorrectPartitionError is a transient error that happens when requests end up on the wrong partition.
func (IncorrectPartitionError) Error ¶
func (e IncorrectPartitionError) Error() string
Error returns the error string
type Network ¶
type Network interface { DialClient(ctx context.Context, addr string) (RunCloser, error) RegisterServer(ctx context.Context, addr string, handler Runner) (io.Closer, error) }
Network implements the communication network between servers in the clsuter.
func NewRPCNetwork ¶
NewRPCNetwork creates a new network setup.
If a grpc.Server is not provided, one is automatically created
type Option ¶
type Option func(c *config)
Option configures the partitioning algorithm.
func WithEndpointRegistry ¶
func WithEndpointRegistry(r EndpointRegistry) Option
WithEndpointRegistry specifies how the endpoints discovery/registration happens.
There is no defaualt endpoint registry.
RedisRegistry implements a Redis-based endpoint registry.
func WithNetwork ¶
WithNetwork provides a network implementation for servers in the cluster to route requests to eqch othere.
The default mechanism is to use RPC (via NewRPCNetwork)
type RunCloser ¶
RunCloser combines Runner and io.Closer
func New ¶
New returns a RunCloser which targets requests to specific endpoints based on the hash provided to the request.
This automatically adds the provided address to the cluster. The provided handler is used to serve requests meant for the local server.
Defaults are used for Picker and Network but EndpointRegistry must be specified -- no defaults are used for it.