Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Machine ¶
type Machine interface { // Publish synchronously publishes the Message Publish(ctx context.Context, msg Message) // Subscribe synchronously subscribes to messages on a given channel, executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc. // Glob matching IS supported for subscribing to multiple channels at once. Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, options ...SubscriptionOpt) error // Go asynchronously executes the given Func Go(ctx context.Context, fn Func) // Current returns the number of active jobs that are running concurrently Current() int // Wait blocks until all active async functions(Go) exit Wait() error // Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions Close() }
Machine is an interface for highly asynchronous Go applications
func New ¶
New creates a new Machine instance with the given options(if present)
Example ¶
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() var ( m = machine.New() results []string mu sync.RWMutex ) defer m.Close() m.Go(ctx, func(ctx context.Context) error { return m.Subscribe(ctx, "accounting.chat_room2", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) }) m.Go(ctx, func(ctx context.Context) error { return m.Subscribe(ctx, "engineering.chat_room1", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) }) m.Go(ctx, func(ctx context.Context) error { return m.Subscribe(ctx, "human_resources.chat_room6", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) }) m.Go(ctx, func(ctx context.Context) error { return m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) { mu.Lock() results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body)) mu.Unlock() return true, nil }) }) <-time.After(1 * time.Second) m.Publish(ctx, machine.Message{ Channel: "human_resources.chat_room6", Body: "sending message to human resources", }) m.Publish(ctx, machine.Message{ Channel: "accounting.chat_room2", Body: "sending message to accounting", }) m.Publish(ctx, machine.Message{ Channel: "engineering.chat_room1", Body: "sending message to engineering", }) if err := m.Wait(); err != nil { panic(err) } sort.Strings(results) for _, res := range results { fmt.Print(res) }
Output: (accounting.chat_room2) received msg: sending message to accounting (accounting.chat_room2) received msg: sending message to accounting (engineering.chat_room1) received msg: sending message to engineering (engineering.chat_room1) received msg: sending message to engineering (human_resources.chat_room6) received msg: sending message to human resources (human_resources.chat_room6) received msg: sending message to human resources
type MessageFilterFunc ¶
Filter is a first class function to filter out messages before they reach a subscriptions primary Handler Return true to indicate that a message passes the filter
type MessageHandlerFunc ¶
Handler is a first class that is executed against the inbound message in a subscription. Return false to indicate that the subscription should end
type Opt ¶
type Opt func(o *Options)
Opt configures a machine instance
func WithThrottledRoutines ¶
WithThrottledRoutines throttles the max number of active routine's spawned by the Machine.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options holds config options for a machine instance
type SubscriptionOpt ¶
type SubscriptionOpt func(options *SubscriptionOptions)
SubscriptionOpt configures a subscription
func WithFilter ¶
func WithFilter(filter MessageFilterFunc) SubscriptionOpt
WithFilter is a subscription option that filters messages
func WithSubscriptionID ¶
func WithSubscriptionID(id string) SubscriptionOpt
WithSubscriptionID is a subscription option that sets the subscription id - if multiple consumers have the same subscritpion id, a message will be broadcasted to just one of the consumers
type SubscriptionOptions ¶
type SubscriptionOptions struct {
// contains filtered or unexported fields
}
SubscriptionOptions holds config options for a subscription