jq

package module
v0.0.0-...-4832d39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2015 License: MIT Imports: 9 Imported by: 0

README

jq

jq is a simple job queue library in Go.

you can write your own backend queue & queue manager implementations to make jq a real distributed job queue :)

jq.NewJq(queueName, queueManager, workerFunc)

jq.Submit(bytes, onReturnValueFunc, onErrorFunc, isSync)

usage:


func workerFunc(input []byte, ret chan<- []byte, done chan<- struct{}, err chan<- error) {
	ret <- []byte("world")
	ret <- []byte("world")
	ret <- []byte("world")
	done <- struct{}{}
}

func TestEnqueue(t *testing.T) {
	jq := NewJq("test_queue", MemQueueManagerFactory(MemQFactory), MockWorkerFunc)
	go jq.DispatchForever()
	jq.Submit([]byte("hello"), func(ret []byte) {
		if !bytes.Equal(ret, []byte("world")) {
			t.Error("error")
		}
	}, nil, true)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultOpt = JqOptions{
	CocurrentWorkerNum: 10000,
	QueueCheckInterval: 100 * time.Millisecond,
}
View Source
var ErrEmpty = errors.New("queue is empty")
View Source
var ErrNotExists = errors.New("queue not exists")
View Source
var ErrTimeout = errors.New("timeout")

Functions

This section is empty.

Types

type Job

type Job struct {
	Id      string        `json:"id"`
	Data    []byte        `json:"data"`
	Timeout time.Duration `json:"timeout"`
	// contains filtered or unexported fields
}

type Jq

type Jq struct {
	// contains filtered or unexported fields
}

func NewJq

func NewJq(name string, queueMgr QueueManager, workerFunc WorkerFunc) *Jq

func NewJqWithOpt

func NewJqWithOpt(name string, queueMgr QueueManager, workerFunc WorkerFunc, opt JqOptions) *Jq

func NewMemJq

func NewMemJq(name string, workerFunc WorkerFunc) *Jq

func (*Jq) DispatchForever

func (jq *Jq) DispatchForever()

func (*Jq) Submit

func (jq *Jq) Submit(data []byte, onRet func([]byte), onErr func(error), sync bool)

func (*Jq) SubmitWithTimeout

func (jq *Jq) SubmitWithTimeout(data []byte, timeout time.Duration, onRet func([]byte), onErr func(error), sync bool)

type JqOptions

type JqOptions struct {
	QueueCheckInterval time.Duration
	CocurrentWorkerNum int
}

type MemQ

type MemQ struct {
	// contains filtered or unexported fields
}

func (*MemQ) Len

func (q *MemQ) Len() int

func (*MemQ) Name

func (q *MemQ) Name() string

func (*MemQ) Pop

func (q *MemQ) Pop() ([]byte, error)

func (*MemQ) Push

func (q *MemQ) Push(v []byte) error

type MemQueueManager

type MemQueueManager struct {
	// contains filtered or unexported fields
}

func MemQueueManagerFactory

func MemQueueManagerFactory(qfactory QueueFactory) *MemQueueManager

func (*MemQueueManager) Del

func (self *MemQueueManager) Del(name string) error

func (*MemQueueManager) Exists

func (self *MemQueueManager) Exists(name string) (bool, error)

func (*MemQueueManager) Get

func (self *MemQueueManager) Get(name string) (Queue, error)

func (*MemQueueManager) GetOrCreate

func (self *MemQueueManager) GetOrCreate(name string) (Queue, error)

type Msg

type Msg struct {
	Type MsgType `json:"type"`
	Data []byte  `json:"data"`
}

type MsgType

type MsgType int
const (
	MSG_RET MsgType = iota
	MSG_DONE
	MSG_ERR
)

type Queue

type Queue interface {
	Push([]byte) error
	Pop() ([]byte, error)
	Len() int
	Name() string
}

func MemQFactory

func MemQFactory(name string) Queue

func RedisQueueFactory

func RedisQueueFactory(name string) Queue

type QueueFactory

type QueueFactory func(name string) Queue

type QueueManager

type QueueManager interface {
	Exists(name string) (bool, error)
	Get(name string) (Queue, error)
	GetOrCreate(name string) (Queue, error)
	Del(name string) error
}

func RedisQueueManagerFactory

func RedisQueueManagerFactory(qfactory QueueFactory) QueueManager

type QueueManagerFactory

type QueueManagerFactory func(QueueFactory) QueueManager

type RedisQueue

type RedisQueue struct {
	// contains filtered or unexported fields
}

func NewRedisQueue

func NewRedisQueue(name string) *RedisQueue

func (*RedisQueue) Len

func (q *RedisQueue) Len() int

func (*RedisQueue) Name

func (q *RedisQueue) Name() string

func (*RedisQueue) Pop

func (q *RedisQueue) Pop() ([]byte, error)

func (*RedisQueue) Push

func (q *RedisQueue) Push(b []byte) error

type RedisQueueManager

type RedisQueueManager struct {
	// contains filtered or unexported fields
}

func (*RedisQueueManager) Del

func (m *RedisQueueManager) Del(name string) error

func (*RedisQueueManager) Exists

func (m *RedisQueueManager) Exists(name string) (bool, error)

func (*RedisQueueManager) Get

func (m *RedisQueueManager) Get(name string) (Queue, error)

func (*RedisQueueManager) GetOrCreate

func (m *RedisQueueManager) GetOrCreate(name string) (Queue, error)

type WorkerFunc

type WorkerFunc func(input []byte, ret chan<- []byte, done chan<- struct{}, err chan<- error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL