eventmaster

package module
v0.0.0-...-9440c28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2017 License: MIT Imports: 32 Imported by: 0

README

eventmaster

There are lots of events that happen in production such as deploys, service restarts, scale-up operations, etc. Right now these events happen, but we have no centralized place to track them -- which means we have no centralized place to search them. This project implements a service that:

  • Creating an event store
  • API for sending/firing events
  • Potential integrations into other services for searching etc. (e.g. grafana annotations)
  • Working with other pieces of infrastructure to expose meaningful events

Setup

Dependencies
  • Go tool (tested at v1.9.x)
  • Cassandra v3.10
Get Code
$ git clone \
    git@github.com:ContextLogic/eventmaster.git \
    $GOPATH/src/github.com/ContextLogic/eventmaster
Building

Dependencies are currently fetched using dep. These are set up as dependencies to the default make target, so running:

make

will emit $GOPATH/bin/eventmaster after fetching and compiling dependencies.

Running

After running make eventmaster can be called directly:

$ $GOPATH/bin/eventmaster

Giving no flags eventmaster runs with sane option.

Configuration

Various aspects of eventmaster can be controlled through a collection of flags and options in a json config file. This will be consolidated when issue #32 is resolved.

The config provided in etc/eventmaster.json encodes the default values assumed by eventmaster if run without specifying a config file (-c). If these settings do not work for your service, please modify etc/eventmaster.json to specify values such as the addresses of the Cassandra cluster along with other database options.

Of note if "cassandra_config":"service_name" is non-empty then eventmaster currently uses service lookup to find the IPs of the Cassandra cluster.

For example the port of the eventmaster server can be configured using the --port option, and if an alternate cassandra address needs to be specified adjust a eventmaster.json file and specify that it is used by providing the -c flag.

Open the eventmaster UI in your browser (default: http://localhost:50052). The UI can be used to create and query topics, data centers, and events.

Database Setup

Execute schema.cql on your Cassandra cluster. This will set up the event_master keyspace and event, temp_event, event_topic, and event_dc tables.

$ cqlsh -f schema.cql

The class and replication_factor can be modified as long as the consistency attribute is configured accordingly in the eventmaster.json used by eventmaster -c eventmaster.json.

Tests

Tests can be run (using the go tool) by calling:

$ make test

REST API

This is documented in the documentation subdirectory.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Git = "unset"

Git stores the commit used during build.

View Source
var Version = "unset"

Version maps to tagged releases of the software.

Functions

func PrintVersions

func PrintVersions()

PrintVersions writes version information for the running binary to stdout.

Types

type Annotation

type Annotation struct {
	// Name must match in the request and response
	Name string `json:"name"`

	Datasource string `json:"datasource"`
	IconColor  string `json:"iconColor"`
	Enable     bool   `json:"enable"`
	ShowLine   bool   `json:"showLine"`
	Query      string `json:"query"`
}

Annotation is the object passed by Grafana when it fetches annotations.

http://docs.grafana.org/plugins/developing/datasources/#annotation-query

type AnnotationQuery

type AnnotationQuery struct {
	Topic string `json:"topic"`
	DC    string `json:"dc"`
}

AnnotationQuery is a collection of possible filters for a Grafana annotation request.

These values are set in gear icon > annotations > edit > Query and can have such useful values as:

{"topic": "$topic", "dc": "$dc"}

or if you do not want filtering leave it blank.

type AnnotationResponse

type AnnotationResponse struct {
	// The original annotation sent from Grafana.
	Annotation Annotation `json:"annotation"`
	// Time since UNIX Epoch in milliseconds. (required)
	Time int64 `json:"time"`
	// The title for the annotation tooltip. (required)
	Title string `json:"title"`
	// Tags for the annotation. (optional)
	Tags string `json:"tags"`
	// Text for the annotation. (optional)
	Text string `json:"text"`
}

AnnotationResponse contains all the information needed to render an annotation event.

https://github.com/grafana/simple-json-datasource#annotation-api

func FromEvent

func FromEvent(store topicNamer, ev *Event) (AnnotationResponse, error)

FromEvent creates an AnnotationResponse formatted text from an Event.

type AnnotationsReq

type AnnotationsReq struct {
	Range      Range      `json:"range"`
	Annotation Annotation `json:"annotation"`
}

AnnotationsReq encodes the information provided by Grafana in its requests.

type AssetTemplate

type AssetTemplate struct {
	sync.RWMutex
	Asset func(name string) ([]byte, error)
	// contains filtered or unexported fields
}

AssetTemplate pulls templates from an assetfs.

func NewAssetTemplate

func NewAssetTemplate(asset func(string) ([]byte, error)) *AssetTemplate

NewAssetTemplate returns a ready-to-use AssetTemplate.

func (*AssetTemplate) Get

func (at *AssetTemplate) Get(name string) (*template.Template, error)

Get attempts to merge the requested name+.html with base.html found at root.

type CassandraConfig

type CassandraConfig struct {
	Addrs       []string `json:"addrs"`
	Keyspace    string   `json:"keyspace"`
	Consistency string   `json:"consistency"`
	Timeout     string   `json:"timeout"`
	ServiceName string   `json:"service_name"`
}

CassandraConfig defines the Cassandra-specific section of the eventmaster configuration file.

type CassandraStore

type CassandraStore struct {
	// contains filtered or unexported fields
}

CassandraStore is an implementation of DataStore that is backed by Cassandra.

func NewCassandraStore

func NewCassandraStore(c CassandraConfig) (*CassandraStore, error)

NewCassandraStore returns a working CassandraStore, or an error.

func (*CassandraStore) AddDC

func (c *CassandraStore) AddDC(dc DC) error

AddDC inserts dc into the event_dc table.

func (*CassandraStore) AddEvent

func (c *CassandraStore) AddEvent(evt *Event) error

AddEvent takes an *Event and stores it in Cassandra.

func (*CassandraStore) AddTopic

func (c *CassandraStore) AddTopic(t RawTopic) error

AddTopic inserts t into event_topic.

func (*CassandraStore) CloseSession

func (c *CassandraStore) CloseSession()

CloseSession closes the underlying session.

func (*CassandraStore) DeleteTopic

func (c *CassandraStore) DeleteTopic(id string) error

DeleteTopic removes the topic with the given id.

func (*CassandraStore) Find

func (c *CassandraStore) Find(q *eventmaster.Query, topicIDs []string, dcIDs []string) (Events, error)

Find searches using the Query, and filters topicIDs and dcIDs.

func (*CassandraStore) FindByID

func (c *CassandraStore) FindByID(id string, includeData bool) (*Event, error)

FindByID searches cassandra for an event by its id.

If includeData is true

func (*CassandraStore) FindIDs

func (c *CassandraStore) FindIDs(q *eventmaster.TimeQuery, stream HandleEvent) error

FindIDs traverses the temporal space defined by q day by day and calls stream function with each event ID found.

func (*CassandraStore) GetDCs

func (c *CassandraStore) GetDCs() ([]DC, error)

GetDCs returns all entries from the event_dc table.

func (*CassandraStore) GetTopics

func (c *CassandraStore) GetTopics() ([]Topic, error)

GetTopics returns all topics.

func (*CassandraStore) UpdateDC

func (c *CassandraStore) UpdateDC(id string, newName string) error

UpdateDC replaces the name for a given DC by id.

func (*CassandraStore) UpdateTopic

func (c *CassandraStore) UpdateTopic(t RawTopic) error

UpdateTopic performs a cql update with t aginst event_topic table.

type DC

type DC struct {
	ID   string `json:"dc_id"`
	Name string `json:"dc_name"`
}

DC represents a datacenter.

type DataStore

type DataStore interface {
	AddEvent(*Event) error
	Find(q *eventmaster.Query, topicIDs []string, dcIDs []string) (Events, error)
	FindByID(string, bool) (*Event, error)
	FindIDs(*eventmaster.TimeQuery, HandleEvent) error
	GetTopics() ([]Topic, error)
	AddTopic(RawTopic) error
	UpdateTopic(RawTopic) error
	DeleteTopic(string) error
	GetDCs() ([]DC, error)
	AddDC(DC) error
	UpdateDC(string, string) error
	CloseSession()
}

DataStore defines the interface needed to be used as a backing store for eventmaster.

A few examples include CassandraStore and MockDataStore.

type Disk

type Disk struct {
	Root string
}

Disk keeps track of the location of the root directory of the template directory.

func (Disk) Get

func (d Disk) Get(name string) (*template.Template, error)

Get attempts to merge the requested name+.html with base.html found at root.

type Event

type Event struct {
	EventID       string                 `json:"event_id"`
	ParentEventID string                 `json:"parent_event_id"`
	EventTime     int64                  `json:"event_time"`
	DCID          string                 `json:"dc_id"`
	TopicID       string                 `json:"topic_id"`
	Tags          []string               `json:"tag_set"`
	Host          string                 `json:"host"`
	TargetHosts   []string               `json:"target_host_set"`
	User          string                 `json:"user"`
	Data          map[string]interface{} `json:"data"`
	ReceivedTime  int64                  `json:"received_time"`
}

Event is the representation of an event across the DataStore boundary.

type EventResult

type EventResult struct {
	EventID       string                 `json:"event_id"`
	ParentEventID string                 `json:"parent_event_id"`
	EventTime     int64                  `json:"event_time"`
	DC            string                 `json:"dc"`
	TopicName     string                 `json:"topic_name"`
	Tags          []string               `json:"tag_set"`
	Host          string                 `json:"host"`
	TargetHosts   []string               `json:"target_host_set"`
	User          string                 `json:"user"`
	Data          map[string]interface{} `json:"data"`
	ReceivedTime  int64                  `json:"received_time"`
}

EventResult is the json-serializable version of an Event.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the in-memory cache of lookups between various pieces of information, such as topic id <-> topic name.

func NewEventStore

func NewEventStore(ds DataStore) (*EventStore, error)

NewEventStore initializes an EventStore.

func (*EventStore) AddDC

func (es *EventStore) AddDC(dc *eventmaster.DC) (string, error)

AddDC stores dc, returning the ID and an error if there was one.

func (*EventStore) AddEvent

func (es *EventStore) AddEvent(event *UnaddedEvent) (string, error)

AddEvent stores event in the DataStore.

func (*EventStore) AddTopic

func (es *EventStore) AddTopic(topic Topic) (string, error)

AddTopic adds topic to the DataStore.

func (*EventStore) CloseSession

func (es *EventStore) CloseSession()

CloseSession closes the underlying DataStore session.

func (*EventStore) DeleteTopic

func (es *EventStore) DeleteTopic(deleteReq *eventmaster.DeleteTopicRequest) error

DeleteTopic removes the Topic with the name in deletereq

func (*EventStore) Find

func (es *EventStore) Find(q *eventmaster.Query) (Events, error)

Find performs validation and sorting around calling the underlying DataStore.

func (*EventStore) FindByID

func (es *EventStore) FindByID(id string) (*Event, error)

FindByID gets an Event from the DataStore an updates defaults.

func (*EventStore) FindIDs

func (es *EventStore) FindIDs(q *eventmaster.TimeQuery, h HandleEvent) error

FindIDs validates input and calls stream on all found Events using the underlying DataStore.

func (*EventStore) GetDCs

func (es *EventStore) GetDCs() ([]DC, error)

GetDCs returns all stored datacenters.

func (*EventStore) GetTopics

func (es *EventStore) GetTopics() ([]Topic, error)

GetTopics retrieves all topics from the DataStore.

func (*EventStore) Update

func (es *EventStore) Update() error

Update reconstitutes internal memory caches with information in the DataStore.

func (*EventStore) UpdateDC

func (es *EventStore) UpdateDC(updateReq *eventmaster.UpdateDCRequest) (string, error)

UpdateDC validates updateReq, stores in both the DataStore and in-memory cache.

func (*EventStore) UpdateTopic

func (es *EventStore) UpdateTopic(oldName string, td Topic) (string, error)

UpdateTopic stores

type Events

type Events []*Event

Events is shorthand for a sortable slice of events.

func (Events) Len

func (evts Events) Len() int

func (Events) Less

func (evts Events) Less(i, j int) bool

func (Events) Swap

func (evts Events) Swap(i, j int)

type Flags

type Flags struct {
	Port int `long:"port" default:"50052" description:"Port for EventMaster gRPC + HTTP API"`

	ConfigFile string `short:"c" long:"config" description:"location of configuration file"`

	RsyslogServer bool `short:"r" long:"rsyslog_server" description:"Flag to start TCP rsyslog server"`
	RsyslogPort   int  `long:"rsyslog_port" default:"50053" description:"Port for rsyslog clients to send logs to"`

	CAFile   string `long:"ca_file" description:"PEM encoded CA's certificate file path"`
	CertFile string `long:"cert_file" description:"PEM encoded certificate file path"`
	KeyFile  string `long:"key_file" description:"PEM encoded private key file path"`

	StaticFiles string `short:"s" long:"static" description:"location of static files to use (instead of embedded files)"`
	Templates   string `short:"t" long:"templates" description:"location of template files to use (instead of embedded)"`
}

Flags is parsed to populate the flags for command eventmaster.

type GRPCServer

type GRPCServer struct {
	// contains filtered or unexported fields
}

GRPCServer implements gRPC endpoints.

func NewGRPCServer

func NewGRPCServer(config *Flags, s *EventStore) *GRPCServer

NewGRPCServer returns a populated

func (*GRPCServer) AddDC

AddDC is the gRPC version of adding a datacenter.

func (*GRPCServer) AddEvent

AddEvent adds an event to the datastore.

func (*GRPCServer) AddTopic

AddTopic is the gRPC verison of AddTopic.

func (*GRPCServer) DeleteTopic

DeleteTopic is the gRPC version of DeleteTopic.

func (*GRPCServer) GetDCs

GetDCs is the gRPC version of getting all datacenters.

func (*GRPCServer) GetEventByID

func (s *GRPCServer) GetEventByID(ctx context.Context, id *eventmaster.EventID) (*eventmaster.Event, error)

GetEventByID returns an event by id.

func (*GRPCServer) GetEventIDs

GetEventIDs returns all event ids.

func (*GRPCServer) GetEvents

GetEvents returns all Events.

func (*GRPCServer) GetTopics

GetTopics is the gRPC call that returns all topics.

func (*GRPCServer) Healthcheck

Healthcheck is the gRPC health endpoint.

func (*GRPCServer) UpdateDC

UpdateDC is the gRPC version of updating a datacenter.

func (*GRPCServer) UpdateTopic

UpdateTopic is the gRPC version of updating a topic.

type GetEventPageData

type GetEventPageData struct {
	Topics []Topic
	Query  *eventmaster.Query
}

GetEventPageData stores information renderd in the query form template.

type HandleEvent

type HandleEvent func(eventID string) error

HandleEvent defines a function for interacting with a stream of events one at a time.

type LogParser

type LogParser func(int64, string, string, string, string) *UnaddedEvent

LogParser defines a function that can be used to log an event.

It allows us to key on event topic and provide custom implementations.

type Pair

type Pair struct {
	// contains filtered or unexported fields
}

Pair represents a single string key to empty interface value mapping.

type Range

type Range struct {
	From time.Time `json:"from"`
	To   time.Time `json:"to"`
}

Range specifies the time range the request is valid for.

type RawTopic

type RawTopic struct {
	ID     string
	Name   string
	Schema string
}

RawTopic is a Topic but with an unparsed Schema.

type RsyslogServer

type RsyslogServer struct {
	// contains filtered or unexported fields
}

RsyslogServer implements an rsyslog endpoint.

func NewRsyslogServer

func NewRsyslogServer(s *EventStore, tlsConfig *tls.Config, port int) (*RsyslogServer, error)

NewRsyslogServer returns a populated rsyslog server.

func (*RsyslogServer) AcceptLogs

func (s *RsyslogServer) AcceptLogs()

AcceptLogs kickss off a goroutine that listens for connections and dispatches log requests.

func (*RsyslogServer) Stop

func (s *RsyslogServer) Stop() error

Stop terminates the underlying network connection.

type SearchResult

type SearchResult struct {
	Results []*EventResult `json:"results"`
}

SearchResult groups a slice of EventResult for http responses.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server implements http.Handler for the eventmaster http server.

func NewServer

func NewServer(store *EventStore, static, templates string) *Server

NewServer returns a ready-to-use Server that uses store, and the appropriate static and templates facilities.

If static or templates are non-empty then files are served from those locations (useful for development). Otherwise the server uses embedded static assets.

func (*Server) HandleCreatePage

func (s *Server) HandleCreatePage(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

HandleCreatePage deals with creating an event.

func (*Server) HandleDCPage

func (s *Server) HandleDCPage(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

HandleDCPage renders the datacenter page.

func (*Server) HandleGetEventPage

func (s *Server) HandleGetEventPage(w http.ResponseWriter, r *http.Request, ps httprouter.Params)

HandleGetEventPage renders all recent events to html.

func (*Server) HandleMainPage

func (s *Server) HandleMainPage(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

HandleMainPage redirects to /event.

func (*Server) HandleTopicPage

func (s *Server) HandleTopicPage(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

HandleTopicPage renders the topic page.

func (*Server) ServeHTTP

func (srv *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP dispatches to the underlying router.

type StatusRecorder

type StatusRecorder struct {
	http.ResponseWriter
	// contains filtered or unexported fields
}

StatusRecorder is a simple http status recorder

func NewStatusRecorder

func NewStatusRecorder(w http.ResponseWriter) *StatusRecorder

NewStatusRecorder returns an initialized StatusRecorder, with 200 as the default status.

func (*StatusRecorder) Status

func (sr *StatusRecorder) Status() int

Status returns the cached http status value.

func (*StatusRecorder) WriteHeader

func (sr *StatusRecorder) WriteHeader(status int)

WriteHeader caches the status, then calls the underlying ResponseWriter.

type TemplateGetter

type TemplateGetter interface {
	Get(name string) (*template.Template, error)
}

TemplateGetter defines what needs to be implemented to be able to fetch templates for use by a Site.

type TemplateRequest

type TemplateRequest struct {
	Target string `json:"target"`
}

TemplateRequest is used for parsing Grafana requests for template variable names.

For example at the current time Target can be either "dc" or "topic".

type Topic

type Topic struct {
	ID     string                 `json:"topic_id"`
	Name   string                 `json:"topic_name"`
	Schema map[string]interface{} `json:"data_schema"`
}

Topic represents a topic.

type UnaddedEvent

type UnaddedEvent struct {
	ParentEventID string                 `json:"parent_event_id"`
	EventTime     int64                  `json:"event_time"`
	DC            string                 `json:"dc"`
	TopicName     string                 `json:"topic_name"`
	Tags          []string               `json:"tag_set"`
	Host          string                 `json:"host"`
	TargetHosts   []string               `json:"target_host_set"`
	User          string                 `json:"user"`
	Data          map[string]interface{} `json:"data"`
}

UnaddedEvent is an internal structrue that hasn't yet been augmented.

See augmentEvent below.

Directories

Path Synopsis
cmd
Package jh implements a shim layer between functions that return values and errors and the httprouter.Handle function signature.
Package jh implements a shim layer between functions that return values and errors and the httprouter.Handle function signature.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL