bmw

package module
v0.0.0-...-146c1f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2018 License: MIT Imports: 11 Imported by: 0

README

Being Micro Worker

SingleBMW

This worker uses Go-Channel as queue, Gin as API engine.

MultiBMW

This struct could create multiple worker pool on different URIs. Each worker pool has its own message Waiter and Pusher.

Usage

See worker/resource folder.

func main() {
	flag.Parse()
	loadConfig("./", "config")

	handler := &handler{}
	batchHandler := &batchHandler{}

	routes := []bmw.SingleRoute{
		bmw.SingleRoute{
			Path:    "service/resource/upload",
			Topic:   "upload",
			Handler: handler,
		},
		bmw.SingleRoute{
			Path:    "service/resource/upload/batch",
			Topic:   "upload-batch",
			Handler: batchHandler,
		},
	}
	handlerRoute := new(bmw.MultiHandlerRoutes)
	handlerRoute.Host = listenPort
	handlerRoute.Routes = routes

	goChan := bmw.GoChannelConfig{}
	bmw := bmw.NewMultiBMW(goChan, handlerRoute)

	engine := bmw.GetAPI().GetEngine()
	engine.GET("service/resource/key", findKeyQuery)

	bmw.Start()
}

type handler struct {
}

func (h *handler) Handle(job bmw.RetryJob) error {
	return handleRequest(&job)
}

type batchHandler struct {
}

func (h *batchHandler) Handle(job bmw.RetryJob) error {
	fmt.Println(job.ID)
	return nil
}

To send a job: curl localhost:8085/service/resource/upload/batch -d "test"

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxRetry   = uint8(3)
	RetryError = errors.New("need retry")
)

Functions

This section is empty.

Types

type DefaultAPI

type DefaultAPI struct {
	// contains filtered or unexported fields
}

DefaultAPI receives payload only from request body then push it into queue

func NewDefaultAPI

func NewDefaultAPI(host string, routes []*RouteConfig) *DefaultAPI

func (*DefaultAPI) GetEngine

func (api *DefaultAPI) GetEngine() *gin.Engine

GetEngine returns the gin.Engine, so extra path handler can be added

func (*DefaultAPI) StartServe

func (api *DefaultAPI) StartServe() error

type Dispatcher

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	WorkerPool chan chan RetryJob
	Config     *DispatcherConfig
	// contains filtered or unexported fields
}

Dispatcher manages the workers

func NewDispatcher

func NewDispatcher(handler Handler, waiter Waiter, config *DispatcherConfig) *Dispatcher

NewDispatcher makes a dispatcher

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run generates workers, make them start working

type DispatcherConfig

type DispatcherConfig struct {
	MaxWorkers int
	WorkerRate int
}

type GoChannelConfig

type GoChannelConfig struct {
}

type Handler

type Handler interface {
	Handle(job RetryJob) error
}

Handler handles payloadJob

type HandlerRoute

type HandlerRoute struct {
	Host    string // for api
	Path    string // for api
	Topic   string // for queue (pusher, waiter)
	Handler Handler
}

HandlerRoute is likely deprecated

type MultiBMW

type MultiBMW struct {
	// contains filtered or unexported fields
}

func NewMultiBMW

func NewMultiBMW(qConfig interface{}, routes *MultiHandlerRoutes) *MultiBMW

func NewSingleBMW

func NewSingleBMW(qConfig interface{}, route *HandlerRoute) *MultiBMW

func (*MultiBMW) GetAPI

func (b *MultiBMW) GetAPI() *DefaultAPI

GetAPI return the api

func (*MultiBMW) Start

func (b *MultiBMW) Start()

type MultiHandlerRoutes

type MultiHandlerRoutes struct {
	Host   string
	Routes []SingleRoute
}

type Parser

type Parser interface {
	Parse(*http.Request) interface{}
}

Parser parse request to PayloadJob

type Publisher

type Publisher interface {
	Register() error
	Unregister() error
}

Publisher publishes worker ip:port and status to etcd

type Pusher

type Pusher interface {
	Push([]byte) error
}

Pusher push raw data from API request to queue

type PusherWaiter

type PusherWaiter interface {
	Pusher
	Waiter
}

PusherWaiter is both pusher and waiter

type RetryJob

type RetryJob struct {
	ID      string `json:"id"`
	Retry   uint8  `json:"retry"`
	Payload []byte `json:"payload"`
}

RetryJob represents a job pushed into queue

type RouteConfig

type RouteConfig struct {
	// contains filtered or unexported fields
}

type Serializer

type Serializer interface {
	Serialize(interface{}) []byte
	Unserialize([]byte) interface{}
}

Serializer serialize the job, therefore which can be pushed into queue

type ServiceInfo

type ServiceInfo struct {
	ServiceName string `json:"serviceName"`
	Host        string `json:"host"`
	Privilege   int    `json:"privilege"`
	Key         string `json:"key"`
}

ServiceInfo represents the infomation that Publisher will store at the Registry

type SingleRoute

type SingleRoute struct {
	Path    string // for api
	Topic   string // for queue (pusher, waiter)
	Handler Handler
}

type Waiter

type Waiter interface {
	Wait() []byte
}

Waiter waits for job from queue

type Worker

type Worker struct {
	ID         string
	WorkerPool chan chan RetryJob
	JobChannel chan RetryJob
	// contains filtered or unexported fields
}

Worker represents the worker that executes the job

func NewWorker

func NewWorker(ID string, workerPool chan chan RetryJob, handler Handler, concurrency uint64, async bool) Worker

func (*Worker) Start

func (w *Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it

func (*Worker) Stop

func (w *Worker) Stop()

Stop signals the worker to stop listening for work requests.

Directories

Path Synopsis
Package lib is a generated protocol buffer package.
Package lib is a generated protocol buffer package.
worker
geo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL