rplx

package module
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2020 License: MIT Imports: 19 Imported by: 0

README

RPLX Go Report Card

Pronounced as Replix

Golang library for multi master replication integer variables with TTL support

todo

Usage examples

func main() {
	r = rplx.New(
		rplx.WithRemoteNodesProvider(remoteNodes()),
	)

	ln, err := net.Listen("tcp4", "127.0.0.1:3001")  

	if err != nil {
		panic(err)
	}

	if err := r.StartReplicationServer(ln); err != nil {
		panic(err)
	}
}

func remoteNodes() []*rplx.RemoteNodeOption {
    nodes := make([]*rplx.RemoteNodeOption, 0)

    nodes = append(nodes, &rplx.DefaultRemoteNodeOption("127.0.0.1:3002")) 

    return nodes
}

Also see example in test folder

Metrics

Creates Rplx instance with option WithMetrics() will registers prometheus metrics.

Name Type Description
rplx_variables_got Counter Vector Stores received variables count with fields: 'remote_node_id'
rplx_variables_sent Counter Vector Stores sent variables count with fields: 'remote_node_id'
rplx_variables_sent_response_codes Counter Vector Stores response code, received while variable sent with fields: 'remote_node_id', 'code'
rplx_variables_sent_duration Histogram Vector Stores duration for Sync Request, fields: 'remote_node_id', 'code'

Also included metrics from package github.com/grpc-ecosystem/go-grpc-prometheus

Public API

Get

Get(name string) (int64, error)

Returns variable value or error, if variable expired or not exists

Errors:

  • ErrVariableNotExists
  • ErrVariableExpired
Delete

Delete(name string) error

Delete variable

Errors:

  • ErrVariableNotExists

By fact this method sets TTL for variable to Now - second, send this info to replication and remove from local cache

UpdateTTL

UpdateTTL(name string, ttl time.Time) error

Update TTL for variable

Errors:

  • ErrVariableNotExists
Upsert

Upsert(name string, delta int64)

Update variable value on provided delta, or create new variable, if not exists

All

All() (notExpired map[string]int64, expired map[string]int64)

Returns two maps, where variable name as map item key and variable value as map item value

First return param contains not expires variables. Second param contains expired (while not garbage colleced) variables

Run integration tests

docker-compose up -d
docker build -t client -f ./test/client/Dockerfile .
docker run --rm --net host client
docker-compose down -v
Additional
  • README.RUS.md

Documentation

Overview

Package test is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrVariableNotExists returns if variable not exists
	ErrVariableNotExists = errors.New("variable not exists")
	// ErrVariableExpired returns if variable is expired
	ErrVariableExpired = errors.New("variable expired")
)

Functions

func RegisterReplicatorServer

func RegisterReplicatorServer(s *grpc.Server, srv ReplicatorServer)

Types

type HelloRequest added in v0.2.0

type HelloRequest struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*HelloRequest) Descriptor added in v0.2.0

func (*HelloRequest) Descriptor() ([]byte, []int)

func (*HelloRequest) ProtoMessage added in v0.2.0

func (*HelloRequest) ProtoMessage()

func (*HelloRequest) Reset added in v0.2.0

func (m *HelloRequest) Reset()

func (*HelloRequest) String added in v0.2.0

func (m *HelloRequest) String() string

func (*HelloRequest) XXX_DiscardUnknown added in v0.2.0

func (m *HelloRequest) XXX_DiscardUnknown()

func (*HelloRequest) XXX_Marshal added in v0.2.0

func (m *HelloRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HelloRequest) XXX_Merge added in v0.2.0

func (dst *HelloRequest) XXX_Merge(src proto.Message)

func (*HelloRequest) XXX_Size added in v0.2.0

func (m *HelloRequest) XXX_Size() int

func (*HelloRequest) XXX_Unmarshal added in v0.2.0

func (m *HelloRequest) XXX_Unmarshal(b []byte) error

type HelloResponse added in v0.2.0

type HelloResponse struct {
	ID                   string   `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*HelloResponse) Descriptor added in v0.2.0

func (*HelloResponse) Descriptor() ([]byte, []int)

func (*HelloResponse) GetID added in v0.2.0

func (m *HelloResponse) GetID() string

func (*HelloResponse) ProtoMessage added in v0.2.0

func (*HelloResponse) ProtoMessage()

func (*HelloResponse) Reset added in v0.2.0

func (m *HelloResponse) Reset()

func (*HelloResponse) String added in v0.2.0

func (m *HelloResponse) String() string

func (*HelloResponse) XXX_DiscardUnknown added in v0.2.0

func (m *HelloResponse) XXX_DiscardUnknown()

func (*HelloResponse) XXX_Marshal added in v0.2.0

func (m *HelloResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HelloResponse) XXX_Merge added in v0.2.0

func (dst *HelloResponse) XXX_Merge(src proto.Message)

func (*HelloResponse) XXX_Size added in v0.2.0

func (m *HelloResponse) XXX_Size() int

func (*HelloResponse) XXX_Unmarshal added in v0.2.0

func (m *HelloResponse) XXX_Unmarshal(b []byte) error

type MockReplicatorClient added in v0.2.11

type MockReplicatorClient struct {
	// contains filtered or unexported fields
}

MockReplicatorClient is a mock of ReplicatorClient interface

func NewMockReplicatorClient added in v0.2.11

func NewMockReplicatorClient(ctrl *gomock.Controller) *MockReplicatorClient

NewMockReplicatorClient creates a new mock instance

func (*MockReplicatorClient) EXPECT added in v0.2.11

EXPECT returns an object that allows the caller to indicate expected use

func (*MockReplicatorClient) Hello added in v0.2.11

Hello mocks base method

func (*MockReplicatorClient) Sync added in v0.2.11

Sync mocks base method

type MockReplicatorClientMockRecorder added in v0.2.11

type MockReplicatorClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockReplicatorClientMockRecorder is the mock recorder for MockReplicatorClient

func (*MockReplicatorClientMockRecorder) Hello added in v0.2.11

func (mr *MockReplicatorClientMockRecorder) Hello(ctx, in interface{}, opts ...interface{}) *gomock.Call

Hello indicates an expected call of Hello

func (*MockReplicatorClientMockRecorder) Sync added in v0.2.11

func (mr *MockReplicatorClientMockRecorder) Sync(ctx, in interface{}, opts ...interface{}) *gomock.Call

Sync indicates an expected call of Sync

type MockReplicatorServer added in v0.2.11

type MockReplicatorServer struct {
	// contains filtered or unexported fields
}

MockReplicatorServer is a mock of ReplicatorServer interface

func NewMockReplicatorServer added in v0.2.11

func NewMockReplicatorServer(ctrl *gomock.Controller) *MockReplicatorServer

NewMockReplicatorServer creates a new mock instance

func (*MockReplicatorServer) EXPECT added in v0.2.11

EXPECT returns an object that allows the caller to indicate expected use

func (*MockReplicatorServer) Hello added in v0.2.11

Hello mocks base method

func (*MockReplicatorServer) Sync added in v0.2.11

Sync mocks base method

type MockReplicatorServerMockRecorder added in v0.2.11

type MockReplicatorServerMockRecorder struct {
	// contains filtered or unexported fields
}

MockReplicatorServerMockRecorder is the mock recorder for MockReplicatorServer

func (*MockReplicatorServerMockRecorder) Hello added in v0.2.11

func (mr *MockReplicatorServerMockRecorder) Hello(arg0, arg1 interface{}) *gomock.Call

Hello indicates an expected call of Hello

func (*MockReplicatorServerMockRecorder) Sync added in v0.2.11

func (mr *MockReplicatorServerMockRecorder) Sync(arg0, arg1 interface{}) *gomock.Call

Sync indicates an expected call of Sync

type Option

type Option func(*Rplx)

Option describe Rplx option

func WithGCInterval

func WithGCInterval(interval time.Duration) Option

WithGCInterval option for set garbage collect interval

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger option for specify logger

func WithMetrics added in v0.4.0

func WithMetrics() Option

WithMetrics option

func WithNodeID added in v0.2.0

func WithNodeID(nodeID string) Option

WithNodeID option for specify rplx node name

func WithReadOnly added in v0.1.0

func WithReadOnly() Option

WithReadOnly option sets read only mode

func WithRemoteNodesCheckInterval added in v0.2.8

func WithRemoteNodesCheckInterval(interval time.Duration) Option

WithRemoteNodesCheckInterval option

func WithRemoteNodesProvider added in v0.2.8

func WithRemoteNodesProvider(provider RemoteNodesProvider) Option

WithRemoteNodesProvider option

func WithReplicationChanCap

func WithReplicationChanCap(c int) Option

WithReplicationChanCap option for set replication channel capacity

type RemoteNodeOption added in v0.2.8

type RemoteNodeOption struct {
	Addr               string
	DialOpts           []grpc.DialOption
	SyncInterval       time.Duration
	MaxBufferSize      int
	ConnectionInterval time.Duration
	WaitSyncCount      int
}

RemoteNodeOption describe options for RemoteNode, returns from RemoteNodeProvider

func DefaultRemoteNodeOption added in v0.2.8

func DefaultRemoteNodeOption(addr string) *RemoteNodeOption

DefaultRemoteNodeOption returns default remoteNodeOption with provided address

type RemoteNodesProvider added in v0.2.8

type RemoteNodesProvider func() []*RemoteNodeOption

RemoteNodesProvider is type for function, called automatically and returns info about remote nodes

type ReplicatorClient

type ReplicatorClient interface {
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
	Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
}

ReplicatorClient is the client API for Replicator service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewReplicatorClient

func NewReplicatorClient(cc *grpc.ClientConn) ReplicatorClient

type ReplicatorServer

type ReplicatorServer interface {
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
	Sync(context.Context, *SyncRequest) (*SyncResponse, error)
}

ReplicatorServer is the server API for Replicator service.

type Rplx

type Rplx struct {
	// contains filtered or unexported fields
}

Rplx describe main Rplx object

func New

func New(opts ...Option) *Rplx

New creates new Rplx

func (*Rplx) All added in v0.2.4

func (rplx *Rplx) All() (notExpired map[string]int64, expired map[string]int64)

All returns all variables values first returns param - not expires variables second param - expires, but not garbage collected variables

func (*Rplx) Delete

func (rplx *Rplx) Delete(name string) error

Delete sets for variable ttl with -1 sec from Now, sends variable to replication (update TTL on clients) and remove variable from rplx.variables map

func (*Rplx) Get

func (rplx *Rplx) Get(name string) (int64, error)

Get returns variable v or error if variable not exists or expired if variable expired, removes variable from rplx.variables map

func (*Rplx) Hello added in v0.2.0

func (rplx *Rplx) Hello(ctx context.Context, req *HelloRequest) (*HelloResponse, error)

Hello is implementation grpc method for get Hello request

func (*Rplx) StartReplicationServer

func (rplx *Rplx) StartReplicationServer(ln net.Listener, grpcOptions ...grpc.ServerOption) error

StartReplicationServer starts grpc server for receive sync messages from remote nodes

func (*Rplx) Stop added in v0.2.10

func (rplx *Rplx) Stop()

Stop Rplx

func (*Rplx) Sync

func (rplx *Rplx) Sync(ctx context.Context, req *SyncRequest) (*SyncResponse, error)

Sync is GRPC function, fired on incoming sync message

func (*Rplx) UpdateTTL

func (rplx *Rplx) UpdateTTL(name string, ttl time.Time) error

UpdateTTL updates TTL for variable or return error if variable not exists

func (*Rplx) Upsert

func (rplx *Rplx) Upsert(name string, delta int64) int64

Upsert change variable on delta or create variable, if not exists returns new value

func (*Rplx) VariablePartsCount added in v0.4.2

func (rplx *Rplx) VariablePartsCount(name string) (int, error)

VariablePartsCount returns count remote nodes parts for variable

type SyncNodeValue

type SyncNodeValue struct {
	Value                int64    `protobuf:"varint,1,opt,name=Value,proto3" json:"Value,omitempty"`
	Version              int64    `protobuf:"varint,2,opt,name=Version,proto3" json:"Version,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*SyncNodeValue) Descriptor

func (*SyncNodeValue) Descriptor() ([]byte, []int)

func (*SyncNodeValue) GetValue

func (m *SyncNodeValue) GetValue() int64

func (*SyncNodeValue) GetVersion added in v0.2.0

func (m *SyncNodeValue) GetVersion() int64

func (*SyncNodeValue) ProtoMessage

func (*SyncNodeValue) ProtoMessage()

func (*SyncNodeValue) Reset

func (m *SyncNodeValue) Reset()

func (*SyncNodeValue) String

func (m *SyncNodeValue) String() string

func (*SyncNodeValue) XXX_DiscardUnknown

func (m *SyncNodeValue) XXX_DiscardUnknown()

func (*SyncNodeValue) XXX_Marshal

func (m *SyncNodeValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SyncNodeValue) XXX_Merge

func (dst *SyncNodeValue) XXX_Merge(src proto.Message)

func (*SyncNodeValue) XXX_Size

func (m *SyncNodeValue) XXX_Size() int

func (*SyncNodeValue) XXX_Unmarshal

func (m *SyncNodeValue) XXX_Unmarshal(b []byte) error

type SyncRequest

type SyncRequest struct {
	NodeID string `protobuf:"bytes,1,opt,name=NodeID,proto3" json:"NodeID,omitempty"`
	// map key - variable name
	Variables            map[string]*SyncVariable `` /* 159-byte string literal not displayed */
	XXX_NoUnkeyedLiteral struct{}                 `json:"-"`
	XXX_unrecognized     []byte                   `json:"-"`
	XXX_sizecache        int32                    `json:"-"`
}

func (*SyncRequest) Descriptor

func (*SyncRequest) Descriptor() ([]byte, []int)

func (*SyncRequest) GetNodeID

func (m *SyncRequest) GetNodeID() string

func (*SyncRequest) GetVariables

func (m *SyncRequest) GetVariables() map[string]*SyncVariable

func (*SyncRequest) ProtoMessage

func (*SyncRequest) ProtoMessage()

func (*SyncRequest) Reset

func (m *SyncRequest) Reset()

func (*SyncRequest) String

func (m *SyncRequest) String() string

func (*SyncRequest) XXX_DiscardUnknown

func (m *SyncRequest) XXX_DiscardUnknown()

func (*SyncRequest) XXX_Marshal

func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SyncRequest) XXX_Merge

func (dst *SyncRequest) XXX_Merge(src proto.Message)

func (*SyncRequest) XXX_Size

func (m *SyncRequest) XXX_Size() int

func (*SyncRequest) XXX_Unmarshal

func (m *SyncRequest) XXX_Unmarshal(b []byte) error

type SyncResponse

type SyncResponse struct {
	Code                 int64    `protobuf:"varint,1,opt,name=Code,proto3" json:"Code,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*SyncResponse) Descriptor

func (*SyncResponse) Descriptor() ([]byte, []int)

func (*SyncResponse) GetCode

func (m *SyncResponse) GetCode() int64

func (*SyncResponse) ProtoMessage

func (*SyncResponse) ProtoMessage()

func (*SyncResponse) Reset

func (m *SyncResponse) Reset()

func (*SyncResponse) String

func (m *SyncResponse) String() string

func (*SyncResponse) XXX_DiscardUnknown

func (m *SyncResponse) XXX_DiscardUnknown()

func (*SyncResponse) XXX_Marshal

func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SyncResponse) XXX_Merge

func (dst *SyncResponse) XXX_Merge(src proto.Message)

func (*SyncResponse) XXX_Size

func (m *SyncResponse) XXX_Size() int

func (*SyncResponse) XXX_Unmarshal

func (m *SyncResponse) XXX_Unmarshal(b []byte) error

type SyncVariable

type SyncVariable struct {
	// map key - nodeID
	NodesValues          map[string]*SyncNodeValue `` /* 163-byte string literal not displayed */
	TTL                  int64                     `protobuf:"varint,2,opt,name=TTL,proto3" json:"TTL,omitempty"`
	TTLVersion           int64                     `protobuf:"varint,3,opt,name=TTLVersion,proto3" json:"TTLVersion,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                  `json:"-"`
	XXX_unrecognized     []byte                    `json:"-"`
	XXX_sizecache        int32                     `json:"-"`
}

func (*SyncVariable) Descriptor

func (*SyncVariable) Descriptor() ([]byte, []int)

func (*SyncVariable) GetNodesValues

func (m *SyncVariable) GetNodesValues() map[string]*SyncNodeValue

func (*SyncVariable) GetTTL

func (m *SyncVariable) GetTTL() int64

func (*SyncVariable) GetTTLVersion added in v0.2.0

func (m *SyncVariable) GetTTLVersion() int64

func (*SyncVariable) ProtoMessage

func (*SyncVariable) ProtoMessage()

func (*SyncVariable) Reset

func (m *SyncVariable) Reset()

func (*SyncVariable) String

func (m *SyncVariable) String() string

func (*SyncVariable) XXX_DiscardUnknown

func (m *SyncVariable) XXX_DiscardUnknown()

func (*SyncVariable) XXX_Marshal

func (m *SyncVariable) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SyncVariable) XXX_Merge

func (dst *SyncVariable) XXX_Merge(src proto.Message)

func (*SyncVariable) XXX_Size

func (m *SyncVariable) XXX_Size() int

func (*SyncVariable) XXX_Unmarshal

func (m *SyncVariable) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL