persistencemysql

package module
v0.0.0-...-bee527b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MIT Imports: 11 Imported by: 0

README

protoactor-go-persistence-mysql

Go package with persistence provider for Proto Actor (Go) based on MySQL.

Usage

package main

import (
	"database/sql"
	
	"github.com/asynkron/protoactor-go/actor"
	"github.com/asynkron/protoactor-go/persistence"
	"github.com/go-sql-driver/mysql"
	"github.com/ytake/protoactor-go-persistence-mysql"
)

type Actor struct {
	persistence.Mixin
}

func (a *Actor) Receive(ctx actor.Context) {
	// example
}

func main() {

	conf := &mysql.Config{
		// example config
	}
	system := actor.NewActorSystem()
	db, _ := sql.Open("mysql", conf.FormatDSN())
	provider, _ := persistencemysql.New(3, persistencemysql.NewTable(), db, system.Logger())

	props := actor.PropsFromProducer(func() actor.Actor { return &Actor{} },
		actor.WithReceiverMiddleware(persistence.Using(provider)))

	pid, _ := system.Root.SpawnNamed(props, "persistent")
}

Default table schema

use ulid as id(varchar(26)) and json as payload

CREATE TABLE `journals`
(
    `id`              varchar(26) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
    `payload`         json                                                  NOT NULL,
    `sequence_number` bigint                                                 DEFAULT NULL,
    `actor_name`      varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
    `created_at`      timestamp                                              DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `uidx_id` (`id`),
    UNIQUE KEY `uidx_names` (`actor_name`,`sequence_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE `snapshots`
(
    `id`              varchar(26) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
    `payload`         json                                                  NOT NULL,
    `sequence_number` bigint                                                 DEFAULT NULL,
    `actor_name`      varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
    `created_at`      timestamp                                              DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `uidx_id` (`id`),
    UNIQUE KEY `uidx_names` (`actor_name`,`sequence_number`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

change table name

use the interface to change the table name.

for journal table and snapshot table.

// Schemaer is the interface that wraps the basic methods for a schema.
type Schemaer interface {
    // JournalTableName returns the name of the journal table.
    JournalTableName() string
    // SnapshotTableName returns the name of the snapshot table.
    SnapshotTableName() string
    // ID returns the name of the id column.
    ID() string
    // Payload returns the name of the payload column.
    Payload() string
    // ActorName returns the name of the actor name column.
    ActorName() string
    // SequenceNumber returns the name of the sequence number column.
    SequenceNumber() string
    // Created returns the name of the created at column.
    Created() string
    // CreateTable returns the sql statement to create the table.
    CreateTable() []string
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultSchema

type DefaultSchema struct {
	// contains filtered or unexported fields
}

DefaultSchema is the default schema for the mysql provider.

func NewTable

func NewTable() *DefaultSchema

func (*DefaultSchema) ActorName

func (d *DefaultSchema) ActorName() string

ActorName returns the name of the actor name column.

func (*DefaultSchema) CreateTable

func (d *DefaultSchema) CreateTable() []string

CreateTable returns the sql statement to create the table.

func (*DefaultSchema) Created

func (d *DefaultSchema) Created() string

Created returns the name of the created at column.

func (*DefaultSchema) ID

func (d *DefaultSchema) ID() string

ID returns the name of the id column.

func (*DefaultSchema) JournalTableName

func (d *DefaultSchema) JournalTableName() string

JournalTableName returns the name of the journal table.

func (*DefaultSchema) Payload

func (d *DefaultSchema) Payload() string

Payload returns the name of the payload column.

func (*DefaultSchema) SequenceNumber

func (d *DefaultSchema) SequenceNumber() string

SequenceNumber returns the name of the sequence number column.

func (*DefaultSchema) SnapshotTableName

func (d *DefaultSchema) SnapshotTableName() string

SnapshotTableName returns the name of the snapshot table.

func (*DefaultSchema) WithJournalTable

func (d *DefaultSchema) WithJournalTable(name string) *DefaultSchema

WithJournalTable sets the name of the journal table.

func (*DefaultSchema) WithSnapshotTable

func (d *DefaultSchema) WithSnapshotTable(name string) *DefaultSchema

WithSnapshotTable sets the name of the snapshot table.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider is the abstraction used for persistence

func New

func New(snapshotInterval int, table Schemaer, db *sql.DB, logger *slog.Logger) (*Provider, error)

New creates a new mysql provider

func (*Provider) DeleteEvents

func (provider *Provider) DeleteEvents(_ string, _ int)

DeleteEvents removes all events from the provider

func (*Provider) DeleteSnapshots

func (provider *Provider) DeleteSnapshots(_ string, _ int)

func (*Provider) GetEvents

func (provider *Provider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{}))

func (*Provider) GetSnapshot

func (provider *Provider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)

func (*Provider) GetSnapshotInterval

func (provider *Provider) GetSnapshotInterval() int

func (*Provider) GetState

func (provider *Provider) GetState() persistence.ProviderState

func (*Provider) PersistEvent

func (provider *Provider) PersistEvent(actorName string, eventIndex int, snapshot proto.Message)

func (*Provider) PersistSnapshot

func (provider *Provider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)

func (*Provider) Restart

func (provider *Provider) Restart()

type Schemaer

type Schemaer interface {
	// JournalTableName returns the name of the journal table.
	JournalTableName() string
	// SnapshotTableName returns the name of the snapshot table.
	SnapshotTableName() string
	// ID returns the name of the id column.
	ID() string
	// Payload returns the name of the payload column.
	Payload() string
	// ActorName returns the name of the actor name column.
	ActorName() string
	// SequenceNumber returns the name of the sequence number column.
	SequenceNumber() string
	// Created returns the name of the created at column.
	Created() string
	// CreateTable returns the sql statement to create the table.
	CreateTable() []string
}

Schemaer is the interface that wraps the basic methods for a schema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL