pgq

package module
v0.0.0-...-a23930f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2018 License: MIT Imports: 6 Imported by: 0

README

PGQ consumers in GO

Package for writing PGQ consumers in Golang.

Installation

go get github.com/sasha-alias/pgq

Description

There are two types of consumers you can create:

  • pgq.Consumer - a single instance consumer
  • pgq.CoopConsumer - a cooperative consumer, running several subconsumers in goroutines

In order to implement own event or batch processing you have to define the following interfaces:

func MyEventHandler(event pgq.Event) error {}
func MyBatchHandler(event []pgq.Event) error {}

Then create an instance of appropriate consumer and pass your function to it via EventHandler or BatchHandler methods.

Consumer

package main

import (
    "github.com/sasha-alias/pgq"
    "log"
)

// Define own event handler
func ProcessEvent(event pgq.Event) error {
    log.Printf("%+v", event)
    return nil
}

func main() {
    consumer, _ := pgq.NewConsumer("consumer_name", "queue_name", "postgresql connect string")  // Create consumer
    consumer.EventHandler(ProcessEvent)  // Set the event handler you defined before
    consumer.Work()  // Start events processing
}

Cooperative consumer

package main

import (
    "github.com/sasha-alias/pgq"
    "log"
)

func ProcessEvent(event pgq.Event) error {
    log.Printf("%+v", event)
    return nil
}

func main() {
    consumer, _ := pgq.NewCoopConsumer("consumer_name", number_of_subconsumers, "queue_name", "postgresql connect string")
    consumer.EventHandler(ProcessEvent)
    consumer.Work()
}

Documentation

Overview

Package pgq provides an easy framework for writing consumers for PGQ. PGQ is an implementation of transactional queues on top of Postgresql.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) BatchHandler

func (self *Consumer) BatchHandler(handler func([]Event) error)

Set batch handler

func (*Consumer) BatchId

func (self *Consumer) BatchId() int64

Current batch id

func (*Consumer) CloseBatch

func (self *Consumer) CloseBatch() error

Close current batch

func (*Consumer) ConsumerName

func (self *Consumer) ConsumerName() string

Consumer name

func (*Consumer) EventHandler

func (self *Consumer) EventHandler(handler func(Event) error)

Set event handler

func (*Consumer) QueueName

func (self *Consumer) QueueName() string

Queue name

func (*Consumer) SleepInterval

func (self *Consumer) SleepInterval(interval time.Duration)

Set sleep interval

func (*Consumer) SubconsumerName

func (self *Consumer) SubconsumerName() string

Subconsumer name

func (*Consumer) Subscribe

func (self *Consumer) Subscribe() error

Register consumer

func (*Consumer) Unsubscribe

func (self *Consumer) Unsubscribe() error

Unregister consumer

func (*Consumer) Work

func (self *Consumer) Work() error

Main working loop

type CoopConsumer

type CoopConsumer struct {
	Count int
	// contains filtered or unexported fields
}

Cooperative consumer

func (*CoopConsumer) BatchHandler

func (self *CoopConsumer) BatchHandler(handler func([]Event) error)

Set batch handler

func (*CoopConsumer) BatchId

func (self *CoopConsumer) BatchId() int64

Current batch id

func (*CoopConsumer) CloseBatch

func (self *CoopConsumer) CloseBatch() error

func (*CoopConsumer) ConsumerId

func (self *CoopConsumer) ConsumerId() string

Consumer Id

func (*CoopConsumer) ConsumerName

func (self *CoopConsumer) ConsumerName() string

Consumer name

func (*CoopConsumer) EventHandler

func (self *CoopConsumer) EventHandler(handler func(Event) error)

Set event handler

func (*CoopConsumer) QueueName

func (self *CoopConsumer) QueueName() string

Queue name

func (*CoopConsumer) SleepInterval

func (self *CoopConsumer) SleepInterval(interval time.Duration)

Set sleep interval

func (*CoopConsumer) SubconsumerName

func (self *CoopConsumer) SubconsumerName() string

Subconsumer name

func (*CoopConsumer) Subscribe

func (self *CoopConsumer) Subscribe() error

Register consumer on the queue

func (*CoopConsumer) Unsubscribe

func (self *CoopConsumer) Unsubscribe() error

Unregister consumer from the queue

func (*CoopConsumer) Work

func (self *CoopConsumer) Work() error

Main working loop

type Event

type Event struct {
	Ev_id     int64
	Ev_time   time.Time
	Ev_txid   int64
	Ev_retry  sql.NullInt64
	Ev_type   sql.NullString
	Ev_data   sql.NullString
	Ev_extra1 sql.NullString
	Ev_extra2 sql.NullString
	Ev_extra3 sql.NullString
	Ev_extra4 sql.NullString
	Consumer  IConsumer
}

type IConsumer

type IConsumer interface {
	Subscribe() error
	Unsubscribe() error
	EventHandler(handler func(event Event) error)
	BatchHandler(handler func(batch []Event) error)
	Work() error
	SleepInterval(time.Duration)
	BatchId() int64
	ConsumerName() string
	SubconsumerName() string
	QueueName() string
	CloseBatch() error
}

func NewConsumer

func NewConsumer(consumer_name string, queue_name string, connect_string string) (IConsumer, error)

Register consumer in pgq

func NewCoopConsumer

func NewCoopConsumer(consumer_name string, count int, queue_name string, connect_string string) (IConsumer, error)

type Subconsumer

type Subconsumer struct {
	// contains filtered or unexported fields
}

Subconsumer

func (*Subconsumer) BatchHandler

func (self *Subconsumer) BatchHandler(handler func([]Event) error)

Set batch handler

func (*Subconsumer) BatchId

func (self *Subconsumer) BatchId() int64

Get current batch id

func (*Subconsumer) CloseBatch

func (self *Subconsumer) CloseBatch() error

Close current batch

func (*Subconsumer) ConsumerName

func (self *Subconsumer) ConsumerName() string

Consumer name

func (*Subconsumer) EventHandler

func (self *Subconsumer) EventHandler(handler func(Event) error)

Set event handler

func (*Subconsumer) QueueName

func (self *Subconsumer) QueueName() string

Queue name

func (*Subconsumer) SleepInterval

func (self *Subconsumer) SleepInterval(interval time.Duration)

Set sleep interval

func (*Subconsumer) SubconsumerName

func (self *Subconsumer) SubconsumerName() string

Subconsumer name

func (*Subconsumer) Subscribe

func (self *Subconsumer) Subscribe() error

Register subconsumer

func (*Subconsumer) Unsubscribe

func (self *Subconsumer) Unsubscribe() error

Unregister subconsumer

func (*Subconsumer) Work

func (self *Subconsumer) Work() error

Working loop running in goroutine

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL