Documentation ¶
Index ¶
- Constants
- func GetNotificationID(bucketName, objectPrefix string) string
- type CallbackFn
- type EventBus
- type MemEventBus
- func (e *MemEventBus) Publish(channel string, arg interface{}, globally bool)
- func (e *MemEventBus) PublishStorageEvent(evtData *StorageEvent)
- func (e *MemEventBus) RegisterStorageEvents(bucketName string, objectPrefix string, objectRegEx *regexp.Regexp, ...) (string, error)
- func (e *MemEventBus) SubscribeAsync(channel string, callback CallbackFn)
- type NotificationsMap
- type StorageEvent
Constants ¶
const ( // SYN_STORAGE_EVENT is the event type for synthetic storage events that are sent via the // PublishStorageEvent function. SYN_STORAGE_EVENT = "eventbus:synthetic-storage-event" )
Variables ¶
This section is empty.
Functions ¶
func GetNotificationID ¶
GetNotificationID returns a string that is a combination of a bucket and an object prefix representing a server-side subscription to storage events.
Types ¶
type CallbackFn ¶
type CallbackFn func(data interface{})
CallbackFn defines the signature of all callback functions used for callbacks by the EventBus interface.
type EventBus ¶
type EventBus interface { // Publish sends the given data to all functions that have // registered for the given channel. Each callback function is // called on a separate go-routine. // globally indicates whether the event should distributed across machines // if the event bus implementation support this. It is ignored otherwise. // If the message cannot be sent for some reason an error will be logged. Publish(channelID string, data interface{}, globally bool) // SubscribeAsync allows to register a callback function for the given // channel. It is assumed that the subscriber and publisher know what // types are sent on each channel. SubscribeAsync(channelID string, callback CallbackFn) // RegisterStorageEvents registers to receive storage events for the given // bucket. // bucketName - global name of the target bucket // objectPrefix - filter objects (server side) that have this prefix. // objectRegEx - only include objects where the name matches this regular // expression (can be nil). Client side filtering. // client - Google storage client that has permission to create a // pubsub based event subscription for the given bucket. // // Returns: channel ID to use in the SubscribeAsync call to receive events // for this combination of (bucketName, objectPrefix, objectRegEx), e.g. // // chanID := RegisterStorageEvents("bucket-name", "tests", regexp.MustCompile(`\.json$`)) // eventBus.SubscribeAsync(chanID, func(data interface{}) { // storageEvtData := data.(*eventbus.StorageEvent) // // ... handle the storage event ... // }) // // // Note: objectPrefix filters events on the server side, i.e. they never reach // cause a PubSub event to be fired. objectRegEx filter events on the // client side by matching against an objects name, // e.g. ".*\.json$" would only include JSON files. // Currently it is implied that the GCS event type is always // storage.ObjectFinalizeEvent which indicates that an object was created. // RegisterStorageEvents(bucketName string, objectPrefix string, objectRegEx *regexp.Regexp, client *storage.Client) (string, error) // PublishStorageEvent publishes a synthetic storage event that is handled by // registered storage event handlers. All storage events are global. PublishStorageEvent(evtData *StorageEvent) }
EventBus defines an interface for a generic event that allows to send arbitrary data on multiple channels.
type MemEventBus ¶
type MemEventBus struct {
// contains filtered or unexported fields
}
MemEventBus implement the EventBus interface for an in-process event bus.
func (*MemEventBus) Publish ¶
func (e *MemEventBus) Publish(channel string, arg interface{}, globally bool)
Publish implements the EventBus interface.
func (*MemEventBus) PublishStorageEvent ¶
func (e *MemEventBus) PublishStorageEvent(evtData *StorageEvent)
PublishStorageEvent implements the EventBus interface.
func (*MemEventBus) RegisterStorageEvents ¶
func (e *MemEventBus) RegisterStorageEvents(bucketName string, objectPrefix string, objectRegEx *regexp.Regexp, client *storage.Client) (string, error)
RegisterStorageEvent implements the EventBus interface.
func (*MemEventBus) SubscribeAsync ¶
func (e *MemEventBus) SubscribeAsync(channel string, callback CallbackFn)
SubscribeAsync implements the EventBus interface.
type NotificationsMap ¶
type NotificationsMap struct {
// contains filtered or unexported fields
}
NotificationsMap is a helper type that keep track of storage events. It is intended to be used by the MemEventBus and distEventBus (see gevent package) implementations of EventBus
It assumes that storage events mainly consist of buckets and objects and related meta data.
It uses the notion of a 'notification ID' which is a combination of a bucket and object prefix to keep track of server side storage events. Regular expressions are used to filter storage events on the client side. A channel ID (as defined by the EventBus interface) is a prefixed combination of the notification ID and a regular expression. For each notification ID (= a server side subscription to storage events) there can be an arbitrary number of regular expressions.
NotificationsMap keeps track of the notification IDs and the associated regular expressions. It can then be used to match storage events against notification IDs (= subscriptions) and their regular expressions.
func NewNotificationsMap ¶
func NewNotificationsMap() *NotificationsMap
NewNotifications creates a new instance of NotificationsMap
func (*NotificationsMap) Add ¶
func (n *NotificationsMap) Add(notifyID string, objectRegEx *regexp.Regexp) string
Add adds a notification to the map that consists of a notification id (created via GetNotificationID) and regular expression. The regex can be nil. If not nil, it will be used for client side filtering of object IDs that are delivered by events. It returns a channelID that should be used as the return value of the RegisterStorageEvents(...) method that called Add(...) in the first place.
func (*NotificationsMap) Matches ¶
func (n *NotificationsMap) Matches(bucketID, objectID string) []string
Matches checks whether the given bucketID and objectID are in the recorded list of notifications and the regular expressions associated with them. It returns the channel IDs that match the found events.
func (*NotificationsMap) MatchesByID ¶
func (n *NotificationsMap) MatchesByID(notificationID, objectID string) []string
MatchesByID assumes that the given objectID matches the object prefix encoded in notification ID. This is usually the case when the objectID was delivered as a PubSub event together with the notificationID (see the gevent package as an example). It will then check whether the objectID matches the regular expressions associated with the notification id. It returns a list of channel IDs to which events should be sent.
type StorageEvent ¶
type StorageEvent struct { // GCSEventType is the event type supplied by GCS. // See https://cloud.google.com/storage/docs/pubsub-notifications#events GCSEventType string // BucketID is the name of the bucket that create the event. BucketID string // ObjectID is the name/path of the object that triggered the event. ObjectID string // The generation number of the object that was overwritten by the object // that this notification pertains to. This attribute only appears in // OBJECT_FINALIZE events in the case of an overwrite. OverwroteGeneration string // MD5 is the MD5 hash of the object as a hex encoded string. MD5 string // TimeStamp is the time of the last update in Unix time (seconds since the epoch). TimeStamp int64 }
StorageEvent is the type of object that is published by GCS storage events. Note: These events need to be registered with RegisterStorageEvents.
func NewStorageEvent ¶
func NewStorageEvent(bucketID, objectID string, lastUpdated int64, md5 string) *StorageEvent
NewStorageEvent is a convenience method to create a new StorageEvent. Currently all instances have storage.ObjectFinalizeEvent as GCSEventType. This indicates a new object being created.