eventbus

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2021 License: Apache-2.0 Imports: 24 Imported by: 1

README

概述

事件总线。使用事件驱动的方式进行业务解耦。
建议使用集群的方式运行,这样可以支持编程多语言环境。

获取

go get github.com/aacfactory/eventbus

使用

本地

// 构建
eb := eventbus.NewEventbus()
// 挂载处理器

type Arg struct {
    Id       string    `json:"id,omitempty"`
    Num      int       `json:"num,omitempty"`
    Datetime time.Time `json:"datetime,omitempty"`
}

type Result struct {
    Value string `json:"value,omitempty"`
}

func HandlerReply(event eventbus.Event) (result interface{}, err error) {
    arg := &Arg{}
    _ = json.Unmarshal(event.Body(), arg)
    fmt.Println("handle reply", event.Head(), arg)
    if arg.Num < 0 {
        err = errors.InvalidArgumentErrorWithDetails("bad number", "num", "less than 0")
        return
    }
    result = &Result{
        Value: "result",
    }
    return
}

func HandlerVoid(event eventbus.Event) (result interface{}, err error) {
    arg := &Arg{}
    _ = json.Unmarshal(event.Body(), arg)
    fmt.Println("handle void", event.Head(), arg)
    return
}

_ = eb.RegisterHandler("void", HandlerVoid)
_ = eb.RegisterHandler("reply", HandlerReply)
// 启动 (启动必须晚于挂载处理器)
eb.Start(context.TODO())
// 执行
options := eventbus.NewDeliveryOptions()
options.Add("h1", "1")
options.Add("h2", "2")

sendErr := eb.Send("reply", &Arg{
    Id:       "id",
    Num:      10,
    Datetime: time.Now(),
}, options)

if sendErr != nil {
fmt.Println("send failed", sendErr)
}

for i := 0; i < 2; i++ {
    rf := eb.Request("reply", &Arg{
        Id:       "id",
        Num:      i - 1,
        Datetime: time.Now(),
    }, options)
    result := &Result{}
    requestErr := rf.Get(result)
    if requestErr != nil {
    	fmt.Println("request failed", requestErr)
    } else {
        fmt.Println("request succeed", result)
    }
}
// 优雅的关闭
eb.Close(context.TODO())

集群

TCP + 地址发现 模式

当前没有实现服务发现,请使用 aacfactory/cluster ,或自行实现。

// options
options := eventbus.ClusterEventbusOption{
    Host:                       "0.0.0.0", // 实际监听地址
    Port:                       9090, // 实际监听端口
    PublicHost:                 "127.0.0.1", // 注册地址,如果为空,则默认使用监听地址
    PublicPort:                 0, // 注册端口,如果为空,则默认使用监听端口
    Meta:                       &eventbus.EndpointMeta{}, // 注册源数据
    Tags:                       nil, // 标签,一般用于版本化与运行隔离化
    TLS:                        &eventbus.EndpointTLS{}, // TLS 配置
    EventChanCap:               64, // 事件 chan 的长度
    Workers:                    8,  // 工作协程数量, 默认是CPU的2倍
}
// discovery
discovery := Foo{}
// 创建
bus, err = eventbus.NewClusterEventbus(discovery, options)
if err != nil {
    return
}
// 操作与本地Eventbus一样


NATS 模型
// options
options := eventbus.NatsEventbusOption{
        Name:                 "A", // nats 的 client 名称
        Servers:              []string{"nats://120.55.167.188:14222"},
        Username:             "ruser",
        Password:             "T0pS3cr3t",
        MaxReconnects:        10,
        ReconnectWaitSecond:  3,
        RetryOnFailedConnect: true,
        EventChanCap:         64,
}
// discovery
discovery := Foo{}
// 创建
bus, err = eventbus.NewNatsEventbus(discovery, options)
if err != nil {
    return
}
// 操作与本地Eventbus一样

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEventbusWithOption

func NewEventbusWithOption(option LocaledEventbusOption) (eb *localedEventbus)

Types

type ClusterEventbusOption

type ClusterEventbusOption struct {
	Host                     string              `json:"host,omitempty"`
	Port                     int                 `json:"port,omitempty"`
	PublicHost               string              `json:"publicHost,omitempty"`
	PublicPort               int                 `json:"publicPort,omitempty"`
	Meta                     cluster.ServiceMeta `json:"meta,omitempty"`
	Tags                     []string            `json:"tags,omitempty"`
	TLS                      cluster.ServiceTLS  `json:"tls,omitempty"`
	EventChanCap             int                 `json:"eventChanCap,omitempty"`
	WorkersMaxIdleTime       time.Duration       `json:"workersMaxIdleTime,omitempty"`
	WorkersCommandTimeout    time.Duration       `json:"workersCommandTimeout,omitempty"`
	WorkersCommandBufferSize int                 `json:"workersCommandBufferSize,omitempty"`
	Workers                  int                 `json:"workers,omitempty"`
}

type DeliveryOptions

type DeliveryOptions interface {
	Add(key string, value string)
	Put(key string, value []string)
	Get(key string) (string, bool)
	Keys() []string
	Empty() bool
	Values(key string) ([]string, bool)
	Remove(key string)
	AddTag(tags ...string)
}

func NewDeliveryOptions

func NewDeliveryOptions() DeliveryOptions

type Event added in v1.0.1

type Event interface {
	Head() EventHead
	Body() []byte
}

type EventHandler

type EventHandler func(event Event) (result interface{}, err error)

type EventHead added in v1.0.1

type EventHead interface {
	Add(key string, value string)
	Put(key string, value []string)
	Get(key string) (string, bool)
	Keys() []string
	Empty() bool
	Values(key string) ([]string, bool)
	Remove(key string)
}

type Eventbus

type Eventbus interface {
	Send(address string, v interface{}, options ...DeliveryOptions) (err error)
	Request(address string, v interface{}, options ...DeliveryOptions) (reply ReplyFuture)
	RegisterHandler(address string, handler EventHandler, tags ...string) (err error)
	RegisterLocalHandler(address string, handler EventHandler, tags ...string) (err error)
	Start(context context.Context)
	Close(context context.Context)
}

func NewClusterEventbus

func NewClusterEventbus(discovery cluster.ServiceDiscovery, option ClusterEventbusOption) (bus Eventbus, err error)

func NewEventbus

func NewEventbus() Eventbus

func NewNatsEventbus added in v1.1.0

func NewNatsEventbus(discovery cluster.ServiceDiscovery, option NatsEventbusOption) (bus Eventbus, err error)

type LocaledEventbusOption

type LocaledEventbusOption struct {
	EventChanCap             int           `json:"eventChanCap,omitempty"`
	WorkersMaxIdleTime       time.Duration `json:"workersMaxIdleTime,omitempty"`
	WorkersCommandTimeout    time.Duration `json:"workersCommandTimeout,omitempty"`
	WorkersCommandBufferSize int           `json:"workersCommandBufferSize,omitempty"`
	Workers                  int           `json:"workers,omitempty"`
}

type MultiMap

type MultiMap map[string][]string

func (MultiMap) Add

func (h MultiMap) Add(key string, value string)

func (MultiMap) Empty

func (h MultiMap) Empty() bool

func (MultiMap) Get

func (h MultiMap) Get(key string) (string, bool)

func (MultiMap) Keys

func (h MultiMap) Keys() []string

func (MultiMap) Merge

func (h MultiMap) Merge(o ...MultiMap)

func (MultiMap) Put

func (h MultiMap) Put(key string, value []string)

func (MultiMap) Remove

func (h MultiMap) Remove(key string)

func (MultiMap) Values

func (h MultiMap) Values(key string) ([]string, bool)

type NatsEventbusOption added in v1.1.0

type NatsEventbusOption struct {
	Name                     string              `json:"name,omitempty"`
	Servers                  []string            `json:"servers,omitempty"`
	Username                 string              `json:"username,omitempty"`
	Password                 string              `json:"password,omitempty"`
	MaxReconnects            int                 `json:"maxReconnects,omitempty"`
	ReconnectWaitSecond      int                 `json:"reconnectWaitSecond,omitempty"`
	RetryOnFailedConnect     bool                `json:"retryOnFailedConnect,omitempty"`
	Meta                     cluster.ServiceMeta `json:"meta,omitempty"`
	Tags                     []string            `json:"tags,omitempty"`
	TLS                      cluster.ServiceTLS  `json:"tls,omitempty"`
	EventChanCap             int                 `json:"eventChanCap,omitempty"`
	WorkersMaxIdleTime       time.Duration       `json:"workersMaxIdleTime,omitempty"`
	WorkersCommandTimeout    time.Duration       `json:"workersCommandTimeout,omitempty"`
	WorkersCommandBufferSize int                 `json:"workersCommandBufferSize,omitempty"`
	Workers                  int                 `json:"workers,omitempty"`
}

type ReplyFuture

type ReplyFuture interface {
	Get(v interface{}) (err error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL