worker

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2020 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PathCheckpoint = "chkpnt"

	// PatternPathData : data/{jobID}/{topic}
	PatternPathJobTopicData = "%s/%s"
)
View Source
const (
	SpaceDefaultWorker = "daemon-data"
)

Variables

This section is empty.

Functions

func NewFactoryRegistry

func NewFactoryRegistry() (factory *factoryRegistry)

NewAbstractWorkerFactory create AbstractWorkerFactory

Types

type BaseProxy added in v0.0.2

type BaseProxy struct {
	// contains filtered or unexported fields
}

func NewBaseProxy added in v0.0.2

func NewBaseProxy(job job.Job, helper *Helper) *BaseProxy

func (BaseProxy) CollectAndSubscribe added in v0.0.2

func (b BaseProxy) CollectAndSubscribe(topic string, from string, handler DataHandler) (CancelSubs, error)

func (BaseProxy) DeleteData added in v0.0.2

func (b BaseProxy) DeleteData(topic string, rowID string) error

func (BaseProxy) GetCheckpoint added in v0.0.2

func (b BaseProxy) GetCheckpoint(checkpoint interface{}) error

func (BaseProxy) GetData added in v0.0.2

func (b BaseProxy) GetData(topic string, rowID string) (data []byte, err error)

func (BaseProxy) GetDataList added in v0.0.2

func (b BaseProxy) GetDataList(topic string, handler DataHandler) error

func (BaseProxy) GetDataListRange added in v0.0.2

func (b BaseProxy) GetDataListRange(topic string, from string, end string, handler DataHandler) error

func (BaseProxy) GetJob added in v0.0.2

func (b BaseProxy) GetJob() job.Job

func (BaseProxy) GetObject added in v0.0.2

func (b BaseProxy) GetObject(topic string, rowID string, ptr interface{}) error

func (BaseProxy) SubscribeTx added in v0.0.2

func (b BaseProxy) SubscribeTx(topic string, from string, handler DataHandler) (CancelSubs, error)

type CancelSubs added in v0.0.2

type CancelSubs func()

type CancelTxSubs added in v0.0.2

type CancelTxSubs struct {
	// contains filtered or unexported fields
}

type DataHandler

type DataHandler func(jobID string, topic string, rowID string, value []byte) bool

type DataPath

type DataPath struct {
	JobID string
	Topic string
}

func ParseDataPath

func ParseDataPath(path string) (dataPath DataPath, err error)

func ParseDataPathBytes

func ParseDataPathBytes(path []byte) (dataPath DataPath, err error)

func (DataPath) String

func (path DataPath) String() string

type Factory

type Factory interface {
	Name() string
	Space() string
	NewWorker(helper *Helper) (Worker, error)
}

Factory ..

type Helper

type Helper struct {
	// contains filtered or unexported fields
}

Helper ..

func NewHelper

func NewHelper(space string, config common.DaemonConfig, logger log.Logger, job job.Job,
	dao Repository, proxyProvider ProxyProvider) *Helper

NewHelper ..

func (*Helper) Config

func (helper *Helper) Config() common.DaemonConfig

Config get node's config

func (*Helper) Debug

func (helper *Helper) Debug(msg string, keyvals ...interface{})

Info log info

func (*Helper) DeleteData

func (helper *Helper) DeleteData(topic string, rowID string) error

DeleteData ..

func (*Helper) Error

func (helper *Helper) Error(msg string, keyvals ...interface{})

Info log info

func (*Helper) GetCheckpoint

func (helper *Helper) GetCheckpoint(checkpoint interface{}) error

GetCheckpoint ..

func (*Helper) GetData

func (helper *Helper) GetData(topic string, rowID string) (data []byte, err error)

GetData ..

func (*Helper) GetDataList

func (helper *Helper) GetDataList(topic string, handler DataHandler) error

GetDataList ..

func (*Helper) GetDataListRange added in v0.0.2

func (helper *Helper) GetDataListRange(topic string, from string, end string, handler DataHandler) error

GetDataList ..

func (*Helper) GetObject

func (helper *Helper) GetObject(topic string, rowID string, ptr interface{}) error

GetObject ..

func (*Helper) GetRepository

func (helper *Helper) GetRepository() Repository

Config get worker's Repository

func (*Helper) ID

func (helper *Helper) ID() string

ID get worker's id

func (*Helper) Info

func (helper *Helper) Info(msg string, keyvals ...interface{})

Info log info

func (*Helper) Job

func (helper *Helper) Job() job.Job

Job get worker's Job

func (*Helper) NewWorkerProxy added in v0.0.2

func (helper *Helper) NewWorkerProxy(jobID string) (Proxy, error)

NewWorkerProxy ..

func (*Helper) PutCheckpoint

func (helper *Helper) PutCheckpoint(checkpoint interface{}) error

PutCheckpoint ..

func (*Helper) PutData

func (helper *Helper) PutData(topic string, rowID string, data []byte) error

PutData ..

func (*Helper) PutDataFullPath

func (helper *Helper) PutDataFullPath(fullPath string, data []byte) error

PutDataFullPath ..

func (*Helper) PutObject

func (helper *Helper) PutObject(topic string, rowID string, data interface{}) error

PutObject ..

func (*Helper) Space

func (helper *Helper) Space() string

ID get worker's space

func (*Helper) SubscribeTx added in v0.0.2

func (helper *Helper) SubscribeTx(topic string, from string, handler DataHandler) (CancelSubs, error)

type Manager

type Manager struct {
	common.Context
	// contains filtered or unexported fields
}

Manager manager for jobs

func NewManager

func NewManager(context common.Context, spaceRegistry types.SpaceRegistry, proxyProvider ProxyProvider) *Manager

NewManager ..

func (*Manager) ContainsWorker

func (manager *Manager) ContainsWorker(id string) bool

ContainsWorker if worker id is registered.

func (*Manager) GetRepository

func (manager *Manager) GetRepository() Repository

func (*Manager) GetWorker

func (manager *Manager) GetWorker(id string) Worker

GetWorker get worker for id

func (*Manager) GetWorkerNames added in v0.0.2

func (manager *Manager) GetWorkerNames() []string

func (*Manager) GetWorkers added in v0.0.2

func (manager *Manager) GetWorkers() []Worker

func (*Manager) NewWorkerProxy added in v0.0.2

func (manager *Manager) NewWorkerProxy(job job.Job) (proxy Proxy, err error)

func (*Manager) RegisterWorkerFactory

func (manager *Manager) RegisterWorkerFactory(factory Factory) error

func (*Manager) SetJobs

func (manager *Manager) SetJobs(jobs []job.Job)

SetJobs ...

func (*Manager) Start

func (manager *Manager) Start()

type Proxy added in v0.0.2

type Proxy interface {
	GetJob() job.Job
	GetCheckpoint(checkpoint interface{}) error
	GetData(topic string, rowID string) (data []byte, err error)
	GetObject(topic string, rowID string, data interface{}) error
	GetDataList(topic string, handler DataHandler) error
	GetDataListRange(topic string, from string, end string, handler DataHandler) error
	DeleteData(topic string, rowID string) error
	SubscribeTx(topic string, from string, handler DataHandler) (CancelSubs, error)
	CollectAndSubscribe(topic string, from string, handler DataHandler) (CancelSubs, error)
}

Proxy for worker

type ProxyProvider added in v0.0.2

type ProxyProvider interface {
	NewWorkerProxy(jobID string) (Proxy, error)
}

type Repository

type Repository interface {
	PutCheckpoint(jobID string, checkpoint interface{}) error
	GetCheckpoint(jobID string, checkpoint interface{}) error
	//CurrentBlockNumber() (block int64)
	PutData(space string, jobID string, topic string, rowID string, data []byte) error
	PutObject(space string, jobID string, topic string, rowID string, data interface{}) error
	PutDataSync(space string, jobID string, topic string, rowID string, data []byte) error
	PutObjectSync(space string, jobID string, topic string, rowID string, data interface{}) error
	GetData(space string, jobID string, topic string, rowID string) (data []byte, err error)
	GetObject(space string, jobID string, topic string, rowID string, data interface{}) error
	DeleteData(space string, jobID string, topic string, rowID string) error
	GetDataWithTopic(space string, jobID string, topic string, handler DataHandler) error
	GetDataWithTopicRange(space string, jobID string, topic string, from string, end string, handler DataHandler) error
	// SubscribeTx async subscription
	SubscribeTx(space string, jobID string, topic string, from string, handler DataHandler) (CancelSubs, error)

	PutDataFullPath(space string, path string, data []byte) error
	PutObjectFullPath(space string, path string, data interface{}) error
	GetDataFullPath(space string, path string) (data []byte, err error)
	GetObjectFullPath(space string, path string, data interface{}) (err error)
	DeleteDataFullPath(space string, path string) error
}

func NewRepository

func NewRepository(config common.DaemonConfig, logger log.Logger, client types.Client) Repository

type Worker

type Worker interface {
	ID() string
	Start() error
	Stop() error
	IsStarted() bool
}

Worker ..

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL