mq

package module
v0.0.0-...-7b93327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2022 License: Apache-2.0 Imports: 2 Imported by: 1

README

GoDoc

go-mq

MQ is go API to communicate with messaging queue system.

Index

Getting Started

To use this library

go get -u github.com/KurioApp/go-mq
Support and Contribution

To contribute to this project, you could submit a Pull Request (PR) or just file an issue.

Example

MQ is a go API to communicate with messaging queue system like RabbitMQ, Google Pubsub, Redis Pubsub, Kafka etc. But unfortunately, currently we still support only for Google Pubsub.

Google Pubsub

To use the Google Pubsub messaging queue system, you could follow the example below.

package main

import (
	"context"
	"log"

	mq "github.com/KurioApp/go-mq"
	pubsub "github.com/KurioApp/go-mq/pubsub"
	"google.golang.org/api/option"
)

func main() {
	ctx := context.Background()

	pbClient, err := pubsub.New(ctx, "pubsub-project-id", option.WithCredentialsFile("./credential_file.json"))
	if err != nil {
		log.Fatal(err)
	}

	// Ensure the topic is exist
	topic, err := pbClient.EnsureTopic(context.TODO(), "topic-name")
	if err != nil {
		log.Fatal(err)
	}

	// Publish a Message to Pubsub
	pbClient.Publish("topic-name", []byte(`{"message": "hello world"}`))

	// Ensure the susbcription is exist in the topic
	_, err = pbClient.EnsureSubscription(context.TODO(), topic, "subscription-id")
	if err != nil {
		log.Fatal(err)
	}

	// Subscribe to a channel
	go func() {
		pbClient.Subscribe(context.Background(), "subscription-id", &IncomingMsgHandler{})
	}()
}

type IncomingMsgHandler struct {
}

// Handle is implementation of Handler interface from mq.Handler
func (i *IncomingMsgHandler) Handle(m mq.Message) {
	// Handle Incoming Message Here
}

Documentation

Overview

Package mq provides standard way to deal with messaging queue system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithMaxOutstanding

func ContextWithMaxOutstanding(ctx context.Context, v int) context.Context

ContextWithMaxOutstanding decorate ctx with maxOutstanding value.

func ContextWithNumWorkers

func ContextWithNumWorkers(ctx context.Context, v int) context.Context

ContextWithNumWorkers decorate ctx with numWorkers value.

func MaxOutstandingFromContext

func MaxOutstandingFromContext(ctx context.Context) (int, bool)

MaxOutstandingFromContext get value of maxOutstanding from context.

func NumWorkersFromContext

func NumWorkersFromContext(ctx context.Context) (int, bool)

NumWorkersFromContext get value of numWorkers from context.

Types

type AckErrNotifier

type AckErrNotifier interface {
	NotifyAckErr(Message, error)
}

AckErrNotifier provides method NotifyAckErr, invokes when Ack failed.

type AckHandler

type AckHandler interface {
	// HandleAck handle message that automatically ack when nil error returned.
	HandleAck(Message) error
}

AckHandler handle message and ack automatically.

type Handler

type Handler interface {
	Handle(Message)
}

Handler is the message handler.

func AutoAck

func AutoAck(h AckHandler) Handler

AutoAck creates Handler from AckHandler.

AutoAck will handle the message, when err returned by AckHandler then it will m.Nack() otherwise m.Ack(). To know the successful of Ack and Nack, the h should also implement the AckErrNotifier and NackErrNotifier. Implementing these interfaces are optional.

type HandlerFunc

type HandlerFunc func(Message)

HandlerFunc is the function adapter of Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(m Message)

Handle invoke f(msg).

type Message

type Message interface {
	ID() string
	Body() []byte
	Timestamp() time.Time
	Ack() error
	Nack() error
}

Message represent the message from server.

Method Ack or Nack should be invoked.

type NackErrNotifier

type NackErrNotifier interface {
	NotifyNackErr(Message, error)
}

NackErrNotifier provides method NotifyNackErr, invokes when Nack failed.

type PublishResult

type PublishResult interface {
	Get(context.Context) (id string, err error)
	Ready() <-chan struct{}
}

PublishResult is results of the publish.

type Publisher

type Publisher interface {
	Publish(topic string, msg []byte) PublishResult
}

Publisher is the interface that wraps the basic Publish method.

Publish publishes the msg to specific topic.

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, channel string, h Handler) error
}

Subscriber is the interface that wraps the basic Subscribe method.

Subscribe subscribes to a channel for receive messages. The message will be passed to h Handler. Message handling need to be Ack or Nack. Leave the Nack might block the receiving until timeout (the behaviour will differ based on implementation).

Directories

Path Synopsis
Package pubsub provides Google Pub/Sub related implementation.
Package pubsub provides Google Pub/Sub related implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL