Documentation ¶
Index ¶
- Constants
- Variables
- type AsyncOption
- type AsyncOptions
- type AsyncTask
- type Cluster
- func (cluster *Cluster) AllTaskChains() map[string][]string
- func (cluster *Cluster) AllTasks() map[string][]string
- func (cluster *Cluster) ExecEvent(failover bool, event Event, execTime time.Duration, err error) error
- func (cluster *Cluster) ReceiveEvent(event Event, err error) error
- func (cluster *Cluster) RegisterApplication(ctx context.Context, mux *TaskMux) (err error)
- func (cluster *Cluster) RunSync(ctx context.Context)
- func (cluster *Cluster) StopSync()
- func (cluster *Cluster) SyncInfo() error
- func (cluster *Cluster) TargetEventsAfter(eventName string) []string
- func (cluster *Cluster) UnregisterApplication() (err error)
- type ClusterExt
- type ClusterOption
- type ContextWrapperFnk
- type ErrorHandlerFnk
- type Event
- type EventAllocator
- type FuncTask
- type Message
- type Option
- func WithCluster(appName string, options ...ClusterOption) Option
- func WithClusterObject(cluster ClusterExt) Option
- func WithContextWrapper(w ContextWrapperFnk) Option
- func WithErrorHandler(h ErrorHandlerFnk) Option
- func WithEventAllocator(eventAllocator EventAllocator) Option
- func WithMainExecContext(ctx context.Context) Option
- func WithMonitor(appName, host, hostname string, updater ...monitor.MetricUpdater) Option
- func WithMonitorDefaults(appName string, updater ...monitor.MetricUpdater) Option
- func WithPanicHandler(h PanicHandlerFnk) Option
- func WithResponseFactory(responseFactory ResponseWriterFactory) Option
- func WithStreamResponseMap(streams ...any) Option
- func WithStreamResponsePublisher(publisher Publisher) Option
- type Options
- type PanicHandlerFnk
- type Payload
- type Promise
- type ProxySubscriber
- type Publisher
- type Receiver
- type ResponseHandlerFnk
- type ResponseWriter
- type ResponseWriterFactory
- type Stream
- type Subscriber
- type Task
- type TaskMux
- func (srv *TaskMux) Close() error
- func (srv *TaskMux) CompleteTasks(event Event) (totalTasks, completedTasks []string)
- func (srv *TaskMux) ExecuteEvent(event Event) error
- func (srv *TaskMux) Failver(task any) error
- func (srv *TaskMux) FinishInit() error
- func (srv *TaskMux) Handle(taskName string, handler any) Promise
- func (srv *TaskMux) Receive(msg Message) error
- func (srv *TaskMux) TaskMap() map[string][]string
Constants ¶
const DefaultRetranslateCount = 30
DefaultRetranslateCount shows amount of event repeating in the pipeline
Variables ¶
var ( // ErrSkipEvent in case of repeat count exceeds the limit ErrSkipEvent = errors.ErrSkipEvent // ErrNil in case of empty response ErrNil = errors.ErrNil )
var (
ErrChanelTaken = errors.New(`chanel has been taken`)
)
Error list...
var ( // ErrNoSyncInformation in case if cant get access to sync information ErrNoSyncInformation = errors.New("no information for sync") )
var ErrNullMessageValue = errors.New(`the value message is nil`)
ErrNullMessageValue if value is nil
Functions ¶
This section is empty.
Types ¶
type AsyncOption ¶
type AsyncOption func(opt *AsyncOptions)
AsyncOption type options tune
func WithRecoverHandler ¶
func WithRecoverHandler(f func(any)) AsyncOption
WithRecoverHandler defined error handler
func WithWorkerCount ¶
func WithWorkerCount(count int) AsyncOption
WithWorkerCount change count of workers
func WithWorkerPoolSize ¶
func WithWorkerPoolSize(size int) AsyncOption
WithWorkerPoolSize setup maximal size of worker pool
type AsyncOptions ¶
type AsyncOptions struct {
// contains filtered or unexported fields
}
AsyncOptions contains concurrent execution pool
func (*AsyncOptions) Pool ¶
func (opt *AsyncOptions) Pool(fnk func(any)) *rpool.PoolFunc[any]
Pool of execution
type AsyncTask ¶
type AsyncTask struct {
// contains filtered or unexported fields
}
AsyncTask processor
func WrapAsyncTask ¶
func WrapAsyncTask(task Task, options ...AsyncOption) *AsyncTask
WrapAsyncTask as async executor
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster provides synchronization of several processing pools and join all processing graphs in one cross-service execution map.
func NewCluster ¶
func NewCluster(appName string, options ...ClusterOption) *Cluster
NewCluster graph synchronization
func (*Cluster) AllTaskChains ¶
AllTaskChains returns all events task chains TODO: revise the way of tasks information representation
func (*Cluster) ExecEvent ¶
func (cluster *Cluster) ExecEvent(failover bool, event Event, execTime time.Duration, err error) error
ExecEvent handler after event has been executed
func (*Cluster) ReceiveEvent ¶
ReceiveEvent handler before it was executed
func (*Cluster) RegisterApplication ¶
RegisterApplication in iternal storages
func (*Cluster) TargetEventsAfter ¶
TargetEventsAfter returns list of events to execute after the current event
func (*Cluster) UnregisterApplication ¶
UnregisterApplication from cluster
type ClusterExt ¶
type ClusterExt interface { RegisterApplication(ctx context.Context, mux *TaskMux) error UnregisterApplication() error ReceiveEvent(event Event, err error) error ExecEvent(failover bool, event Event, execTime time.Duration, err error) error TargetEventsAfter(eventName string) []string AllTasks() map[string][]string AllTaskChains() map[string][]string }
ClusterExt extends functionality of mux
type ClusterOption ¶
type ClusterOption = func(cluster *Cluster)
ClusterOption provides option extractor for the cluster configuration
func ClusterWithHostinfo ¶
func ClusterWithHostinfo(hostIP, hostname string) ClusterOption
ClusterWithHostinfo puts hostname information
func ClusterWithReader ¶
func ClusterWithReader(infoReader monitor.ClusterInfoReader) ClusterOption
ClusterWithReader of the culter info
func ClusterWithStores ¶
func ClusterWithStores(stores ...monitor.MetricUpdater) ClusterOption
ClusterWithStores between exec graph refresh
func ClusterWithSyncInterval ¶
func ClusterWithSyncInterval(syncInterval time.Duration) ClusterOption
ClusterWithSyncInterval between exec graph refresh
type ContextWrapperFnk ¶
ContextWrapperFnk for prepare execution context
type ErrorHandlerFnk ¶
ErrorHandlerFnk for any error response
type Event ¶
type Event interface { fmt.Stringer // ID of the event ID() uuid.UUID // Name of the event Name() string // Payload returns the current message payload Payload() Payload // Err returns error response object Err() error // CreatedAt returns the date of the event generation CreatedAt() time.Time // Mux returns base executer server Mux() *TaskMux // SetMux set new mux object SetMux(*TaskMux) // Mux returns origin promise object Promise() Promise // Promise set origin promise object SetPromise(Promise) // WithName returns new event with new name and current payload and error WithName(name string) Event // WithPayload returns new event object with extended payload context WithPayload(payload any) Event // WithError returns new event object with extended error value WithError(err error) Event // SetComplete marks event as complited or no SetComplete(b bool) // IsComplete returns marker of event completion IsComplete() bool // Counters returns current counter state Counters() (sent, retranslated int) // After provided event After(e Event) Event // Repeat event Repeat(e Event) Event // DoneTasks returns the list of previous event names DoneTasks() []string // HasDoneTask checks is the tasks has been processed HasDoneTask(name string) bool // Encode event to byte array Encode() ([]byte, error) // Decode event by the byte array Decode(data []byte) error }
Event provides interface of working with message streams
func WithPayload ¶
WithPayload returns new event object with payload data
type EventAllocator ¶
EventAllocator provides event allocation objects interface
type FuncTask ¶
type FuncTask func(ctx context.Context, event Event, responseWriter ResponseWriter) error
FuncTask provides implementation of Task interface for function pointer
func ExtFuncTask ¶
ExtFuncTask wraps function argument with arbitrary input data type
func (FuncTask) Async ¶
func (f FuncTask) Async(options ...AsyncOption) *AsyncTask
Async transforms task to the asynchronous executor
type Message ¶
type Message = notificationcenter.Message
Message this is the internal type of message
type Option ¶
type Option func(opt *Options)
Option of the task configuration
func WithCluster ¶
func WithCluster(appName string, options ...ClusterOption) Option
WithCluster set option of the cluster synchronizer from options
func WithClusterObject ¶
func WithClusterObject(cluster ClusterExt) Option
WithClusterObject set option of the cluster synchronizer
func WithContextWrapper ¶
func WithContextWrapper(w ContextWrapperFnk) Option
WithContextWrapper puts context wrapper to the Mux option
func WithErrorHandler ¶
func WithErrorHandler(h ErrorHandlerFnk) Option
WithErrorHandler puts error handler to the Mux option
func WithEventAllocator ¶
func WithEventAllocator(eventAllocator EventAllocator) Option
WithEventAllocator set option with event allocator
func WithMainExecContext ¶
WithMainExecContext puts main execution context to the Mux option
func WithMonitor ¶
func WithMonitor(appName, host, hostname string, updater ...monitor.MetricUpdater) Option
WithMonitor set option with monitoring storage
func WithMonitorDefaults ¶
func WithMonitorDefaults(appName string, updater ...monitor.MetricUpdater) Option
WithMonitorDefaults set option with monitoring storage
func WithPanicHandler ¶
func WithPanicHandler(h PanicHandlerFnk) Option
WithPanicHandler puts panic handler to the Mux option
func WithResponseFactory ¶
func WithResponseFactory(responseFactory ResponseWriterFactory) Option
WithResponseFactory set option with stream factory
func WithStreamResponseMap ¶
WithStreamResponseMap set option with stream mapping converted to factory
func WithStreamResponsePublisher ¶
WithStreamResponsePublisher set option with single stream publisher
type Options ¶
type Options struct { MainExecContext context.Context PanicHandler PanicHandlerFnk ErrorHandler ErrorHandlerFnk ContextWrapper ContextWrapperFnk ResponseFactory ResponseWriterFactory Cluster ClusterExt EventAllocator EventAllocator }
Options of the mux server
type PanicHandlerFnk ¶
PanicHandlerFnk for any panic errors
type Payload ¶
type Payload interface { // Encode payload data to the bytes Encode() ([]byte, error) // Decode payload data into the target Decode(target any) error }
Payload represents interface of working with input data
type Promise ¶
type Promise interface { // EventName accepted by the item EventName() string // TargetEventName returns name of target event TargetEventName() []string // AfterEventName map event in the event queue AfterEventName() string // TargetEvent define TargetEvent(name string) Promise // Then execute the next task if current succeeded Then(handler any) Promise // ThenEvent which need to execute ThenEvent(name string) // Parent promise item Parent() Promise // Task executor interface Task() Task // IsVirtual promise type IsVirtual() bool }
Promise describe the behaviour of Single task item
type ProxySubscriber ¶
type ProxySubscriber struct {
// contains filtered or unexported fields
}
ProxySubscriber defineds the multiple subscriber object
func (*ProxySubscriber) Close ¶
func (prx *ProxySubscriber) Close() error
Close all proxy subscribers
type Publisher ¶
type Publisher = notificationcenter.Publisher
Publisher writing interface
func PublisherEventWrapper ¶
PublisherEventWrapper with fixed event name
type Receiver ¶
type Receiver = notificationcenter.Receiver
Receiver defines the processing interface. This handler used for processing of the input messages from the stream
type ResponseHandlerFnk ¶
ResponseHandlerFnk provides implementation of ResponseWriter interface
func (ResponseHandlerFnk) Release ¶
func (f ResponseHandlerFnk) Release() error
Release response writer stream empty method
func (ResponseHandlerFnk) RepeatWithResponse ¶
func (f ResponseHandlerFnk) RepeatWithResponse(response any) error
RepeatWithResponse send data into the same stream response
func (ResponseHandlerFnk) WriteResonse ¶
func (f ResponseHandlerFnk) WriteResonse(response any) error
WriteResonse sends data into the stream response
type ResponseWriter ¶
type ResponseWriter interface { // WriteResonse sends data into the stream response WriteResonse(response any) error // RepeatWithResponse send data into the same stream response RepeatWithResponse(response any) error // Release response writer stream Release() error }
ResponseWriter basic response functionality
type ResponseWriterFactory ¶
type ResponseWriterFactory interface { // Borrow response writer by event and task object Borrow(ctx context.Context, promise Promise, event Event) ResponseWriter // Release response writer object Release(w ResponseWriter) }
ResponseWriterFactory interface to generate new response object
func NewMultistreamResponseFactory ¶
func NewMultistreamResponseFactory(streams ...any) ResponseWriterFactory
NewMultistreamResponseFactory returns implementation with multipublisher support
func NewProxyResponseFactory ¶
func NewProxyResponseFactory() ResponseWriterFactory
NewProxyResponseFactory returns
func NewStreamResponseFactory ¶
func NewStreamResponseFactory(publisher Publisher) ResponseWriterFactory
NewStreamResponseFactory returns
type Subscriber ¶
type Subscriber = notificationcenter.Subscriber
Subscriber defines the interface of subscribing for some event stream processing
func NewProxySubscriber ¶
func NewProxySubscriber(subs ...Subscriber) Subscriber
NewProxySubscriber object from list of subscribers
type Task ¶
type Task interface { // Execute the list of subtasks with input data collection. // It returns the new data collection which will be used in the next tasks as input params. Execute(ctx context.Context, event Event, responseWriter ResponseWriter) error }
Task describes a single execution unit
func Retranslator ¶
Retranslator of the event to the stream
type TaskMux ¶
type TaskMux struct {
// contains filtered or unexported fields
}
TaskMux object which controls the workflow of task execution
func (*TaskMux) CompleteTasks ¶
CompleteTasks checks the event completion state
func (*TaskMux) ExecuteEvent ¶
ExecuteEvent with mux executor
func (*TaskMux) Handle ¶
Handle register new task for specific chanel Task after other task can be defined by "parentTaskName>currentTaskName"