Documentation ¶
Index ¶
- Constants
- Variables
- func RandomJid() string
- func RssKb() int64
- type Batch
- type BatchStatus
- type Client
- func (c *Client) Ack(jid string) error
- func (c *Client) BatchCommit(bid string) error
- func (c *Client) BatchNew(def *Batch) (*Batch, error)
- func (c *Client) BatchOpen(bid string) (*Batch, error)
- func (c *Client) BatchStatus(bid string) (*BatchStatus, error)
- func (c *Client) Beat(args ...string) (string, error)
- func (c *Client) Clear(name Structure) error
- func (c *Client) Close() error
- func (c *Client) Discard(name Structure, filter JobFilter) error
- func (c *Client) Fail(jid string, err error, backtrace []byte) error
- func (c *Client) Fetch(q ...string) (*Job, error)
- func (c *Client) Flush() error
- func (c *Client) Generic(cmdline string) (string, error)
- func (c *Client) Info() (map[string]interface{}, error)
- func (c *Client) Kill(name Structure, filter JobFilter) error
- func (c *Client) PauseQueues(names ...string) error
- func (c *Client) Push(job *Job) error
- func (c *Client) PushBulk(jobs []*Job) (map[string]string, error)
- func (c *Client) QueueSizes() (map[string]uint64, error)
- func (c *Client) RemoveQueues(names ...string) error
- func (c *Client) Requeue(name Structure, filter JobFilter) error
- func (c *Client) ResumeQueues(names ...string) error
- func (c *Client) TrackGet(jid string) (*JobTrack, error)
- func (c *Client) TrackSet(jid string, percent int, desc string, reserveUntil *time.Time) error
- type ClientData
- type Dialer
- type Failure
- type Job
- func (j *Job) GetCustom(name string) (interface{}, bool)
- func (j *Job) SetCustom(name string, value interface{}) *Job
- func (j *Job) SetExpiresAt(expiresAt time.Time) *Job
- func (j *Job) SetExpiresIn(expiresIn time.Duration) *Job
- func (j *Job) SetUniqueFor(secs uint) *Job
- func (j *Job) SetUniqueness(until UniqueUntil) *Job
- type JobFilter
- type JobTrack
- type MutateClient
- type Operation
- type Pool
- type ProtocolError
- type Server
- type Structure
- type UniqueUntil
Constants ¶
const ( // This is the protocol version supported by this client. // The server might be running an older or newer version. ExpectedProtocolVersion = 2 )
Variables ¶
var ( ErrBatchAlreadyCommitted = fmt.Errorf("batch has already been committed, must reopen") ErrBatchNotOpen = fmt.Errorf("batch must be opened before it can be used") )
var ( // Set this to a non-empty value in a consumer process // e.g. see how faktory_worker_go sets this. RandomProcessWid = "" Labels = []string{"golang"} )
var ( Name = "Faktory" Version = "1.8.0" )
var ( RetryPolicyDefault = 25 RetryPolicyEmphemeral = 0 RetryPolicyDirectToMorgue = -1 )
var (
Everything = JobFilter{
Regexp: "*",
}
)
Functions ¶
Types ¶
type Batch ¶
type Batch struct { // Unique identifier for each batch. // NB: the caller should not set this, it is generated // by Faktory when the batch is persisted to Redis. Bid string `json:"bid"` ParentBid string `json:"parent_bid,omitempty"` Description string `json:"description,omitempty"` Success *Job `json:"success,omitempty"` Complete *Job `json:"complete,omitempty"` // contains filtered or unexported fields }
func NewBatch ¶
Allocate a new Batch. Caller must set one or more callbacks and push one or more jobs in the batch.
b := faktory.NewBatch(cl) b.Success = faktory.NewJob("MySuccessCallback", 12345) b.Jobs(func() error { b.Push(...) })
func (*Batch) Commit ¶
Commit any pushed jobs in the batch to Redis so they can fire callbacks. A Batch object can only be committed once. You must use client.BatchOpen to get a new copy if you want to commit more jobs.
type BatchStatus ¶
type BatchStatus struct { Bid string `json:"bid"` ParentBid string `json:"parent_bid,omitempty"` Description string `json:"description,omitempty"` CreatedAt string `json:"created_at"` Total int64 `json:"total"` Pending int64 `json:"pending"` Failed int64 `json:"failed"` // "" if pending, // "1" if callback enqueued, // "2" if callback finished successfully CompleteState string `json:"complete_st"` SuccessState string `json:"success_st"` }
type Client ¶
type Client struct { Location string Options *ClientData // contains filtered or unexported fields }
The Client structure represents a thread-unsafe connection to a Faktory server. It is recommended to use a connection pool of Clients in a multi-threaded process. See faktory_worker_go's internal connection pool for example.
func Dial ¶
Dial connects to the remote faktory server with a Dialer reflecting the value of srv.Network; i.e., a *tls.Dialer if "tcp+tls" and a *net.Dialer if not.
client.Dial(client.Localhost, "topsecret")
func DialWithDialer ¶
DialWithDialer connects to the faktory server
func Open ¶
Open connects to a Faktory server based on environment variable conventions:
• Use FAKTORY_PROVIDER to point to a custom URL variable. • Use FAKTORY_URL as a catch-all default.
Use the URL to configure any necessary password:
tcp://:mypassword@localhost:7419
By default Open assumes localhost with no password which is appropriate for local development.
func OpenWithDialer ¶
OpenWithDialer connects to a Faktory server following the same conventions as Open but instead uses dialer as the transport.
func (*Client) BatchCommit ¶
func (*Client) BatchStatus ¶
func (c *Client) BatchStatus(bid string) (*BatchStatus, error)
func (*Client) Beat ¶
* The first arg to Beat allows a worker process to report its current lifecycle state * to Faktory. All worker processes must follow the same basic lifecycle: * * (startup) -> "" -> "quiet" -> "terminate" * * Quiet allows the process to finish its current work without fetching any new work. * Terminate means the process should exit within X seconds, usually ~30 seconds.
func (*Client) Fail ¶
Fail notifies Faktory that a job failed with the given error. If backtrace is non-nil, it is assumed to be the output from runtime/debug.Stack().
func (*Client) PauseQueues ¶
List queues explicitly or use "*" to pause all known queues
func (*Client) QueueSizes ¶ added in v1.5.2
func (*Client) RemoveQueues ¶ added in v1.6.2
List queues explicitly or use "*" to remove all known queues
func (*Client) ResumeQueues ¶
List queues explicitly or use "*" to resume all known queues
type ClientData ¶
type ClientData struct { Hostname string `json:"hostname"` Wid string `json:"wid"` Pid int `json:"pid"` Labels []string `json:"labels"` // this can be used by proxies to route the connection. // it is ignored by Faktory. Username string `json:"username"` // Hash is hex(sha256(password + nonce)) PasswordHash string `json:"pwdhash"` // The protocol version used by this client. // The server can reject this connection if the version will not work // The server advertises its protocol version in the HI. Version int `json:"v"` }
ClientData is serialized to JSON and sent with the HELLO command. PasswordHash is required if the server is not listening on localhost. The WID (worker id) must be random and unique for each worker process. It can be a UUID, etc. Non-worker processes should leave WID empty.
The other elements can be useful for debugging and are displayed on the Busy tab.
type Failure ¶
type Failure struct { RetryCount int `json:"retry_count"` RetryRemaining int `json:"remaining"` FailedAt string `json:"failed_at"` NextAt string `json:"next_at,omitempty"` ErrorMessage string `json:"message,omitempty"` ErrorType string `json:"errtype,omitempty"` Backtrace []string `json:"backtrace,omitempty"` }
type Job ¶
type Job struct { // required Jid string `json:"jid"` Queue string `json:"queue"` Type string `json:"jobtype"` Args []interface{} `json:"args"` // optional CreatedAt string `json:"created_at,omitempty"` EnqueuedAt string `json:"enqueued_at,omitempty"` At string `json:"at,omitempty"` ReserveFor int `json:"reserve_for,omitempty"` Retry *int `json:"retry"` Backtrace int `json:"backtrace,omitempty"` Failure *Failure `json:"failure,omitempty"` Custom map[string]interface{} `json:"custom,omitempty"` }
func NewJob ¶
Clients should use this constructor to build a Job, not allocate a bare struct directly.
func (*Job) SetCustom ¶
Set custom metadata for this job. Faktory reserves all element names starting with "_" for internal use, e.g. SetCustom("_txid", "12345")
func (*Job) SetExpiresAt ¶
Configure the TTL for this job. After this point in time, the job will be discarded rather than executed.
func (*Job) SetUniqueFor ¶
Configure this job to be unique for +secs+ seconds or until the job has been successfully processed.
func (*Job) SetUniqueness ¶
func (j *Job) SetUniqueness(until UniqueUntil) *Job
Configure the uniqueness deadline for this job, legal values are:
- "success" - the job will be considered unique until it has successfully processed or the +unique_for+ TTL has passed, this is the default value.
- "start" - the job will be considered unique until it starts processing. Retries may lead to multiple copies of the job running.
type JobFilter ¶
type JobFilter struct { Jids []string `json:"jids,omitempty"` Regexp string `json:"regexp,omitempty"` Jobtype string `json:"jobtype,omitempty"` }
func Matching ¶
This is a generic pattern match across the entire job JSON payload. Be very careful that you don't accidentally match some unintended part of the payload.
NB: your pattern should have * on each side. The pattern is passed directly to Redis.
Example: discard any job retries whose payload contains the special word "uid:12345":
client.Discard(faktory.Retries, faktory.Matching("*uid:12345*"))
See the Redis SCAN documentation for pattern matching examples. https://redis.io/commands/scan
func OfType ¶
Matches jobs based on the exact Jobtype. This is pretty fast because it devolves to Matching(`"jobtype":"$ARG"`) and matches within Redis.
func WithJids ¶
Match jobs with the given JIDs. Warning: O(m*n), very slow because it has to pull every job into Faktory and check the JID against the list.
If you pass in a single JID, it will devolve to matching within Redis and perform much faster. For that reason, it might be better to handle one JID at a time.
type MutateClient ¶
type MutateClient interface { // Move the given jobs from structure to the Dead set. // Faktory will not touch them anymore but you can still see them in the Web UI. // // Kill(Retries, OfType("DataSyncJob").WithJids("abc", "123")) Kill(name Structure, filter JobFilter) error // Move the given jobs to their associated queue so they can be immediately // picked up and processed. Requeue(name Structure, filter JobFilter) error // Throw away the given jobs, e.g. if you want to delete all jobs named "QuickbooksSyncJob" // // Discard(Dead, OfType("QuickbooksSyncJob")) Discard(name Structure, filter JobFilter) error // Empty the entire given structure, e.g. if you want to clear all retries. // This is very fast as it is special cased by Faktory. Clear(name Structure) error }
Commands which allow you to perform admin tasks on various Faktory structures. These are NOT designed to be used in business logic but rather for maintenance, data repair, migration, etc. They can have poor scalability or performance edge cases.
Generally these operations are O(n) or worse. They will get slower as your data gets bigger.
type Pool ¶
func NewPool ¶
NewPool creates a new Pool object through which multiple clients will be managed on your behalf.
Call Get() to retrieve a client instance and Put() to return it to the pool. If you do not call Put(), the connection will be leaked, and the pool will stop working once it hits capacity.
Do NOT call Close() on the client, as the lifecycle is managed internally.
The dialer clients in this pool use is determined by the URI scheme in FAKTORY_PROVIDER.
func NewPoolWithDialer ¶
NewPoolWithDialer creates a new Pool object similar to NewPool but clients will use the provided dialer instead of default ones.
type ProtocolError ¶
type ProtocolError struct {
// contains filtered or unexported fields
}
func (*ProtocolError) Error ¶
func (pe *ProtocolError) Error() string
type Server ¶
type Server struct { Network string Address string Username string Password string Timeout time.Duration TLS *tls.Config }
func DefaultServer ¶
func DefaultServer() *Server
func (*Server) OpenWithDialer ¶
OpenWithDialer creates a *Client with the dialer.
func (*Server) ReadFromEnv ¶
type UniqueUntil ¶
type UniqueUntil string
const ( UntilSuccess UniqueUntil = "success" // default UntilStart UniqueUntil = "start" )