client

package
v0.0.0-...-60b9055 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2019 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ACTIVE = iota //ACTIVE status
	CLOSED        //CLOSE status
)

pool state

View Source
const (
	//DefaultPoolSize is 10
	DefaultPoolSize = 10
	//DefaultDialTimeout 5s
	DefaultDialTimeout = time.Second * 5
	//DefaultPoolTimeout 5s
	DefaultPoolTimeout = time.Second * 5
	//DefaultConnPerSecond 500
	DefaultConnPerSecond = 500
)

Variables

View Source
var (
	//ErrParamsInvalid means params invalid, please check
	ErrParamsInvalid = errors.New("invalid params")
	//ErrMsgTypeError is msg type error
	ErrMsgTypeError = errors.New("message type error")
)
View Source
var (
	//ErrSubscribeTimeout error
	ErrSubscribeTimeout = errors.New("subscribe topic time out")
)
View Source
var (
	//ErrTimeout error
	ErrTimeout = errors.New("get conn time out")
)

Functions

This section is empty.

Types

type Conn

type Conn struct {
	C net.Conn
	// contains filtered or unexported fields
}

Conn instance

func (*Conn) Close

func (c *Conn) Close() error

Close wrapper net.Conn close

func (*Conn) Read

func (c *Conn) Read(b []byte) (n int, err error)

Read wrapper net.Conn read

func (*Conn) Write

func (c *Conn) Write(b []byte) (n int, err error)

Write wrapper net.Conn write

type ConnPool

type ConnPool struct {
	Mu sync.Mutex
	// contains filtered or unexported fields
}

ConnPool is the definition of Connect pool

func NewConnPool

func NewConnPool(poolSize int, dialTimeout, poolTimeout time.Duration, addr string, rate int64) *ConnPool

NewConnPool creates a new connection pool

func NewDefaultConnPool

func NewDefaultConnPool(addr string) *ConnPool

NewDefaultConnPool use some default params

func (*ConnPool) Close

func (p *ConnPool) Close()

Close the conn pool

func (*ConnPool) Get

func (p *ConnPool) Get() (*Conn, error)

Get a connection, if idle is not empty, otherwise create a new one.

func (*ConnPool) Put

func (p *ConnPool) Put(c *Conn)

Put the conn into idle

func (*ConnPool) Remove

func (p *ConnPool) Remove(c *Conn)

Remove the conn

func (*ConnPool) Size

func (p *ConnPool) Size() int

Size returns the size of the cp

type Consumer

type Consumer struct {
	Pool *ConnPool
	// contains filtered or unexported fields
}

Consumer instance

func NewConsumer

func NewConsumer(addr string) *Consumer

NewConsumer creates a new Consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close the consumer

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topic string, filter string, f func(...*msg.Message) error) error

Subscribe topic with filter and default handler

func (*Consumer) SubscribeWithCount

func (c *Consumer) SubscribeWithCount(topic string, filter string, count int64, f func(...*msg.Message) error) error

SubscribeWithCount subscribe topic with filter and default handler with count

func (*Consumer) SubscribeWithCountAndHandler

func (c *Consumer) SubscribeWithCountAndHandler(topic string, filter string, count int64, h Handler) error

SubscribeWithCountAndHandler subscribe topic with filter and given handler with count

func (*Consumer) SubscribeWithHandler

func (c *Consumer) SubscribeWithHandler(topic string, filter string, h Handler) error

SubscribeWithHandler subscribe topic with filter and given handler

func (*Consumer) UnSubscribe

func (c *Consumer) UnSubscribe(topic string) error

UnSubscribe topic

type DefaultHandler

type DefaultHandler struct {
	// contains filtered or unexported fields
}

DefaultHandler is the default handler, used by Subscribe method.

func (*DefaultHandler) After

func (d *DefaultHandler) After()

After method

func (*DefaultHandler) Before

func (d *DefaultHandler) Before()

Before method

func (*DefaultHandler) Handle

func (d *DefaultHandler) Handle(msgs ...*msg.Message) error

Handle method

type Handler

type Handler interface {
	//Before method
	Before()
	//Handle when Handle return value is not nil, run loop will abort, then After() will be invoked.
	Handle(...*msg.Message) error
	//After method
	After()
}

Handler defines the functions that handle messages

type Producer

type Producer struct {
	Pool *ConnPool
}

Producer instance

func NewProducer

func NewProducer(addr string) *Producer

NewProducer creates a new producer

func (*Producer) BatchPublish

func (p *Producer) BatchPublish(m ...*msg.Message) error

BatchPublish batch publish msgs all the messages must have MessageType_Publish.

func (*Producer) Close

func (p *Producer) Close()

Close the producer

func (*Producer) PublishDirect

func (p *Producer) PublishDirect(topic string, filter string, body []byte) error

PublishDirect publish body to topic with filter, only one subscriber Need Ack

func (*Producer) PublishDirectPersist

func (p *Producer) PublishDirectPersist(topic string, filter string, body []byte) error

PublishDirectPersist publish body to topic with filter, persist, only one subscriber Need Ack

func (*Producer) PublishFanout

func (p *Producer) PublishFanout(topic string, filter string, body []byte) error

PublishFanout publish body to topic with filter, all subscriber Need Ack

func (*Producer) PublishFanoutPersist

func (p *Producer) PublishFanoutPersist(topic string, filter string, body []byte) error

PublishFanoutPersist publish body to topic with filter, persist, all subscriber Need Ack

func (*Producer) PushDirect

func (p *Producer) PushDirect(topic string, filter string, body []byte) error

PushDirect publish body to topic with filter, only one subscriber no Ack

func (*Producer) PushDirectPersist

func (p *Producer) PushDirectPersist(topic string, filter string, body []byte) error

PushDirectPersist publish body to topic with filter, persist, only one subscriber no Ack

func (*Producer) PushFanout

func (p *Producer) PushFanout(topic string, filter string, body []byte) error

PushFanout publish body to topic with filter, all subscriber no Ack

func (*Producer) PushFanoutPersist

func (p *Producer) PushFanoutPersist(topic string, filter string, body []byte) error

PushFanoutPersist publish body to topic with filter, persist, all subscriber Need Ack

func (*Producer) WaitAck

func (p *Producer) WaitAck(c *Conn) error

WaitAck will block until an ack is coming

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is based on Token Bucket algorithm, it supports both blocking and non-blocking way. it is thread-safe in concurrency

func NewRateLimiter

func NewRateLimiter(rate int64, unit time.Duration) *RateLimiter

NewRateLimiter create a new rate limiter

func (*RateLimiter) Acquire

func (r *RateLimiter) Acquire()

Acquire Blocking when the limiter is available

func (*RateLimiter) AcquireCount

func (r *RateLimiter) AcquireCount(count int64)

AcquireCount will take multi-token

func (*RateLimiter) TryAcquire

func (r *RateLimiter) TryAcquire() bool

TryAcquire is Non-blocking, when acquire failed, it returns false.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL