Documentation ¶
Index ¶
- Constants
- func BoolHeader(msg *amqp.Delivery, param string, def bool) bool
- func FloatHeader(msg *amqp.Delivery, param string, def float64) float64
- func IntHeader(msg *amqp.Delivery, param string, def int64) int64
- func SignalContext(parent context.Context) context.Context
- func StringHeader(msg *amqp.Delivery, param string, def string) string
- type BrokerConfig
- func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig
- func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig
- func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig
- func (bc *BrokerConfig) OnExpired(handler DefaultSinkExpiredHandler) *BrokerConfig
- func (bc *BrokerConfig) OnTooMuchRetries(threshold int64, handler DefaultSinkExpiredHandler) *BrokerConfig
- func (bc *BrokerConfig) Retries(num int) *BrokerConfig
- func (bc *BrokerConfig) Start() *Server
- func (bc *BrokerConfig) StdLogger(prefix string) *BrokerConfig
- func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig
- func (bc *BrokerConfig) Verbose(verbose bool) *BrokerConfig
- type DefaultSinkExpiredHandler
- type Exchange
- func (exc *Exchange) Attr(name string, value interface{}) *Exchange
- func (exc *Exchange) Direct(name string) *Exchange
- func (exc *Exchange) Fanout(name string) *Exchange
- func (exc *Exchange) Handler(obj SimpleHandler) *Server
- func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
- func (exc *Exchange) Key(routingKeys ...string) *Exchange
- func (exc *Exchange) Topic(name string) *Exchange
- func (exc *Exchange) Transact(fn TransactionHandler) *Server
- func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
- type Logger
- type Message
- func (msg *Message) Bytes(content []byte) *Message
- func (msg *Message) ContentType(mime string) *Message
- func (msg *Message) Exchange(name string) *Message
- func (msg *Message) Header(name string, data interface{}) *Message
- func (msg *Message) ID(id string) *Message
- func (msg *Message) JSON(obj interface{}) *Message
- func (msg *Message) JSONContent(data []byte) *Message
- func (msg *Message) Key(name string) *Message
- func (ms *Message) Publish(ctx context.Context) (<-chan struct{}, error)
- func (ms *Message) PublishWait(ctx context.Context) error
- func (msg *Message) Raw() *amqp.Publishing
- func (msg *Message) Reply(correlationId, queueName string) *Message
- func (msg *Message) ReplyTo(correlationId, queueName string) *Message
- func (ms *Message) Send() <-chan struct{}
- func (ms *Message) SendContext(ctx context.Context) error
- func (msg *Message) String(content string) *Message
- func (msg *Message) TTL(tm time.Duration) *Message
- func (msg *Message) Time(stamp time.Time) *Message
- func (ms *Message) TrySend() error
- func (msg *Message) Type(contentType string) *Message
- type RPC
- type ReQueueConfig
- type ReadHandlerFunc
- type ReceiverHandler
- type Reply
- type Requeue
- type SenderHandler
- type Server
- type SimpleHandler
- type SinkConfig
- func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
- func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
- func (snk *SinkConfig) Direct(name string) *Exchange
- func (snk *SinkConfig) Fanout(name string) *Exchange
- func (snk *SinkConfig) Handler(obj SimpleHandler) *Server
- func (snk *SinkConfig) HandlerFunc(fn SinkHandlerFunc) *Server
- func (snk *SinkConfig) KeepDead() *SinkConfig
- func (snk *SinkConfig) Lazy() *SinkConfig
- func (snk *SinkConfig) ManualAck() *SinkConfig
- func (snk *SinkConfig) Name(name string) *SinkConfig
- func (snk *SinkConfig) OnExpired(handler SinkExpiredHandlerFunc) *SinkConfig
- func (snk *SinkConfig) OnTooMuchRetries(threshold int64, handler SinkExpiredHandlerFunc) *SinkConfig
- func (snk *SinkConfig) Ready(fn ReadHandlerFunc) *SinkConfig
- func (snk *SinkConfig) Requeue(interval time.Duration) *SinkConfig
- func (snk *SinkConfig) Retries(count int) *SinkConfig
- func (snk *SinkConfig) Topic(name string) *Exchange
- func (snk *SinkConfig) Transact(fn TransactionHandler) *Server
- func (snk *SinkConfig) TransactFunc(fn TransactionHandlerFunc) *Server
- func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
- func (snk *SinkConfig) Validate(certFile string) *SinkConfig
- func (snk *SinkConfig) Verbose(verbose bool) *SinkConfig
- type SinkExpiredHandlerFunc
- type SinkHandlerFunc
- type StateHandler
- type TransactionHandler
- type TransactionHandlerFunc
- type Writer
- type WriterConfig
- func (wc *WriterConfig) Create() *Writer
- func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
- func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
- func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
- func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
- func (wc *WriterConfig) Overflow(maxQueueSize int, handler func(int)) *WriterConfig
- func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
- func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig
Constants ¶
const DefaultSignatureHeader = "X-Signature"
const LastErrorHeader = "x-fluent-amqp-last-message-error"
Variables ¶
This section is empty.
Functions ¶
func FloatHeader ¶ added in v0.0.11
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
// contains filtered or unexported fields
}
func Broker ¶
func Broker(urls ...string) *BrokerConfig
func (*BrokerConfig) Context ¶
func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig
func (*BrokerConfig) Interval ¶
func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig
func (*BrokerConfig) Logger ¶
func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig
func (*BrokerConfig) OnExpired ¶ added in v0.0.11
func (bc *BrokerConfig) OnExpired(handler DefaultSinkExpiredHandler) *BrokerConfig
Default handler for Expired handler in a sink
func (*BrokerConfig) OnTooMuchRetries ¶ added in v0.0.11
func (bc *BrokerConfig) OnTooMuchRetries(threshold int64, handler DefaultSinkExpiredHandler) *BrokerConfig
Default handler for TooMuchRetries handler in a sink
func (*BrokerConfig) Retries ¶ added in v0.0.11
func (bc *BrokerConfig) Retries(num int) *BrokerConfig
Default (can be changed in a Sink) maximum retries for sink with TransactHandlers. Negative value means no limit. Default is 10.
func (*BrokerConfig) Start ¶
func (bc *BrokerConfig) Start() *Server
func (*BrokerConfig) StdLogger ¶ added in v0.0.2
func (bc *BrokerConfig) StdLogger(prefix string) *BrokerConfig
func (*BrokerConfig) Timeout ¶
func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig
func (*BrokerConfig) Verbose ¶ added in v0.1.30
func (bc *BrokerConfig) Verbose(verbose bool) *BrokerConfig
type DefaultSinkExpiredHandler ¶ added in v0.0.11
type DefaultSinkExpiredHandler func(ctx context.Context, sinkName string, msg amqp.Delivery, retries int64) bool
Handler for sinks to catch expired messages (see SinkConfig OnExpired and OnTooMuchRetries)
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
func (*Exchange) Handler ¶ added in v0.0.10
func (exc *Exchange) Handler(obj SimpleHandler) *Server
func (*Exchange) HandlerFunc ¶
func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
func (*Exchange) Transact ¶
func (exc *Exchange) Transact(fn TransactionHandler) *Server
func (*Exchange) TransactFunc ¶
func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func (*Message) ContentType ¶ added in v0.0.11
func (*Message) JSONContent ¶ added in v0.0.11
func (*Message) PublishWait ¶ added in v0.0.10
func (*Message) Raw ¶ added in v0.0.11
func (msg *Message) Raw() *amqp.Publishing
func (*Message) SendContext ¶ added in v0.0.9
type RPC ¶ added in v0.0.11
type RPC struct {
// contains filtered or unexported fields
}
type ReQueueConfig ¶
type ReQueueConfig struct {
// contains filtered or unexported fields
}
func (*ReQueueConfig) Create ¶
func (rq *ReQueueConfig) Create() Requeue
func (*ReQueueConfig) Queue ¶
func (rq *ReQueueConfig) Queue(name string) *ReQueueConfig
func (*ReQueueConfig) Timeout ¶
func (rq *ReQueueConfig) Timeout(tm time.Duration) *ReQueueConfig
type ReadHandlerFunc ¶ added in v0.1.34
type ReadHandlerFunc func() error
type ReceiverHandler ¶
func NewCertValidator ¶
func NewCertValidator(cert []byte, header string, log Logger) (ReceiverHandler, error)
Creates new handler that validates messages against signature header. Important! application MUST drop duplicated (by message id) messages by it self or it's possible just to resend same messages multiple times.
func NewCertValidatorFromFile ¶
func NewCertValidatorFromFile(certFile string, header string, log Logger) (ReceiverHandler, error)
Creates new handler (see NewCertValidator) with key from public ASN.1 DER certificate. Certificate should contain --CERTIFICATE-- section
type SenderHandler ¶
type SenderHandler interface {
Handle(msg *amqp.Publishing) bool
}
func NewSigner ¶
func NewSigner(privateKey []byte, header string) (SenderHandler, error)
Create new PKS#1 1.5 SHA512 signer handler
func NewSignerFromFile ¶
func NewSignerFromFile(privateKeyFile string, header string) (SenderHandler, error)
Load private key from PKCS#8 file and create new PKS#1 1.5 SHA512 signer handler. File should contains --PRIVATE KEY-- section
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server keeps broker configuration and all declared objects (queues, exchanges and else) for re-declare after restart
func (*Server) Publisher ¶
func (brk *Server) Publisher() *WriterConfig
Publisher creates new AMQP producer
func (*Server) Requeue ¶
func (brk *Server) Requeue(originalQueue string) *ReQueueConfig
Requeue creates new queue for requeue-ing. New name constructed by default as [originalQueue]/requeue. For example: original queue is SAMPLE, than requeue queue is SAMPLE/requeue
func (*Server) Sink ¶
func (brk *Server) Sink(queueName string) *SinkConfig
Sink creates new AMQP consumer with optional queue name. If queue name is empty - autogenerated one will be used without persistence. Max retries by default is 10
func (*Server) WaitToFinish ¶
func (brk *Server) WaitToFinish()
Wait to finish blocks thread un=til all allocated resources become freed
type SimpleHandler ¶ added in v0.0.10
type SinkConfig ¶
type SinkConfig struct {
// contains filtered or unexported fields
}
func (*SinkConfig) Attr ¶
func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
func (*SinkConfig) DeadLetter ¶
func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
func (*SinkConfig) Direct ¶
func (snk *SinkConfig) Direct(name string) *Exchange
func (*SinkConfig) Fanout ¶
func (snk *SinkConfig) Fanout(name string) *Exchange
func (*SinkConfig) Handler ¶ added in v0.0.10
func (snk *SinkConfig) Handler(obj SimpleHandler) *Server
func (*SinkConfig) HandlerFunc ¶ added in v0.0.2
func (snk *SinkConfig) HandlerFunc(fn SinkHandlerFunc) *Server
func (*SinkConfig) KeepDead ¶ added in v0.1.25
func (snk *SinkConfig) KeepDead() *SinkConfig
func (*SinkConfig) Lazy ¶
func (snk *SinkConfig) Lazy() *SinkConfig
func (*SinkConfig) ManualAck ¶
func (snk *SinkConfig) ManualAck() *SinkConfig
func (*SinkConfig) Name ¶ added in v0.0.11
func (snk *SinkConfig) Name(name string) *SinkConfig
Name of sink. Just a tag fo future identity (like in default handlers)
func (*SinkConfig) OnExpired ¶ added in v0.0.11
func (snk *SinkConfig) OnExpired(handler SinkExpiredHandlerFunc) *SinkConfig
Handler for expired (reached retries limit) messages
func (*SinkConfig) OnTooMuchRetries ¶ added in v0.0.11
func (snk *SinkConfig) OnTooMuchRetries(threshold int64, handler SinkExpiredHandlerFunc) *SinkConfig
Handler for messages that reached defined threshold retires limit. Threshold can be less (in case of positive retries limit) or equal to retries limit and will executes before OnExpired (if applicable). The handler will be invoked each times after threshold limit. If handler returns false, message is dropped. In case when threshold is same as retries, requires at least one (from OnExpired or from OnTooMuschRetries) false to drop message.
func (*SinkConfig) Ready ¶ added in v0.1.34
func (snk *SinkConfig) Ready(fn ReadHandlerFunc) *SinkConfig
Add handler to catch when channel is ready
func (*SinkConfig) Requeue ¶ added in v0.0.5
func (snk *SinkConfig) Requeue(interval time.Duration) *SinkConfig
func (*SinkConfig) Retries ¶
func (snk *SinkConfig) Retries(count int) *SinkConfig
func (*SinkConfig) Topic ¶
func (snk *SinkConfig) Topic(name string) *Exchange
func (*SinkConfig) Transact ¶ added in v0.0.2
func (snk *SinkConfig) Transact(fn TransactionHandler) *Server
func (*SinkConfig) TransactFunc ¶ added in v0.0.2
func (snk *SinkConfig) TransactFunc(fn TransactionHandlerFunc) *Server
func (*SinkConfig) Use ¶
func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
func (*SinkConfig) Validate ¶
func (snk *SinkConfig) Validate(certFile string) *SinkConfig
func (*SinkConfig) Verbose ¶ added in v0.1.30
func (snk *SinkConfig) Verbose(verbose bool) *SinkConfig
type SinkExpiredHandlerFunc ¶ added in v0.0.11
Handler for messages that reached retries amount. False return means request to drop message
type StateHandler ¶
type TransactionHandler ¶
type TransactionHandlerFunc ¶
type WriterConfig ¶
type WriterConfig struct {
// contains filtered or unexported fields
}
func (*WriterConfig) Create ¶
func (wc *WriterConfig) Create() *Writer
func (*WriterConfig) DefaultDirect ¶
func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
func (*WriterConfig) DefaultFanout ¶
func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
func (*WriterConfig) DefaultKey ¶
func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
func (*WriterConfig) DefaultTopic ¶
func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
func (*WriterConfig) Overflow ¶ added in v0.1.33
func (wc *WriterConfig) Overflow(maxQueueSize int, handler func(int)) *WriterConfig
func (*WriterConfig) Sign ¶
func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
Sign body and add signature to DefaultSignatureHeader header. Panic if private key couldn't be read
func (*WriterConfig) Use ¶
func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig