hh

package
v0.10.1-0...-af01984 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2016 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package hh implements a hinted handoff for writes

Index

Constants

View Source
const (
	// DefaultMaxSize is the default maximum size of all hinted handoff queues in bytes.
	DefaultMaxSize = 1024 * 1024 * 1024

	// DefaultMaxAge is the default maximum amount of time that a hinted handoff write
	// can stay in the queue.  After this time, the write will be purged.
	DefaultMaxAge = 7 * 24 * time.Hour

	// DefaultRetryRateLimit is the default rate that hinted handoffs will be retried.
	// The rate is in bytes per second and applies across all nodes when retried.   A
	// value of 0 disables the rate limit.
	DefaultRetryRateLimit = 0

	// DefaultRetryInterval is the default amount of time the system waits before
	// attempting to flush hinted handoff queues. With each failure of a hinted
	// handoff write, this retry interval increases exponentially until it reaches
	// the maximum
	DefaultRetryInterval = time.Second

	// DefaultRetryMaxInterval is the maximum the hinted handoff retry interval
	// will ever be.
	DefaultRetryMaxInterval = time.Minute

	// DefaultPurgeInterval is the amount of time the system waits before attempting
	// to purge hinted handoff data due to age or inactive nodes.
	DefaultPurgeInterval = time.Hour
)

Variables

View Source
var (
	ErrNotOpen     = fmt.Errorf("queue not open")
	ErrQueueFull   = fmt.Errorf("queue is full")
	ErrSegmentFull = fmt.Errorf("segment is full")
)

Possible errors returned by a hinted handoff queue.

View Source
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")

ErrHintedHandoffDisabled is returned when attempting to use a disabled hinted handoff service.

Functions

func NewRateLimiter

func NewRateLimiter(limit int64) *limiter

NewRateLimiter returns a new limiter configured to restrict a process to the limit per second. limit is the maximum amount that can be used per second. The limit should be > 0. A limit <= 0, will not limit the processes.

Types

type Config

type Config struct {
	Enabled          bool          `toml:"enabled"`
	Dir              string        `toml:"dir"`
	MaxSize          int64         `toml:"max-size"`
	MaxAge           toml.Duration `toml:"max-age"`
	RetryRateLimit   int64         `toml:"retry-rate-limit"`
	RetryInterval    toml.Duration `toml:"retry-interval"`
	RetryMaxInterval toml.Duration `toml:"retry-max-interval"`
	PurgeInterval    toml.Duration `toml:"purge-interval"`
}

Config is a hinted handoff configuration.

func NewConfig

func NewConfig() Config

NewConfig returns a new Config.

func (*Config) Validate

func (c *Config) Validate() error

type NodeProcessor

type NodeProcessor struct {
	PurgeInterval    time.Duration // Interval between periodic purge checks
	RetryInterval    time.Duration // Interval between periodic write-to-node attempts.
	RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
	MaxSize          int64         // Maximum size an underlying queue can get.
	MaxAge           time.Duration // Maximum age queue data can get before purging.
	RetryRateLimit   int64         // Limits the rate data is sent to node.

	Logger *log.Logger
	// contains filtered or unexported fields
}

NodeProcessor encapsulates a queue of hinted-handoff data for a node, and the transmission of the data to the node.

func NewNodeProcessor

func NewNodeProcessor(nodeID uint64, dir string, w shardWriter, m metaClient) *NodeProcessor

NewNodeProcessor returns a new NodeProcessor for the given node, using dir for the hinted-handoff data.

func (*NodeProcessor) Active

func (n *NodeProcessor) Active() (bool, error)

Active returns whether this node processor is for a currently active node.

func (*NodeProcessor) Close

func (n *NodeProcessor) Close() error

Close closes the NodeProcessor, terminating all data tranmission to the node. When closed it will not accept hinted-handoff data.

func (*NodeProcessor) Head

func (n *NodeProcessor) Head() string

Head returns the head of the processor's queue.

func (*NodeProcessor) LastModified

func (n *NodeProcessor) LastModified() (time.Time, error)

LastModified returns the time the NodeProcessor last receieved hinted-handoff data.

func (*NodeProcessor) Open

func (n *NodeProcessor) Open() error

Open opens the NodeProcessor. It will read and write data present in dir, and start transmitting data to the node. A NodeProcessor must be opened before it can accept hinted data.

func (*NodeProcessor) Purge

func (n *NodeProcessor) Purge() error

Purge deletes all hinted-handoff data under management by a NodeProcessor. The NodeProcessor should be in the closed state before calling this function.

func (*NodeProcessor) SendWrite

func (n *NodeProcessor) SendWrite() (int, error)

SendWrite attempts to sent the current block of hinted data to the target node. If successful, it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF when there is no more data or the node is inactive.

func (*NodeProcessor) Tail

func (n *NodeProcessor) Tail() string

Tail returns the tail of the processor's queue.

func (*NodeProcessor) WriteShard

func (n *NodeProcessor) WriteShard(shardID uint64, points []models.Point) error

WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate hinted-handoff queues, and be called concurrently, it takes a lock during queue access.

type Service

type Service struct {
	Logger *log.Logger

	MetaClient metaClient

	Monitor interface {
		RegisterDiagnosticsClient(name string, client diagnostics.Client)
		DeregisterDiagnosticsClient(name string)
	}
	// contains filtered or unexported fields
}

Service represents a hinted handoff service.

func NewService

func NewService(c Config, w shardWriter, m metaClient) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close closes the hinted handoff service.

func (*Service) Diagnostics

func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns diagnostic information.

func (*Service) Open

func (s *Service) Open() error

Open opens the hinted handoff service.

func (*Service) SetLogger

func (s *Service) SetLogger(l *log.Logger)

SetLogger sets the internal logger to the logger passed in.

func (*Service) WriteShard

func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error

WriteShard queues the points write for shardID to node ownerID to handoff queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL