nsq

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2022 License: MIT Imports: 8 Imported by: 3

README

NSQ

Run Testing Go Report Card codecov

NSQ as backend with Queue package (A realtime distributed messaging platform)

screen

Setup

start the NSQ lookupd

nsqlookupd

start the NSQ server

nsqd --lookupd-tcp-address=localhost:4160

start the NSQ admin dashboard

nsqadmin --lookupd-http-address localhost:4161

Testing

go test -v ./...

Example

package main

import (
  "context"
  "encoding/json"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/nsq"
  "github.com/golang-queue/queue"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := nsq.NewWorker(
    nsq.WithAddr("127.0.0.1:4150"),
    nsq.WithTopic("example"),
    nsq.WithChannel("foobar"),
    // concurrent job number
    nsq.WithMaxInFlight(10),
    nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
      var v *job
      if err := json.Unmarshal(m.Bytes(), &v); err != nil {
        return err
      }
      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q := queue.NewPool(
    10,
    queue.WithWorker(w),
  )
  defer q.Release()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option interface {
	Apply(*Options)
}

An Option configures a mutex.

func WithAddr

func WithAddr(addr string) Option

WithAddr setup the addr of NSQ

func WithChannel

func WithChannel(channel string) Option

WithChannel setup the channel of NSQ

func WithLogger

func WithLogger(l queue.Logger) Option

WithLogger set custom logger

func WithMaxInFlight

func WithMaxInFlight(num int) Option

WithMaxInFlight Maximum number of messages to allow in flight (concurrency knob)

func WithRunFunc

func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option

WithRunFunc setup the run func of queue

func WithTopic

func WithTopic(topic string) Option

WithTopic setup the topic of NSQ

type OptionFunc added in v0.1.0

type OptionFunc func(*Options)

OptionFunc is a function that configures a queue.

func (OptionFunc) Apply added in v0.1.0

func (f OptionFunc) Apply(option *Options)

Apply calls f(option)

type Options added in v0.1.0

type Options struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker for NSQ

func NewWorker

func NewWorker(opts ...Option) *Worker

NewWorker for struc

func (*Worker) Queue

func (w *Worker) Queue(job core.QueuedMessage) error

Queue send notification to queue

func (*Worker) Request added in v0.1.0

func (w *Worker) Request() (core.QueuedMessage, error)

Request fetch new task from queue

func (*Worker) Run

func (w *Worker) Run(task core.QueuedMessage) error

Run start the worker

func (*Worker) Shutdown

func (w *Worker) Shutdown() error

Shutdown worker

func (*Worker) Stats added in v0.1.0

func (w *Worker) Stats() *nsq.ConsumerStats

Stats retrieves the current connection and message statistics for a Consumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL