ckp

package
v0.0.0-...-64b8ef3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2019 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StorageType_File      string = "file"
	StorageType_Zookeeper        = "zookeeper"
	StorageType_Mock             = "mock"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	Progress
	Ctx map[string]string `json:"ctx"`
}

func NewCheckpoint

func NewCheckpoint() *Checkpoint

func (*Checkpoint) GetIntCtx

func (o *Checkpoint) GetIntCtx(key string, defValue int64) int64

func (*Checkpoint) GetStringCtx

func (o *Checkpoint) GetStringCtx(key string, defValue string) string

func (*Checkpoint) GetUintCtx

func (o *Checkpoint) GetUintCtx(key string, defValue uint64) uint64

func (*Checkpoint) SetIntCtx

func (o *Checkpoint) SetIntCtx(key string, value int64) *Checkpoint

func (*Checkpoint) SetProgress

func (o *Checkpoint) SetProgress(p prog.Progress) *Checkpoint

func (*Checkpoint) SetStringCtx

func (o *Checkpoint) SetStringCtx(key string, value string) *Checkpoint

func (*Checkpoint) SetUintCtx

func (o *Checkpoint) SetUintCtx(key string, value uint64) *Checkpoint

type Checkpointer

type Checkpointer interface {
	Checkpoint() *Checkpoint
}

type CkpManager

type CkpManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCkpManager

func NewCkpManager(cfg *Config) (*CkpManager, error)

func (*CkpManager) Close

func (o *CkpManager) Close() error

func (*CkpManager) Err

func (o *CkpManager) Err() <-chan error

func (*CkpManager) GetCheckpoint

func (o *CkpManager) GetCheckpoint(name string) *Checkpoint

func (*CkpManager) GetMinProgress

func (o *CkpManager) GetMinProgress() prog.Progress

func (*CkpManager) RegisterCheckpointer

func (o *CkpManager) RegisterCheckpointer(name string, ckper Checkpointer) error

func (*CkpManager) SetAlignedProgress

func (o *CkpManager) SetAlignedProgress(p prog.Progress) error

func (*CkpManager) Start

func (o *CkpManager) Start() error

func (*CkpManager) WaitUntil

func (o *CkpManager) WaitUntil(ctx context.Context, progress prog.Progress) error

WaitUntil wait all sinks to reach a specified progress

type CkpStorage

type CkpStorage interface {
	// Save syncs data into storage
	Save(data []byte) error
	// Load read data from storage, return nil data when read empty content and no error happened
	Load() ([]byte, error)
	Close() error
}

type Config

type Config struct {
	// storage type to store checkpoint data, may be file or zookeeper
	Storage string

	// the interval to save data
	Interval int

	// file path to store data
	Dir string

	// zookeeper info to store data
	ZkHosts string
	ZkPath  string
}

func NewDefaultConfig

func NewDefaultConfig() *Config

type Data

type Data struct {
	Time            time.Time              `json:"time"`
	AlignedProgress *Progress              `json:"aligned_progress,omitempty"`
	Ckps            map[string]*Checkpoint `json:"ckps"`
}

type FileStorage

type FileStorage struct {
	// contains filtered or unexported fields
}

func NewFileStorage

func NewFileStorage(path string) (*FileStorage, error)

func (*FileStorage) Close

func (o *FileStorage) Close() error

func (*FileStorage) Load

func (o *FileStorage) Load() ([]byte, error)

Load read data from all two files, and return newer one, return nil data when read empty content and no error happened.

func (*FileStorage) Save

func (o *FileStorage) Save(data []byte) error

Save syncs data into one of two file, alternately.

type MockCheckpointer

type MockCheckpointer struct {
	// contains filtered or unexported fields
}

func NewMockCheckpointer

func NewMockCheckpointer(p prog.Progress) *MockCheckpointer

func (*MockCheckpointer) Checkpoint

func (o *MockCheckpointer) Checkpoint() *Checkpoint

type MockStorage

type MockStorage struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewMockStorage

func NewMockStorage() *MockStorage

func (*MockStorage) Close

func (o *MockStorage) Close() error

func (*MockStorage) Load

func (o *MockStorage) Load() ([]byte, error)

func (*MockStorage) Save

func (o *MockStorage) Save(data []byte) error

type Progress

type Progress struct {
	Flavor   string `json:"flavor"`
	ServerID uint32 `json:"server_id"`
	Name     string `json:"name"`
	Pos      uint32 `json:"pos"`
	Gset     string `json:"gset"`
}

func (*Progress) GetProgress

func (o *Progress) GetProgress() prog.Progress

func (*Progress) SetProgress

func (o *Progress) SetProgress(p prog.Progress)

type ZookeeperStorage

type ZookeeperStorage struct {
	// contains filtered or unexported fields
}

func NewZookeeperStorage

func NewZookeeperStorage(hosts string, path string) (*ZookeeperStorage, error)

func (*ZookeeperStorage) Close

func (o *ZookeeperStorage) Close() error

func (*ZookeeperStorage) Load

func (o *ZookeeperStorage) Load() ([]byte, error)

func (*ZookeeperStorage) Save

func (o *ZookeeperStorage) Save(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL