nrelay

package module
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

README

nats-relay

Apache License GoDoc Go Report Card Releases

Simple low-latency NATS relay(replication) server.

Relay(replicate) one or any Topics in NATS server to another NATS server.

nats-relay usage

Configuration

Configuration is done using a YAML file:

primary:   "nats://primary-natsd.local:4222/"
secondary: "nats://secondary-natsd.local:4222/"
nats:      "nats://localhost:4222/"
topic:
  "foo.>":
    worker: 2
  "bar.*":
    worker: 2
  "baz.1.>":
    worker: 2
  "baz.2.>":
    worker: 2

Specifiable wildcard('>' or '*') topicss are available

see more examples

Embeding

import (
	"log"
	"time"
	"os"
	"os/signal"
	"syscall"

	"github.com/nats-io/nats.go"
	"github.com/octu0/nats-relay"
)

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	serverConf := nrelay.DefaultServerConfig()
	relayConf := nrelay.RelayConfig{
		PrimaryUrl:   "nats://primary-natsd.local:4222",
		SecondaryUrl: "nats://secondary-natsd.local:4222",
		NatsUrl:      "nats://localhost:4222",
		Topics: nrelay.Topics(
			nrelay.Topic("foo.>", nrelay.WorkerNum(2)),
			nrelay.Topic("bar.*", nrelay.WorkerNum(2)),
			nrelay.Topic("baz.1.>", nrelay.WorkerNum(2)),
			nrelay.Topic("baz.2.>", nrelay.WorkerNum(2)),
		),
	}
	executor := chanque.NewExecutor(10, 100)
	logger := log.New(os.Stdout, "nrelay ", log.Ldate|log.Ltime|log.Lshortfile)

	svr := nrelay.NewServer(
		nrelay.ServerOptRelayConfig(relayConfig),
		nrelay.ServerOptExecutor(executor),
		nrelay.ServerOptLogger(logger),
		nrelay.ServerOptNatsOptions(
			nats.PingInterval(500*time.Millisecond),
			nats.ReconnectBufSize(16*1024*1024),
			nats.CustomDialer(...),
		),
	)
	svr.Run(ctx)
}

Build

Build requires Go version 1.16+ installed.

$ go version

Run make pkg to Build and package for linux, darwin.

$ git clone https://github.com/octu0/nats-relay
$ make pkg

Help

NAME:
   nats-relay

USAGE:
   nats-relay [global options] command [command options] [arguments...]

VERSION:
   1.7.0

COMMANDS:
     relay    run relay server
     help, h  Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --debug, -d    debug mode [$NRELAY_DEBUG]
   --verbose, -V  verbose. more message [$NRELAY_VERBOSE]
   --help, -h     show help
   --version, -v  print the version
subcommand: relay
NAME:
   nats-relay relay - run relay server

USAGE:
   nats-relay relay [command options] [arguments...]

OPTIONS:
   --yaml value, -c value  relay configuration yaml file path (default: "./relay.yaml") [$NRELAY_RELAY_YAML]
   --pool-min value        goroutine pool min size (default: 100) [$NRELAY_POOL_MIN]
   --pool-max value        goroutine pool min size (default: 1000) [$NRELAY_POOL_MAX]

License

Apache License 2.0, see LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	AppName string = "nats-relay"
	Version string = "1.7.0"
	UA      string = AppName + "/" + Version
)

Variables

This section is empty.

Functions

func PrefixSize added in v1.6.0

func PrefixSize(size int) topicOptionFunc

func Topic added in v1.6.0

func Topic(name string, funcs ...topicOptionFunc) *topicNameOptionTuple

func Topics added in v1.6.0

func Topics(topics ...*topicNameOptionTuple) map[string]RelayClientConfig

func WorkerNum added in v1.6.0

func WorkerNum(num int) topicOptionFunc

Types

type DefaultServer added in v1.7.0

type DefaultServer struct {
	// contains filtered or unexported fields
}

func NewDefaultServer added in v1.7.0

func NewDefaultServer(funcs ...ServerOptFunc) *DefaultServer

func (*DefaultServer) Run added in v1.7.0

func (s *DefaultServer) Run(ctx context.Context) error

type Destination added in v1.7.0

type Destination interface {
	Open(num int) error
	Close() error
	Workers() []chanque.Worker
}

type MultipleSource added in v1.7.0

type MultipleSource struct {
	// contains filtered or unexported fields
}

func NewMultipleSource added in v1.7.0

func NewMultipleSource(urls []string, natsOpts []nats.Option, logger *log.Logger) *MultipleSource

func (*MultipleSource) Close added in v1.7.0

func (s *MultipleSource) Close() error

func (*MultipleSource) Open added in v1.7.0

func (s *MultipleSource) Open() error

func (*MultipleSource) Subscribe added in v1.7.0

func (s *MultipleSource) Subscribe(topic string, prefixSize int, workers []chanque.Worker) error

func (*MultipleSource) Unsubscribe added in v1.7.0

func (s *MultipleSource) Unsubscribe() error

type MultipleSourceSingleDestinationRelay added in v1.7.0

type MultipleSourceSingleDestinationRelay struct {
	// contains filtered or unexported fields
}

func NewMultipleSourceSingleDestinationRelay added in v1.7.0

func NewMultipleSourceSingleDestinationRelay(topic string, src Source, dst Destination, prefix, num int, logger *log.Logger) *MultipleSourceSingleDestinationRelay

func (*MultipleSourceSingleDestinationRelay) Run added in v1.7.0

type Relay added in v1.7.0

type Relay interface {
	Run(context.Context) error
}

type RelayClientConfig

type RelayClientConfig struct {
	WorkerNum  int `yaml:"worker"`
	PrefixSize int `yaml:"prefix"`
}

type RelayConfig

type RelayConfig struct {
	PrimaryUrl   string                       `yaml:"primary"`
	SecondaryUrl string                       `yaml:"secondary"`
	NatsUrl      string                       `yaml:"nats"`
	Topics       map[string]RelayClientConfig `yaml:"topic"`
}

relay.yaml ---------- primary: "nats://master1.example.com:4222/" secondary: "nats://master2.example.com:4222/" nats: "nats://localhost:4222/" topic:

"foo.>":
  worker: 2
"bar.>":
  worker: 2

type Server added in v1.6.0

type Server interface {
	Run(context.Context) error
}

type ServerOptFunc added in v1.7.0

type ServerOptFunc func(*serverOpt)

func ServerOptExecutor added in v1.7.0

func ServerOptExecutor(executor *chanque.Executor) ServerOptFunc

func ServerOptLogger added in v1.7.0

func ServerOptLogger(logger *log.Logger) ServerOptFunc

func ServerOptNatsOptions added in v1.7.0

func ServerOptNatsOptions(natsOpts ...nats.Option) ServerOptFunc

func ServerOptRelayConfig added in v1.7.0

func ServerOptRelayConfig(conf RelayConfig) ServerOptFunc

type SingleDestination added in v1.7.0

type SingleDestination struct {
	// contains filtered or unexported fields
}

func NewSingleDestination added in v1.7.0

func NewSingleDestination(executor *chanque.Executor, url string, natsOpts []nats.Option, logger *log.Logger) *SingleDestination

func (*SingleDestination) Close added in v1.7.0

func (d *SingleDestination) Close() error

func (*SingleDestination) Open added in v1.7.0

func (d *SingleDestination) Open(num int) error

func (*SingleDestination) Workers added in v1.7.0

func (d *SingleDestination) Workers() []chanque.Worker

type Source added in v1.7.0

type Source interface {
	Open() error
	Close() error
	Subscribe(topic string, prefixSize int, workers []chanque.Worker) error
	Unsubscribe() error
}

Directories

Path Synopsis
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL