Documentation ¶
Overview ¶
Package server is the base server for the pmap event ingestion.
Index ¶
Constants ¶
const ( MetadataKeyGitHubCommit = "github-commit" MetadataKeyGitHubRepo = "github-repo" MetadataKeyWorkflow = "github-workflow" MetadataKeyWorkflowSha = "github-workflow-sha" MetadataKeyWorkflowTriggeredTimestamp = "github-workflow-triggered-timestamp" MetadataKeyWorkflowRunID = "github-run-id" MetadataKeyWorkflowRunAttempt = "github-run-attempt" GCSPathSeparatorKey = "/gh-prefix/" )
These are metadatas for GCS objects that were uploaded. These customs keys are defined in snapshot-file-change and snapshot-file-copy workflow. https://github.com/abcxyz/pmap/blob/main/.github/workflows/snapshot-file-change.yml#L74-L78
const ( MaxTopicDataBytes = 10_000_000 MaxTopicAttrValueBytes = 1024 )
const (
// AttrKeyProcessErr is the attribute key for process error.
AttrKeyProcessErr = "ProcessErr"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventHandler ¶
type EventHandler[T any, P ProtoWrapper[T]] struct { // contains filtered or unexported fields }
EventHandler retrieves GCS objects upon receiving GCS notifications via Pub/Sub, calls a list of processors to process the objects, and lastly passes the objects downstream. The successMessenger handles successfully processed objects, the failureMessenger handles failure events.
The GCS object could be any proto message type. But an instance of Handler can only handle one type of proto message.
func NewHandler ¶
func NewHandler[T any, P ProtoWrapper[T]](ctx context.Context, ps []Processor[P], successMessenger Messenger, opts ...Option) (*EventHandler[T, P], error)
Create a new Handler with the given processors, successMessenger, and handler options. failureMessenger will default to NoopMessenger if not provided.
// Assume you have processor to handle structpb.Struct. type MyProcessor struct {} func (p *MyProcessor) Process(context.Context, *structpb.Struct) error { return nil } // You can create a handler for that type of processors. h := NewHandler(ctx, []Processor[*structpb.Struct]{&MyProcessor{}}, msgr, opts...)
func (*EventHandler[T, P]) HTTPHandler ¶
func (h *EventHandler[T, P]) HTTPHandler() http.Handler
HTTPHandler provides an http.Handler that accepts GCS notifications in HTTP requests and calls [Handle] to handle the events.
Object's metadata change will be included in payload of the notification.
func (*EventHandler[T, P]) Handle ¶
Handle retrieves a GCS object with the given GCS notification, processes the object with the list of processors, and passes it downstream.
type HandlerConfig ¶
type HandlerConfig struct { Port string `env:"PORT,default=8080"` ProjectID string `env:"PROJECT_ID,required"` SuccessTopicID string `env:"PMAP_SUCCESS_TOPIC_ID,required"` // FailureTopicID is optional for policy service FailureTopicID string `env:"PMAP_FAILURE_TOPIC_ID"` }
HandlerConfig defines the set over environment variables required for running this application.
func (*HandlerConfig) ToFlags ¶
func (cfg *HandlerConfig) ToFlags(set *cli.FlagSet) *cli.FlagSet
ToFlags binds the config to the give cli.FlagSet and returns it.
func (*HandlerConfig) Validate ¶
func (cfg *HandlerConfig) Validate() error
Validate validates the handler config after load.
type HandlerOpts ¶
type HandlerOpts struct {
// contains filtered or unexported fields
}
HandlerOpts available when creating an EventHandler such as GCS storage client and Messenger for failure events.
type MappingHandlerConfig ¶ added in v0.0.3
type MappingHandlerConfig struct { // DefaultResourceScope is the default resource scope to search resources. // This is only used for global resources such as GCS bucket. // Please make sure the Service Account used in the Cloud Run service for // Data Mapping is granted the 'roles/cloudasset.viewer' to the corresponding // scope level. DefaultResourceScope string `env:"PMAP_MAPPING_DEFAULT_RESOURCE_SCOPE,required"` HandlerConfig }
MappingConfig defines the environment variables required for running mapping service.
func (*MappingHandlerConfig) ToFlags ¶ added in v0.0.3
func (cfg *MappingHandlerConfig) ToFlags(set *cli.FlagSet) *cli.FlagSet
func (*MappingHandlerConfig) Validate ¶ added in v0.0.3
func (cfg *MappingHandlerConfig) Validate() (retErr error)
ValidateMappingConfig validates the handler config for mapping service after load.
type NoopMessenger ¶
type NoopMessenger struct{}
NoopMessenger is a no-op implementation of Messenger interface.
type Option ¶
type Option func(context.Context, *HandlerOpts) (*HandlerOpts, error)
Define your option to change HandlerOpts.
func WithFailureMessenger ¶
WithFailureMessenger returns an option to set the Messenger for unsuccessfully processed pmap event when creating an EventHandler.
func WithStorageClient ¶
WithStorageClient returns an option to set the GCS storage client when creating an EventHandler.
type Processor ¶
A generic interface for processing proto messages. Any type that processes proto can implement this interface.
For example, someProcessor implements Process(context.Context, *structpb.Struct) is of type Processor[*structpb.Struct].
type ProtoWrapper ¶
Wrap the proto message interface. This helps to use generics to initialize proto messages without knowing their types.
type PubSubMessage ¶
type PubSubMessage struct { Message struct { Data []byte `json:"data,omitempty"` Attributes map[string]string `json:"attributes"` } `json:"message"` Subscription string `json:"subscription"` }
PubSubMessage is the payload of a [Pub/Sub message].
GCS objects' custom metadata will be included in [Data]. [Attributes]: includes bucketID and objectID info. [Pub/Sub message]: https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage [Data]: https://cloud.google.com/storage/docs/json_api/v1/objects#resource-representations
type PubSubMessenger ¶
type PubSubMessenger struct {
// contains filtered or unexported fields
}
PubSubMessenger implements the Messenger interface for Google Cloud PubSub.
func NewPubSubMessenger ¶
func NewPubSubMessenger(topic *pubsub.Topic) *PubSubMessenger
NewPubSubMessenger creates a new instance of the PubSubMessenger.
type StoppableProcessor ¶
StoppableProcessor is the interface to processors that are stoppable.