machine

package module
v2.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

README

Machine GoDoc

concurrency

import "github.com/autom8ter/machine/v2"

Machine is a zero dependency library for highly concurrent Go applications. It is inspired by errgroup.Group with extra bells & whistles:

  • In memory Publish Subscribe for asynchronously broadcasting & consuming messages in memory
  • Asynchronous worker groups similar to errgroup.Group
  • Throttled max active goroutine count
  • Asynchronous error handling(see WithErrorHandler to override default error handler)
  • Asynchronous cron jobs- Cron()

Use Cases

Machine is meant to be completely agnostic and dependency free- its use cases are expected to be emergent. Really, it can be used anywhere goroutines are used.

Highly concurrent and/or asynchronous applications include:

  • gRPC streaming servers

  • websocket servers

  • pubsub servers

  • reverse proxies

  • cron jobs

  • custom database/cache

  • ETL pipelines

  • log sink

  • filesystem walker

  • code generation

// Machine is an interface for highly asynchronous Go applications
type Machine interface {
	// Publish synchronously publishes the Message
	Publish(ctx context.Context, msg Message)
	// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
	// Glob matching IS supported for subscribing to multiple channels at once.
	Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
	// Go asynchronously executes the given Func
	Go(ctx context.Context, fn Func)
	// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
	Cron(ctx context.Context, interval time.Duration, fn CronFunc)
	// Loop asynchronously executes the given function repeatedly UNTIL the context cancels OR false is returned by the LoopFunc
	Loop(ctx context.Context, fn LoopFunc)
	// Wait blocks until all active async functions(Loop, Go, Cron) exit
	Wait()
	// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
	Close()
}

Example

        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  	defer cancel()
  	var (
  		m       = machine.New()
  		results []string
  		mu      sync.RWMutex
  	)
  	defer m.Close()
  
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	m.Go(ctx, func(ctx context.Context) error {
  		m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
  			mu.Lock()
  			results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
  			mu.Unlock()
  			return true, nil
  		})
  		return nil
  	})
  	<-time.After(1 * time.Second)
  	m.Publish(ctx, machine.Msg{
  		Channel: "human_resources.chat_room6",
  		Body:    "hello world human resources",
  	})
  	m.Publish(ctx, machine.Msg{
  		Channel: "accounting.chat_room2",
  		Body:    "hello world accounting",
  	})
  	m.Publish(ctx, machine.Msg{
  		Channel: "engineering.chat_room1",
  		Body:    "hello world engineering",
  	})
  	m.Wait()
  	sort.Strings(results)
  	for _, res := range results {
  		fmt.Print(res)
  	}
  	// Output:
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(accounting.chat_room2) received msg: hello world accounting
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(engineering.chat_room1) received msg: hello world engineering
  	//(human_resources.chat_room6) received msg: hello world human resources
  	//(human_resources.chat_room6) received msg: hello world human resources
Extended Examples

All examples are < 500 lines of code(excluding code generation)

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CronFunc added in v2.0.2

type CronFunc func(ctx context.Context) (bool, error)

CronFunc is a first class function that is asynchronously executed on a timed interval. Return false to indicate that the cron should end

type Func

type Func func(ctx context.Context) error

Func is a first class function that is asynchronously executed.

type LoopFunc added in v2.0.4

type LoopFunc func(ctx context.Context) (bool, error)

LoopFunc is a first class function that is asynchronously executed over and over again. Return false to indicate that the loop should end

type Machine

type Machine interface {
	// Publish synchronously publishes the Message
	Publish(ctx context.Context, msg Message)
	// Subscribe synchronously subscribes to messages on a given channel,  executing the given HandlerFunc UNTIL the context cancels OR false is returned by the HandlerFunc.
	// Glob matching IS supported for subscribing to multiple channels at once.
	Subscribe(ctx context.Context, channel string, handler MessageHandlerFunc, opts ...SubscriptionOpt)
	// Go asynchronously executes the given Func
	Go(ctx context.Context, fn Func)
	// Cron asynchronously executes the given function on a timed interval UNTIL the context cancels OR false is returned by the CronFunc
	Cron(ctx context.Context, interval time.Duration, fn CronFunc)
	// Loop asynchronously executes the given function repeatedly UNTIL the context cancels OR false is returned by the LoopFunc
	Loop(ctx context.Context, fn LoopFunc)
	// Wait blocks until all active async functions(Loop, Go, Cron) exit
	Wait()
	// Close blocks until all active routine's exit(calls Wait) then closes all active subscriptions
	Close()
}

Machine is an interface for highly asynchronous Go applications

func New

func New(opts ...Opt) Machine

New creates a new Machine instance with the given options(if present)

Example
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var (
	m       = machine.New()
	results []string
	mu      sync.RWMutex
)
defer m.Close()

m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "accounting.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "engineering.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "human_resources.*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
		mu.Unlock()
		return true, nil
	})
	return nil
})
m.Go(ctx, func(ctx context.Context) error {
	m.Subscribe(ctx, "*", func(ctx context.Context, msg machine.Message) (bool, error) {
		mu.Lock()
		results = append(results, fmt.Sprintf("(%s) received msg: %v\n", msg.GetChannel(), msg.GetBody()))
		mu.Unlock()
		return true, nil
	})
	return nil
})
<-time.After(1 * time.Second)
m.Publish(ctx, machine.Msg{
	Channel: "human_resources.chat_room6",
	Body:    "hello world human resources",
})
m.Publish(ctx, machine.Msg{
	Channel: "accounting.chat_room2",
	Body:    "hello world accounting",
})
m.Publish(ctx, machine.Msg{
	Channel: "engineering.chat_room1",
	Body:    "hello world engineering",
})
m.Wait()
sort.Strings(results)
for _, res := range results {
	fmt.Print(res)
}
Output:

(accounting.chat_room2) received msg: hello world accounting
(accounting.chat_room2) received msg: hello world accounting
(engineering.chat_room1) received msg: hello world engineering
(engineering.chat_room1) received msg: hello world engineering
(human_resources.chat_room6) received msg: hello world human resources
(human_resources.chat_room6) received msg: hello world human resources

type Message

type Message interface {
	GetChannel() string
	GetBody() interface{}
}

type MessageFilterFunc added in v2.0.2

type MessageFilterFunc func(ctx context.Context, msg Message) (bool, error)

Filter is a first class function to filter out messages before they reach a subscriptions primary Handler Return true to indicate that a message passes the filter

type MessageHandlerFunc added in v2.0.2

type MessageHandlerFunc func(ctx context.Context, msg Message) (bool, error)

Handler is a first class that is executed against the inbound message in a subscription. Return false to indicate that the subscription should end

type Msg

type Msg struct {
	Channel string
	Body    interface{}
}

func (Msg) GetBody

func (m Msg) GetBody() interface{}

func (Msg) GetChannel

func (m Msg) GetChannel() string

type Opt

type Opt func(o *Options)

Opt configures a machine instance

func WithErrHandler

func WithErrHandler(errHandler func(err error)) Opt

WithErrHandler overrides the default machine error handler

func WithThrottledRoutines

func WithThrottledRoutines(max int) Opt

WithThrottledRoutines throttles the max number of active routine's spawned by the Machine.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds config options for a machine instance

type SubscriptionOpt added in v2.0.2

type SubscriptionOpt func(options *SubscriptionOptions)

SubscriptionOpt configures a subscription

func WithFilter

func WithFilter(filter MessageFilterFunc) SubscriptionOpt

WithFilter is a subscription option that filters messages

type SubscriptionOptions added in v2.0.2

type SubscriptionOptions struct {
	// contains filtered or unexported fields
}

SubscriptionOptions holds config options for a subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL