cloudtasks

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: MIT Imports: 24 Imported by: 0

README

cloudtasks

Go Reference

Google Cloud Tasks integration with Cloud Run apps.

Install

go get github.com/altipla-consulting/cloudtasks

Usage

Declare queues

To set up the queue system with its corresponding models and APIs, you'll need to call the code from your application's main function:

func main() {
  if err := models.ConnectQueues(); err != nil {
    log.Fatal(err)
  }

  ...
}

In the models/queues.go file:

var (
  QueueFoo cloudtasks.Queue
  QueueBar cloudtasks.Queue
)

func ConnectQueues() error {
  // PROJECT_HASH should be replaced by your Cloud Run project hash.
  // For example if you have URLs like "https://foo-service-9omj3qcv6b-ew.a.run.app/" the hash will be "9omj3qcv6b".
  QueueFoo = cloudtasks.NewQueue("PROJECT_HASH", "foo")
  QueueBar = cloudtasks.NewQueue("PROJECT_HASH", "bar")

  // It could also come from a global setting constant.
  QueueGlobal = cloudtasks.NewQueue(config.ProjectHash, "global")

  return nil
}

Queues manage task consumption rates and the maximum number of concurrent tasks. These are pre-configured in Google Cloud.

A single queue can efficiently process multiple task types. It's recommended to use one queue for similar rate requirements instead of creating multiple queues, as this approach minimizes runtime overhead.

Defining tasks

Define background tasks within the same package that will execute them. By convention, task functions are named with the suffix xxFn, indicating they're background functions. The first argument in cloudtasks.Func is a unique string for debugging purposes. Declaring two tasks with the same name will trigger a panic.

var fooFn = cloudtasks.Func("foo", func(ctx context.Context, task *cloudtasks.Task) error {
  var arg int64
  if err := task.Read(&arg); err != nil {
    return errors.Trace(err)
  }

  // ... task content

  return nil
})

Ensure global task declarations occur during initialization, typically in the init() function.

Task Invocation

Invoke tasks by calling the previously defined functions:

func FooHandler(...) {
  // ... other code

  var arg int64
  if err := fooFn.Call(r.Context(), models.QueueFoo, arg); err != nil {
    return errors.Trace(err)
  }

  // ... other code
}

Tasks can accept any JSON-serializable data as arguments, excluding structs with private fields or methods. This flexibility allows for various data types, including basic types like numbers and strings, or more complex structures needed for task execution.

External task invocation

Invoke tasks of external applications with our helper that has retry and authorization built-in:

func FooHandler(w http.ResponseWriter, r *http.Request) error {
  // ... other code

  task := &cloudtasks.ExternalTask{
    URL: "https://foo-service-9omj3qcv6b-ew.a.run.app/myurl",
    Payload: arg,
  }
  if err := models.QueueFoo.SendExternal(r.Context(), task); err != nil {
    return errors.Trace(err)
  }

  // ... other code
}

Contributing

You can make pull requests or create issues in GitHub. Any code you send should be formatted using make gofmt.

License

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCannotSendTask = errors.New("cloudtasks: cannot send task")

ErrCannotSendTask is returned from a Send call when the task has been correctly prepared but cannot be sent to the remote service. It can give you an opportunity to call the underlying implementation directly instead.

Functions

This section is empty.

Types

type ExternalTask added in v0.2.0

type ExternalTask struct {
	URL     string
	Payload any
}

ExternalTask should be filled with the data of the task to call in an external Cloud Run application.

func (*ExternalTask) LogValue added in v0.3.0

func (task *ExternalTask) LogValue() slog.Value

LogValue implements slog.LogValuer.

type Function

type Function struct {
	// contains filtered or unexported fields
}

Function is a stored task implementation.

func Func

func Func(key string, h Handler) *Function

Func builds and registers a new task implementation.

func (*Function) Call

func (f *Function) Call(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) error

Call builds a task invocation and directly sends it individually to the queue.

If you are going to send multiple tasks at the same time is more efficient to build all of them with Task() first and then send them in batches with queue.SendTasks(). If sending a single task this function will be similar in performance to the batch method described before.

func (*Function) CallTask

func (f *Function) CallTask(ctx context.Context, queue Queue, payload interface{}, opts ...TaskOption) (*Task, error)

CallTask builds a task invocation, sends it and returns the built task.

func (*Function) Task

func (f *Function) Task(payload interface{}, opts ...TaskOption) (*Task, error)

Task builds a task invocation to the function. You can later send the task in batches using queue.SendTasks() or directly invoke Call() to make both things at the same time.

The payload can be a proto.Message or any other kind of interface that can be serialized to JSON. It would then be read on the task.

func (*Function) TestCall added in v0.4.0

func (f *Function) TestCall(t *testing.T, payload interface{}) error

TestCall makes a direct call to the handler with the payload as incoming payload. It requires a testing argument to be sure it is only used in tests.

type Handler

type Handler func(ctx context.Context, task *Task) error

Handler should be implemented by any task function.

type Queue

type Queue interface {
	// Send a new task to the queue.
	Send(ctx context.Context, task *Task) error

	// SendExternal sends a new task to an external URL.
	SendExternal(ctx context.Context, task *ExternalTask) error
}

Queue abstract any remote or local system that can execute a task.

func NewQueue

func NewQueue(runProjectHash string, name string, opts ...QueueOption) Queue

NewQueue initializes a new queue. It needs: - The Cloud Run project hash. For example if you have URLs like "https://foo-service-9omj3qcv6b-ew.a.run.app/" the hash will be "9omj3qcv6b". - The queue name.

type QueueOption

type QueueOption func(*gcloudQueue)

QueueOption configures queues when creating them.

func WithRegion

func WithRegion(region string) QueueOption

WithRegion configures a custom region for the queue. By default it will use the region of the Cloud Run service.

type Task

type Task struct {

	// Read only. The current retry count.
	Retries int64
	// contains filtered or unexported fields
}

Task is a task sent or receieved from a queue.

func (*Task) LogValue added in v0.3.0

func (task *Task) LogValue() slog.Value

LogValue implements slog.LogValuer.

func (*Task) Name

func (task *Task) Name() string

Name returns the name of the task. By default it is autogenerated.

func (*Task) Read

func (task *Task) Read(dest interface{}) error

Read unmarshals the task payload into the provided destination.

type TaskOption

type TaskOption func(task *Task)

TaskOption configures tasks when creating them.

func WithName

func WithName(name string) TaskOption

WithName configures a custom name for the task. By default it will be autogenerated. A custom name could be problematic with tombstones (task names that can't be repeated) and concurrency controls, so assign it with care and read Google Cloud Tasks documentation before using it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL