Documentation ¶
Index ¶
- Variables
- func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger) (discoverer.Discoverer, error)
- type Callback
- type Client
- type Message
- type Option
- func WithAddColumns(cols map[string]string) Option
- func WithBatchingMaxMessages(n int) Option
- func WithBatchingMaxPublishDelay(t time.Duration) Option
- func WithBatchingMaxSize(n int) Option
- func WithBlockIfQueueIsFull(b bool) Option
- func WithBufferPool(bp bufferpool.BufferPool) Option
- func WithBufferPoolSize(n int) Option
- func WithBytePool(bp bufferpool.BytePool) Option
- func WithBytePoolSize(n int) Option
- func WithBytePoolWidth(n int) Option
- func WithConnTimeout(t time.Duration) Option
- func WithGroupID(g string) Option
- func WithLogger(log logger.Logger) Option
- func WithMaxPendingMessages(n int) Option
- func WithMaxRetries(n int) Option
- func WithMetricsName(name string) Option
- func WithMetricsRegistry(reg prometheus.Registerer) Option
- func WithReadBufferSize(n int) Option
- func WithSendTimeout(t time.Duration) Option
- func WithSocketRecvBufferSize(n int) Option
- func WithSocketSendBufferSize(n int) Option
- func WithURL(u string) Option
- func WithUpdateInterval(u time.Duration) Option
- func WithWorkerNum(n int) Option
- func WithWriteBufferSize(n int) Option
- type Options
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidGroupID = errors.New("invalid group ID") ErrInvalidURL = errors.New("invalid URL") ErrNoEndpoint = errors.New("service has no endpoints") ErrNoAvailableWorker = errors.New("no available worker") )
variables
var (
// DefaultURL is the default Manager URL for discovering the DataProxy cluster
DefaultURL = "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"
)
Functions ¶
func NewDiscoverer ¶
func NewDiscoverer(url, groupID string, lookupInterval time.Duration, log logger.Logger) (discoverer.Discoverer, error)
NewDiscoverer news a DataProxy discoverer
Types ¶
type Client ¶
type Client interface { // Send sends a message and wait for the result. Send(ctx context.Context, msg Message) error // SendAsync sends a message asynchronously, when the message is sent or timeout, the callback will be called. SendAsync(ctx context.Context, msg Message, callback Callback) // Close flushes all the message to the server and wait for the results or timeout, then close the producer. Close() }
Client is the interface of a DataProxy client
type Message ¶
type Message struct { GroupID string // InLong group ID StreamID string // InLong stream ID Payload []byte // the content of the message Headers map[string]string // message headers, won't be sent to the server right now MetaData interface{} // any data you want, won't be sent to the server, but you can get it in the callback }
Message is the message to send
type Option ¶
type Option func(*Options)
Option is the Options helper.
func WithAddColumns ¶
WithAddColumns sets AddColumns
func WithBatchingMaxMessages ¶
WithBatchingMaxMessages sets BatchingMaxMessages
func WithBatchingMaxPublishDelay ¶
WithBatchingMaxPublishDelay sets BatchingMaxPublishDelay
func WithBatchingMaxSize ¶
WithBatchingMaxSize sets BatchingMaxSize
func WithBlockIfQueueIsFull ¶
WithBlockIfQueueIsFull sets BlockIfQueueIsFull
func WithBufferPool ¶
func WithBufferPool(bp bufferpool.BufferPool) Option
WithBufferPool sets BufferPool
func WithBufferPoolSize ¶
WithBufferPoolSize sets BufferPoolSize
func WithConnTimeout ¶
WithConnTimeout sets ConnTimeout
func WithMaxPendingMessages ¶
WithMaxPendingMessages sets MaxPendingMessages
func WithMetricsRegistry ¶
func WithMetricsRegistry(reg prometheus.Registerer) Option
WithMetricsRegistry sets Logger
func WithReadBufferSize ¶
WithReadBufferSize sets ReadBufferSize
func WithSendTimeout ¶
WithSendTimeout sets SendTimeout
func WithSocketRecvBufferSize ¶
WithSocketRecvBufferSize sets SocketRecvBufferSize
func WithSocketSendBufferSize ¶
WithSocketSendBufferSize sets SocketSendBufferSize
func WithUpdateInterval ¶
WithUpdateInterval sets UpdateInterval
func WithWriteBufferSize ¶
WithWriteBufferSize sets WriteBufferSize
type Options ¶
type Options struct { GroupID string // InLong group ID URL string // the Manager URL for discovering the DataProxy cluster UpdateInterval time.Duration // interval to refresh the endpoint list, default: 5m ConnTimeout time.Duration // connection timeout: default: 3000ms WriteBufferSize int // write buffer size in bytes, default: 8M ReadBufferSize int // read buffer size in bytes, default: 1M SocketSendBufferSize int // socket send buffer size in bytes, default: 8M SocketRecvBufferSize int // socket receive buffer size in bytes, default: 1M BufferPool bufferpool.BufferPool // encoding/decoding buffer pool, if not given, SDK will init a new one BytePool bufferpool.BytePool // encoding/decoding byte pool, if not given, SDK will init a new one BufferPoolSize int // buffer pool size, default: 409600 BytePoolSize int // byte pool size, default: 409600 BytePoolWidth int // byte pool width, default: equals to BatchingMaxSize Logger logger.Logger // debug logger, default: stdout MetricsName string // the unique metrics name of this SDK, used to isolate metrics in the case that more than 1 client are initialized in one process MetricsRegistry prometheus.Registerer // metrics registry, default: prometheus.DefaultRegisterer WorkerNum int // worker number, default: 8 SendTimeout time.Duration // send timeout, default: 30000ms MaxRetries int // max retry count, default: 2 BatchingMaxPublishDelay time.Duration // the time period within which the messages sent will be batched, default: 20ms BatchingMaxMessages int // the maximum number of messages permitted in a batch, default: 50 BatchingMaxSize int // the maximum number of bytes permitted in a batch, default: 40K MaxPendingMessages int // the max size of the queue holding the messages pending to receive an acknowledgment from the broker, default: 204800 BlockIfQueueIsFull bool // whether Send and SendAsync block if producer's message queue is full, default: false AddColumns map[string]string // addition columns to add to the message, for example: __addcol1__worldid=xxx&__addcol2__ip=yyy, all the message will be added 2 more columns with worldid=xxx and ip=yyy // contains filtered or unexported fields }
Options is the DataProxy go client configs
func (*Options) ValidateAndSetDefault ¶
ValidateAndSetDefault validates an options and set up the default values