nats

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2022 License: MIT Imports: 8 Imported by: 3

README

nats

CodeQL Run Testing codecov Go Report Card

NATS as backend with Queue package (Connective Technology for Adaptive Edge & Distributed Systems)

Testing

go test -v ./...

Example

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nats"
  "github.com/golang-queue/queue"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nats.NewWorker(
    nats.WithAddr("127.0.0.1:4222"),
    nats.WithSubj("example"),
    nats.WithQueue("foobar"),
    nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      var v *job
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(10),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*options)

Option for queue system

func WithAddr

func WithAddr(addr string) Option

WithAddr setup the addr of NATS

func WithLogger

func WithLogger(l queue.Logger) Option

WithLogger set custom logger

func WithQueue

func WithQueue(queue string) Option

WithQueue setup the queue of NATS

func WithRunFunc

func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option

WithRunFunc setup the run func of queue

func WithSubj

func WithSubj(subj string) Option

WithSubj setup the subject of NATS

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker for NSQ

func NewWorker

func NewWorker(opts ...Option) *Worker

NewWorker for struc

func (*Worker) Queue

func (w *Worker) Queue(job core.QueuedMessage) error

Queue send notification to queue

func (*Worker) Request added in v0.0.7

func (w *Worker) Request() (core.QueuedMessage, error)

Request a new task

func (*Worker) Run

func (w *Worker) Run(task core.QueuedMessage) error

Run start the worker

func (*Worker) Shutdown

func (w *Worker) Shutdown() error

Shutdown worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL