chronomq

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2020 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// DefaultMaxCFSize is the max number of entries the filter is expected to hold
	// creates a Cuckoo filter taking 512MB fixed space overhead on a 64 bit machine
	// with an average job size of 1Kb, that is 500*1000_000_000/1024/1024/1024=465.66128730773926 GB of just job payload storage
	// so this upper-bound should be enough
	DefaultMaxCFSize uint = 500 * 1000 * 1000

	// TestMaxCFSize value should be used for testing
	TestMaxCFSize uint = 10000
)

Variables

View Source
var ErrJobOutOfSpokeBounds = errors.New("The offered job is outside the bounds of this spoke ")

ErrJobOutOfSpokeBounds is returned when an attempt was made to add a job to a spoke that should not contain it - the job's trigger time it outside the spoke bounds

Functions

func NextID

func NextID() uint64

NextID generates the next int id monotonically increasing

Types

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is a time ordered collection of spokes

func NewHub

func NewHub(opts *HubOpts) *Hub

NewHub creates a new hub where adjacent spokes lie at the given spokeSpan duration boundary.

func (*Hub) AddJobLocked

func (h *Hub) AddJobLocked(j *Job) error

AddJobLocked to this hub. Hub should never reject a job - this method will panic if that happens

func (*Hub) CancelJobLocked

func (h *Hub) CancelJobLocked(jobID string) (*Job, error)

CancelJobLocked cancels a job if found. Calls are noop for unknown jobs

func (*Hub) GetNJobs

func (h *Hub) GetNJobs(n int) chan *Job

GetNJobs returns upto N jobs (or less if there are less jobs in available) It does not return a consistent snapshot of jobs but provides a best effort view

func (*Hub) NextLocked

func (h *Hub) NextLocked() *Job

NextLocked returns the next job that is ready now or returns nil.

func (*Hub) PersistLocked

func (h *Hub) PersistLocked() chan error

PersistLocked locks the hub and starts persisting data to disk

func (*Hub) Prune

func (h *Hub) Prune() int

Prune clears spokes which are expired and have no jobs returns the number of spokes pruned

func (*Hub) Restore

func (h *Hub) Restore() error

Restore loads any jobs saved to disk at the given path

func (*Hub) Stats

func (h *Hub) Stats() stats.Snapshot

Stats returns a snapshot of the current hubs stats

func (*Hub) StatusLocked

func (h *Hub) StatusLocked()

StatusLocked prints the state of the spokes of this hub

func (*Hub) StatusPrinter

func (h *Hub) StatusPrinter()

StatusPrinter starts a status printer that prints hub stats over some time interval

func (*Hub) Stop

func (h *Hub) Stop(persist bool)

Stop the hub gracefully and if persist is true, then persist all jobs to disk for later recovery

type HubOpts

type HubOpts struct {
	Persister      persistence.Persister // persister to store/restore from disk
	AttemptRestore bool                  // If true, hub will try to restore from disk on start
	SpokeSpan      time.Duration         // How wide should the spokes be
	MaxCFSize      uint                  // Max size of the Cuckoo Filter
}

HubOpts define customizations for Hub initialization

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job is the basic unit of work in chronomq

func NewJob

func NewJob(id string, triggerAt time.Time, b []byte) *Job

NewJob creates a new chronomq job

func NewJobAutoID

func NewJobAutoID(triggerAt time.Time, b []byte) *Job

NewJobAutoID creates a job a job id assigned automatically

func (*Job) AsBound

func (j *Job) AsBound(spokeSpan time.Duration) temporal.Bound

AsBound returns temporal.Bound for a hypothetical spoke that should hold this job

func (*Job) AsPriorityItem

func (j *Job) AsPriorityItem() *queue.Item

AsPriorityItem returns this job as a prioritizable item

func (*Job) AsTemporalState

func (j *Job) AsTemporalState() temporal.State

AsTemporalState returns the job's temporal classification at the point in time

func (*Job) Body

func (j *Job) Body() []byte

Body returns the job of the job

func (*Job) GobDecode

func (j *Job) GobDecode(data []byte) error

GobDecode encodes a job into a binary buffer

func (*Job) GobEncode

func (j *Job) GobEncode() (data []byte, err error)

GobEncode encodes a job into a binary buffer

func (*Job) ID

func (j *Job) ID() string

ID returns the id of the job

func (*Job) IsReady

func (j *Job) IsReady() bool

IsReady returns true if job is ready to be worked on

func (*Job) SetOpts

func (j *Job) SetOpts(pri int32, ttr time.Duration)

SetOpts sets job options

func (*Job) SizeOf

func (j *Job) SizeOf() uint64

SizeOf returns the memory allocated for this job in bytes including the size of the actual body payload + the fixed overhead costs Implements monitor.Sizeable interface

func (*Job) TriggerAt

func (j *Job) TriggerAt() time.Time

TriggerAt returns the job's trigger time

type Spoke

type Spoke struct {
	temporal.Bound
	// contains filtered or unexported fields
}

Spoke is a time bound chain of jobs

func NewSpoke

func NewSpoke(start, end time.Time) *Spoke

NewSpoke creates a new spoke to hold jobs

func NewSpokeFromNow

func NewSpokeFromNow(duration time.Duration) *Spoke

NewSpokeFromNow creates a new spoke to hold jobs that starts from now and ends at the given duration

func (*Spoke) AddJobLocked

func (s *Spoke) AddJobLocked(j *Job) error

AddJobLocked submits a job to the spoke. If the spoke cannot take responsibility of this job, it will return it as it is, otherwise nil is returned

func (*Spoke) AsPriorityItem

func (s *Spoke) AsPriorityItem() *queue.Item

AsPriorityItem returns a spoke as a prioritizable Item

func (*Spoke) AsTemporalState

func (s *Spoke) AsTemporalState() temporal.State

AsTemporalState returns the spoke's temporal classification at the point in time

func (*Spoke) CancelJobLocked

func (s *Spoke) CancelJobLocked(id string) (*Job, error)

CancelJobLocked will try to delete a job that hasn't been consumed yet

func (*Spoke) GetLocker

func (s *Spoke) GetLocker() sync.Locker

GetLocker returns the spoke as a sync.Locker interface

func (*Spoke) ID

func (s *Spoke) ID() uuid.UUID

ID returns the id of this spoke

func (*Spoke) IsJobInBounds

func (s *Spoke) IsJobInBounds(j *Job) bool

IsJobInBounds returns true if this job's trigger time is temporally bounded by this spoke

func (*Spoke) JobAtIdx

func (s *Spoke) JobAtIdx(i int) *Job

JobAtIdx returns the job at index i stored by this spoke

func (*Spoke) Lock

func (s *Spoke) Lock()

Lock this spoke

func (*Spoke) NextLocked

func (s *Spoke) NextLocked() *Job

NextLocked returns the next ready job

func (*Spoke) OwnsJobLocked

func (s *Spoke) OwnsJobLocked(id string) bool

OwnsJobLocked returns true if a job by given id is owned by this spoke

func (*Spoke) PendingJobsLen

func (s *Spoke) PendingJobsLen() int

PendingJobsLen returns the number of jobs remaining in this spoke

func (*Spoke) PersistLocked

func (s *Spoke) PersistLocked(p persistence.Persister) chan error

PersistLocked all jobs in this spoke

func (*Spoke) Unlock

func (s *Spoke) Unlock()

Unlock this spoke

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL