hammer

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2022 License: MIT Imports: 10 Imported by: 0

README

hammer

Build Status Go Report Card go.dev reference

Simple webhook system written in golang.

Features

  • GRPC + Rest API.
  • Topics and Subscriptions scheme similar to google pubsub, the message is published in a topic and this topic has several subscriptions sending the same notification to different systems.
  • Payload sent follows the JSON Event Format for CloudEvents - Version 1.0 standard.
  • Control the maximum amount of delivery attempts and delay between these attempts.
  • Locks control of worker deliveries using PostgreSQL SELECT FOR UPDATE SKIP LOCKED.
  • Simplicity, it does the minimum necessary, it will not have authentication/permission scheme among other things, the idea is to use it internally in the cloud and not leave exposed.

Quickstart

Let's start with the basic concepts, we have three main entities that we must know to start:

  • Topic: A named resource to which messages are sent.
  • Subscription: A named resource representing a subscription to specific topic.
  • Message: The data that a publisher sends to a topic and is eventually delivered to subscribers.
Run the server

To run the server it is necessary to have a database available from postgresql, in this example we will consider that we have a database called hammer running in localhost with user and password equal to user.

Docker
docker run --env HAMMER_DATABASE_URL='postgres://user:pass@localhost:5432/hammer?sslmode=disable' quay.io/allisson/hammer migrate # run database migrations
docker run -p 4001:4001 -p 8000:8000 -p 9000:9000 -p 50051:50051 --env HAMMER_DATABASE_URL='postgres://user:pass@localhost:5432/hammer?sslmode=disable' quay.io/allisson/hammer server # run grpc/http/metrics servers
Local
git clone https://github.com/allisson/hammer
cd hammer
cp local.env .env # and edit .env
make run-migrate # create database schema
make run-server # run the server (grpc + http)

We are using curl in the examples below.

Create a new topic
curl -X POST 'http://localhost:8000/v1/topics' \
--header 'Content-Type: application/json' \
--data-raw '{
  "topic": {
    "id": "topic",
    "name": "Topic"
  }
}'
{
  "id": "topic",
  "name": "Topic",
  "created_at": "2021-03-18T11:08:49.678732Z",
  "updated_at": "2021-03-18T11:08:49.678732Z"
}
Create a new subscription

The max_delivery_attempts, delivery_attempt_delay and delivery_attempt_timeout are in seconds.

curl -X POST 'http://localhost:8000/v1/subscriptions' \
--header 'Content-Type: application/json' \
--data-raw '{
	"subscription": {
		"id": "httpbin-post",
		"topic_id": "topic",
		"name": "Httpbin Post",
		"url": "https://httpbin.org/post",
		"secret_token": "my-super-secret-token",
		"max_delivery_attempts": 5,
		"delivery_attempt_delay": 60,
		"delivery_attempt_timeout": 5
	}
}'
{
  "id": "httpbin-post",
  "topic_id": "topic",
  "name": "Httpbin Post",
  "url": "https://httpbin.org/post",
  "secret_token": "my-super-secret-token",
  "max_delivery_attempts": 5,
  "delivery_attempt_delay": 60,
  "delivery_attempt_timeout": 5,
  "created_at": "2021-03-18T11:10:05.855296Z",
  "updated_at": "2021-03-18T11:10:05.855296Z"
}
Create a new message
curl -X POST 'http://localhost:8000/v1/messages' \
--header 'Content-Type: application/json' \
--data-raw '{
	"message": {
		"topic_id": "topic",
		"content_type": "application/json",
		"data": "{\"name\": \"Allisson\"}"
	}
}'
{
  "id": "01F12GF6VAXGNHVXM4YT37N75A",
  "topic_id": "topic",
  "content_type": "application/json",
  "data": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
  "created_at": "2021-03-18T11:10:29.738632Z"
}
Run the worker

The system will send a post request and the server must respond with the following status codes for the delivery to be considered successful: 200, 201, 202 and 204.

Docker
docker run --env HAMMER_DATABASE_URL='postgres://user:pass@localhost:5432/hammer?sslmode=disable' quay.io/allisson/hammer worker
{"level":"info","ts":1616065862.332101,"caller":"service/worker.go:67","msg":"worker-started"}
{"level":"info","ts":1616065863.104438,"caller":"service/worker.go:36","msg":"worker-delivery-attempt-created","id":"01F12GG6NWZR03MW1MFMQDWVVF","delivery_id":"01F12GF6VM4YSX5GW8TM4781EZ","response_status_code":200,"execution_duration":749,"success":true}
Local
make run-worker
go run cmd/worker/main.go
{"level":"info","ts":1616065862.332101,"caller":"service/worker.go:67","msg":"worker-started"}
{"level":"info","ts":1616065863.104438,"caller":"service/worker.go:36","msg":"worker-delivery-attempt-created","id":"01F12GG6NWZR03MW1MFMQDWVVF","delivery_id":"01F12GF6VM4YSX5GW8TM4781EZ","response_status_code":200,"execution_duration":749,"success":true}

Submitted payload (Compatible with JSON Event Format for CloudEvents - Version 1.0):

{
  "data_base64": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
  "datacontenttype": "application/json",
  "id": "01F12GF6VM4YSX5GW8TM4781EZ",
  "messageid": "01F12GF6VAXGNHVXM4YT37N75A",
  "secrettoken": "my-super-secret-token",
  "source": "/v1/messages/01F12GF6VAXGNHVXM4YT37N75A",
  "specversion": "1.0",
  "subscriptionid": "httpbin-post",
  "time": "2021-03-18T11:10:29.748978Z",
  "topicid": "topic",
  "type": "hammer.message.created"
}
Get delivery data
curl -X GET http://localhost:8000/v1/deliveries/01F12GF6VM4YSX5GW8TM4781EZ
{
  "id": "01F12GF6VM4YSX5GW8TM4781EZ",
  "topic_id": "topic",
  "subscription_id": "httpbin-post",
  "message_id": "01F12GF6VAXGNHVXM4YT37N75A",
  "content_type": "application/json",
  "data": "eyJuYW1lIjogIkFsbGlzc29uIn0=",
  "url": "https://httpbin.org/post",
  "secret_token": "my-super-secret-token",
  "max_delivery_attempts": 5,
  "delivery_attempt_delay": 60,
  "delivery_attempt_timeout": 5,
  "scheduled_at": "2021-03-18T11:10:29.748978Z",
  "delivery_attempts": 1,
  "status": "completed",
  "created_at": "2021-03-18T11:10:29.748978Z",
  "updated_at": "2021-03-18T11:11:03.098648Z"
}
Get delivery attempt data

The execution_duration are in milliseconds.

curl -X GET http://localhost:8000/v1/delivery-attempts/01F12GG6NWZR03MW1MFMQDWVVF
{
  "id": "01F12GG6NWZR03MW1MFMQDWVVF",
  "delivery_id": "01F12GF6VM4YSX5GW8TM4781EZ",
  "request": "POST /post HTTP/1.1\r\nHost: httpbin.org\r\nContent-Type: application/json\r\n\r\n{\"specversion\":\"1.0\",\"type\":\"hammer.message.created\",\"source\":\"/v1/messages/01F12GF6VAXGNHVXM4YT37N75A\",\"id\":\"01F12GF6VM4YSX5GW8TM4781EZ\",\"time\":\"2021-03-18T11:10:29.748978Z\",\"secrettoken\":\"my-super-secret-token\",\"messageid\":\"01F12GF6VAXGNHVXM4YT37N75A\",\"subscriptionid\":\"httpbin-post\",\"topicid\":\"topic\",\"datacontenttype\":\"application/json\",\"data_base64\":\"eyJuYW1lIjogIkFsbGlzc29uIn0=\"}",
  "response": "HTTP/2.0 200 OK\r\nContent-Length: 1298\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\nDate: Thu, 18 Mar 2021 11:11:03 GMT\r\nServer: gunicorn/19.9.0\r\n\r\n{\n  \"args\": {}, \n  \"data\": \"{\\\"specversion\\\":\\\"1.0\\\",\\\"type\\\":\\\"hammer.message.created\\\",\\\"source\\\":\\\"/v1/messages/01F12GF6VAXGNHVXM4YT37N75A\\\",\\\"id\\\":\\\"01F12GF6VM4YSX5GW8TM4781EZ\\\",\\\"time\\\":\\\"2021-03-18T11:10:29.748978Z\\\",\\\"secrettoken\\\":\\\"my-super-secret-token\\\",\\\"messageid\\\":\\\"01F12GF6VAXGNHVXM4YT37N75A\\\",\\\"subscriptionid\\\":\\\"httpbin-post\\\",\\\"topicid\\\":\\\"topic\\\",\\\"datacontenttype\\\":\\\"application/json\\\",\\\"data_base64\\\":\\\"eyJuYW1lIjogIkFsbGlzc29uIn0=\\\"}\", \n  \"files\": {}, \n  \"form\": {}, \n  \"headers\": {\n    \"Accept-Encoding\": \"gzip\", \n    \"Content-Length\": \"386\", \n    \"Content-Type\": \"application/json\", \n    \"Host\": \"httpbin.org\", \n    \"User-Agent\": \"Go-http-client/2.0\", \n    \"X-Amzn-Trace-Id\": \"Root=1-60533547-501c866f62e44ea3736dbc0c\"\n  }, \n  \"json\": {\n    \"data_base64\": \"eyJuYW1lIjogIkFsbGlzc29uIn0=\", \n    \"datacontenttype\": \"application/json\", \n    \"id\": \"01F12GF6VM4YSX5GW8TM4781EZ\", \n    \"messageid\": \"01F12GF6VAXGNHVXM4YT37N75A\", \n    \"secrettoken\": \"my-super-secret-token\", \n    \"source\": \"/v1/messages/01F12GF6VAXGNHVXM4YT37N75A\", \n    \"specversion\": \"1.0\", \n    \"subscriptionid\": \"httpbin-post\", \n    \"time\": \"2021-03-18T11:10:29.748978Z\", \n    \"topicid\": \"topic\", \n    \"type\": \"hammer.message.created\"\n  }, \n  \"origin\": \"191.33.94.128\", \n  \"url\": \"https://httpbin.org/post\"\n}\n",
  "response_status_code": 200,
  "execution_duration": 749,
  "success": true,
  "created_at": "2021-03-18T11:11:03.091808Z"
}

Environment variables

All environment variables is defined on file local.env.

How to build docker image

docker build -f Dockerfile -t hammer .

REST API

Default port: 8000

curl --location --request GET 'http://localhost:8000/v1/topics'

To disable the rest api, set the environment variable HAMMER_REST_API_ENABLED to false.

export HAMMER_REST_API_ENABLED='false'

Prometheus metrics

Default port: 4001

curl --location --request GET 'http://localhost:4001/metrics'

To disable prometheus metrics, set the environment variable HAMMER_METRICS_ENABLED to false.

export HAMMER_METRICS_ENABLED='false'

Health check

Default port: 9000

curl --location --request GET 'http://localhost:9000/liveness'
curl --location --request GET 'http://localhost:9000/readiness'

To disable health check, set the environment variable HAMMER_HEALTH_CHECK_ENABLED to false.

export HAMMER_HEALTH_CHECK_ENABLED='false'

Documentation

Index

Constants

View Source
const (
	// DeliveryStatusPending represents the delivery pending status
	DeliveryStatusPending = "pending"
	// DeliveryStatusFailed represents the delivery failed status
	DeliveryStatusFailed = "failed"
	// DeliveryStatusCompleted represents the delivery completed status
	DeliveryStatusCompleted = "completed"
)

Variables

View Source
var (

	// ErrTopicAlreadyExists is used when the topic already exists on repository.
	ErrTopicAlreadyExists = errors.New("topic_already_exists")
	// ErrTopicDoesNotExists is used when the topic does not exists on repository.
	ErrTopicDoesNotExists = errors.New("topic_does_not_exists")
	// ErrSubscriptionAlreadyExists is used when the subscription already exists on repository.
	ErrSubscriptionAlreadyExists = errors.New("subscription_already_exists")
	// ErrSubscriptionDoesNotExists is used when the subscription does not exists on repository.
	ErrSubscriptionDoesNotExists = errors.New("subscription_does_not_exists")
	// ErrMessageDoesNotExists is used when the message does not exists on repository.
	ErrMessageDoesNotExists = errors.New("message_does_not_exists")
	// ErrDeliveryDoesNotExists is used when the delivery does not exists on repository.
	ErrDeliveryDoesNotExists = errors.New("delivery_does_not_exists")
	// ErrDeliveryAttemptDoesNotExists is used when the delivery attempt does not exists on repository.
	ErrDeliveryAttemptDoesNotExists = errors.New("delivery_attempt_does_not_exists")
	// DefaultPaginationLimit represents a default pagination limit on resource list
	DefaultPaginationLimit = env.GetInt("HAMMER_DEFAULT_PAGINATION_LIMIT", 25)
	// MaxPaginationLimit represents the max value for pagination limit on resource list
	MaxPaginationLimit = env.GetInt("HAMMER_MAX_PAGINATION_LIMIT", 50)
	// DefaultSecretTokenLength represents a default length for a random string to secret token if it is not informed
	DefaultSecretTokenLength = env.GetInt("HAMMER_DEFAULT_SECRET_TOKEN_LENGTH", 40)
	// WorkerDatabaseDelay represents a delay for database access by workers
	WorkerDatabaseDelay = env.GetInt("HAMMER_WORKER_DATABASE_DELAY", 5)
)

Functions

func GenerateULID added in v0.6.0

func GenerateULID() (string, error)

GenerateULID returns a new ulid id

Types

type CloudEventPayload

type CloudEventPayload struct {
	SpecVersion     string    `json:"specversion"`
	Type            string    `json:"type"`
	Source          string    `json:"source"`
	ID              string    `json:"id"`
	Time            time.Time `json:"time"`
	SecretToken     string    `json:"secrettoken"`
	MessageID       string    `json:"messageid"`
	SubscriptionID  string    `json:"subscriptionid"`
	TopicID         string    `json:"topicid"`
	DataContentType string    `json:"datacontenttype"`
	DataBase64      string    `json:"data_base64"`
}

CloudEventPayload data

type Delivery

type Delivery struct {
	ID                     string    `json:"id" db:"id"`
	TopicID                string    `json:"topic_id" db:"topic_id"`
	SubscriptionID         string    `json:"subscription_id" db:"subscription_id"`
	MessageID              string    `json:"message_id" db:"message_id"`
	ContentType            string    `json:"content_type" db:"content_type"`
	Data                   string    `json:"data" db:"data"`
	URL                    string    `json:"url" db:"url"`
	SecretToken            string    `json:"secret_token" db:"secret_token"`
	MaxDeliveryAttempts    int       `json:"max_delivery_attempts" db:"max_delivery_attempts"`
	DeliveryAttemptDelay   int       `json:"delivery_attempt_delay" db:"delivery_attempt_delay"`
	DeliveryAttemptTimeout int       `json:"delivery_attempt_timeout" db:"delivery_attempt_timeout"`
	ScheduledAt            time.Time `json:"scheduled_at" db:"scheduled_at"`
	DeliveryAttempts       int       `json:"delivery_attempts" db:"delivery_attempts"`
	Status                 string    `json:"status" db:"status"`
	CreatedAt              time.Time `json:"created_at" db:"created_at"`
	UpdatedAt              time.Time `json:"updated_at" db:"updated_at"`
}

Delivery data

func MakeTestDelivery

func MakeTestDelivery() *Delivery

MakeTestDelivery returns a new Delivery

type DeliveryAttempt

type DeliveryAttempt struct {
	ID                 string    `json:"id" db:"id"`
	DeliveryID         string    `json:"delivery_id" db:"delivery_id"`
	Request            string    `json:"request" db:"request"`
	Response           string    `json:"response" db:"response"`
	ResponseStatusCode int       `json:"response_status_code" db:"response_status_code"`
	ExecutionDuration  int       `json:"execution_duration" db:"execution_duration"`
	Success            bool      `json:"success" db:"success"`
	Error              string    `json:"error" db:"error"`
	CreatedAt          time.Time `json:"created_at" db:"created_at"`
}

DeliveryAttempt data

func MakeTestDeliveryAttempt

func MakeTestDeliveryAttempt() *DeliveryAttempt

MakeTestDeliveryAttempt returns a new DeliveryAttempt

type DeliveryAttemptRepository

type DeliveryAttemptRepository interface {
	Find(ctx context.Context, id string) (*DeliveryAttempt, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*DeliveryAttempt, error)
	Store(ctx context.Context, deliveryAttempt *DeliveryAttempt) error
}

DeliveryAttemptRepository interface

type DeliveryAttemptService

type DeliveryAttemptService interface {
	Find(ctx context.Context, id string) (*DeliveryAttempt, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*DeliveryAttempt, error)
}

DeliveryAttemptService interface

type DeliveryRepository

type DeliveryRepository interface {
	Find(ctx context.Context, id string) (*Delivery, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Delivery, error)
	Store(ctx context.Context, delivery *Delivery) error
	Dispatch(ctx context.Context) (*DeliveryAttempt, error)
}

DeliveryRepository interface

type DeliveryService

type DeliveryService interface {
	Find(ctx context.Context, id string) (*Delivery, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Delivery, error)
}

DeliveryService interface

type FindFilter

type FindFilter struct {
	FieldName string
	Operator  string
	Value     string
}

FindFilter data

type FindOptions

type FindOptions struct {
	FindFilters    []FindFilter
	FindPagination *FindPagination
	FindOrderBy    *FindOrderBy
}

FindOptions data

type FindOrderBy

type FindOrderBy struct {
	FieldName string
	Order     string
}

FindOrderBy data

type FindPagination

type FindPagination struct {
	Limit  uint
	Offset uint
}

FindPagination data

type Message

type Message struct {
	ID          string    `json:"id" db:"id"`
	TopicID     string    `json:"topic_id" db:"topic_id"`
	ContentType string    `json:"content_type" db:"content_type"`
	Data        string    `json:"data" db:"data"`
	CreatedAt   time.Time `json:"created_at" db:"created_at"`
}

Message data

func MakeTestMessage

func MakeTestMessage() *Message

MakeTestMessage returns a new Message

func (Message) Validate

func (m Message) Validate() error

Validate message

type MessageRepository

type MessageRepository interface {
	Find(ctx context.Context, id string) (*Message, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Message, error)
	Store(ctx context.Context, message *Message) error
	Delete(ctx context.Context, id string) error
}

MessageRepository interface

type MessageService

type MessageService interface {
	Find(ctx context.Context, id string) (*Message, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Message, error)
	Create(ctx context.Context, message *Message) error
	Delete(ctx context.Context, id string) error
}

MessageService interface

type MigrationRepository

type MigrationRepository interface {
	Run(ctx context.Context) error
}

MigrationRepository interface

type MigrationService

type MigrationService interface {
	Run(ctx context.Context) error
}

MigrationService interface

type Subscription

type Subscription struct {
	ID                     string    `json:"id" db:"id"`
	TopicID                string    `json:"topic_id" db:"topic_id"`
	Name                   string    `json:"name" db:"name"`
	URL                    string    `json:"url" db:"url"`
	SecretToken            string    `json:"secret_token" db:"secret_token"`
	MaxDeliveryAttempts    int       `json:"max_delivery_attempts" db:"max_delivery_attempts"`
	DeliveryAttemptDelay   int       `json:"delivery_attempt_delay" db:"delivery_attempt_delay"`
	DeliveryAttemptTimeout int       `json:"delivery_attempt_timeout" db:"delivery_attempt_timeout"`
	CreatedAt              time.Time `json:"created_at" db:"created_at"`
	UpdatedAt              time.Time `json:"updated_at" db:"updated_at"`
}

Subscription data

func MakeTestSubscription

func MakeTestSubscription() *Subscription

MakeTestSubscription returns a new Subscription

func (Subscription) Validate

func (s Subscription) Validate() error

Validate subscription

type SubscriptionRepository

type SubscriptionRepository interface {
	Find(ctx context.Context, id string) (*Subscription, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Subscription, error)
	Store(ctx context.Context, subscription *Subscription) error
	Delete(ctx context.Context, id string) error
}

SubscriptionRepository interface

type SubscriptionService

type SubscriptionService interface {
	Find(ctx context.Context, id string) (*Subscription, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Subscription, error)
	Create(ctx context.Context, subscription *Subscription) error
	Update(ctx context.Context, subscription *Subscription) error
	Delete(ctx context.Context, id string) error
}

SubscriptionService interface

type Topic

type Topic struct {
	ID        string    `json:"id" db:"id"`
	Name      string    `json:"name" db:"name"`
	CreatedAt time.Time `json:"created_at" db:"created_at"`
	UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}

Topic data

func MakeTestTopic

func MakeTestTopic() *Topic

MakeTestTopic returns a new Topic

func (Topic) Validate

func (t Topic) Validate() error

Validate topic

type TopicRepository

type TopicRepository interface {
	Find(ctx context.Context, id string) (*Topic, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Topic, error)
	Store(ctx context.Context, topic *Topic) error
	Delete(ctx context.Context, id string) error
}

TopicRepository interface

type TopicService

type TopicService interface {
	Find(ctx context.Context, id string) (*Topic, error)
	FindAll(ctx context.Context, findOptions FindOptions) ([]*Topic, error)
	Create(ctx context.Context, topic *Topic) error
	Update(ctx context.Context, topic *Topic) error
	Delete(ctx context.Context, id string) error
}

TopicService interface

type WorkerService

type WorkerService interface {
	Run(ctx context.Context)
	Stop(ctx context.Context)
}

WorkerService interface

Directories

Path Synopsis
api
v1
Package api is a reverse proxy.
Package api is a reverse proxy.
cmd
repository

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL