eventbus

package
v0.0.0-...-03d6fc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2019 License: BSD-3-Clause Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// SYN_STORAGE_EVENT is the event type for synthetic storage events that are sent via the
	// PublishStorageEvent function.
	SYN_STORAGE_EVENT = "eventbus:synthetic-storage-event"
)

Variables

This section is empty.

Functions

func GetNotificationID

func GetNotificationID(bucketName, objectPrefix string) string

GetNotificationID returns a string that is a combination of a bucket and an object prefix representing a server-side subscription to storage events.

Types

type CallbackFn

type CallbackFn func(data interface{})

CallbackFn defines the signature of all callback functions used for callbacks by the EventBus interface.

type EventBus

type EventBus interface {
	// Publish sends the given data to all functions that have
	// registered for the given channel. Each callback function is
	// called on a separate go-routine.
	// globally indicates whether the event should distributed across machines
	// if the event bus implementation support this. It is ignored otherwise.
	// If the message cannot be sent for some reason an error will be logged.
	Publish(channelID string, data interface{}, globally bool)

	// SubscribeAsync allows to register a callback function for the given
	// channel. It is assumed that the subscriber and publisher know what
	// types are sent on each channel.
	SubscribeAsync(channelID string, callback CallbackFn)

	// RegisterStorageEvents registers to receive storage events for the given
	// bucket.
	//  bucketName - global name of the target bucket
	//  objectPrefix - filter objects (server side) that have this prefix.
	//  objectRegEx - only include objects where the name matches this regular
	//                expression (can be nil). Client side filtering.
	//  client - Google storage client that has permission to create a
	//           pubsub based event subscription for the given bucket.
	//
	// Returns: channel ID to use in the SubscribeAsync call to receive events
	//          for this combination of (bucketName, objectPrefix, objectRegEx), e.g.
	//
	//    chanID := RegisterStorageEvents("bucket-name", "tests", regexp.MustCompile(`\.json$`))
	//    eventBus.SubscribeAsync(chanID, func(data interface{}) {
	//       storageEvtData := data.(*eventbus.StorageEvent)
	//
	//        ... handle the storage event ...
	//    })
	//
	//
	// Note: objectPrefix filters events on the server side, i.e. they never reach
	//       cause a PubSub event to be fired. objectRegEx filter events on the
	//       client side by matching against an objects name,
	//       e.g. ".*\.json$" would only include JSON files.
	//       Currently it is implied that the GCS event type is always
	//       storage.ObjectFinalizeEvent which indicates that an object was created.
	//
	RegisterStorageEvents(bucketName string, objectPrefix string, objectRegEx *regexp.Regexp, client *storage.Client) (string, error)

	// PublishStorageEvent publishes a synthetic storage event that is handled by
	// registered storage event handlers. All storage events are global.
	PublishStorageEvent(evtData *StorageEvent)
}

EventBus defines an interface for a generic event that allows to send arbitrary data on multiple channels.

func New

func New() EventBus

New returns a new in-process event bus that can used to notify different components about events.

type MemEventBus

type MemEventBus struct {
	// contains filtered or unexported fields
}

MemEventBus implement the EventBus interface for an in-process event bus.

func (*MemEventBus) Publish

func (e *MemEventBus) Publish(channel string, arg interface{}, globally bool)

Publish implements the EventBus interface.

func (*MemEventBus) PublishStorageEvent

func (e *MemEventBus) PublishStorageEvent(evtData *StorageEvent)

PublishStorageEvent implements the EventBus interface.

func (*MemEventBus) RegisterStorageEvents

func (e *MemEventBus) RegisterStorageEvents(bucketName string, objectPrefix string, objectRegEx *regexp.Regexp, client *storage.Client) (string, error)

RegisterStorageEvent implements the EventBus interface.

func (*MemEventBus) SubscribeAsync

func (e *MemEventBus) SubscribeAsync(channel string, callback CallbackFn)

SubscribeAsync implements the EventBus interface.

type NotificationsMap

type NotificationsMap struct {
	// contains filtered or unexported fields
}

NotificationsMap is a helper type that keep track of storage events. It is intended to be used by the MemEventBus and distEventBus (see gevent package) implementations of EventBus

It assumes that storage events mainly consist of buckets and objects and related meta data.

It uses the notion of a 'notification ID' which is a combination of a bucket and object prefix to keep track of server side storage events. Regular expressions are used to filter storage events on the client side. A channel ID (as defined by the EventBus interface) is a prefixed combination of the notification ID and a regular expression. For each notification ID (= a server side subscription to storage events) there can be an arbitrary number of regular expressions.

NotificationsMap keeps track of the notification IDs and the associated regular expressions. It can then be used to match storage events against notification IDs (= subscriptions) and their regular expressions.

func NewNotificationsMap

func NewNotificationsMap() *NotificationsMap

NewNotifications creates a new instance of NotificationsMap

func (*NotificationsMap) Add

func (n *NotificationsMap) Add(notifyID string, objectRegEx *regexp.Regexp) string

Add adds a notification to the map that consists of a notification id (created via GetNotificationID) and regular expression. The regex can be nil. If not nil, it will be used for client side filtering of object IDs that are delivered by events. It returns a channelID that should be used as the return value of the RegisterStorageEvents(...) method that called Add(...) in the first place.

func (*NotificationsMap) Matches

func (n *NotificationsMap) Matches(bucketID, objectID string) []string

Matches checks whether the given bucketID and objectID are in the recorded list of notifications and the regular expressions associated with them. It returns the channel IDs that match the found events.

func (*NotificationsMap) MatchesByID

func (n *NotificationsMap) MatchesByID(notificationID, objectID string) []string

MatchesByID assumes that the given objectID matches the object prefix encoded in notification ID. This is usually the case when the objectID was delivered as a PubSub event together with the notificationID (see the gevent package as an example). It will then check whether the objectID matches the regular expressions associated with the notification id. It returns a list of channel IDs to which events should be sent.

type StorageEvent

type StorageEvent struct {
	// GCSEventType is the event type supplied by GCS.
	// See https://cloud.google.com/storage/docs/pubsub-notifications#events
	GCSEventType string

	// BucketID is the name of the bucket that create the event.
	BucketID string

	// ObjectID is the name/path of the object that triggered the event.
	ObjectID string

	// The generation number of the object that was overwritten by the object
	// that this notification pertains to. This attribute only appears in
	// OBJECT_FINALIZE events in the case of an overwrite.
	OverwroteGeneration string

	// MD5 is the MD5 hash of the object as a hex encoded string.
	MD5 string

	// TimeStamp is the time of the last update in Unix time (seconds since the epoch).
	TimeStamp int64
}

StorageEvent is the type of object that is published by GCS storage events. Note: These events need to be registered with RegisterStorageEvents.

func NewStorageEvent

func NewStorageEvent(bucketID, objectID string, lastUpdated int64, md5 string) *StorageEvent

NewStorageEvent is a convenience method to create a new StorageEvent. Currently all instances have storage.ObjectFinalizeEvent as GCSEventType. This indicates a new object being created.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL