mql

package module
v0.0.0-...-b19f329 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2023 License: MIT Imports: 9 Imported by: 0

README

mql

A light, persistant embeddable message queue

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func AssertEqual

func AssertEqual(t *testing.T, want, have any)

func AssertInRange

func AssertInRange[T constraints.Ordered](t *testing.T, val, lower, upper T)

func AssertNoErr

func AssertNoErr(t *testing.T, err error, format string, args ...any)

func PanicOnErr

func PanicOnErr(err error, format string, args ...any)

Types

type Message

type Message struct {
	Topic Topic
	Index int
	Data  []byte
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}
Example
store, err := NewSqliteStore(":memory:")
PanicOnErr(err, "init sqlite store")
defer store.Close()
queue := NewQueue(store)

var topic Topic = "topic_1"
clientID := "client_1"

//consume
wg := sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			return
		default:
			msgs, err := queue.ReadContext(ctx, clientID, topic, 2, 50*time.Millisecond)
			PanicOnErr(err, "read")
			if len(msgs) == 0 {
				continue
			}
			for _, m := range msgs {
				fmt.Println(m.Index, string(m.Data))
			}
			err = queue.CommitContext(ctx, clientID, topic, msgs[len(msgs)-1].Index)
		}
	}
}(ctx)

//produce
chunkSize := 5
chunks := 10
for i := 0; i < chunks; i++ {
	msgs := make([][]byte, chunkSize)
	for j := 0; j < chunkSize; j++ {
		msgs[j] = []byte(fmt.Sprintf("message_%03d", i*chunkSize+j))
	}
	queue.WriteContext(ctx, topic, msgs...)
	<-time.After(100 * time.Millisecond)
}
cancel()
wg.Wait()
Output:

func NewQueue

func NewQueue(store Store) *Queue

func (*Queue) CommitContext

func (q *Queue) CommitContext(ctx context.Context, clientID string, topic Topic, idx int) error

func (*Queue) ReadContext

func (q *Queue) ReadContext(ctx context.Context, clientID string, topic Topic, limit int, wait time.Duration) ([]Message, error)

func (*Queue) WriteContext

func (q *Queue) WriteContext(ctx context.Context, topic Topic, msgs ...[]byte) error

type SqliteStore

type SqliteStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewSqliteStore

func NewSqliteStore(dsn string) (*SqliteStore, error)

func (*SqliteStore) AppendContext

func (s *SqliteStore) AppendContext(ctx context.Context, topic Topic, msgs ...[]byte) error

func (*SqliteStore) Close

func (s *SqliteStore) Close()

func (*SqliteStore) CommitContext

func (s *SqliteStore) CommitContext(ctx context.Context, clientID string, topic Topic, idx int) error

func (*SqliteStore) FetchNextContext

func (s *SqliteStore) FetchNextContext(ctx context.Context, clientID string, topic Topic, limit int) ([]Message, error)

type Store

type Store interface {
	AppendContext(ctx context.Context, topic Topic, msgs ...[]byte) error
	CommitContext(ctx context.Context, clientID string, topic Topic, idx int) error
	FetchNextContext(ctx context.Context, clientID string, topic Topic, limit int) ([]Message, error)
}

type Topic

type Topic string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL