server

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package server is the base server for the pmap event ingestion.

Index

Constants

View Source
const (
	MetadataKeyGitHubCommit               = "github-commit"
	MetadataKeyGitHubRepo                 = "github-repo"
	MetadataKeyWorkflow                   = "github-workflow"
	MetadataKeyWorkflowSha                = "github-workflow-sha"
	MetadataKeyWorkflowTriggeredTimestamp = "github-workflow-triggered-timestamp"
	MetadataKeyWorkflowRunID              = "github-run-id"
	MetadataKeyWorkflowRunAttempt         = "github-run-attempt"
	GCSPathSeparatorKey                   = "/gh-prefix/"
)

These are metadatas for GCS objects that were uploaded. These customs keys are defined in snapshot-file-change and snapshot-file-copy workflow. https://github.com/abcxyz/pmap/blob/main/.github/workflows/snapshot-file-change.yml#L74-L78

View Source
const (
	MaxTopicDataBytes      = 10_000_000
	MaxTopicAttrValueBytes = 1024
)
View Source
const (
	// AttrKeyProcessErr is the attribute key for process error.
	AttrKeyProcessErr = "ProcessErr"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type EventHandler

type EventHandler[T any, P ProtoWrapper[T]] struct {
	// contains filtered or unexported fields
}

EventHandler retrieves GCS objects upon receiving GCS notifications via Pub/Sub, calls a list of processors to process the objects, and lastly passes the objects downstream. The successMessenger handles successfully processed objects, the failureMessenger handles failure events.

The GCS object could be any proto message type. But an instance of Handler can only handle one type of proto message.

func NewHandler

func NewHandler[T any, P ProtoWrapper[T]](ctx context.Context, ps []Processor[P], successMessenger Messenger, opts ...Option) (*EventHandler[T, P], error)

Create a new Handler with the given processors, successMessenger, and handler options. failureMessenger will default to NoopMessenger if not provided.

// Assume you have processor to handle structpb.Struct.
type MyProcessor struct {}
func (p *MyProcessor) Process(context.Context, *structpb.Struct) error { return nil }
// You can create a handler for that type of processors.
h := NewHandler(ctx, []Processor[*structpb.Struct]{&MyProcessor{}}, msgr, opts...)

func (*EventHandler[T, P]) HTTPHandler

func (h *EventHandler[T, P]) HTTPHandler() http.Handler

HTTPHandler provides an http.Handler that accepts GCS notifications in HTTP requests and calls [Handle] to handle the events.

Object's metadata change will be included in payload of the notification.

func (*EventHandler[T, P]) Handle

func (h *EventHandler[T, P]) Handle(ctx context.Context, m pubsub.Message) error

Handle retrieves a GCS object with the given GCS notification, processes the object with the list of processors, and passes it downstream.

type HandlerConfig

type HandlerConfig struct {
	Port           string `env:"PORT,default=8080"`
	ProjectID      string `env:"PROJECT_ID,required"`
	SuccessTopicID string `env:"PMAP_SUCCESS_TOPIC_ID,required"`
	// FailureTopicID is optional for policy service
	FailureTopicID string `env:"PMAP_FAILURE_TOPIC_ID"`
}

HandlerConfig defines the set over environment variables required for running this application.

func (*HandlerConfig) ToFlags

func (cfg *HandlerConfig) ToFlags(set *cli.FlagSet) *cli.FlagSet

ToFlags binds the config to the give cli.FlagSet and returns it.

func (*HandlerConfig) Validate

func (cfg *HandlerConfig) Validate() error

Validate validates the handler config after load.

type HandlerOpts

type HandlerOpts struct {
	// contains filtered or unexported fields
}

HandlerOpts available when creating an EventHandler such as GCS storage client and Messenger for failure events.

type MappingHandlerConfig added in v0.0.3

type MappingHandlerConfig struct {
	// DefaultResourceScope is the default resource scope to search resources.
	// This is only used for global resources such as GCS bucket.
	// Please make sure the Service Account used in the Cloud Run service for
	// Data Mapping is granted the 'roles/cloudasset.viewer' to the corresponding
	// scope level.
	DefaultResourceScope string `env:"PMAP_MAPPING_DEFAULT_RESOURCE_SCOPE,required"`
	HandlerConfig
}

MappingConfig defines the environment variables required for running mapping service.

func (*MappingHandlerConfig) ToFlags added in v0.0.3

func (cfg *MappingHandlerConfig) ToFlags(set *cli.FlagSet) *cli.FlagSet

func (*MappingHandlerConfig) Validate added in v0.0.3

func (cfg *MappingHandlerConfig) Validate() (retErr error)

ValidateMappingConfig validates the handler config for mapping service after load.

type Messenger

type Messenger interface {
	Send(context.Context, []byte, map[string]string) error
}

An interface for sending pmap event downstream.

type NoopMessenger

type NoopMessenger struct{}

NoopMessenger is a no-op implementation of Messenger interface.

func (*NoopMessenger) Send

func (m *NoopMessenger) Send(_ context.Context, _ []byte, _ map[string]string) error

type Option

type Option func(context.Context, *HandlerOpts) (*HandlerOpts, error)

Define your option to change HandlerOpts.

func WithFailureMessenger

func WithFailureMessenger(msger Messenger) Option

WithFailureMessenger returns an option to set the Messenger for unsuccessfully processed pmap event when creating an EventHandler.

func WithStorageClient

func WithStorageClient(client *storage.Client) Option

WithStorageClient returns an option to set the GCS storage client when creating an EventHandler.

type Processor

type Processor[P proto.Message] interface {
	Process(context.Context, P) error
}

A generic interface for processing proto messages. Any type that processes proto can implement this interface.

For example, someProcessor implements Process(context.Context, *structpb.Struct) is of type Processor[*structpb.Struct].

type ProtoWrapper

type ProtoWrapper[T any] interface {
	proto.Message
	*T
}

Wrap the proto message interface. This helps to use generics to initialize proto messages without knowing their types.

type PubSubMessage

type PubSubMessage struct {
	Message struct {
		Data       []byte            `json:"data,omitempty"`
		Attributes map[string]string `json:"attributes"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

PubSubMessage is the payload of a [Pub/Sub message].

GCS objects' custom metadata will be included in [Data]. [Attributes]: includes bucketID and objectID info. [Pub/Sub message]: https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage [Data]: https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations

type PubSubMessenger

type PubSubMessenger struct {
	// contains filtered or unexported fields
}

PubSubMessenger implements the Messenger interface for Google Cloud PubSub.

func NewPubSubMessenger

func NewPubSubMessenger(topic *pubsub.Topic) *PubSubMessenger

NewPubSubMessenger creates a new instance of the PubSubMessenger.

func (*PubSubMessenger) Send

func (p *PubSubMessenger) Send(ctx context.Context, data []byte, attr map[string]string) error

type StoppableProcessor

type StoppableProcessor[P proto.Message] interface {
	Stop() error
}

StoppableProcessor is the interface to processors that are stoppable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL