gitea: code.gitea.io/gitea/modules/queue Index | Files

package queue

import "code.gitea.io/gitea/modules/queue"

Index

Package Files

bytefifo.go helper.go manager.go queue.go queue_bytefifo.go queue_channel.go queue_disk.go queue_disk_channel.go queue_redis.go queue_wrapped.go setting.go unique_queue.go unique_queue_channel.go unique_queue_disk.go unique_queue_disk_channel.go unique_queue_redis.go unique_queue_wrapped.go workerpool.go

Variables

var ErrAlreadyInQueue = fmt.Errorf("already in queue")

ErrAlreadyInQueue is returned when trying to push data to the queue that is already in the queue

func IsErrInvalidConfiguration Uses

func IsErrInvalidConfiguration(err error) bool

IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration

func RegisteredTypesAsString Uses

func RegisteredTypesAsString() []string

RegisteredTypesAsString provides the list of requested types of queues

type ByteFIFO Uses

type ByteFIFO interface {
    // Len returns the length of the fifo
    Len() int64
    // PushFunc pushes data to the end of the fifo and calls the callback if it is added
    PushFunc(data []byte, fn func() error) error
    // Pop pops data from the start of the fifo
    Pop() ([]byte, error)
    // Close this fifo
    Close() error
}

ByteFIFO defines a FIFO that takes a byte array

type ByteFIFOQueue Uses

type ByteFIFOQueue struct {
    *WorkerPool
    // contains filtered or unexported fields
}

ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool

func NewByteFIFOQueue Uses

func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOQueue, error)

NewByteFIFOQueue creates a new ByteFIFOQueue

func (*ByteFIFOQueue) IsEmpty Uses

func (q *ByteFIFOQueue) IsEmpty() bool

IsEmpty checks if the queue is empty

func (*ByteFIFOQueue) Name Uses

func (q *ByteFIFOQueue) Name() string

Name returns the name of this queue

func (*ByteFIFOQueue) Push Uses

func (q *ByteFIFOQueue) Push(data Data) error

Push pushes data to the fifo

func (*ByteFIFOQueue) PushFunc Uses

func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error

PushFunc pushes data to the fifo

func (*ByteFIFOQueue) Run Uses

func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run runs the bytefifo queue

func (*ByteFIFOQueue) Shutdown Uses

func (q *ByteFIFOQueue) Shutdown()

Shutdown processing from this queue

func (*ByteFIFOQueue) Terminate Uses

func (q *ByteFIFOQueue) Terminate()

Terminate this queue and close the queue

type ByteFIFOQueueConfiguration Uses

type ByteFIFOQueueConfiguration struct {
    WorkerPoolConfiguration
    Workers int
    Name    string
}

ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue

type ByteFIFOUniqueQueue Uses

type ByteFIFOUniqueQueue struct {
    ByteFIFOQueue
}

ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo

func NewByteFIFOUniqueQueue Uses

func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFunc, cfg, exemplar interface{}) (*ByteFIFOUniqueQueue, error)

NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue

func (*ByteFIFOUniqueQueue) Has Uses

func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error)

Has checks if the provided data is in the queue

type ChannelQueue Uses

type ChannelQueue struct {
    *WorkerPool
    // contains filtered or unexported fields
}

ChannelQueue implements Queue

A channel queue is not persistable and does not shutdown or terminate cleanly It is basically a very thin wrapper around a WorkerPool

func (*ChannelQueue) Name Uses

func (q *ChannelQueue) Name() string

Name returns the name of this queue

func (*ChannelQueue) Push Uses

func (q *ChannelQueue) Push(data Data) error

Push will push data into the queue

func (*ChannelQueue) Run Uses

func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

type ChannelQueueConfiguration Uses

type ChannelQueueConfiguration struct {
    WorkerPoolConfiguration
    Workers int
    Name    string
}

ChannelQueueConfiguration is the configuration for a ChannelQueue

type ChannelUniqueQueue Uses

type ChannelUniqueQueue struct {
    *WorkerPool
    // contains filtered or unexported fields
}

ChannelUniqueQueue implements UniqueQueue

It is basically a thin wrapper around a WorkerPool but keeps a store of what has been pushed within a table.

Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

func (*ChannelUniqueQueue) Has Uses

func (q *ChannelUniqueQueue) Has(data Data) (bool, error)

Has checks if the data is in the queue

func (*ChannelUniqueQueue) Name Uses

func (q *ChannelUniqueQueue) Name() string

Name returns the name of this queue

func (*ChannelUniqueQueue) Push Uses

func (q *ChannelUniqueQueue) Push(data Data) error

Push will push data into the queue if the data is not already in the queue

func (*ChannelUniqueQueue) PushFunc Uses

func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error

PushFunc will push data into the queue

func (*ChannelUniqueQueue) Run Uses

func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

type ChannelUniqueQueueConfiguration Uses

type ChannelUniqueQueueConfiguration ChannelQueueConfiguration

ChannelUniqueQueueConfiguration is the configuration for a ChannelUniqueQueue

type Data Uses

type Data interface{}

Data defines an type of queuable data

type DummyByteFIFO Uses

type DummyByteFIFO struct{}

DummyByteFIFO represents a dummy fifo

func (*DummyByteFIFO) Close Uses

func (*DummyByteFIFO) Close() error

Close returns nil

func (*DummyByteFIFO) Len Uses

func (*DummyByteFIFO) Len() int64

Len is always 0

func (*DummyByteFIFO) Pop Uses

func (*DummyByteFIFO) Pop() ([]byte, error)

Pop returns nil

func (*DummyByteFIFO) PushFunc Uses

func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error

PushFunc returns nil

type DummyQueue Uses

type DummyQueue struct {
}

DummyQueue represents an empty queue

func (*DummyQueue) Flush Uses

func (*DummyQueue) Flush(time.Duration) error

Flush always returns nil

func (*DummyQueue) FlushWithContext Uses

func (*DummyQueue) FlushWithContext(context.Context) error

FlushWithContext always returns nil

func (*DummyQueue) Has Uses

func (*DummyQueue) Has(Data) (bool, error)

Has always returns false as this queue never does anything

func (*DummyQueue) IsEmpty Uses

func (*DummyQueue) IsEmpty() bool

IsEmpty asserts that the queue is empty

func (*DummyQueue) Push Uses

func (*DummyQueue) Push(Data) error

Push fakes a push of data to the queue

func (*DummyQueue) PushFunc Uses

func (*DummyQueue) PushFunc(Data, func() error) error

PushFunc fakes a push of data to the queue with a function. The function is never run.

func (*DummyQueue) Run Uses

func (*DummyQueue) Run(_, _ func(context.Context, func()))

Run does nothing

type DummyUniqueByteFIFO Uses

type DummyUniqueByteFIFO struct {
    DummyByteFIFO
}

DummyUniqueByteFIFO represents a dummy unique fifo

func (*DummyUniqueByteFIFO) Has Uses

func (*DummyUniqueByteFIFO) Has([]byte) (bool, error)

Has always returns false

type ErrInvalidConfiguration Uses

type ErrInvalidConfiguration struct {
    // contains filtered or unexported fields
}

ErrInvalidConfiguration is called when there is invalid configuration for a queue

func (ErrInvalidConfiguration) Error Uses

func (err ErrInvalidConfiguration) Error() string

type Flushable Uses

type Flushable interface {
    // Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
    Flush(time.Duration) error
    // FlushWithContext is very similar to Flush
    // NB: The worker will not be registered with the manager.
    FlushWithContext(ctx context.Context) error
    // IsEmpty will return if the managed pool is empty and has no work
    IsEmpty() bool
}

Flushable represents a pool or queue that is flushable

type HandlerFunc Uses

type HandlerFunc func(...Data)

HandlerFunc is a function that takes a variable amount of data and processes it

type Immediate Uses

type Immediate struct {
    // contains filtered or unexported fields
}

Immediate represents an direct execution queue

func (*Immediate) Flush Uses

func (*Immediate) Flush(time.Duration) error

Flush always returns nil

func (*Immediate) FlushWithContext Uses

func (*Immediate) FlushWithContext(context.Context) error

FlushWithContext always returns nil

func (*Immediate) Has Uses

func (*Immediate) Has(Data) (bool, error)

Has always returns false as this queue never does anything

func (*Immediate) IsEmpty Uses

func (*Immediate) IsEmpty() bool

IsEmpty asserts that the queue is empty

func (*Immediate) Push Uses

func (q *Immediate) Push(data Data) error

Push fakes a push of data to the queue

func (*Immediate) PushFunc Uses

func (q *Immediate) PushFunc(data Data, f func() error) error

PushFunc fakes a push of data to the queue with a function. The function is never run.

func (*Immediate) Run Uses

func (*Immediate) Run(_, _ func(context.Context, func()))

Run does nothing

type LevelQueue Uses

type LevelQueue struct {
    *ByteFIFOQueue
}

LevelQueue implements a disk library queue

type LevelQueueByteFIFO Uses

type LevelQueueByteFIFO struct {
    // contains filtered or unexported fields
}

LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue

func NewLevelQueueByteFIFO Uses

func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error)

NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue

func (*LevelQueueByteFIFO) Close Uses

func (fifo *LevelQueueByteFIFO) Close() error

Close this fifo

func (*LevelQueueByteFIFO) Len Uses

func (fifo *LevelQueueByteFIFO) Len() int64

Len returns the length of the fifo

func (*LevelQueueByteFIFO) Pop Uses

func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error)

Pop pops data from the start of the fifo

func (*LevelQueueByteFIFO) PushFunc Uses

func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error

PushFunc will push data into the fifo

type LevelQueueConfiguration Uses

type LevelQueueConfiguration struct {
    ByteFIFOQueueConfiguration
    DataDir          string
    ConnectionString string
    QueueName        string
}

LevelQueueConfiguration is the configuration for a LevelQueue

type LevelUniqueQueue Uses

type LevelUniqueQueue struct {
    *ByteFIFOUniqueQueue
}

LevelUniqueQueue implements a disk library queue

type LevelUniqueQueueByteFIFO Uses

type LevelUniqueQueueByteFIFO struct {
    // contains filtered or unexported fields
}

LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue

func NewLevelUniqueQueueByteFIFO Uses

func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueByteFIFO, error)

NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue

func (*LevelUniqueQueueByteFIFO) Close Uses

func (fifo *LevelUniqueQueueByteFIFO) Close() error

Close this fifo

func (*LevelUniqueQueueByteFIFO) Has Uses

func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error)

Has returns whether the fifo contains this data

func (*LevelUniqueQueueByteFIFO) Len Uses

func (fifo *LevelUniqueQueueByteFIFO) Len() int64

Len returns the length of the fifo

func (*LevelUniqueQueueByteFIFO) Pop Uses

func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error)

Pop pops data from the start of the fifo

func (*LevelUniqueQueueByteFIFO) PushFunc Uses

func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error

PushFunc pushes data to the end of the fifo and calls the callback if it is added

type LevelUniqueQueueConfiguration Uses

type LevelUniqueQueueConfiguration struct {
    ByteFIFOQueueConfiguration
    DataDir          string
    ConnectionString string
    QueueName        string
}

LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue

type ManagedPool Uses

type ManagedPool interface {
    // AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
    AddWorkers(number int, timeout time.Duration) context.CancelFunc
    // NumberOfWorkers returns the total number of workers in the pool
    NumberOfWorkers() int
    // MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
    MaxNumberOfWorkers() int
    // SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
    SetMaxNumberOfWorkers(int)
    // BoostTimeout returns the current timeout for worker groups created during a boost
    BoostTimeout() time.Duration
    // BlockTimeout returns the timeout the internal channel can block for before a boost would occur
    BlockTimeout() time.Duration
    // BoostWorkers sets the number of workers to be created during a boost
    BoostWorkers() int
    // SetPoolSettings sets the user updatable settings for the pool
    SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}

ManagedPool is a simple interface to get certain details from a worker pool

type ManagedQueue Uses

type ManagedQueue struct {
    QID           int64
    Type          Type
    Name          string
    Configuration interface{}
    ExemplarType  string
    Managed       interface{}

    PoolWorkers map[int64]*PoolWorkers
    // contains filtered or unexported fields
}

ManagedQueue represents a working queue with a Pool of workers.

Although a ManagedQueue should really represent a Queue this does not necessarily have to be the case. This could be used to describe any queue.WorkerPool.

func (*ManagedQueue) AddWorkers Uses

func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc

AddWorkers adds workers to the queue if it has registered an add worker function

func (*ManagedQueue) BlockTimeout Uses

func (q *ManagedQueue) BlockTimeout() time.Duration

BlockTimeout returns the timeout til the next boost

func (*ManagedQueue) BoostTimeout Uses

func (q *ManagedQueue) BoostTimeout() time.Duration

BoostTimeout returns the timeout of the next boost

func (*ManagedQueue) BoostWorkers Uses

func (q *ManagedQueue) BoostWorkers() int

BoostWorkers returns the number of workers for a boost

func (*ManagedQueue) CancelWorkers Uses

func (q *ManagedQueue) CancelWorkers(pid int64)

CancelWorkers cancels pooled workers with pid

func (*ManagedQueue) Flush Uses

func (q *ManagedQueue) Flush(timeout time.Duration) error

Flush flushes the queue with a timeout

func (*ManagedQueue) IsEmpty Uses

func (q *ManagedQueue) IsEmpty() bool

IsEmpty returns if the queue is empty

func (*ManagedQueue) MaxNumberOfWorkers Uses

func (q *ManagedQueue) MaxNumberOfWorkers() int

MaxNumberOfWorkers returns the maximum number of workers for the pool

func (*ManagedQueue) NumberOfWorkers Uses

func (q *ManagedQueue) NumberOfWorkers() int

NumberOfWorkers returns the number of workers in the queue

func (*ManagedQueue) RegisterWorkers Uses

func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64

RegisterWorkers registers workers to this queue

func (*ManagedQueue) RemoveWorkers Uses

func (q *ManagedQueue) RemoveWorkers(pid int64)

RemoveWorkers deletes pooled workers with pid

func (*ManagedQueue) SetPoolSettings Uses

func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)

SetPoolSettings sets the setable boost values

func (*ManagedQueue) Workers Uses

func (q *ManagedQueue) Workers() []*PoolWorkers

Workers returns the poolworkers

type ManagedQueueList Uses

type ManagedQueueList []*ManagedQueue

ManagedQueueList implements the sort.Interface

func (ManagedQueueList) Len Uses

func (l ManagedQueueList) Len() int

func (ManagedQueueList) Less Uses

func (l ManagedQueueList) Less(i, j int) bool

func (ManagedQueueList) Swap Uses

func (l ManagedQueueList) Swap(i, j int)

type Manager Uses

type Manager struct {
    Queues map[int64]*ManagedQueue
    // contains filtered or unexported fields
}

Manager is a queue manager

func GetManager Uses

func GetManager() *Manager

GetManager returns a Manager and initializes one as singleton if there's none yet

func (*Manager) Add Uses

func (m *Manager) Add(managed interface{},
    t Type,
    configuration,
    exemplar interface{}) int64

Add adds a queue to this manager

func (*Manager) FlushAll Uses

func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error

FlushAll flushes all the flushable queues attached to this manager

func (*Manager) GetManagedQueue Uses

func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue

GetManagedQueue by qid

func (*Manager) ManagedQueues Uses

func (m *Manager) ManagedQueues() []*ManagedQueue

ManagedQueues returns the managed queues

func (*Manager) Remove Uses

func (m *Manager) Remove(qid int64)

Remove a queue from the Manager

type Mappable Uses

type Mappable interface {
    MapTo(v interface{}) error
}

Mappable represents an interface that can MapTo another interface

type Named Uses

type Named interface {
    Name() string
}

Named represents a queue with a name

type NewQueueFunc Uses

type NewQueueFunc func(handler HandlerFunc, config interface{}, exemplar interface{}) (Queue, error)

NewQueueFunc is a function that creates a queue

type PersistableChannelQueue Uses

type PersistableChannelQueue struct {
    // contains filtered or unexported fields
}

PersistableChannelQueue wraps a channel queue and level queue together The disk level queue will be used to store data at shutdown and terminate - and will be restored on start up.

func (*PersistableChannelQueue) Flush Uses

func (q *PersistableChannelQueue) Flush(timeout time.Duration) error

Flush flushes the queue and blocks till the queue is empty

func (*PersistableChannelQueue) FlushWithContext Uses

func (q *PersistableChannelQueue) FlushWithContext(ctx context.Context) error

FlushWithContext flushes the queue and blocks till the queue is empty

func (*PersistableChannelQueue) IsEmpty Uses

func (q *PersistableChannelQueue) IsEmpty() bool

IsEmpty checks if a queue is empty

func (*PersistableChannelQueue) Name Uses

func (q *PersistableChannelQueue) Name() string

Name returns the name of this queue

func (*PersistableChannelQueue) Push Uses

func (q *PersistableChannelQueue) Push(data Data) error

Push will push the indexer data to queue

func (*PersistableChannelQueue) Run Uses

func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

func (*PersistableChannelQueue) Shutdown Uses

func (q *PersistableChannelQueue) Shutdown()

Shutdown processing this queue

func (*PersistableChannelQueue) Terminate Uses

func (q *PersistableChannelQueue) Terminate()

Terminate this queue and close the queue

type PersistableChannelQueueConfiguration Uses

type PersistableChannelQueueConfiguration struct {
    Name         string
    DataDir      string
    BatchLength  int
    QueueLength  int
    Timeout      time.Duration
    MaxAttempts  int
    Workers      int
    MaxWorkers   int
    BlockTimeout time.Duration
    BoostTimeout time.Duration
    BoostWorkers int
}

PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue

type PersistableChannelUniqueQueue Uses

type PersistableChannelUniqueQueue struct {
    *ChannelUniqueQueue
    // contains filtered or unexported fields
}

PersistableChannelUniqueQueue wraps a channel queue and level queue together

Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

func (*PersistableChannelUniqueQueue) Flush Uses

func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error

Flush flushes the queue

func (*PersistableChannelUniqueQueue) Has Uses

func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error)

Has will test if the queue has the data

func (*PersistableChannelUniqueQueue) Name Uses

func (q *PersistableChannelUniqueQueue) Name() string

Name returns the name of this queue

func (*PersistableChannelUniqueQueue) Push Uses

func (q *PersistableChannelUniqueQueue) Push(data Data) error

Push will push the indexer data to queue

func (*PersistableChannelUniqueQueue) PushFunc Uses

func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) error

PushFunc will push the indexer data to queue

func (*PersistableChannelUniqueQueue) Run Uses

func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue

func (*PersistableChannelUniqueQueue) Shutdown Uses

func (q *PersistableChannelUniqueQueue) Shutdown()

Shutdown processing this queue

func (*PersistableChannelUniqueQueue) Terminate Uses

func (q *PersistableChannelUniqueQueue) Terminate()

Terminate this queue and close the queue

type PersistableChannelUniqueQueueConfiguration Uses

type PersistableChannelUniqueQueueConfiguration struct {
    Name         string
    DataDir      string
    BatchLength  int
    QueueLength  int
    Timeout      time.Duration
    MaxAttempts  int
    Workers      int
    MaxWorkers   int
    BlockTimeout time.Duration
    BoostTimeout time.Duration
    BoostWorkers int
}

PersistableChannelUniqueQueueConfiguration is the configuration for a PersistableChannelUniqueQueue

type PoolWorkers Uses

type PoolWorkers struct {
    PID        int64
    Workers    int
    Start      time.Time
    Timeout    time.Time
    HasTimeout bool
    Cancel     context.CancelFunc
    IsFlusher  bool
}

PoolWorkers represents a group of workers working on a queue

type PoolWorkersList Uses

type PoolWorkersList []*PoolWorkers

PoolWorkersList implements the sort.Interface for PoolWorkers

func (PoolWorkersList) Len Uses

func (l PoolWorkersList) Len() int

func (PoolWorkersList) Less Uses

func (l PoolWorkersList) Less(i, j int) bool

func (PoolWorkersList) Swap Uses

func (l PoolWorkersList) Swap(i, j int)

type Queue Uses

type Queue interface {
    Flushable
    Run(atShutdown, atTerminate func(context.Context, func()))
    Push(Data) error
}

Queue defines an interface of a queue-like item

Queues will handle their own contents in the Run method

func CreateQueue Uses

func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue

CreateQueue for name with provided handler and exemplar

func NewChannelQueue Uses

func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewChannelQueue creates a memory channel queue

func NewChannelUniqueQueue Uses

func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewChannelUniqueQueue create a memory channel queue

func NewDummyQueue Uses

func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)

NewDummyQueue creates a new DummyQueue

func NewImmediate Uses

func NewImmediate(handler HandlerFunc, opts, exemplar interface{}) (Queue, error)

NewImmediate creates a new false queue to execute the function when push

func NewLevelQueue Uses

func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewLevelQueue creates a ledis local queue

func NewLevelUniqueQueue Uses

func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewLevelUniqueQueue creates a ledis local queue

Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

func NewPersistableChannelQueue Uses

func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate

func NewPersistableChannelUniqueQueue Uses

func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewPersistableChannelUniqueQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate

func NewQueue Uses

func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error)

NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error

func NewRedisQueue Uses

func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewRedisQueue creates single redis or cluster redis queue

func NewRedisUniqueQueue Uses

func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewRedisUniqueQueue creates single redis or cluster redis queue.

Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

func NewWrappedQueue Uses

func NewWrappedQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewWrappedQueue will attempt to create a queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedQueue with delayed startup of the queue instead and a channel which will be redirected to the queue

func NewWrappedUniqueQueue Uses

func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)

NewWrappedUniqueQueue will attempt to create a unique queue of the provided type, but if there is a problem creating this queue it will instead create a WrappedUniqueQueue with delayed startup of the queue instead and a channel which will be redirected to the queue

Please note that this Queue does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

type RedisByteFIFO Uses

type RedisByteFIFO struct {
    // contains filtered or unexported fields
}

RedisByteFIFO represents a ByteFIFO formed from a redisClient

func NewRedisByteFIFO Uses

func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error)

NewRedisByteFIFO creates a ByteFIFO formed from a redisClient

func (*RedisByteFIFO) Close Uses

func (fifo *RedisByteFIFO) Close() error

Close this fifo

func (*RedisByteFIFO) Len Uses

func (fifo *RedisByteFIFO) Len() int64

Len returns the length of the fifo

func (*RedisByteFIFO) Pop Uses

func (fifo *RedisByteFIFO) Pop() ([]byte, error)

Pop pops data from the start of the fifo

func (*RedisByteFIFO) PushFunc Uses

func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error

PushFunc pushes data to the end of the fifo and calls the callback if it is added

type RedisByteFIFOConfiguration Uses

type RedisByteFIFOConfiguration struct {
    ConnectionString string
    QueueName        string
}

RedisByteFIFOConfiguration is the configuration for the RedisByteFIFO

type RedisQueue Uses

type RedisQueue struct {
    *ByteFIFOQueue
}

RedisQueue redis queue

type RedisQueueConfiguration Uses

type RedisQueueConfiguration struct {
    ByteFIFOQueueConfiguration
    RedisByteFIFOConfiguration
}

RedisQueueConfiguration is the configuration for the redis queue

type RedisUniqueByteFIFO Uses

type RedisUniqueByteFIFO struct {
    RedisByteFIFO
    // contains filtered or unexported fields
}

RedisUniqueByteFIFO represents a UniqueByteFIFO formed from a redisClient

func NewRedisUniqueByteFIFO Uses

func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniqueByteFIFO, error)

NewRedisUniqueByteFIFO creates a UniqueByteFIFO formed from a redisClient

func (*RedisUniqueByteFIFO) Has Uses

func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error)

Has returns whether the fifo contains this data

func (*RedisUniqueByteFIFO) Pop Uses

func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error)

Pop pops data from the start of the fifo

func (*RedisUniqueByteFIFO) PushFunc Uses

func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error

PushFunc pushes data to the end of the fifo and calls the callback if it is added

type RedisUniqueByteFIFOConfiguration Uses

type RedisUniqueByteFIFOConfiguration struct {
    RedisByteFIFOConfiguration
    SetName string
}

RedisUniqueByteFIFOConfiguration is the configuration for the RedisUniqueByteFIFO

type RedisUniqueQueue Uses

type RedisUniqueQueue struct {
    *ByteFIFOUniqueQueue
}

RedisUniqueQueue redis queue

type RedisUniqueQueueConfiguration Uses

type RedisUniqueQueueConfiguration struct {
    ByteFIFOQueueConfiguration
    RedisUniqueByteFIFOConfiguration
}

RedisUniqueQueueConfiguration is the configuration for the redis queue

type Shutdownable Uses

type Shutdownable interface {
    Shutdown()
    Terminate()
}

Shutdownable represents a queue that can be shutdown

type Type Uses

type Type string

Type is a type of Queue

const ChannelQueueType Type = "channel"

ChannelQueueType is the type for channel queue

const ChannelUniqueQueueType Type = "unique-channel"

ChannelUniqueQueueType is the type for channel queue

const DummyQueueType Type = "dummy"

DummyQueueType is the type for the dummy queue

const ImmediateType Type = "immediate"

ImmediateType is the type to execute the function when push

const LevelQueueType Type = "level"

LevelQueueType is the type for level queue

const LevelUniqueQueueType Type = "unique-level"

LevelUniqueQueueType is the type for level queue

const PersistableChannelQueueType Type = "persistable-channel"

PersistableChannelQueueType is the type for persistable queue

const PersistableChannelUniqueQueueType Type = "unique-persistable-channel"

PersistableChannelUniqueQueueType is the type for persistable queue

const RedisQueueType Type = "redis"

RedisQueueType is the type for redis queue

const RedisUniqueQueueType Type = "unique-redis"

RedisUniqueQueueType is the type for redis queue

const WrappedQueueType Type = "wrapped"

WrappedQueueType is the type for a wrapped delayed starting queue

const WrappedUniqueQueueType Type = "unique-wrapped"

WrappedUniqueQueueType is the type for a wrapped delayed starting queue

func RegisteredTypes Uses

func RegisteredTypes() []Type

RegisteredTypes provides the list of requested types of queues

type UniqueByteFIFO Uses

type UniqueByteFIFO interface {
    ByteFIFO
    // Has returns whether the fifo contains this data
    Has(data []byte) (bool, error)
}

UniqueByteFIFO defines a FIFO that Uniques its contents

type UniqueQueue Uses

type UniqueQueue interface {
    Queue
    PushFunc(Data, func() error) error
    Has(Data) (bool, error)
}

UniqueQueue defines a queue which guarantees only one instance of same data is in the queue. Instances with same identity will be discarded if there is already one in the line.

This queue is particularly useful for preventing duplicated task of same purpose - please note that this does not guarantee that a particular task cannot be processed twice or more at the same time. Uniqueness is only guaranteed whilst the task is waiting in the queue.

Users of this queue should be careful to push only the identifier of the data

func CreateUniqueQueue Uses

func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) UniqueQueue

CreateUniqueQueue for name with provided handler and exemplar

type WorkerPool Uses

type WorkerPool struct {
    // contains filtered or unexported fields
}

WorkerPool represent a dynamically growable worker pool for a provided handler function. They have an internal channel which they use to detect if there is a block and will grow and shrink in response to demand as per configuration.

func NewWorkerPool Uses

func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) AddWorkers Uses

func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc

AddWorkers adds workers to the pool - this allows the number of workers to go above the limit

func (*WorkerPool) BlockTimeout Uses

func (p *WorkerPool) BlockTimeout() time.Duration

BlockTimeout returns the timeout til the next boost

func (*WorkerPool) BoostTimeout Uses

func (p *WorkerPool) BoostTimeout() time.Duration

BoostTimeout returns the timeout of the next boost

func (*WorkerPool) BoostWorkers Uses

func (p *WorkerPool) BoostWorkers() int

BoostWorkers returns the number of workers for a boost

func (*WorkerPool) CleanUp Uses

func (p *WorkerPool) CleanUp(ctx context.Context)

CleanUp will drain the remaining contents of the channel This should be called after AddWorkers context is closed

func (*WorkerPool) Flush Uses

func (p *WorkerPool) Flush(timeout time.Duration) error

Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager

func (*WorkerPool) FlushWithContext Uses

func (p *WorkerPool) FlushWithContext(ctx context.Context) error

FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty NB: The worker will not be registered with the manager.

func (*WorkerPool) IsEmpty Uses

func (p *WorkerPool) IsEmpty() bool

IsEmpty returns if true if the worker queue is empty

func (*WorkerPool) MaxNumberOfWorkers Uses

func (p *WorkerPool) MaxNumberOfWorkers() int

MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool

func (*WorkerPool) NumberOfWorkers Uses

func (p *WorkerPool) NumberOfWorkers() int

NumberOfWorkers returns the number of current workers in the pool

func (*WorkerPool) Push Uses

func (p *WorkerPool) Push(data Data)

Push pushes the data to the internal channel

func (*WorkerPool) SetMaxNumberOfWorkers Uses

func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int)

SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool Changing this number will not change the number of current workers but will change the limit for future additions

func (*WorkerPool) SetPoolSettings Uses

func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)

SetPoolSettings sets the setable boost values

func (*WorkerPool) Wait Uses

func (p *WorkerPool) Wait()

Wait for WorkerPool to finish

type WorkerPoolConfiguration Uses

type WorkerPoolConfiguration struct {
    QueueLength  int
    BatchLength  int
    BlockTimeout time.Duration
    BoostTimeout time.Duration
    BoostWorkers int
    MaxWorkers   int
}

WorkerPoolConfiguration is the basic configuration for a WorkerPool

type WrappedQueue Uses

type WrappedQueue struct {
    // contains filtered or unexported fields
}

WrappedQueue wraps a delayed starting queue

func (*WrappedQueue) Flush Uses

func (q *WrappedQueue) Flush(timeout time.Duration) error

Flush flushes the queue and blocks till the queue is empty

func (*WrappedQueue) FlushWithContext Uses

func (q *WrappedQueue) FlushWithContext(ctx context.Context) error

FlushWithContext implements the final part of Flushable

func (*WrappedQueue) IsEmpty Uses

func (q *WrappedQueue) IsEmpty() bool

IsEmpty checks whether the queue is empty

func (*WrappedQueue) Name Uses

func (q *WrappedQueue) Name() string

Name returns the name of the queue

func (*WrappedQueue) Push Uses

func (q *WrappedQueue) Push(data Data) error

Push will push the data to the internal channel checking it against the exemplar

func (*WrappedQueue) Run Uses

func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func()))

Run starts to run the queue and attempts to create the internal queue

func (*WrappedQueue) Shutdown Uses

func (q *WrappedQueue) Shutdown()

Shutdown this queue and stop processing

func (*WrappedQueue) Terminate Uses

func (q *WrappedQueue) Terminate()

Terminate this queue and close the queue

type WrappedQueueConfiguration Uses

type WrappedQueueConfiguration struct {
    Underlying  Type
    Timeout     time.Duration
    MaxAttempts int
    Config      interface{}
    QueueLength int
    Name        string
}

WrappedQueueConfiguration is the configuration for a WrappedQueue

type WrappedUniqueQueue Uses

type WrappedUniqueQueue struct {
    *WrappedQueue
    // contains filtered or unexported fields
}

WrappedUniqueQueue wraps a delayed starting unique queue

func (*WrappedUniqueQueue) Has Uses

func (q *WrappedUniqueQueue) Has(data Data) (bool, error)

Has checks if the data is in the queue

func (*WrappedUniqueQueue) IsEmpty Uses

func (q *WrappedUniqueQueue) IsEmpty() bool

IsEmpty checks whether the queue is empty

func (*WrappedUniqueQueue) Push Uses

func (q *WrappedUniqueQueue) Push(data Data) error

Push will push the data to the internal channel checking it against the exemplar

func (*WrappedUniqueQueue) PushFunc Uses

func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error

PushFunc will push the data to the internal channel checking it against the exemplar

type WrappedUniqueQueueConfiguration Uses

type WrappedUniqueQueueConfiguration struct {
    Underlying  Type
    Timeout     time.Duration
    MaxAttempts int
    Config      interface{}
    QueueLength int
    Name        string
}

WrappedUniqueQueueConfiguration is the configuration for a WrappedUniqueQueue

Package queue imports 15 packages (graph) and is imported by 29 packages. Updated 2021-01-22. Refresh now. Tools for package owners.