messaging

package module
v4.0.0-...-44770d4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2018 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Overview

Package messaging provides the logic and data structures that the services will need to communicate with each other over AMQP (as implemented by RabbitMQ).

Index

Constants

This section is empty.

Variables

View Source
var (
	// Info level logger. Can be set by other packages. Defaults to writing to
	// os.Stdout.
	Info Logger = log.New(os.Stdout, "", log.Lshortfile)

	// Warn level logger. Can be set by other packages. Defaults to writing to
	// os.Stderr.
	Warn Logger = log.New(os.Stderr, "", log.Lshortfile)

	// Error level logger. Can be set by other packages. Default to writing to
	// os.Stderr.
	Error Logger = log.New(os.Stderr, "", log.Lshortfile)

	//LaunchCommand is the string used in LaunchCo
	LaunchCommand = "LAUNCH"

	//ReindexAllKey is the routing/binding key for full reindex messages.
	ReindexAllKey = "index.all"

	//ReindexTemplatesKey is the routing/binding key for templates reindex messages.
	ReindexTemplatesKey = "index.templates"

	//IncrementalKey is the routing/binding key for incremental updates
	IncrementalKey = "metadata.update"

	//LaunchesKey is the routing/binding key for job launch request messages.
	LaunchesKey = "jobs.launches"

	//UpdatesKey is the routing/binding key for job update messages.
	UpdatesKey = "jobs.updates"

	//StopsKey is the routing/binding key for job stop request messages.
	StopsKey = "jobs.stops"

	//CommandsKey is the routing/binding key for job command messages.
	CommandsKey = "jobs.commands"

	// TimeLimitRequestsKey is the routing/binding key for the job time limit messages.
	TimeLimitRequestsKey = "jobs.timelimits.requests"

	//TimeLimitDeltaKey is the routing/binding key for the job time limit delta messages.
	TimeLimitDeltaKey = "jobs.timelimits.deltas"

	//TimeLimitResponseKey is the routing/binding key for the job time limit
	//response messages.
	TimeLimitResponseKey = "jobs.timelimits.responses"

	//QueuedState is when a job is queued.
	QueuedState JobState = "Queued"

	//SubmittedState is when a job has been submitted.
	SubmittedState JobState = "Submitted"

	//RunningState is when a job is running.
	RunningState JobState = "Running"

	//ImpendingCancellationState is when a job is running but the current step is about
	//to reach its expiration time.
	ImpendingCancellationState JobState = "ImpendingCancellation"

	//SucceededState is when a job has successfully completed the required steps.
	SucceededState JobState = "Completed"

	//FailedState is when a job has failed. Duh.
	FailedState JobState = "Failed"
)

Functions

func StopQueueName

func StopQueueName(invID string) string

StopQueueName returns the formatted queue name for job stop requests. It's based on the passed in string, which is assumed to be the InvocationID for a job, but there's no reason that is required to the case.

func StopRequestKey

func StopRequestKey(invID string) string

StopRequestKey returns the binding key formatted correctly for the jobs exchange based on the InvocationID passed in.

func TimeLimitDeltaQueueName

func TimeLimitDeltaQueueName(invID string) string

TimeLimitDeltaQueueName returns the correctly formatted queue name for time limit delta requests. It's based on the passed in string, which is assumed to be the InvocationID for a job, but there's no reason that is required to be the case.

func TimeLimitDeltaRequestKey

func TimeLimitDeltaRequestKey(invID string) string

TimeLimitDeltaRequestKey returns the binding key formatted correctly for the jobs exchange based on the InvocationID passed in.

func TimeLimitRequestKey

func TimeLimitRequestKey(invID string) string

TimeLimitRequestKey returns the formatted binding key based on the passed in job InvocationID.

func TimeLimitRequestQueueName

func TimeLimitRequestQueueName(invID string) string

TimeLimitRequestQueueName returns the formatted queue name for time limit requests. It is based on the passed in job InvocationID.

func TimeLimitResponsesKey

func TimeLimitResponsesKey(invID string) string

TimeLimitResponsesKey returns the formatted binding key based on the passed in job InvocationID.

func TimeLimitResponsesQueueName

func TimeLimitResponsesQueueName(invID string) string

TimeLimitResponsesQueueName returns the formatted queue name for time limit responses. It is based on the passed in job InvocationID.

Types

type Client

type Client struct {
	Reconnect bool
	// contains filtered or unexported fields
}

Client encapsulates the information needed to interact via AMQP.

func NewClient

func NewClient(uri string, reconnect bool) (*Client, error)

NewClient returns a new *Client. It will block until the connection succeeds.

func (*Client) AddConsumer

func (c *Client) AddConsumer(exchange, exchangeType, queue, key string, handler MessageHandler, prefetchCount int)

AddConsumer adds a consumer with only one binding, which is usually what you need

func (*Client) AddConsumerMulti

func (c *Client) AddConsumerMulti(exchange, exchangeType, queue string, keys []string, handler MessageHandler, prefetchCount int)

AddConsumerMulti adds a consumer to the list of consumers that need to be created each time the client is set up. Note that this just adds the consumers to a list, it doesn't actually start handling messages yet. You need to call Listen() for that.

func (*Client) AddDeletableConsumer

func (c *Client) AddDeletableConsumer(exchange, exchangeType, queue, key string, handler MessageHandler)

AddDeletableConsumer adds a consumer to the list of consumers that need to be created each time the client is set up. Unlike AddConsumer(), the new consumer will have auto-delete set to true and durable set to false. Make sure that Listen() has been called before calling this function. This only supports a single bind key, for now.

func (*Client) Close

func (c *Client) Close()

Close closes the connection to the AMQP broker.

func (*Client) CreateQueue

func (c *Client) CreateQueue(name, exchange, key string, durable, autoDelete bool) (*amqp.Channel, error)

CreateQueue creates a queue with the given name, durability, and auto-delete settings. It then binds it to the given exchange with the provided key. This function does not declare the exchange.

func (*Client) DeleteQueue

func (c *Client) DeleteQueue(name string) error

DeleteQueue deletes the queue with the given name without regards to safety.

func (*Client) Listen

func (c *Client) Listen()

Listen will wait for messages and pass them off to handlers, which run in their own goroutine.

func (*Client) Publish

func (c *Client) Publish(key string, body []byte) error

Publish sends a message to the configured exchange with a routing key set to the value of 'key'.

func (*Client) PublishJobUpdate

func (c *Client) PublishJobUpdate(u *UpdateMessage) error

PublishJobUpdate sends a mess to the configured exchange with a routing key of "jobs.updates"

func (*Client) QueueExists

func (c *Client) QueueExists(name string) (bool, error)

QueueExists returns true if the given queue name exists, false or an error otherwise.

func (*Client) SendStopRequest

func (c *Client) SendStopRequest(invID, user, reason string) error

SendStopRequest sends out a message to the jobs.stops.<invocation_id> topic telling listeners to stop their job.

func (*Client) SendTimeLimitDelta

func (c *Client) SendTimeLimitDelta(invID, delta string) error

SendTimeLimitDelta sends out a message to the jobs.timelimits.deltas.<invocationID> topic containing how the job should adjust its timelimit.

func (*Client) SendTimeLimitRequest

func (c *Client) SendTimeLimitRequest(invID string) error

SendTimeLimitRequest sends out a message to the job on the "jobs.timelimits.requests.<invocationID>" topic. This should trigger the job to emit a TimeLimitResponse.

func (*Client) SendTimeLimitResponse

func (c *Client) SendTimeLimitResponse(invID string, timeRemaining int64) error

SendTimeLimitResponse sends out a message to the jobs.timelimits.responses.<invocationID> topic containing the remaining time for the job.

func (*Client) SetupPublishing

func (c *Client) SetupPublishing(exchange string) error

SetupPublishing initializes the publishing functionality of the client. Call this before calling Publish.

type Command

type Command int

Command is tells the receiver of a JobRequest which action to perform

const (
	//Launch tells the receiver of a JobRequest to launch the job
	Launch Command = iota

	//Stop tells the receiver of a JobRequest to stop a job
	Stop
)

type JobRequest

type JobRequest struct {
	Job     *model.Job
	Command Command
	Message string
	Version int
}

JobRequest is a generic request type for job related requests.

func NewLaunchRequest

func NewLaunchRequest(j *model.Job) *JobRequest

NewLaunchRequest returns a *JobRequest that has been constructed to be a launch request for the provided job.

type JobState

type JobState string

JobState defines a valid state for a job.

type Logger

type Logger interface {
	Print(args ...interface{})
	Printf(format string, args ...interface{})
	Println(args ...interface{})
}

type MessageHandler

type MessageHandler func(amqp.Delivery)

MessageHandler defines a type for amqp.Delivery handlers.

type StatusCode

type StatusCode int

StatusCode defines a valid exit code for a job.

const (
	// Success is the exit code used when the required commands execute correctly.
	Success StatusCode = iota

	// StatusDockerPullFailed is the exit code when a 'docker pull' fails.
	StatusDockerPullFailed

	// StatusDockerCreateFailed is the exit code when a 'docker create' fails.
	StatusDockerCreateFailed

	// StatusInputFailed is the exit code when an input download fails.
	StatusInputFailed

	// StatusStepFailed is the exit code when a step in the job fails.
	StatusStepFailed

	// StatusOutputFailed is the exit code when the output upload fails.
	StatusOutputFailed

	// StatusKilled is the exit code when the job is killed.
	StatusKilled

	// StatusTimeLimit is the exit code when the job is killed due to the time
	// limit being reached.
	StatusTimeLimit

	// StatusBadDuration is the exit code when the job is killed because an
	// unparseable job duration was sent to it.
	StatusBadDuration
)

type StopRequest

type StopRequest struct {
	Reason       string
	Username     string
	Version      int
	InvocationID string
}

StopRequest contains the information needed to stop a job

func NewStopRequest

func NewStopRequest() *StopRequest

NewStopRequest returns a *JobRequest that has been constructed to be a stop request for a running job.

type TimeLimitDelta

type TimeLimitDelta struct {
	InvocationID string
	Delta        string
}

TimeLimitDelta is the message that is sent to get road-runner to change its time limit. The 'Delta' field contains a string in Go's Duration string format. More info on the format is available here: https://golang.org/pkg/time/#ParseDuration

type TimeLimitRequest

type TimeLimitRequest struct {
	InvocationID string
}

TimeLimitRequest is the message that is sent to road-runner to get it to broadcast its current time limit.

type TimeLimitResponse

type TimeLimitResponse struct {
	InvocationID          string
	MillisecondsRemaining int64
}

TimeLimitResponse is the message that is sent by road-runner in response to a TimeLimitRequest. It contains the current time limit from road-runner.

type UpdateMessage

type UpdateMessage struct {
	Job     *model.Job
	Version int
	State   JobState
	Message string
	SentOn  string // Should be the milliseconds since the epoch
	Sender  string // Should be the hostname of the box sending the message.
}

UpdateMessage contains the information needed to broadcast a change in state for a job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL