observer6

package
v0.0.0-...-4450389 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2019 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package `observer6` contains `Observer` to watch for registry and repo events and trigger actions on a `Processor`, specifically `nogfsostad.Processor`. It:

  • triggers shadow repo initialization;
  • enables repos to receive gRPCs.

`RegistryObserver` uses entity activities to process events. It uses one activity per entity, that is registry, repo, and repo workflow. Activities watch the events streams forever. The registry activity starts repo activities for repos that are below the observed prefixes. The repo activities start workflow activities as needed and wait for their completion before continuing to process repo events.

The move-repo workflow is a special case. If the old location was below an observed prefix, the repo activity runs the release workflow part and then quits. The registry activity starts a new repo activity for the acquire workflow part and to continue watching the event stream.

Per-repo activities are serialized using a chain of `done -> dep` channels. See `DepChainMap`. Serialization is currently only relevant if the move-repo workflow release and acquire parts both run in the same server.

Activities distinguish two phases:

  1. initial loading until the event stream will block;
  2. watching the event stream, processing event per event.

Desired effects of the two-phase approach are:

  • Initial loading directly enables repos that are already initialized without revisiting the actual initialization work.
  • Initial loading skips initializing repos with stored errors.
  • Initial loading naturally supports multiple events that may toggle some state. It will take action only based on the final state. Example: A repo could in the future toggle between renaming and not renaming. If the final state is renaming, it will remain disabled. If the final state is not renaming, it will be enabled.
  • The initial batch processing of all events is separated from the ongoing one-by-one processing of new events.

Differently than `observer4`, `observer6` uses `grpclazy.Engine` to manage the gRPC streams for the activities.

Differently than `observer5`, `observer6` runs an unbounded number of activities in order to concurrently watch all repos that are below the observed prefixed.

`observer6` uses `grpclazy.Engine` for all activities. `grpclazy.Engine` uses a single gRPC stream to receive broadcast signals about new events. For each activity that has new events, `grpclazy.Engine` starts a goroutine and opens a separate gRPC stream to pass the available events to the activity. The activity processes the available events and returns to the `grpclazy.Engine`, which puts the activity to sleep until the next signal that there are new events.

Index

Constants

View Source
const ConfigCronInvervalSeconds = 1
View Source
const ConfigErrorMessageTruncateLength = 120

`ConfigErrorMessageTruncateLength` limits the length of error messages when storing them on a repo. Longer messages are truncated. The limit must be smaller than the maximum length that nogfsoregd accepts.

View Source
const ConfigMaxRepoStreams = 10
View Source
const ConfigMaxRetryWeak = 5
View Source
const ConfigMaxStreams = 20

We use a general activity stream limit and a separate limit for repo activity streams. Registry and workflow activities never block. But repo activities may block waiting for workflow activities to complete. Separate limits should avoid deadlock. If we used a single limiter, blocked repo activities could consume all available streams waiting for workflows. But workflow progress would be prevented without available streams.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Registries  []string
	Prefixes    []string
	Conn        *grpc.ClientConn
	SysRPCCreds credentials.PerRPCCredentials
	Initializer Initializer
	Processor   Processor
}

type DepChainMap

type DepChainMap map[uuid.I]chan struct{}

`DepChainMap` is used to serialize activities per repo using a chain of `done -> dep` channels. The upstream activity closes `done` to unlock the downstream activity's `dep`.

func NewDepMap

func NewDepMap() DepChainMap

func (DepChainMap) Gc

func (m DepChainMap) Gc() int

func (DepChainMap) Next

func (m DepChainMap) Next(id uuid.I) DepChainNode

type DepChainNode

type DepChainNode struct {
	Dep  <-chan struct{}
	Done chan<- struct{}
}

type Initializer

type Initializer interface {
	GetRepo(ctx context.Context, repoId uuid.I) (*nogfsostad.RepoInfo, error)
	InitRepo(ctx context.Context, repoId uuid.I) (*nogfsostad.RepoInfo, error)
	EnableGitlab(ctx context.Context, repoId uuid.I) (*nogfsostad.RepoInfo, error)
	MoveRepo(
		ctx context.Context,
		repoId uuid.I,
		oldHostPath string,
		oldShadowPath string,
		newHostPath string,
	) (newShadowPath string, err error)
}

type Logger

type Logger interface {
	Infow(msg string, kv ...interface{})
	Warnw(msg string, kv ...interface{})
	Errorw(msg string, kv ...interface{})
}

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

func New

func New(lg Logger, cfg *Config) *Observer

func (*Observer) Watch

func (o *Observer) Watch(ctx context.Context) error

type Processor

type Processor interface {
	EnableRepo4(ctx context.Context, inf *nogfsostad.RepoInfo) error
	DisableRepo4(ctx context.Context, repoId uuid.I) error
	FreezeRepo(
		ctx context.Context, repoId uuid.I, author nogfsostad.GitUser,
	) error
	UnfreezeRepo(
		ctx context.Context, repoId uuid.I, author nogfsostad.GitUser,
	) error
}

type RegistryObserver

type RegistryObserver struct {
	// contains filtered or unexported fields
}

func (*RegistryObserver) ProcessRegistryEvents

func (o *RegistryObserver) ProcessRegistryEvents(
	ctx context.Context,
	registry string,
	tail ulid.I,
	stream pb.Registry_EventsClient,
) (ulid.I, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL