dbevent

package module
v0.0.0-...-e49965d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

README

Go Database Eventing (Publish event using database)

Test

Go library to help publishing events along with transation data atomically

Features include:

Running example

Consumer Example

Start mysql server

./start_server.sh
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/pongsatt/go-dbevent"
	"github.com/pongsatt/go-dbevent/driver"
)

// Data represent example json data
type Data struct {
	ID string
}

func main() {
	var nodeID string

	if len(os.Args) > 1 {
		nodeID = os.Args[1]
	} else {
		panic("nodeID required")
	}

	dbConfig := &dbevent.DBConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		DBName:   "testdb",
		User:     "root",
		Password: "my-secret-pw",
	}

	mysqlDriver := driver.NewMySQLEventDriver(dbConfig, &driver.MySQLStoreConfig{NodeID: nodeID})

	eventStore := dbevent.NewStore(mysqlDriver)
	defer eventStore.Close()

	consumer := eventStore.NewConsumer("group1", &dbevent.ConsumerConfig{})
	defer consumer.Close()

	consumer.Consume(func(event *dbevent.Event) error {
		fmt.Printf("got event %d\n", event.ID)
		return nil
	})

	fmt.Println("Consumer ready")

	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

	<-termChan
	fmt.Println("Done")
}

Start consumer

go run example/consumer_highlevel/* node1
Producer Example
package main

import (
	"encoding/json"
	"fmt"
	"os"
	"strconv"
	"time"

	// we need it
	_ "github.com/go-sql-driver/mysql"
	"github.com/google/uuid"
	"github.com/pongsatt/go-dbevent"
	"github.com/pongsatt/go-dbevent/driver"
)

// Data represent example json data
type Data struct {
	ID string
}

func main() {
	eventNum := 1

	if len(os.Args) > 1 {
		eventNum, _ = strconv.Atoi(os.Args[1])
	}

	dbConfig := &dbevent.DBConfig{
		Host:     "127.0.0.1",
		Port:     3306,
		DBName:   "testdb",
		User:     "root",
		Password: "my-secret-pw",
	}

	mysqlDriver := driver.NewMySQLEventDriver(dbConfig, &driver.MySQLStoreConfig{})
	if err := mysqlDriver.Provision(); err != nil {
		panic(err)
	}

	defer mysqlDriver.Close()

	for i := 0; i < eventNum; i++ {
		now := time.Now()

		data := &Data{
			ID: uuid.NewString(),
		}

		b, _ := json.Marshal(data)
		event := &dbevent.Event{
			Type:          "testtype",
			AggregateType: "aggType",
			AggregateID:   "agg1",
			Data:          b,
			CreatedAt:     &now,
		}

		if err := mysqlDriver.Create(event); err != nil {
			panic(err)
		}
		fmt.Printf("event '%s' produced\n", data.ID)
	}
}

Run producer

go run example/producer/* 1

Consumer output

got event 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBuilder

func NewBuilder(eventType string) *builder

NewBuilder returns new builder instance

Types

type BackOffConfig

type BackOffConfig struct {
	InitialBackoffMs    int
	BackoffMultiplier   int
	BackoffRandomFactor float32
}

BackOffConfig represents backoff configuration

type Backoff

type Backoff struct {
	// contains filtered or unexported fields
}

Backoff represents exponential backoff

func NewBackoff

func NewBackoff(config *BackOffConfig) *Backoff

NewBackoff creates new backoff object

func (*Backoff) NextBackoffMs

func (backoff *Backoff) NextBackoffMs() int

NextBackoffMs returns next backoff time in milisecond

func (*Backoff) ResetSleepBackoff

func (backoff *Backoff) ResetSleepBackoff()

ResetSleepBackoff resets backoff

func (*Backoff) SleepBackoff

func (backoff *Backoff) SleepBackoff()

SleepBackoff sleeps using backoff delay

type Backoffer

type Backoffer interface {
	SleepBackoff()
	ResetSleepBackoff()
}

Backoffer represents backoff algorithm interface

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents consumer

func NewConsumer

func NewConsumer(readGroup string, driver ConsumerDriver, config *ConsumerConfig) *Consumer

NewConsumer creates new consumer

func (*Consumer) Close

func (consumer *Consumer) Close()

Close consumer

func (*Consumer) CloseAndWait

func (consumer *Consumer) CloseAndWait()

CloseAndWait closes consumer and wait it to be done

func (*Consumer) Consume

func (consumer *Consumer) Consume(onMessage func(event *Event) error)

Consume subscribes to new event

type ConsumerConfig

type ConsumerConfig struct {
	WaitChangeTimeoutSec int
	BatchSize            int
}

ConsumerConfig represents consumer configuration

type ConsumerDriver

type ConsumerDriver interface {
	Fetch(readGroup string, limit int) ([]*Event, error)
	CommitInTrans(readGroup string, event *Event, handler func() error) error
	WaitChange(timeout time.Duration)
}

ConsumerDriver represents event consumer driver

type DBConfig

type DBConfig struct {
	DSN      string
	Host     string
	Port     int
	DBName   string
	User     string
	Password string
}

DBConfig represents database configuration

func (*DBConfig) ToDSN

func (config *DBConfig) ToDSN() string

ToDSN return datasource name

type Event

type Event struct {
	ID            uint   `xorm:"pk 'id'"`
	Type          string `xorm:"type" gorm:"not null"`
	AggregateType string `xorm:"aggregate_type"`
	AggregateID   string `xorm:"aggregate_id"`
	Data          JSON
	CreatedAt     *time.Time `xorm:"created_at"`
}

Event represents event data

type JSON

type JSON json.RawMessage

JSON custom data type

func (*JSON) Scan

func (j *JSON) Scan(value interface{}) error

Scan scan value into Jsonb, implements sql.Scanner interface

func (JSON) Value

func (j JSON) Value() (driver.Value, error)

Value return json value, implement driver.Valuer interface

type MockBackoffer

type MockBackoffer struct {
	mock.Mock
}

MockBackoffer is an autogenerated mock type for the Backoffer type

func (*MockBackoffer) ResetSleepBackoff

func (_m *MockBackoffer) ResetSleepBackoff()

ResetSleepBackoff provides a mock function with given fields:

func (*MockBackoffer) SleepBackoff

func (_m *MockBackoffer) SleepBackoff()

SleepBackoff provides a mock function with given fields:

type MockConsumerDriver

type MockConsumerDriver struct {
	mock.Mock
}

MockConsumerDriver is an autogenerated mock type for the ConsumerDriver type

func (*MockConsumerDriver) CommitInTrans

func (_m *MockConsumerDriver) CommitInTrans(readGroup string, event *Event, handler func() error) error

CommitInTrans provides a mock function with given fields: readGroup, event, handler

func (*MockConsumerDriver) Fetch

func (_m *MockConsumerDriver) Fetch(readGroup string, limit int) ([]*Event, error)

Fetch provides a mock function with given fields: readGroup, limit

func (*MockConsumerDriver) WaitChange

func (_m *MockConsumerDriver) WaitChange(timeout time.Duration)

WaitChange provides a mock function with given fields: timeout

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store represents event store

func NewStore

func NewStore(driver StoreDriver) *Store

NewStore creates new store

func (*Store) Close

func (store *Store) Close() error

Close driver

func (*Store) NewConsumer

func (store *Store) NewConsumer(readGroup string, config *ConsumerConfig) *Consumer

NewConsumer creates new consumer for store

func (*Store) Produce

func (store *Store) Produce(events ...*Event) error

Produce creates new event

type StoreDriver

type StoreDriver interface {
	Provision() error
	Create(events ...*Event) error
	Close() error
	ConsumerDriver
}

StoreDriver represents event store driver

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL