Event stream publisher (ESP)
Synchronization of event stream (ES) aggregates between postgreSQL and kafka.
- Transactional based
- Used SELECT, NOTIFY
- Without WAL journal (supported full event history)
- Metrics (0.0.0.0:8080/metrics)
First step
creating a database table
CREATE TABLE IF NOT EXISTS aggregates_published_events
(
aggregate_id uuid NOT NULL,
aggregate_type VARCHAR(128) NOT NULL,
published_version integer,
PRIMARY KEY (aggregate_id, aggregate_type)
);
creating a postgreSQL function
CREATE OR REPLACE FUNCTION publish_aggregates_events() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('aggregates_events', concat(NEW.aggregate_id, '|', NEW.aggregate_type, '|', NEW.event_version));
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
creating a postgreSQL trigger
CREATE TRIGGER aggregates_events_notify_event
AFTER INSERT ON aggregates_events
FOR EACH ROW EXECUTE PROCEDURE publish_aggregates_events();
creating a kafka topic for each Aggregate with a prefix
(default prefix - es.aggregate
)
Second step
Creating a some domain logic with ES toolkit...
Third step
run ESP synchronizer for each postgreSQL database
ESP_DB_ADDR=postgresql://postgres:postgres@127.0.0.1:5432/postgres
ESP_KAFKA_BROKERS=127.0.0.1::9092,127.0.0.1:9093,127.0.0.1:9094
esp
Env
ESP_DB_ADDR=postgresql://postgres:123456@127.0.0.1:5432/postgres
ESP_KAFKA_BROKERS=127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093
ESP_KAFKA_TOPIC_PREFIX=es.aggregate
ESP_LOG_LEVEL=info
ESP_LOG_FORMAT=plain
ESP_STATS_NAME=default
ESP_STATS_HTTP_PORT=8080