segment

package module
v0.0.0-...-affc048 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: MIT Imports: 20 Imported by: 1

README

Segment

Segment.io compatible server written in go.

Introduction

Segment is a cloud based analytics platform for tracking events from your application.

It has a well designed specific that supports APIs:

  • Page: what web page are they on?
  • Track: what are they doing?
  • Identify: who is the customer?
  • Alias: what was their past identity?
  • Group: what account or organization are they part of?
  • Screen: what app screen are they on?

This go library implements endpoints for all of these APIs.

Getting Started

These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system.

Prerequisites

This library uses the gorilla mux library for attaching http handlers, and prometheus for monitoring.

go get -u github.com/gorilla/mux
go get -u github.com/prometheus/client_golang
Installing

This library is installed as package segment.

go get -u  github.com/brightsparc/segment

Examples

Create a new Segment listener by providing a function to return projectId from writeKey. For unknown writeKey values, return empty string to have endpoint return 400 back request. Configure one or more destinations, this example includes forwarded to segment cloud, and firehose stream.

package main

import (
	"context"
	"log"
	"net/http"

	"github.com/brightsparc/segment"

	"github.com/gorilla/mux"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
	projectId := func(writeKey string) string {
		return "xxxx" // TODO: Match this with your writeKey for authorisation
	}
	destinations := []segment.Destination{
		segment.NewForwarder("https://api.segment.io/v1/batch"),
		segment.NewDelivery(&segment.DeliveryConfig{
			StreamRegion: "us-west-2",
			StreamName:   "stream-name", // Will attempt to create if not exists
		}),
	}

	router := mux.NewRouter()
	router.Handle("/metrics", promhttp.Handler()) // prometheus metrics endpoint
	sr := router.PathPrefix("/v1").Subrouter()    // Will create endpoints /v1/batch etc
	seg := segment.NewSegment(projectId, destinations, sr)
	go seg.Run(context.Background())

	log.Println("Listening on :8000")
	log.Fatal(http.ListenAndServe(":8000", router))
}

Implementation Details

Send messages

The segment Send method will execute Send method on each destination in order, and return on error. It is recommended to implement a queue as per the Delivery process, the Forwarder should only be used for testing.

Background process
  • The segment Run method launches a go-routine for each destination, accepts a context to end these processes.
  • The Delivery process attempts to connect to a region + stream, and optionally accepts an endpoint for testing. It requires AWS credentials to be set, and exit after 3 failed attempts. Batches of up to 500 messages at send every 30 seconds by default.
Logging

The Segment class will log to standard error by default, but can be configured by the Logger property.

Monitoring

The prometheus client is enabled to return http and delivery metrics.

Authors

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Delivery

type Delivery struct {
	Logger *log.Logger // Public logger that caller can override
	// contains filtered or unexported fields
}

Delivery is destination for AWS firehose

func NewDelivery

func NewDelivery(config *DeliveryConfig) *Delivery

NewDelivery creates a new delivery stream given configuration

func (*Delivery) Connect

func (d *Delivery) Connect() error

Connect connects to firehose and describes or creates stream

func (*Delivery) Process

func (d *Delivery) Process(ctx context.Context) error

Process handles the messages

func (*Delivery) Send

func (d *Delivery) Send(ctx context.Context, message interface{}) error

Send pushes the message onto the queue

func (*Delivery) WithLogger

func (d *Delivery) WithLogger(logger *log.Logger) Destination

WithLogger adds optional logging

type DeliveryConfig

type DeliveryConfig struct {
	StreamEndpoint string        `json:"streamEndpoint,omitempty"`
	StreamRegion   string        `json:"streamRegion"`
	StreamName     string        `json:"streamName"`
	BatchSize      int           `json:"batchSize,omitempty"`
	FlushInterval  time.Duration `json:"flushInterval,omitempty"`
}

DeliveryConfig contains configuration parameters including optional endpint

type Destination

type Destination interface {
	Process(ctx context.Context) error
	Send(ctx context.Context, message interface{}) error
	WithLogger(logger *log.Logger) Destination
}

Destination interface has a blocking Process method, and Send method

type Forwarder

type Forwarder struct {
	Logger *log.Logger // Public logger that caller can override
	// contains filtered or unexported fields
}

Forwarder type

func NewForwarder

func NewForwarder(endpoint string) *Forwarder

NewForwarder creates a new forwarder given endpoint

func (*Forwarder) Process

func (f *Forwarder) Process(ctx context.Context) error

Process forwards messages

func (*Forwarder) Send

func (f *Forwarder) Send(ctx context.Context, message interface{}) error

Send pushes messages onto queue

func (*Forwarder) WithLogger

func (f *Forwarder) WithLogger(logger *log.Logger) Destination

WithLogger initializes with logger

type ProjectId

type ProjectId func(writeKey string) string

ProjectId is the func definition to return string based on writeKey

type Segment

type Segment struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

Segment is intialized with proejctId and destinations

func NewSegment

func NewSegment(projectId ProjectId, destinations []Destination, router *mux.Router) *Segment

NewSegment create new segment handler given project and delivery config

func (*Segment) Run

func (s *Segment) Run(ctx context.Context)

Run this as go-routine to processes the messages, and optionally send updates

func (*Segment) WithLogger

func (s *Segment) WithLogger(logger *log.Logger) *Segment

WithLogger propogates the logger down to destinations

type SegmentBatch

type SegmentBatch struct {
	MessageId string                 `json:"messageId,omitempty"`
	Timestamp time.Time              `json:"timestamp,omitempty"`
	SentAt    time.Time              `json:"sentAt,omitempty"`
	Context   map[string]interface{} `json:"context,omitempty"`
	Messages  []SegmentMessage       `json:"batch"`
}

SegmentBatch contains batch of messages

type SegmentEvent

type SegmentEvent struct {
	WriteKey string `json:"writeKey,omitempty"` // Read clear, and set proejctId
	SegmentMessage
}

SegmentEvent is single message with write key

type SegmentMessage

type SegmentMessage struct {
	MessageId    string                 `json:"messageId"`
	Timestamp    time.Time              `json:"timestamp"`
	SentAt       time.Time              `json:"sentAt,omitempty"`
	ProjectId    string                 `json:"projectId"`
	Type         string                 `json:"type"`
	Context      map[string]interface{} `json:"context,omitempty"` // Duplicate here for batch
	Properties   map[string]interface{} `json:"properties,omitempty"`
	Traits       map[string]interface{} `json:"traits,omitempty"`
	Integrations map[string]interface{} `json:"integrations,omitempty"` // Probably won't use
	AnonymousId  string                 `json:"anonymousId,omitempty"`
	UserId       string                 `json:"userId,omitempty"`
	Event        string                 `json:"event,omitempty"`    // Track only
	Category     string                 `json:"category,omitempty"` // Page only
	Name         string                 `json:"name,omitempty"`     // Page only
}

SegmentMessage fields common to all.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL