drift

package module
v0.0.0-...-9477886 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2019 License: MIT Imports: 17 Imported by: 0

README

Godocs Go Report Card Open Source Helpers Release

# drift NSQ Producer/Consumer integration to drift your request smoothly. Add/Kill consumer over http request on any topic. Publish new request over http on any nsqd.

STEPS TO RUN drift

  1. install nsq
  2. go get github.com/mayur-tolexo/drift
  3. cd $GOPATH/src/github.com/mayur-tolexo/drift
  4. install godep
  5. godep restore
  6. go run example/drift.go
  7. [in new tab] cd $GOPATH/src/github.com/mayur-tolexo/drift
  8. go run nsqlookup/nsqlookup.go
  9. [in new tab] cd $GOPATH/src/github.com/mayur-tolexo/drift
  10. go run nsqd/nsqd.go --lookupd-tcp-address=127.0.0.1:4160
  11. [in new tab] cd $GOPATH/src/github.com/mayur-tolexo/drift
  12. go run example/producer.go
  13. add new consumer as mention below
  14. start admin as mention below
  15. open http://127.0.0.1:4171/ in browser

START ADMIN

POST localhost:1500/drift/v1/start/admin/

{
  "lookup_http_address": ["127.0.0.1:4161"],
  "http_address":"localhost:4171",
  "user": ["drift-user"],
  "acl_http_header": "admin-user"
}

STOP ADMIN

GET localhost:1500/drift/v1/stop/admin/

ADD NEW CONSUMER

POST localhost:1500/drift/v1/add/consumer/

{
  "lookup_http_address": [
    "127.0.0.1:4161"
  ],
  "topic_detail": [
    {
      "topic": "elastic",
      "channel": "v2.1",
      "count": 1
    },
    {
      "topic": "elastic",
      "channel": "v6.2",
      "count": 2
    }
  ],
  "max_in_flight": 200
}

COUNT CONSUMERS OF A TOPIC ON SPECIFIC CHANNEL

GET localhost:1500/drift/v1/consumer/?topic=elastic&channel=v2.1

COUNT ALL CONSUMERS OF A TOPIC

GET localhost:1500/drift/v1/consumer/?topic=elastic

KILL CONSUMER

POST localhost:1500/drift/v1/kill/consumer/

{
  "topic": "elastic",
  "channel": "v2.1",
  "count":1
}

PUBLISH REQUEST

POST localhost:1500/drift/v1/pub/request/

{
  "nsqd_tcp_address": ["127.0.0.1:4150"],
  "topic": "elastic",
  "data": "This is a test over http"
}

Example

//printIT : function which consumer will call to execute
func printIT(value ...interface{}) error {
  fmt.Println(value)
  return nil
}

func main() {
  d := drift.NewConsumer(printIT)
  d.Start(1500)
}

Handler

handler is a function to which the consumer will call.

FUNCTION DEFINATION:
func(value ...interface{}) error

Here PrintIT is a handler function. Define your own handler and pass it in the drift and you are ready to go.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartAdmin

func StartAdmin(lookupHTTPAddr []string, httpAddrs string)

StartAdmin will start admin

Types

type AddAdmin

type AddAdmin struct {
	AdminUser                []string `json:"user"`
	HTTPAddrs                string   `json:"http_address"`
	LookupHTTPAddr           []string `json:"lookup_http_address"`
	NsqDTCPAddrs             []string `json:"nsqd_tcp_address"`
	ACLHTTPHeader            string   `json:"acl_http_header"`
	NotificationHTTPEndpoint string   `json:"notification_http_endpoint"`
}

AddAdmin is the add admin request

type AddConstumer

type AddConstumer struct {
	LookupHTTPAddr []string    `json:"lookup_http_address"`
	NsqDTCPAddrs   []string    `json:"nsqd_tcp_address"`
	Topic          []TopicData `json:"topic_detail"`
	MaxInFlight    int         `json:"max_in_flight"`
	StartAdmin     bool        `json:"start_admin"`
}

AddConstumer is the request format of add consumer api

type Admin

type Admin struct {
	Topic   string `json:"topic"`
	Channel string `json:"channel"`
	Action  string `json:"action"`
}

Admin is the request format of admin api to permorm action. allowed actions are - create/empty/delete/pause/unpause

type Drift

type Drift struct {
	Server aqua.RestServer
	// contains filtered or unexported fields
}

Drift have the consumer/publisher model

func NewConsumer

func NewConsumer(jobHandler JobHandler) *Drift

NewConsumer will create new consumer

Example

New consumer created with handel to call by the consumer. This will start new server to receive request over HTTP

package main

import (
	"fmt"

	"github.com/mayur-tolexo/drift"
)

func printIT(value ...interface{}) error {
	fmt.Println("In 1st Print", value)
	return nil
}

func printIT2(value ...interface{}) error {
	fmt.Println("In 2nd Print", value)
	return nil
}

func printIT3(value ...interface{}) error {
	fmt.Println("In 3rd Print", value)
	return nil
}

// New consumer created with handel to call by the consumer.
// This will start new server to receive request over HTTP
func main() {
	//Default handler is printIT
	d := drift.NewConsumer(printIT)

	// This will map a new handeler with specified topic's channel
	d.AddChanelHandler("elastic", "v6.2", printIT2)

	// This will map a new handeler with all channels of the specified topic.
	// If a channelHandler is already mapped with any channel of the specified topic then that handler will be called
	// and in rest of the channel this handler will be called.
	d.AddTopicHandler("elastic", printIT3)

	//port assign here is 1500
	d.Start(1500)
}
Output:

func NewPub

func NewPub(nsqDTCPAddrs []string) *Drift

NewPub will create new publisher

Example

new pub created to publish message to nsqd

msg := flag.String("msg", "Hi this is a test", "Message to broadcast")
flag.Parse()
nsqdTCPAddrs := []string{"127.0.0.1:4150"}
d := drift.NewPub(nsqdTCPAddrs)
topic := "elastic"
if resp, err := d.Publish(topic, *msg); err == nil {
	fmt.Println(resp)
} else {
	fmt.Println(err.Error())
}
Output:

func (*Drift) AddChanelHandler

func (d *Drift) AddChanelHandler(topic, channel string, jobHandler JobHandler)

AddChanelHandler will add a new handler with the channel of given topic

func (*Drift) AddConsumer

func (d *Drift) AddConsumer(payload AddConstumer) (data interface{}, err error)

AddConsumer will process add consumer request

func (*Drift) AddTopicHandler

func (d *Drift) AddTopicHandler(topic string, jobHandler JobHandler)

AddTopicHandler will add a new handler with the given topic

func (*Drift) Publish

func (d *Drift) Publish(topic string, data interface{}) (resp interface{}, err error)

Publish will broadcast the data to the nsqd

func (*Drift) SetLogger

func (d *Drift) SetLogger(l *log.Logger, lvl LogLevel)

SetLogger logger

func (*Drift) Start

func (d *Drift) Start(port int)

Start will start the drift server

type JobHandler

type JobHandler func(value ...interface{}) error

JobHandler function which will be called

type KillConsumer

type KillConsumer struct {
	Topic   string `json:"topic"`
	Channel string `json:"channel"`
	Count   int    `json:"count"`
}

KillConsumer is the request format of kill consumer api

type LogLevel

type LogLevel int

LogLevel specifies the severity of a given log message

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarning
	LogLevelError
)

Log levels

type Publish

type Publish struct {
	NsqDTCPAddrs []string    `json:"nsqd_tcp_address"`
	Topic        string      `json:"topic"`
	Data         interface{} `json:"data"`
}

Publish is the request format of publish request api

type TopicData

type TopicData struct {
	Topic   string `json:"topic"`
	Channel string `json:"channel"`
	Count   int    `json:"count"`
}

TopicData will contail the topic details

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL