executer

package module
v0.0.0-...-6a4b313 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 9 Imported by: 0

README

serviceengine

Golang based Consumer-Producer Engine

This library allows the simple in memory way to produce messages to a topic/s and allows the consumers to be set to receive messages, either from a single topic or multiple.

Introduction

Service Engine is a typical Pub/Sub way to request a service, optionally wait for a reponse, write a code to be invoked based on a Request and send results back to the requestor.

ENGINE instance is created first time the import of this package happens.

A typical way to use the library

Usually, a dependent package imports the service engine initilizing the ENGINE. Then, we can create a Consumer like below:

nc := NewConsumer(func(request Request) {
	log.Printf("message is %+v", request)
  // Do something and prepare a response......
	res := NewResponse(request.GetID(), request.ExternalID)
	request.Respond(*res)
	return
})

Now, add this consumer to listen on a topic: (One can query the engine if there is already a consumer for a topic by HasConsumer method).

ENGINE.AddConsumer("a newtopic", *nc)

Then, somehere in your code we can create a Request -

nreq := NewRequest("mybadid")
nreq.Message = []byte("some message")

And now, send the request for execution like:

ENGINE.Execute("a newtopic", *nreq)

You will notice that the Consumer code will get called and it would reponse to Resonse channel inside the Request object.

One can read the response in two ways: Just do-

ress := nreq.Receive()

OR

We can create a seperate Response Channel and as in-

var reschan chan Response
//buffer capacity of 10
reschan = make(chan Response, 10)

And create a Request with this common channel-

nreq := NewRequestWith("mybadid", reschan)

In that case, one can invoke a Go routing to listen to all responses as in-

go func(reschan <-chan Response, resn int) {
	n := 0
	for {
		if n == resn {
			log.Printf("Received %d", n)
			return
		}
		_, ok := <-reschan
		if ok {
			n++
		} else {
			log.Printf("Received %d", n)
			return
		}
	}
}(reschan, sentcount)

This is a sample code which exits either when all (sentcount) messages are received or Response Channel is closed.

More complex cases are discussed in Test Cases.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BUILDJSON []byte

Functions

This section is empty.

Types

type BuildInfo

type BuildInfo struct {
	Service struct {
		Name     string `json:"name"`
		Version  string `json:"version"`
		CommitId string `json:"commitId"`
	} `json:"service"`
	Components []struct {
		Name     string `json:"name"`
		Version  string `json:"version"`
		CommitId string `json:"commitId"`
	} `json:"components"`
}
var BUILDINFO BuildInfo

type Consumer

type Consumer struct {
	Exit chan bool
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(handler func(request Request)) *Consumer

func (Consumer) GetID

func (c Consumer) GetID() uuid.UUID

func (*Consumer) Set

func (c *Consumer) Set(handler func(request Request))

type Engine

type Engine interface {
	Execute(topic string, request Request) error
	AddConsumer(topic string, listener Consumer)
	RemoveConsumer(topic string, consunerid uuid.UUID) bool
	HasConsumers(topic string) bool
	GetConsumers(topic string) []Consumer
	Start() error
	Stop() error
}
var ENGINE Engine

func NewEngine

func NewEngine() Engine

type Message

type Message []byte

type MessageBody

type MessageBody struct {
	ExternalID string
	Message    Message
	Status     Status
	// contains filtered or unexported fields
}

func (*MessageBody) AsJSON

func (m *MessageBody) AsJSON(object interface{}) error

func (MessageBody) FromJSONInto

func (m MessageBody) FromJSONInto(object interface{}) error

func (MessageBody) GetID

func (m MessageBody) GetID() uuid.UUID

type Request

type Request struct {
	MessageBody
	// contains filtered or unexported fields
}

func NewRequest

func NewRequest(externalID string) *Request

func NewRequestWith

func NewRequestWith(externalID string, reschan chan Response) *Request

func (*Request) GetReceiver

func (req *Request) GetReceiver() <-chan Response

func (*Request) GetResponder

func (req *Request) GetResponder() chan<- Response

func (*Request) Receive

func (req *Request) Receive() Response

func (*Request) Respond

func (req *Request) Respond(res Response)

func (*Request) SetResponder

func (req *Request) SetResponder(reschan chan Response)

type Response

type Response struct {
	MessageBody
}

func NewResponse

func NewResponse(requestid uuid.UUID, externalID string) *Response

type Status

type Status int
const (
	StatusNew       Status = iota
	StatusSent      Status = iota
	StatusReceived  Status = iota
	StatusError     Status = iota
	StatusCompleted Status = iota
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL