worker

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PathCheckpoint = "chkpnt"

	// PatternPathData : data/{jobID}/{topic}
	PatternPathJobTopicData = "data/%s/%s"
)
View Source
const (
	SpaceDefaultWorker = "daemon-data"
)

Variables

This section is empty.

Functions

func NewFactoryRegistry

func NewFactoryRegistry() (factory *factoryRegistry)

NewAbstractWorkerFactory create AbstractWorkerFactory

Types

type DataHandler

type DataHandler func(jobID string, topic string, rowID string, value []byte) bool

type DataPath

type DataPath struct {
	JobID string
	Topic string
}

func ParseDataPath

func ParseDataPath(path string) (dataPath DataPath, err error)

func ParseDataPathBytes

func ParseDataPathBytes(path []byte) (dataPath DataPath, err error)

func (DataPath) String

func (path DataPath) String() string

type Factory

type Factory interface {
	Name() string
	Space() string
	NewWorker(helper *Helper) (Worker, error)
}

Factory ..

type Helper

type Helper struct {
	// contains filtered or unexported fields
}

Helper ..

func NewHelper

func NewHelper(space string, config common.DaemonConfig, logger log.Logger, job job.Job, dao Repository) *Helper

NewHelper ..

func (*Helper) Config

func (helper *Helper) Config() common.DaemonConfig

Config get node's config

func (*Helper) Debug

func (helper *Helper) Debug(msg string, keyvals ...interface{})

Info log info

func (*Helper) DeleteData

func (helper *Helper) DeleteData(topic string, rowID string) error

DeleteData ..

func (*Helper) Error

func (helper *Helper) Error(msg string, keyvals ...interface{})

Info log info

func (*Helper) GetCheckpoint

func (helper *Helper) GetCheckpoint(checkpoint interface{}) error

GetCheckpoint ..

func (*Helper) GetData

func (helper *Helper) GetData(topic string, rowID string) (data []byte, err error)

GetData ..

func (*Helper) GetDataList

func (helper *Helper) GetDataList(topic string, handler DataHandler) error

GetDataList ..

func (*Helper) GetObject

func (helper *Helper) GetObject(topic string, rowID string, data interface{}) error

GetObject ..

func (*Helper) GetRepository

func (helper *Helper) GetRepository() Repository

Config get worker's Repository

func (*Helper) ID

func (helper *Helper) ID() string

ID get worker's id

func (*Helper) Info

func (helper *Helper) Info(msg string, keyvals ...interface{})

Info log info

func (*Helper) Job

func (helper *Helper) Job() job.Job

Job get worker's Job

func (*Helper) PutCheckpoint

func (helper *Helper) PutCheckpoint(checkpoint interface{}) error

PutCheckpoint ..

func (*Helper) PutData

func (helper *Helper) PutData(topic string, rowID string, data []byte) error

PutData ..

func (*Helper) PutDataFullPath

func (helper *Helper) PutDataFullPath(fullPath string, data []byte) error

PutDataFullPath ..

func (*Helper) PutObject

func (helper *Helper) PutObject(topic string, rowID string, data interface{}) error

PutObject ..

func (*Helper) Space

func (helper *Helper) Space() string

ID get worker's space

type Manager

type Manager struct {
	common.Context
	// contains filtered or unexported fields
}

Manager manager for jobs

func NewManager

func NewManager(context common.Context, spaceRegistry types.SpaceRegistry) *Manager

NewManager ..

func (*Manager) ContainsWorker

func (manager *Manager) ContainsWorker(id string) bool

ContainsWorker if worker id is registered.

func (*Manager) GetRepository

func (manager *Manager) GetRepository() Repository

func (*Manager) GetWorker

func (manager *Manager) GetWorker(id string) Worker

GetWorker get worker for id

func (*Manager) RegisterWorkerFactory

func (manager *Manager) RegisterWorkerFactory(factory Factory) error

func (*Manager) SetJobs

func (manager *Manager) SetJobs(jobs []job.Job)

SetJobs ...

func (*Manager) Start

func (manager *Manager) Start()

type Repository

type Repository interface {
	PutCheckpoint(jobID string, checkpoint interface{}) error
	GetCheckpoint(jobID string, checkpoint interface{}) error
	PutData(space string, jobID string, topic string, rowID string, data []byte) error
	PutObject(space string, jobID string, topic string, rowID string, data interface{}) error
	GetData(space string, jobID string, topic string, rowID string) (data []byte, err error)
	GetObject(space string, jobID string, topic string, rowID string, data interface{}) error
	DeleteData(space string, jobID string, topic string, rowID string) error
	GetDataWithTopic(space string, jobID string, topic string, handler DataHandler) error

	PutDataFullPath(space string, path string, data []byte) error
	PutObjectFullPath(space string, path string, data interface{}) error
	GetDataFullPath(space string, path string) (data []byte, err error)
	GetObjectFullPath(space string, path string, data interface{}) (err error)
	DeleteDataFullPath(space string, path string) error
}

func NewRepository

func NewRepository(config common.DaemonConfig, logger log.Logger, client types.Client) Repository

type Worker

type Worker interface {
	ID() string
	Start() error
	Stop() error
	IsStarted() bool
}

Worker ..

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL