broker

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2021 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	Metadata     MetadataService
	Preservation PreservationService
	// contains filtered or unexported fields
}

Broker is a RDSS client using the SQS and SNS services.

Messages are received from sqsQueueMainURL and sent to an interna channel (messages). The channel is unbuffered so the receiver controls how often we are going to receive from SQS. However, the current processor is unbounded, i.e. processMessage is launched on a new goroutine for each message received.

The message processor will:

* Extract, unmarshal and validate the message payload.

* Reject messages that have been received before.

* Run the designated handler and capture the returned error.

In case of errors, messages are sent to the {Invalid,Error} Message Queue according to the behaviour described in the RDSS API specification.

Messages are deleted from SQS as soon as they're processed. This includes cases where the processing have failed, e.g. validation or handler error. The visibility timeout is set by the SQS queue owner under the assumption that the underlying preservation system is capable to process the requests within the window given (the maximum is 12 hours).

Potential improvements:

* Create a limited number of processors to avoid bursting.

  • Increase throughput: sqs.DeleteMessageBatch, multiple consumers, etc... Low priority since we don't expect many messages.
  • Handlers could take a long time to complete. Do we need cancellation? What are we doing when we exceed the visibility timeout? Is the adapter accountable?

func New

func New(
	logger logrus.FieldLogger, validator message.Validator,
	sqsClient sqsiface.SQSAPI, sqsQueueMainURL string,
	snsClient snsiface.SNSAPI, snsTopicMainARN, snsTopicInvalidARN, snsTopicErrorARN string,
	dynamodbClient dynamodbiface.DynamoDBAPI, dynamodbTable string,
	incomingMessages prometheus.Counter) *Broker

New returns a usable Broker.

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/JiscSD/rdss-archivematica-channel-adapter/broker"
	"github.com/JiscSD/rdss-archivematica-channel-adapter/broker/message"
	"github.com/JiscSD/rdss-archivematica-channel-adapter/internal/testutil"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/request"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sns/snsiface"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/sirupsen/logrus"
)

type sqsMock struct {
	sqsiface.SQSAPI
	count int
}

func (m *sqsMock) DeleteMessageWithContext(aws.Context, *sqs.DeleteMessageInput, ...request.Option) (*sqs.DeleteMessageOutput, error) {
	return &sqs.DeleteMessageOutput{}, nil
}

func (m *sqsMock) ReceiveMessageWithContext(aws.Context, *sqs.ReceiveMessageInput, ...request.Option) (*sqs.ReceiveMessageOutput, error) {
	m.count++
	switch m.count {
	case 1:
		blob := testutil.MustSpecFixture("../message-api-spec/messages/example_message.json")
		return &sqs.ReceiveMessageOutput{
			Messages: []*sqs.Message{
				&sqs.Message{
					Body: aws.String(string(blob)),
				},
			},
		}, nil
	default:
		// When this method is called again.
		time.Sleep(time.Millisecond * 1)
		return &sqs.ReceiveMessageOutput{
			Messages: []*sqs.Message{},
		}, nil
	}
}

type snsMock struct {
	snsiface.SNSAPI
}

func (m *snsMock) PublishWithContext(aws.Context, *sns.PublishInput, ...request.Option) (*sns.PublishOutput, error) {
	return &sns.PublishOutput{}, nil
}

type dynaMock struct {
	dynamodbiface.DynamoDBAPI
}

func (m *dynaMock) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
	return &dynamodb.GetItemOutput{}, nil
}

func (m *dynaMock) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
	return &dynamodb.PutItemOutput{}, nil
}

func main() {
	// Create the broker client.
	b := broker.New(
		logrus.StandardLogger(),
		&message.NoOpValidatorImpl{},
		&sqsMock{},
		"http://localhost:4576/queue/main",
		&snsMock{},
		"arn:aws:sns:us-east-1:123456789012:main",
		"arn:aws:sns:us-east-1:123456789012:invalid",
		"arn:aws:sns:us-east-1:123456789012:error",
		&dynaMock{},
		"local_data_repository",
		prometheus.NewCounter(prometheus.CounterOpts{}),
	)

	var wg sync.WaitGroup
	wg.Add(1)

	// We can subscribe a handler for a particular message type. The handler
	// is executed by the broker as soon as the message is received.
	b.Subscribe(message.MessageTypeEnum_MetadataCreate, func(m *message.Message) error {
		defer wg.Done()
		fmt.Println("[MetadataCreate] Message received!")
		return nil
	})

	// Run the broker client.
	go b.Run()

	// We can use the broker client to publish messages too.
	b.Metadata.Create(context.TODO(), &message.MetadataCreateRequest{})

	// Stop the broker client - but not until our handler runs.
	wg.Wait()
	b.Stop()

}
Output:

[MetadataCreate] Message received!

func (*Broker) Request

func (b *Broker) Request(_ context.Context, msg *message.Message) error

Request sends a fire-and-forget request to RDSS.

func (*Broker) RequestResponse

func (b *Broker) RequestResponse(context.Context, *message.Message) (*message.Message, error)

RequestResponse sends a request and waits until a response is received.

func (*Broker) Run

func (b *Broker) Run()

Run starts the processing.

func (*Broker) Stop

func (b *Broker) Stop()

Stop blocks until the broker terminates.

func (*Broker) Subscribe

func (s *Broker) Subscribe(t message.MessageTypeEnum, h MessageHandler)

Subscribe a handler to a specific message type.

type MessageHandler

type MessageHandler func(msg *message.Message) error

MessageHandler is a function supplied by message subscribers.

type MetadataService

MetadataService generates Metadata-type messages.

type MetadataServiceOp

type MetadataServiceOp struct {
	// contains filtered or unexported fields
}

MetadataServiceOp implements MetadataService.

func (*MetadataServiceOp) Create

Create publishes a MetadataCreate message.

func (*MetadataServiceOp) Delete

Delete publishes a MetadataDelete message.

func (*MetadataServiceOp) Read

Read publishes a MetadataRead message.

func (*MetadataServiceOp) Update

Update publishes a MetadataUpdate message.

type PreservationService

type PreservationService interface {
	Event(context.Context, *message.PreservationEventRequest) error
}

PreservationService publishes Preservation-type messages.

type PreservationServiceOp

type PreservationServiceOp struct {
	// contains filtered or unexported fields
}

PreservationServiceOp implements PreservationService.

func (*PreservationServiceOp) Event

Event publishes a PreservationEvent message.

Directories

Path Synopsis
Package message provide types and functions to work with the RDSS API.
Package message provide types and functions to work with the RDSS API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL