asyncp

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

README

Asyncp processing framework

Build Status Coverage Status Go Report Card GoDoc

License Apache 2.0

The simple framework to build async task execution programs.

Pipelines

Sequentially apply a list of tasks. Pipelines allow to split one big processing part for small simple steps to simplify the complex logic.

The pipeline is atomic, all tasks in the pipeline will successfully execute the whole task or all not.

Example program

import (
  ...
  "github.com/demdxx/asyncp/v2"
  "github.com/demdxx/asyncp/v2/streams"
  ...
)

func main() {
  taskQueueSub := nats.NewSubscriber(...)
  taskQueuePub := nats.NewPublisher(...)

  // Create new async multiplexer
  mx := asyncp.NewTaskMux(
    // Define default strem message queue
    asyncp.WithStreamResponseFactory(taskQueuePub),
    asyncp.WithPanicHandler(...),
    asyncp.WithErrorHandler(...),
    asyncp.WithCluster(...),
  )
  defer func() { _ = mx.Close() }()

  // Create new task handler to download articles by RSS
  mx.Handle("rss", downloadRSSList).
    Then(downloadRSSItem).
    Then(updateRSSArticles)

  // Create new task handler to process video files
  mx.Handle("video", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)

  // Send report to user (event contains login and email target)
  mx.Handle("email", pipeline.New(
    `userinfo`, assembleBasicInfo,
    `changes`, assembleAllChangesForUser,
    `template`, assembleEmailHTMLTemplate,
    pipeline.New(
      sendNotification,
      sendSendLogs,
    ),
  )).Then(sendEmailTask)

  // Retranslate all message to the queue if can`t process
  mx.Failver(asyncp.Retranslator(taskQueuePub))

  // Alternative:
  // taskQueueSub.Subscribe(context.Background(), mx)
  // taskQueueSub.Listen(context.Background())
  err = streams.ListenAndServe(context.Background(), mx,
    taskQueueSub, "nats://host:2222/group?topics=topicName")
}

Convert task to async executor.

atask := asyncp.WrapAsyncTask(task,
  WithWorkerCount(10),
  WithWorkerPoolSize(20),
  WithRecoverHandler(func(rec any) {
    // ...
  }))

// Or

asyncp.FuncTask(assembleBasicInfo).Async()

Cluster mode

The framework supports cluster task processing. Some of the servers can execute some specific tasks like video processing or windows specific stuff.

To extend cluster base functionality need to create in other applications (with the same sync options) linked task baseHandlerTaskName>myNewClusterTask.

func main() {
  ...
  // After RSS parsing we need to prcess video files from links if it's present
  mx.Handle("rss>videoExtraction", loadVideoForProcessing).
    Then(makeVideoThumbs).
    Then(convertVideoFormat)
  ...
}

Apmonitor tool

Displays state of the cluster and every task common state.

apmonitor tool

Documentation

Index

Constants

View Source
const DefaultRetranslateCount = 30

DefaultRetranslateCount shows amount of event repeating in the pipeline

Variables

View Source
var (
	// ErrSkipEvent in case of repeat count exceeds the limit
	ErrSkipEvent = errors.ErrSkipEvent

	// ErrNil in case of empty response
	ErrNil = errors.ErrNil
)
View Source
var (
	ErrChanelTaken = errors.New(`chanel has been taken`)
)

Error list...

View Source
var (
	// ErrNoSyncInformation in case if cant get access to sync information
	ErrNoSyncInformation = errors.New("no information for sync")
)
View Source
var ErrNullMessageValue = errors.New(`the value message is nil`)

ErrNullMessageValue if value is nil

Functions

This section is empty.

Types

type AsyncOption

type AsyncOption func(opt *AsyncOptions)

AsyncOption type options tune

func WithRecoverHandler

func WithRecoverHandler(f func(any)) AsyncOption

WithRecoverHandler defined error handler

func WithWorkerCount

func WithWorkerCount(count int) AsyncOption

WithWorkerCount change count of workers

func WithWorkerPoolSize

func WithWorkerPoolSize(size int) AsyncOption

WithWorkerPoolSize setup maximal size of worker pool

type AsyncOptions

type AsyncOptions struct {
	// contains filtered or unexported fields
}

AsyncOptions contains concurrent execution pool

func (*AsyncOptions) Pool

func (opt *AsyncOptions) Pool(fnk func(any)) *rpool.PoolFunc[any]

Pool of execution

type AsyncTask

type AsyncTask struct {
	// contains filtered or unexported fields
}

AsyncTask processor

func WrapAsyncTask

func WrapAsyncTask(task Task, options ...AsyncOption) *AsyncTask

WrapAsyncTask as async executor

func (*AsyncTask) Close

func (t *AsyncTask) Close() error

Close execution pool and finish handler processing

func (*AsyncTask) Execute

func (t *AsyncTask) Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error

Execute the list of subtasks with input data collection.

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster provides synchronization of several processing pools and join all processing graphs in one cross-service execution map.

func NewCluster

func NewCluster(appName string, options ...ClusterOption) *Cluster

NewCluster graph synchronization

func (*Cluster) AllTaskChains

func (cluster *Cluster) AllTaskChains() map[string][]string

AllTaskChains returns all events task chains TODO: revise the way of tasks information representation

func (*Cluster) AllTasks

func (cluster *Cluster) AllTasks() map[string][]string

AllTasks returns all events and links

func (*Cluster) ExecEvent

func (cluster *Cluster) ExecEvent(failover bool, event Event, execTime time.Duration, err error) error

ExecEvent handler after event has been executed

func (*Cluster) ReceiveEvent

func (cluster *Cluster) ReceiveEvent(event Event, err error) error

ReceiveEvent handler before it was executed

func (*Cluster) RegisterApplication

func (cluster *Cluster) RegisterApplication(ctx context.Context, mux *TaskMux) (err error)

RegisterApplication in iternal storages

func (*Cluster) RunSync

func (cluster *Cluster) RunSync(ctx context.Context)

RunSync of cluster information

func (*Cluster) StopSync

func (cluster *Cluster) StopSync()

StopSync interval processing

func (*Cluster) SyncInfo

func (cluster *Cluster) SyncInfo() error

SyncInfo of the cluster

func (*Cluster) TargetEventsAfter

func (cluster *Cluster) TargetEventsAfter(eventName string) []string

TargetEventsAfter returns list of events to execute after the current event

func (*Cluster) UnregisterApplication

func (cluster *Cluster) UnregisterApplication() (err error)

UnregisterApplication from cluster

type ClusterExt

type ClusterExt interface {
	RegisterApplication(ctx context.Context, mux *TaskMux) error
	UnregisterApplication() error
	ReceiveEvent(event Event, err error) error
	ExecEvent(failover bool, event Event, execTime time.Duration, err error) error
	TargetEventsAfter(eventName string) []string
	AllTasks() map[string][]string
	AllTaskChains() map[string][]string
}

ClusterExt extends functionality of mux

type ClusterOption

type ClusterOption = func(cluster *Cluster)

ClusterOption provides option extractor for the cluster configuration

func ClusterWithHostinfo

func ClusterWithHostinfo(hostIP, hostname string) ClusterOption

ClusterWithHostinfo puts hostname information

func ClusterWithReader

func ClusterWithReader(infoReader monitor.ClusterInfoReader) ClusterOption

ClusterWithReader of the culter info

func ClusterWithStores

func ClusterWithStores(stores ...monitor.MetricUpdater) ClusterOption

ClusterWithStores between exec graph refresh

func ClusterWithSyncInterval

func ClusterWithSyncInterval(syncInterval time.Duration) ClusterOption

ClusterWithSyncInterval between exec graph refresh

type ContextWrapperFnk

type ContextWrapperFnk func(ctx context.Context) context.Context

ContextWrapperFnk for prepare execution context

type ErrorHandlerFnk

type ErrorHandlerFnk func(Task, Event, error)

ErrorHandlerFnk for any error response

type Event

type Event interface {
	fmt.Stringer

	// ID of the event
	ID() uuid.UUID

	// Name of the event
	Name() string

	// Payload returns the current message payload
	Payload() Payload

	// Err returns error response object
	Err() error

	// CreatedAt returns the date of the event generation
	CreatedAt() time.Time

	// Mux returns base executer server
	Mux() *TaskMux

	// SetMux set new mux object
	SetMux(*TaskMux)

	// Mux returns origin promise object
	Promise() Promise

	// Promise set origin promise object
	SetPromise(Promise)

	// WithName returns new event with new name and current payload and error
	WithName(name string) Event

	// WithPayload returns new event object with extended payload context
	WithPayload(payload any) Event

	// WithError returns new event object with extended error value
	WithError(err error) Event

	// SetComplete marks event as complited or no
	SetComplete(b bool)

	// IsComplete returns marker of event completion
	IsComplete() bool

	// Counters returns current counter state
	Counters() (sent, retranslated int)

	// After provided event
	After(e Event) Event

	// Repeat event
	Repeat(e Event) Event

	// DoneTasks returns the list of previous event names
	DoneTasks() []string

	// HasDoneTask checks is the tasks has been processed
	HasDoneTask(name string) bool

	// Encode event to byte array
	Encode() ([]byte, error)

	// Decode event by the byte array
	Decode(data []byte) error
}

Event provides interface of working with message streams

func WithPayload

func WithPayload(eventName string, data any) Event

WithPayload returns new event object with payload data

type EventAllocator

type EventAllocator interface {
	Decode(msg Message) (Event, error)
	Release(event Event) error
}

EventAllocator provides event allocation objects interface

type FuncTask

type FuncTask func(ctx context.Context, event Event, responseWriter ResponseWriter) error

FuncTask provides implementation of Task interface for function pointer

func ExtFuncTask

func ExtFuncTask(f any) FuncTask

ExtFuncTask wraps function argument with arbitrary input data type

func (FuncTask) Async

func (f FuncTask) Async(options ...AsyncOption) *AsyncTask

Async transforms task to the asynchronous executor

func (FuncTask) Execute

func (f FuncTask) Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error

Execute the list of subtasks with input data collection.

type Message

type Message = notificationcenter.Message

Message this is the internal type of message

type Option

type Option func(opt *Options)

Option of the task configuration

func WithCluster

func WithCluster(appName string, options ...ClusterOption) Option

WithCluster set option of the cluster synchronizer from options

func WithClusterObject

func WithClusterObject(cluster ClusterExt) Option

WithClusterObject set option of the cluster synchronizer

func WithContextWrapper

func WithContextWrapper(w ContextWrapperFnk) Option

WithContextWrapper puts context wrapper to the Mux option

func WithErrorHandler

func WithErrorHandler(h ErrorHandlerFnk) Option

WithErrorHandler puts error handler to the Mux option

func WithEventAllocator

func WithEventAllocator(eventAllocator EventAllocator) Option

WithEventAllocator set option with event allocator

func WithMainExecContext

func WithMainExecContext(ctx context.Context) Option

WithMainExecContext puts main execution context to the Mux option

func WithMonitor

func WithMonitor(appName, host, hostname string, updater ...monitor.MetricUpdater) Option

WithMonitor set option with monitoring storage

func WithMonitorDefaults

func WithMonitorDefaults(appName string, updater ...monitor.MetricUpdater) Option

WithMonitorDefaults set option with monitoring storage

func WithPanicHandler

func WithPanicHandler(h PanicHandlerFnk) Option

WithPanicHandler puts panic handler to the Mux option

func WithResponseFactory

func WithResponseFactory(responseFactory ResponseWriterFactory) Option

WithResponseFactory set option with stream factory

func WithStreamResponseMap

func WithStreamResponseMap(streams ...any) Option

WithStreamResponseMap set option with stream mapping converted to factory

func WithStreamResponsePublisher

func WithStreamResponsePublisher(publisher Publisher) Option

WithStreamResponsePublisher set option with single stream publisher

type Options

type Options struct {
	MainExecContext context.Context
	PanicHandler    PanicHandlerFnk
	ErrorHandler    ErrorHandlerFnk
	ContextWrapper  ContextWrapperFnk
	ResponseFactory ResponseWriterFactory
	Cluster         ClusterExt
	EventAllocator  EventAllocator
}

Options of the mux server

type PanicHandlerFnk

type PanicHandlerFnk func(Task, Event, any)

PanicHandlerFnk for any panic errors

type Payload

type Payload interface {
	// Encode payload data to the bytes
	Encode() ([]byte, error)

	// Decode payload data into the target
	Decode(target any) error
}

Payload represents interface of working with input data

type Promise

type Promise interface {
	// EventName accepted by the item
	EventName() string

	// TargetEventName returns name of target event
	TargetEventName() []string

	// AfterEventName map event in the event queue
	AfterEventName() string

	// TargetEvent define
	TargetEvent(name string) Promise

	// Then execute the next task if current succeeded
	Then(handler any) Promise

	// ThenEvent which need to execute
	ThenEvent(name string)

	// Parent promise item
	Parent() Promise

	// Task executor interface
	Task() Task

	// IsVirtual promise type
	IsVirtual() bool
}

Promise describe the behaviour of Single task item

type ProxySubscriber

type ProxySubscriber struct {
	// contains filtered or unexported fields
}

ProxySubscriber defineds the multiple subscriber object

func (*ProxySubscriber) Close

func (prx *ProxySubscriber) Close() error

Close all proxy subscribers

func (*ProxySubscriber) Listen

func (prx *ProxySubscriber) Listen(ctx context.Context) (err error)

Listen starts processing queue

func (*ProxySubscriber) Subscribe

func (prx *ProxySubscriber) Subscribe(ctx context.Context, h Receiver) error

Subscribe new handler for the stream

type Publisher

type Publisher = notificationcenter.Publisher

Publisher writing interface

func PublisherEventWrapper

func PublisherEventWrapper(eventName string, publisher Publisher) Publisher

PublisherEventWrapper with fixed event name

type Receiver

type Receiver = notificationcenter.Receiver

Receiver defines the processing interface. This handler used for processing of the input messages from the stream

type ResponseHandlerFnk

type ResponseHandlerFnk func(response any) error

ResponseHandlerFnk provides implementation of ResponseWriter interface

func (ResponseHandlerFnk) Release

func (f ResponseHandlerFnk) Release() error

Release response writer stream empty method

func (ResponseHandlerFnk) RepeatWithResponse

func (f ResponseHandlerFnk) RepeatWithResponse(response any) error

RepeatWithResponse send data into the same stream response

func (ResponseHandlerFnk) WriteResonse

func (f ResponseHandlerFnk) WriteResonse(response any) error

WriteResonse sends data into the stream response

type ResponseWriter

type ResponseWriter interface {
	// WriteResonse sends data into the stream response
	WriteResonse(response any) error

	// RepeatWithResponse send data into the same stream response
	RepeatWithResponse(response any) error

	// Release response writer stream
	Release() error
}

ResponseWriter basic response functionality

type ResponseWriterFactory

type ResponseWriterFactory interface {
	// Borrow response writer by event and task object
	Borrow(ctx context.Context, promise Promise, event Event) ResponseWriter

	// Release response writer object
	Release(w ResponseWriter)
}

ResponseWriterFactory interface to generate new response object

func NewMultistreamResponseFactory

func NewMultistreamResponseFactory(streams ...any) ResponseWriterFactory

NewMultistreamResponseFactory returns implementation with multipublisher support

func NewProxyResponseFactory

func NewProxyResponseFactory() ResponseWriterFactory

NewProxyResponseFactory returns

func NewStreamResponseFactory

func NewStreamResponseFactory(publisher Publisher) ResponseWriterFactory

NewStreamResponseFactory returns

type Stream

type Stream = notificationcenter.Publisher

Stream writing interface

type Subscriber

type Subscriber = notificationcenter.Subscriber

Subscriber defines the interface of subscribing for some event stream processing

func NewProxySubscriber

func NewProxySubscriber(subs ...Subscriber) Subscriber

NewProxySubscriber object from list of subscribers

type Task

type Task interface {
	// Execute the list of subtasks with input data collection.
	// It returns the new data collection which will be used in the next tasks as input params.
	Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error
}

Task describes a single execution unit

func Repeater

func Repeater(repeatMaxCount ...int) Task

Repeater send same event to the same set of pipelines

func Retranslator

func Retranslator(repeatMaxCount int, pubs ...Publisher) Task

Retranslator of the event to the stream

func TaskFrom

func TaskFrom(handler any) Task

TaskFrom converts income handler type to Task interface

type TaskMux

type TaskMux struct {
	// contains filtered or unexported fields
}

TaskMux object which controls the workflow of task execution

func NewTaskMux

func NewTaskMux(options ...Option) *TaskMux

NewTaskMux server object

func (*TaskMux) Close

func (srv *TaskMux) Close() error

Close task schedule and all subtasks

func (*TaskMux) CompleteTasks

func (srv *TaskMux) CompleteTasks(event Event) (totalTasks, completedTasks []string)

CompleteTasks checks the event completion state

func (*TaskMux) ExecuteEvent

func (srv *TaskMux) ExecuteEvent(event Event) error

ExecuteEvent with mux executor

func (*TaskMux) Failver

func (srv *TaskMux) Failver(task any) error

Failver handler if was reseaved event with unsappoted event

func (*TaskMux) FinishInit

func (srv *TaskMux) FinishInit() error

FinishInit of the task server

func (*TaskMux) Handle

func (srv *TaskMux) Handle(taskName string, handler any) Promise

Handle register new task for specific chanel Task after other task can be defined by "parentTaskName>currentTaskName"

func (*TaskMux) Receive

func (srv *TaskMux) Receive(msg Message) error

Receive definds the processing function

func (*TaskMux) TaskMap

func (srv *TaskMux) TaskMap() map[string][]string

TaskMap returns linked list of events

Directories

Path Synopsis
cmd
example
libs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL