persistencepg

package module
v0.0.0-...-9ca7211 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2024 License: MIT Imports: 14 Imported by: 0

README

protoactor-go-persistence-pg

Go package with persistence provider for Proto Actor (Go) based on PostgreSQL.

using pgx v5

Usage

package main

import (
	"context"
	
	"github.com/asynkron/protoactor-go/actor"
	"github.com/asynkron/protoactor-go/persistence"
	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgconn"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/ytake/protoactor-go-persistence-pg"
)

type Actor struct {
	persistence.Mixin
}

func (a *Actor) Receive(ctx actor.Context) {
	// example
}

func main() {

	conf, _ := pgxpool.ParseConfig("postgres://postgres:postgres@localhost:5432/sample?sslmode=disable&pool_max_conns=10")

	system := actor.NewActorSystem()
	ctx := context.Background()
	conn, _ := pgxpool.NewWithConfig(ctx, conf)
	provider, _ := persistencepg.New(ctx, 3, persistencepg.NewTable(), conn, system.Logger())

	props := actor.PropsFromProducer(func() actor.Actor { return &Actor{} },
		actor.WithReceiverMiddleware(persistence.Using(provider)))

	pid, _ := system.Root.SpawnNamed(props, "persistent")
}

Default table schema

use ulid as id(varchar(26)) and json as payload

CREATE TABLE journals
(
    id              VARCHAR(26) NOT NULL,
    payload         JSONB NOT NULL,
    sequence_number BIGINT,
    actor_name      VARCHAR(255),
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    UNIQUE (id),
    UNIQUE (actor_name, sequence_number)
);

CREATE TABLE snapshots
(
    id              VARCHAR(26) NOT NULL,
    payload         JSONB NOT NULL,
    sequence_number BIGINT,
    actor_name      VARCHAR(255),
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    UNIQUE (id),
    UNIQUE (actor_name, sequence_number)
);

change table name

use the interface to change the table name.

for journal table and snapshot table.

// Schemaer is the interface that wraps the basic methods for a schema.
type Schemaer interface {
    // JournalTableName returns the name of the journal table.
    JournalTableName() string
    // SnapshotTableName returns the name of the snapshot table.
    SnapshotTableName() string
    // ID returns the name of the id column.
    ID() string
    // Payload returns the name of the payload column.
    Payload() string
    // ActorName returns the name of the actor name column.
    ActorName() string
    // SequenceNumber returns the name of the sequence number column.
    SequenceNumber() string
    // Created returns the name of the created at column.
    Created() string
    // CreateTable returns the sql statement to create the table.
    CreateTable() []string
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultSchema

type DefaultSchema struct {
	// contains filtered or unexported fields
}

DefaultSchema is the default implementation of the Schemaer interface.

func NewTable

func NewTable() *DefaultSchema

func (*DefaultSchema) ActorName

func (d *DefaultSchema) ActorName() string

ActorName returns the name of the actor name column.

func (*DefaultSchema) CreateTable

func (d *DefaultSchema) CreateTable() []string

CreateTable returns the sql statement to create the table.

func (*DefaultSchema) Created

func (d *DefaultSchema) Created() string

Created returns the name of the created at column.

func (*DefaultSchema) ID

func (d *DefaultSchema) ID() string

ID returns the name of the id column.

func (*DefaultSchema) JournalTableName

func (d *DefaultSchema) JournalTableName() string

JournalTableName returns the name of the journal table.

func (*DefaultSchema) Payload

func (d *DefaultSchema) Payload() string

Payload returns the name of the payload column.

func (*DefaultSchema) SequenceNumber

func (d *DefaultSchema) SequenceNumber() string

SequenceNumber returns the name of the sequence number column.

func (*DefaultSchema) SnapshotTableName

func (d *DefaultSchema) SnapshotTableName() string

SnapshotTableName returns the name of the snapshot table.

func (*DefaultSchema) WithJournalTable

func (d *DefaultSchema) WithJournalTable(name string) *DefaultSchema

WithJournalTable sets the name of the journal table.

func (*DefaultSchema) WithSnapshotTable

func (d *DefaultSchema) WithSnapshotTable(name string) *DefaultSchema

WithSnapshotTable sets the name of the snapshot table.

type Provider

type Provider struct {
	// contains filtered or unexported fields
}

Provider is the abstraction used for persistence

func New

func New(ctx context.Context, snapshotInterval int, table Schemaer, db *pgxpool.Pool, logger *slog.Logger) (*Provider, error)

New creates a new pg provider

func (*Provider) DeleteEvents

func (provider *Provider) DeleteEvents(_ string, _ int)

DeleteEvents removes all events from the provider

func (*Provider) DeleteSnapshots

func (provider *Provider) DeleteSnapshots(_ string, _ int)

func (*Provider) GetEvents

func (provider *Provider) GetEvents(actorName string, eventIndexStart int, eventIndexEnd int, callback func(e interface{}))

GetEvents retrieves events from the provider eventIndexEnd 0 means max see https://github.com/asynkron/protoactor-go/blob/dev/persistence/plugin.go#L65

func (*Provider) GetSnapshot

func (provider *Provider) GetSnapshot(actorName string) (snapshot interface{}, eventIndex int, ok bool)

func (*Provider) GetSnapshotInterval

func (provider *Provider) GetSnapshotInterval() int

func (*Provider) GetState

func (provider *Provider) GetState() persistence.ProviderState

func (*Provider) PersistEvent

func (provider *Provider) PersistEvent(actorName string, eventIndex int, snapshot proto.Message)

func (*Provider) PersistSnapshot

func (provider *Provider) PersistSnapshot(actorName string, eventIndex int, snapshot proto.Message)

func (*Provider) Restart

func (provider *Provider) Restart()

type Schemaer

type Schemaer interface {
	// JournalTableName returns the name of the journal table.
	JournalTableName() string
	// SnapshotTableName returns the name of the snapshot table.
	SnapshotTableName() string
	// ID returns the name of the id column.
	ID() string
	// Payload returns the name of the payload column.
	Payload() string
	// ActorName returns the name of the actor name column.
	ActorName() string
	// SequenceNumber returns the name of the sequence number column.
	SequenceNumber() string
	// Created returns the name of the created at column.
	Created() string
	// CreateTable returns the sql statement to create the table.
	CreateTable() []string
}

Schemaer is the interface that wraps the basic methods for a schema.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL