Documentation ¶
Overview ¶
Event-driven architecture is a software architecture and model for application design. With an event-driven system, the capture, communication, processing, and persistence of events are the core structure of the solution. This differs from a traditional request-driven model.
Features: - broadcast event via rabbitMQ - auto reconnect pattern for rabbitMQ - fault tolerance on panic
Publisher Example:
package main import ( "github.com/ramoozorg/event-driven/rabbitmq" "os" ) type person struct { Name string `bson:"name" json:"name"` Age int `bson:"age" json:"age"` } func main() { chSignal := make(chan os.Signal) conn, err := rabbitmq.NewConnection("test", &rabbitmq.Options{ UriAddress: "amqp://guest:guest@localhost:5672", DurableExchange: true, AutoAck: true, ExclusiveQueue: false, }, chSignal) if err != nil { panic(err) } if err := conn.ExchangeDeclare("exchange1", rabbitmq.TOPIC); err != nil { panic(err) } if err := conn.DeclarePublisherQueue("queue1", "exchange1", "rk", "rk2"); err != nil { panic(err) } if err := conn.DeclarePublisherQueue("queue2", "exchange1", "rk3"); err != nil { panic(err) } if err := NewEventPublish(conn); err != nil { panic(err) } } func NewEventPublish(conn *rabbitmq.Connection) error { p := person{Name: "rs", Age: 22} q := person{Name: "reza", Age: 23} if err := conn.Publish("exchange1", "rk", p, rabbitmq.PublishingOptions{}); err != nil { return err } if err := conn.Publish("exchange1", "rk3", q, rabbitmq.PublishingOptions{}); err != nil { return err } return nil }
Consumer Example:
package main import ( "fmt" "github.com/ramoozorg/event-driven/rabbitmq" "go.mongodb.org/mongo-driver/bson" ) type person struct { Name string `bson:"name" json:"name"` Age int `bson:"age" json:"age"` } func main() { done := make(chan bool, 1) conn, err := rabbitmq.NewConnection("test", &rabbitmq.Options{ UriAddress: rabbitmq.CreateURIAddress("guest", "guest", "localhost:5672", ""), DurableExchange: true, AutoAck: true, ExclusiveQueue: false, }, nil) if err != nil { panic(err) } if err := conn.ExchangeDeclare("exchange1", rabbitmq.TOPIC); err != nil { panic(err) } if err := conn.DeclareConsumerQueue(eventHandler, "queue1", "exchange1", "rk", "rk2"); err != nil { panic(err) } if err := conn.DeclareConsumerQueue(eventHandler, "queue2", "exchange1", "rk3"); err != nil { panic(err) } if err := conn.Consume(); err != nil { panic(err) } <-done } func eventHandler(queue string, delivery rabbitmq.Delivery) { p := person{} _ = bson.Unmarshal(delivery.Body, &p) fmt.Printf("New Event from exchange %v queue %v routingKey %v with body %v received\n", delivery.Exchange, queue, delivery.RoutingKey, p) }
```
Index ¶
- Constants
- Variables
- func CreateURIAddress(username, password, address, vhost string) string
- func RegisterEncoder(encType string, enc Encoder)
- type Connection
- func (c *Connection) Close() error
- func (c *Connection) Consume() error
- func (c *Connection) DeclareConsumerQueue(eventHandler EventHandler, queue, exchange string, routingKey ...string) error
- func (c *Connection) DeclarePublisherQueue(queue, exchange string, routingKey ...string) error
- func (c *Connection) ExchangeDeclare(exchange string, kind Kind) error
- func (c *Connection) GetExchangeList() []string
- func (c *Connection) GetQueueList() map[string]EventHandler
- func (c *Connection) IsConnected() bool
- func (c *Connection) Publish(exchange, routingKey string, body interface{}, ...) error
- type Delivery
- type EncodedConn
- type Encoder
- type EventHandler
- type Headers
- type Kind
- type Options
- type PublishingOptions
Constants ¶
const ( JSON_ENCODER = "json" BSON_ENCODER = "bson" )
Variables ¶
var ( SERVICE_NAME_ERROR = errors.New("service name is empty") URI_ADDRESS_ERROR = errors.New("uri address is invalid, please enter amqp://guest:guest@localhost:5672 for example") ROUTING_KEYS_EMPTY_ERROR = errors.New("routing keys is empty") CONNECTION_CLOSED_ERROR = errors.New("rabbitMQ connection closed, try to reconnect") NIL_CCONECTION_ERROR = errors.New("nil rabbitmq connection") EXCHANGE_ALREADY_EXISTS_ERROR = errors.New("exchange already declared") QUEUE_ALREADY_EXISTS_ERROR = errors.New("queue already declared") EXHCNAGE_NOT_FOUND_ERROR = errors.New("exchange not declare") )
Functions ¶
func CreateURIAddress ¶
CreateURIAddress create url address from input configuration
func RegisterEncoder ¶
RegisterEncoder will register the encType with the given Encoder. Useful for customization.
Types ¶
type Connection ¶
type Connection struct { ServiceCallerName string ConnOpt *Options // contains filtered or unexported fields }
Connection is the structure of amqp event connection
func NewConnection ¶
NewConnection create a rabbitmq connection object
func (*Connection) Consume ¶
func (c *Connection) Consume() error
Consume consumes the events from the queues and passes it as map of chan amqp.Delivery
func (*Connection) DeclareConsumerQueue ¶
func (c *Connection) DeclareConsumerQueue(eventHandler EventHandler, queue, exchange string, routingKey ...string) error
DeclareConsumerQueue declare new queue and bind queue and bind exchange with routing key
func (*Connection) DeclarePublisherQueue ¶
func (c *Connection) DeclarePublisherQueue(queue, exchange string, routingKey ...string) error
DeclarePublisherQueue declare new queue and bind queue and bind exchange with routing key
func (*Connection) ExchangeDeclare ¶
func (c *Connection) ExchangeDeclare(exchange string, kind Kind) error
ExchangeDeclare declare new exchange with specific kind (direct, topic, fanout, headers)
func (*Connection) GetExchangeList ¶
func (c *Connection) GetExchangeList() []string
GetExchangeList return list of exchanges
func (*Connection) GetQueueList ¶
func (c *Connection) GetQueueList() map[string]EventHandler
GetQueueList return list of queues with handlers
func (*Connection) IsConnected ¶
func (c *Connection) IsConnected() bool
IsConnected check rabbitMQ client is connected
func (*Connection) Publish ¶
func (c *Connection) Publish(exchange, routingKey string, body interface{}, publishOptions PublishingOptions) error
Publish publishes a request to the amqp queue
type Delivery ¶
func (Delivery) Ack ¶ added in v0.0.5
Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery.
type EncodedConn ¶
type EncodedConn struct { Conn *Connection Enc Encoder }
func NewEncodedConn ¶
func NewEncodedConn(c *Connection, encType string) (*EncodedConn, error)
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder
type Encoder ¶
type Encoder interface { Encode(msg interface{}) ([]byte, error) Decode(data []byte, vPtr interface{}) error }
Encoder interface is for all register encoders
func EncoderForType ¶
EncoderForType will return the registered Encoder for the encType.
type EventHandler ¶
type Kind ¶
type Kind int // Kind is exchange type
const ( DIRECT Kind = iota // DIRECT a event goes to the queues whose binding key exactly matches the routing key of the event. FANOUT // FANOUT exchanges can be useful when the same event needs to be sent to one or more queues with consumers who may process the same event in different ways. TOPIC // TOPIC exchange is similar to direct exchange, but the routing is done according to the routing pattern. Instead of using fixed routing key, it uses wildcards. HEADERS // HEADERS exchange routes events based on arguments containing headers and optional values. It uses the event header attributes for routing. )
type Options ¶
type Options struct { UriAddress string // UriAddress of rabbitmq, amqp://user:password@x.x.x.x:port DurableExchange bool AutoAck bool AutoDelete bool NoWait bool ExclusiveQueue bool }
Options for new connection of rabbitmq
type PublishingOptions ¶
type PublishingOptions struct { Headers Headers // rabbitMQ event headers // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) Priority uint8 // 0 to 9 CorrelationId string // correlation identifier ReplyTo string // address to to reply to (ex: RPC) Expiration string // event expiration spec MessageId string // event identifier Timestamp time.Time // event timestamp Type string // event type name UserId string // creating user id - ex: "guest" AppId string // creating application id }
PublishingOptions options for event