machine

package module
v3.0.0-...-6734c35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2022 License: Apache-2.0 Imports: 5 Imported by: 0

README

Machine GoDoc

concurrency

import "github.com/autom8ter/machine/v3"

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(see WithErrorHandler to override default error handler)
  • Asynchronous cron jobs- Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applications
type Machine interface {
// Publish synchronously publishes the Message
Publish(ctx context.Context, msg Message)
// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
// Glob matching IS supported for subscribing to multiple channels at once.
Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
// Subscribers returns total number of subscribers to the given channel
Subscribers(channel string) int
// Channels returns the channel names that messages have been sent to
Channels() []string
// Go asynchronously executes the given Func
Go(ctx context.Context, fn Func)
// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
Cron(ctx context.Context, interval time.Duration, fn CronFunc)
// Current returns the number of active jobs that are running concurrently
Current() int
// Wait blocks until all active async functions(Go, Cron) exit
Wait()
// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
Close()
}

Example

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  	defer cancel()
  	var (
  		m       = machine.New()
  		results []string
  		mu      sync.RWMutex
  	)
  	defer m.Close()
  
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	<-time.After(1 * time.Second)
  	m.Publish(ctx, machine.Message{
  		Channel: "human_resources.chat_room6",
  		Body:    "hello world human resources",
  	})
  	m.Publish(ctx, machine.Message{
  		Channel: "accounting.chat_room2",
  		Body:    "hello world accounting",
  	})
  	m.Publish(ctx, machine.Message{
  		Channel: "engineering.chat_room1",
  		Body:    "hello world engineering",
  	})
  	m.Wait()
  	sort.Strings(results)
  	for _, res := range results {
  		fmt.Print(res)
  	}
  	// Output:
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(human_resources.chat_room6) received msg: hello world human resources
  	//(human_resources.chat_room6) received msg: hello world human resources
Extended Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Func

type Func func(ctx context.Context) error

Func is a first class function that is asynchronously executed.

type Machine

type Machine interface {
	// Publish synchronously publishes the Message
	Publish(ctx context.Context, msg Message)
	// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
	// Glob matching IS supported for subscribing to multiple channels at once.
	Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, options ...SubscriptionOpt) error
	// Subscribers returns total number of subscribers to the given channel
	Subscribers(channel string) int
	// Subscriptions returns the channel names/patterns that subscribers are listening on
	Subscriptions() []string
	// Go asynchronously executes the given Func
	Go(ctx context.Context, fn Func)
	// Current returns the number of active jobs that are running concurrently
	Current() int
	// Wait blocks until all active async functions(Go) exit
	Wait() error
	// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
	Close()
}

Machine is an interface for highly asynchronous Go applications

func New

func New(opts ...Opt) Machine

New creates a new Machine instance with the given options(if present)

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var (
	m       = machine.New()
	results []string
	mu      sync.RWMutex
)
defer m.Close()

m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.Channel, msg.Body))
		mu.Unlock()
		return true, nil
	})
	return nil
})
<-time.After(1 * time.Second)
m.Publish(ctx, machine.Message{
	Channel: "human_resources.chat_room6",
	Body:    "hello world human resources",
})
m.Publish(ctx, machine.Message{
	Channel: "accounting.chat_room2",
	Body:    "hello world accounting",
})
m.Publish(ctx, machine.Message{
	Channel: "engineering.chat_room1",
	Body:    "hello world engineering",
})
m.Wait()
sort.Strings(results)
for _, res := range results {
	fmt.Print(res)
}
Output:

(accounting.chat_room2) received msg: hello world accounting
(accounting.chat_room2) received msg: hello world accounting
(engineering.chat_room1) received msg: hello world engineering
(engineering.chat_room1) received msg: hello world engineering
(human_resources.chat_room6) received msg: hello world human resources
(human_resources.chat_room6) received msg: hello world human resources

type Message

type Message struct {
	Channel string
	Body    interface{}
}

type MessageFilterFunc

type MessageFilterFunc func(ctx context.Context, msg Message) (bool, error)

Filter is a first class function to filter out messages before they reach a subscriptions primary Handler Return true to indicate that a message passes the filter

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, msg Message) (bool, error)

Handler is a first class that is executed against the inbound message in a subscription. Return false to indicate that the subscription should end

type Opt

type Opt func(o *Options)

Opt configures a machine instance

func WithThrottledRoutines

func WithThrottledRoutines(max int) Opt

WithThrottledRoutines throttles the max number of active routine's spawned by the Machine.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds config options for a machine instance

type SubscriptionOpt

type SubscriptionOpt func(options *SubscriptionOptions)

SubscriptionOpt configures a subscription

func WithFilter

func WithFilter(filter MessageFilterFunc) SubscriptionOpt

WithFilter is a subscription option that filters messages

func WithSubscriptionID

func WithSubscriptionID(id string) SubscriptionOpt

WithSubscriptionID is a subscription option that sets the subscription id - if multiple consumers have the same subscritpion id, a message will be broadcasted to just one of the consumers

type SubscriptionOptions

type SubscriptionOptions struct {
	// contains filtered or unexported fields
}

SubscriptionOptions holds config options for a subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL