observe

package
v0.0.0-...-4450389 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2019 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Conn       *grpc.ClientConn
	RPCCreds   credentials.PerRPCCredentials
	StateStore StateStore
	Processor  Processor
	Registries []string
	Refs       []string
	Prefixes   []string
	Hosts      []string
}

type FileStateStore

type FileStateStore struct {
	// contains filtered or unexported fields
}

func NewFileStateStore

func NewFileStateStore(dir string) *FileStateStore

func (*FileStateStore) LoadULID

func (s *FileStateStore) LoadULID(name string) (ulid.I, error)

func (*FileStateStore) SaveULID

func (s *FileStateStore) SaveULID(name string, id ulid.I) error

type Logger

type Logger interface {
	Infow(msg string, kv ...interface{})
	Warnw(msg string, kv ...interface{})
	Errorw(msg string, kv ...interface{})
}

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

func NewObserver

func NewObserver(lg Logger, cfg *Config) *Observer

func (*Observer) Watch

func (o *Observer) Watch(ctx context.Context) error

`Watch()` currently processes each event separately. A consequence is that the same repo is usually processed many times when starting from the event epoch. The processor needs to ensure that repeated processing of a repo without changes is sufficiently efficient.

Repeated events could be de-duplicated here, for example by first reading all events until `WillBlock` and gathering a set of repos that need processing, and then the repos only once. But the logic would become more complex. We keep the naive approach unless we observe practical perfomance problems.

type Processor

type Processor interface {
	// `Process()` should handle most processing errors.  It may return
	// errors that are likely due to a shutdown, like `context.Canceled` or
	// termination of a child due to a signal.
	ProcessRepo(ctx context.Context, repo *execute.Repo) error
}

type StateStore

type StateStore interface {
	// `LoadULID()` loads the journal position for `name` from permanent
	// storage.  If successful, it returns either a valid ULID or
	// `ulid.Nil` to indicate that there is no stored state.  It may return
	// errors, such as file I/O problems.
	LoadULID(name string) (ulid.I, error)
	SaveULID(name string, id ulid.I) error
}

`StateStore` is used to preserve journal locations across restarts. Concurrent operations on different keys must be safe.

type VolatileStateStore

type VolatileStateStore struct {
	// contains filtered or unexported fields
}

func NewVolatileStateStore

func NewVolatileStateStore() *VolatileStateStore

func (*VolatileStateStore) LoadULID

func (s *VolatileStateStore) LoadULID(name string) (ulid.I, error)

func (*VolatileStateStore) SaveULID

func (s *VolatileStateStore) SaveULID(name string, id ulid.I) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL