Documentation ¶
Index ¶
- Constants
- Variables
- func NextID() uint64
- type Hub
- func (h *Hub) AddJobLocked(j *Job) error
- func (h *Hub) CancelJobLocked(jobID string) (*Job, error)
- func (h *Hub) GetNJobs(n int) chan *Job
- func (h *Hub) NextLocked() *Job
- func (h *Hub) PersistLocked() chan error
- func (h *Hub) Prune() int
- func (h *Hub) Restore() error
- func (h *Hub) Stats() stats.Snapshot
- func (h *Hub) StatusLocked()
- func (h *Hub) StatusPrinter()
- func (h *Hub) Stop(persist bool)
- type HubOpts
- type Job
- func (j *Job) AsBound(spokeSpan time.Duration) temporal.Bound
- func (j *Job) AsPriorityItem() *queue.Item
- func (j *Job) AsTemporalState() temporal.State
- func (j *Job) Body() []byte
- func (j *Job) GobDecode(data []byte) error
- func (j *Job) GobEncode() (data []byte, err error)
- func (j *Job) ID() string
- func (j *Job) IsReady() bool
- func (j *Job) SetOpts(pri int32, ttr time.Duration)
- func (j *Job) SizeOf() uint64
- func (j *Job) TriggerAt() time.Time
- type Spoke
- func (s *Spoke) AddJobLocked(j *Job) error
- func (s *Spoke) AsPriorityItem() *queue.Item
- func (s *Spoke) AsTemporalState() temporal.State
- func (s *Spoke) CancelJobLocked(id string) (*Job, error)
- func (s *Spoke) GetLocker() sync.Locker
- func (s *Spoke) ID() uuid.UUID
- func (s *Spoke) IsJobInBounds(j *Job) bool
- func (s *Spoke) JobAtIdx(i int) *Job
- func (s *Spoke) Lock()
- func (s *Spoke) NextLocked() *Job
- func (s *Spoke) OwnsJobLocked(id string) bool
- func (s *Spoke) PendingJobsLen() int
- func (s *Spoke) PersistLocked(p persistence.Persister) chan error
- func (s *Spoke) Unlock()
Constants ¶
const ( // DefaultMaxCFSize is the max number of entries the filter is expected to hold // creates a Cuckoo filter taking 512MB fixed space overhead on a 64 bit machine // with an average job size of 1Kb, that is 500*1000_000_000/1024/1024/1024=465.66128730773926 GB of just job payload storage // so this upper-bound should be enough DefaultMaxCFSize uint = 500 * 1000 * 1000 // TestMaxCFSize value should be used for testing TestMaxCFSize uint = 10000 )
Variables ¶
var ErrJobOutOfSpokeBounds = errors.New("The offered job is outside the bounds of this spoke ")
ErrJobOutOfSpokeBounds is returned when an attempt was made to add a job to a spoke that should not contain it - the job's trigger time it outside the spoke bounds
Functions ¶
Types ¶
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is a time ordered collection of spokes
func NewHub ¶
NewHub creates a new hub where adjacent spokes lie at the given spokeSpan duration boundary.
func (*Hub) AddJobLocked ¶
AddJobLocked to this hub. Hub should never reject a job - this method will panic if that happens
func (*Hub) CancelJobLocked ¶
CancelJobLocked cancels a job if found. Calls are noop for unknown jobs
func (*Hub) GetNJobs ¶
GetNJobs returns upto N jobs (or less if there are less jobs in available) It does not return a consistent snapshot of jobs but provides a best effort view
func (*Hub) NextLocked ¶
NextLocked returns the next job that is ready now or returns nil.
func (*Hub) PersistLocked ¶
PersistLocked locks the hub and starts persisting data to disk
func (*Hub) Prune ¶
Prune clears spokes which are expired and have no jobs returns the number of spokes pruned
func (*Hub) StatusLocked ¶
func (h *Hub) StatusLocked()
StatusLocked prints the state of the spokes of this hub
func (*Hub) StatusPrinter ¶
func (h *Hub) StatusPrinter()
StatusPrinter starts a status printer that prints hub stats over some time interval
type HubOpts ¶
type HubOpts struct { Persister persistence.Persister // persister to store/restore from disk AttemptRestore bool // If true, hub will try to restore from disk on start SpokeSpan time.Duration // How wide should the spokes be MaxCFSize uint // Max size of the Cuckoo Filter }
HubOpts define customizations for Hub initialization
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job is the basic unit of work in chronomq
func NewJobAutoID ¶
NewJobAutoID creates a job a job id assigned automatically
func (*Job) AsBound ¶
AsBound returns temporal.Bound for a hypothetical spoke that should hold this job
func (*Job) AsPriorityItem ¶
AsPriorityItem returns this job as a prioritizable item
func (*Job) AsTemporalState ¶
AsTemporalState returns the job's temporal classification at the point in time
type Spoke ¶
Spoke is a time bound chain of jobs
func NewSpokeFromNow ¶
NewSpokeFromNow creates a new spoke to hold jobs that starts from now and ends at the given duration
func (*Spoke) AddJobLocked ¶
AddJobLocked submits a job to the spoke. If the spoke cannot take responsibility of this job, it will return it as it is, otherwise nil is returned
func (*Spoke) AsPriorityItem ¶
AsPriorityItem returns a spoke as a prioritizable Item
func (*Spoke) AsTemporalState ¶
AsTemporalState returns the spoke's temporal classification at the point in time
func (*Spoke) CancelJobLocked ¶
CancelJobLocked will try to delete a job that hasn't been consumed yet
func (*Spoke) IsJobInBounds ¶
IsJobInBounds returns true if this job's trigger time is temporally bounded by this spoke
func (*Spoke) OwnsJobLocked ¶
OwnsJobLocked returns true if a job by given id is owned by this spoke
func (*Spoke) PendingJobsLen ¶
PendingJobsLen returns the number of jobs remaining in this spoke
func (*Spoke) PersistLocked ¶
func (s *Spoke) PersistLocked(p persistence.Persister) chan error
PersistLocked all jobs in this spoke