partition

package
v0.0.0-...-4919d5a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package partition provides utilities to partition requests to a cluster.

Every server in the cluster receives requests meant for any partition but these requests are then routed to the right partition where it is handled.

Each server in the cluster also serves traffic from other servers destined for its own partition.

Usage;

Every server in the cluster creates a router at initialization time:

endpointRegistry = partition.WithEndpointRegistry(...)
router, err := partition.New(ctx, "ownIP:2222", handler, endpointRegistry)

The endpoint registry keeps track of all the servers in the cluster and their local addresses for inter-server communication

The handler is the servers own implementation of requests routed to it by other servers.

When an external request comes in, the server would hash the request and then use the router to execute it on the right server in its cluster:

resp, err := router.Run(ctx, hash, request)

Note that this call would end up being executed on another server in the cluster (or on the local server itself if this hash is mapped to the local server).

The effectiveness of this strategy depends on how uniform the hash is and the mapping strategy. The default mechanism to map hashes to specific servers in the cluster is to use a highest-random-weight algorithm which can be overridden using the WithPicker option.

The inter-server communication is via RPC and this also can be configured to alternate mechanisms using the WithNetwork option.

The requests and responses are expected to be byte slices. For stronger types, protobufs can be used to serialize structures or the runner package (see https://godoc.org/github.com/tvastar/cluster/pkg/partition/runner) for solution using gob-encoding and reflection.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/alicebob/miniredis"
	"github.com/tvastar/cluster/pkg/partition"
)

func main() {
	minir, err := miniredis.Run()
	if err != nil {
		panic(err)
	}
	defer minir.Close()

	opt1 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_"))
	opt2 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_"))

	ctx := context.Background()
	h1 := handler(1)
	h2 := handler(2)

	part1, err := partition.New(ctx, ":2222", h1, opt1)
	if err != nil {
		panic(err)
	}
	defer part1.Close()

	part2, err := partition.New(ctx, ":2223", h2, opt2)
	if err != nil {
		panic(err)
	}
	defer part2.Close()

	// wait for both endpoints to be registered
	time.Sleep(100 * time.Millisecond)

	if _, err := part1.Run(ctx, 555, []byte("hello")); err != nil {
		panic(err)
	}

	if _, err := part2.Run(ctx, 555, []byte("hello")); err != nil {
		panic(err)
	}

	if _, err := part1.Run(ctx, 22222, []byte("hello")); err != nil {
		panic(err)
	}

	if _, err := part2.Run(ctx, 22222, []byte("hello")); err != nil {
		panic(err)
	}

}

type handler int

func (h handler) Run(ctx context.Context, hash uint64, input []byte) ([]byte, error) {
	fmt.Printf("[%d] Run(%d, %s)\n", int(h), hash, string(input))
	return input, nil
}

func (h handler) Close() error {
	return nil
}
Output:

[1] Run(555, hello)
[1] Run(555, hello)
[2] Run(22222, hello)
[2] Run(22222, hello)

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewHashRing

func NewHashRing() func(ctx context.Context, list []string, hash uint64) string

NewHashRing returns a picker which uses a consistent hashing scheme

The scheme is implemented using a hash ring of crc32 hashes.

func NewPicker

func NewPicker() func(ctx context.Context, list []string, hash uint64) string

NewPicker returns a picker which uses a highest random weight algorithm

See https://en.wikipedia.org/wiki/Rendezvous_hashing

Types

type EndpointRegistry

type EndpointRegistry interface {
	RegisterEndpoint(ctx context.Context, addr string) (io.Closer, error)
	ListEndpoints(ctx context.Context, refresh bool) ([]string, error)
}

EndpointRegistry manages the live list of endpoints in a cluster.

The partition package does not cache the results so any requirement to cache this should be

func NewRedisRegistry

func NewRedisRegistry(addr string, prefix string) EndpointRegistry

NewRedisRegistry returns a new registry based on Redis

Note that this fetches the endpoint from redis on every call A wrapper implementation can easily override this behaviour by caching the results of the last call to ListEndpoints.

type IncorrectPartitionError

type IncorrectPartitionError struct{}

IncorrectPartitionError is a transient error that happens when requests end up on the wrong partition.

func (IncorrectPartitionError) Error

func (e IncorrectPartitionError) Error() string

Error returns the error string

type Network

type Network interface {
	DialClient(ctx context.Context, addr string) (RunCloser, error)
	RegisterServer(ctx context.Context, addr string, handler Runner) (io.Closer, error)
}

Network implements the communication network between servers in the clsuter.

func NewRPCNetwork

func NewRPCNetwork(server *grpc.Server) Network

NewRPCNetwork creates a new network setup.

If a grpc.Server is not provided, one is automatically created

type Option

type Option func(c *config)

Option configures the partitioning algorithm.

func WithEndpointRegistry

func WithEndpointRegistry(r EndpointRegistry) Option

WithEndpointRegistry specifies how the endpoints discovery/registration happens.

There is no defaualt endpoint registry.

RedisRegistry implements a Redis-based endpoint registry.

func WithNetwork

func WithNetwork(nw Network) Option

WithNetwork provides a network implementation for servers in the cluster to route requests to eqch othere.

The default mechanism is to use RPC (via NewRPCNetwork)

func WithPicker

func WithPicker(picker func(ctx context.Context, list []string, hash uint64) string) Option

WithPicker specifies how the pick endpoints based on the hash.

The default algorithm is to use the highest random weight algorithm (via NewPicker())

type RunCloser

type RunCloser interface {
	Runner
	io.Closer
}

RunCloser combines Runner and io.Closer

func New

func New(ctx context.Context, addr string, handler Runner, opts ...Option) (RunCloser, error)

New returns a RunCloser which targets requests to specific endpoints based on the hash provided to the request.

This automatically adds the provided address to the cluster. The provided handler is used to serve requests meant for the local server.

Defaults are used for Picker and Network but EndpointRegistry must be specified -- no defaults are used for it.

type Runner

type Runner interface {
	Run(ctx context.Context, hash uint64, input []byte) ([]byte, error)
}

Runner executes a single request with the specified hash

Directories

Path Synopsis
internal
rpc
Package rpc implements partition.Network using rpc
Package rpc implements partition.Network using rpc
Package runner implements a multiplexer on top of the partition package.
Package runner implements a multiplexer on top of the partition package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL