members

package module
v0.0.0-...-99de08e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2023 License: MIT Imports: 17 Imported by: 4

README

members

hashicorp memberlist extension

Usage

go get github.com/tsingsun/members
package main

import (
	"context"
	"flag"
	"github.com/tsingsun/members"
	"github.com/vmihailenco/msgpack/v5"
	"strings"
)

var (
	peers   = flag.String("peers", "", "comma seperated list of peers")
	address = flag.String("address", ":4001", "http host:port")
)

func init() {
	flag.Parse()
}

func main() {
	var ms []string
	if len(*peers) > 0 {
		ms = strings.Split(*peers, ",")
	}
	group, err := members.NewPeer()
	if err != nil {
		panic(err)
	}
	group.Options.KnownPeers = ms
	if err = group.Join(context.Background()); err != nil {
		panic(err)
	}
	// OrderHandler implement members.Shard interface
	orderhdl := &OrderHandler{
		ShardId: "order",
	}
	sd, err := group.AddShard(orderhdl)
	if err != nil {
		panic(err)
	}
	orderhdl.Spreader = sd
}

type OrderHandler struct {
	ShardId  string
	Spreader members.Spreader
	Orders   []string
}

// Name returns the name of the shard, which is used to identify the shard.
func (OrderHandler) Name() string {
	return "order"
}

// MarshalBinary marshals the shard data into a binary to sync other nodes.
func (OrderHandler) MarshalBinary() ([]byte, error) {
	panic("implement me")
}

// Merge data from remote node MarshalBinary result. The Shard should be able to dedupe the data.
func (OrderHandler) Merge(b []byte) error {
	panic("implement me")
}

func (o OrderHandler) Receive(ord string) error {
	bs, err := msgpack.Marshal([]string{ord})
	if err != nil {
		return err
	}
	return o.Spreader.Broadcast(bs)
}

plz See example

Notice: The Best Practice is to use array object and version control to sync the data.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OversizedMessage

func OversizedMessage(b []byte, size int) bool

OversizedMessage indicates whether or not the byte payload should be sent via TCP.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a channel for communication between shard holding nodes.

func NewChannel

func NewChannel(shardName string, peer *Peer) (*Channel, error)

NewChannel creates a new channel for the given shard.

func (*Channel) Broadcast

func (c *Channel) Broadcast(msg []byte) error

Broadcast sends a message to all nodes in the channel.

func (*Channel) Stop

func (c *Channel) Stop(_ context.Context) error

Stop stops the channel.

type NoopSpreader

type NoopSpreader struct{}

func (*NoopSpreader) Broadcast

func (n *NoopSpreader) Broadcast([]byte) error

type Option

type Option func(*Options)

func WithConfiguration

func WithConfiguration(cnf *conf.Configuration) Option

type Options

type Options struct {
	ID         string                   `yaml:"id" json:"id"`
	Cnf        *conf.Configuration      `yaml:"-" json:"-"`
	KnownPeers []string                 `yaml:"known" json:"known"`
	Event      memberlist.EventDelegate `yaml:"event" json:"event"`
	Delegate   memberlist.Delegate      `yaml:"delegate" json:"delegate"`
	JoinTTL    time.Duration            `yaml:"joinTTL" json:"joinTTL"`
	JoinRetry  int                      `yaml:"joinRetry" json:"joinRetry"`
}

type Payload

type Payload struct {
	Key  string
	Data []byte
}

type Peer

type Peer struct {
	Options
	// contains filtered or unexported fields
}

Peer is a memberlist Node wrapper

func NewPeer

func NewPeer(opts ...Option) (*Peer, error)

func (*Peer) AddShard

func (p *Peer) AddShard(sd Shard) (Spreader, error)

func (*Peer) Address

func (p *Peer) Address() string

func (*Peer) Join

func (p *Peer) Join(ctx context.Context) error

func (*Peer) MemberCount

func (p *Peer) MemberCount() int

func (*Peer) OthersNodes

func (p *Peer) OthersNodes() []*memberlist.Node

func (*Peer) ReliableMsgHandle

func (p *Peer) ReliableMsgHandle(ctx context.Context) error

func (*Peer) SendReliable

func (p *Peer) SendReliable(b []byte)

func (*Peer) Start

func (p *Peer) Start(ctx context.Context) error

func (*Peer) Stop

func (p *Peer) Stop(ctx context.Context) error

type Shard

type Shard interface {
	// Name returns the name of the shard, which is used to identify the shard.
	Name() string
	// MarshalBinary marshals the shard data into a binary to sync other nodes.
	MarshalBinary() ([]byte, error)
	// Merge data from remote node MarshalBinary result. The Shard should be able to dedupe the data.
	Merge(b []byte) error
}

Shard is some specified distributed data handler.

type Spreader

type Spreader interface {
	// Broadcast sends a message to all nodes in the cluster.
	Broadcast([]byte) error
}

Spreader is an interface for transporting messages to other nodes in the cluster.

Directories

Path Synopsis
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL