redischeduler

package module
v0.0.0-...-908dd7b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2021 License: MIT Imports: 12 Imported by: 0

README

Redischeduler Build Status Coverage Status

Redischeduler is a distributed job scheduler written in Go.

Installation

go get github.com/jeayu/redischeduler

Getting Started

Creating scheduler
  • To create partitions, you need to configure the total partition size and the sharding for each partition
// the total partition size.
partitionSize := 2
// the sharding for each partition.
clientOptions := []*redis.Options{
    // partition 1
    {
        Addr: "localhost:6379",
        DB:   1,
    },
    // partition 2
    {
        Addr: "localhost:6379",
        DB:   1,
    },
}
partitionRedisConfigs := NewSchedulerPartitionRedisConfigs(clientOptions, partitionSize)
clients := NewPartitionRedisSlice(partitionRedisConfigs)
// creating partitions
partitions := NewPartitions(partitionSize, clients)
  • To create scheduler
scheduler := NewPartitionScheduler(partitions, nil, nil)
  • Schedule task
task := NewTask("SayHi", "world")
duration := 3 * time.Second
scheduler.ScheduleTask(task, duration)
Creating worker
  • To create a task worker, you need to define a task invoker and and create the partition in which the worker
partitionId := 1
partitionSize := 2
clientOptions := []*redis.Options{
     // partition 1
    {
        Addr: "localhost:6379",
        DB:   1,
    },
}
worker, err := NewSinglePartitionWorker(&SinglePartitionWorkerConfig{
    PartitionId:          partitionId,
    PartitionSize:        partitionSize,
    PartitionRedisConfig: NewPartitionRedisConfigs(clientOptions, partitionId),
    TaskInvoker: &TaskInvoker{
        Functions: map[string]reflect.Value{
            "SayHi": reflect.ValueOf(sayHi),
            "Say":   reflect.ValueOf(say),
        },
    },
})
  • Running worker
worker.Run()

Documentation

Index

Constants

View Source
const (
	StartPartition int = 1
	StartSharding  int = 1
)

Variables

This section is empty.

Functions

func PartitionShardingName

func PartitionShardingName(partitionId, shardingId int) string

func ShardingId

func ShardingId(partitionId, partitionShards, sharding int) int

Returns the shardingId Returns value range: [StartSharding, Size * PartitionShards] Note: The sharding parameter is greater than or equal to the constant StartSharding

Types

type Invoker

type Invoker interface {
	Call(task WorkerTask) (err error)
}

type Partition

type Partition struct {
	Id int
	// Size: preallocate partition sizes
	Size int
	// PartitionShards: number of shards per partition
	PartitionShards int
	// contains filtered or unexported fields
}

func NewPartitions

func NewPartitions(size int, clients []*PartitionRedis) *Partition

Initialize all partitions. Configure for the scheduler,redis client required. clients: redis client for each sharding

func NewSinglePartition

func NewSinglePartition(id, size, partitionShards int) *Partition

Initialize single partition. Worker configuration only,no redis client required. Id: partition id Size: preallocate partition sizes PartitionShards: number of shards per partition

func (*Partition) Client

func (p *Partition) Client(partitionId, shardingId int) *redis.Client

type PartitionChannel

type PartitionChannel struct {
	Channel chan WorkerTask
	// contains filtered or unexported fields
}

func NewPartitionChannel

func NewPartitionChannel(pr *PartitionRedis, sleep time.Duration, logger *log.Logger) *PartitionChannel

func (*PartitionChannel) Run

func (c *PartitionChannel) Run()

type PartitionRedis

type PartitionRedis struct {
	// contains filtered or unexported fields
}

-----------------------------------------

func NewPartitionRedis

func NewPartitionRedis(config *PartitionRedisConfig) *PartitionRedis

func NewPartitionRedisSlice

func NewPartitionRedisSlice(configs []*PartitionRedisConfig) []*PartitionRedis

func (*PartitionRedis) Node

func (p *PartitionRedis) Node() string

type PartitionRedisConfig

type PartitionRedisConfig struct {
	ClientOptions *redis.Options
	// PartitionId is greater than or equal to the constant StartPartition
	PartitionId int
	// ShardingId is greater than or equal to the constant StartSharding
	ShardingId int
}

-----------------------------------------

func NewPartitionRedisConfig

func NewPartitionRedisConfig(clientOptions *redis.Options, partitionShards, partitionId, sharding int) *PartitionRedisConfig

ClientOptions: Redis configuration of sharding PartitionShards: number of shards per partition PartitionId range: [StartPartition, ...] Sharding range: [StartSharding, ...]

func NewPartitionRedisConfigs

func NewPartitionRedisConfigs(clientOptions []*redis.Options, partitionId int) []*PartitionRedisConfig

func NewSchedulerPartitionRedisConfigs

func NewSchedulerPartitionRedisConfigs(clientOptions []*redis.Options, partitionSize int) []*PartitionRedisConfig

func (*PartitionRedisConfig) String

func (p *PartitionRedisConfig) String() string

type PartitionRules

type PartitionRules interface {
	// Partitioning task
	// Returns the partitionId and shardingId of the task.
	// PartitionId value range: [StartPartition, Size]
	// ShardingId value range: [StartSharding, Size * PartitionShards]
	Partitioning(task *Task) (int, int)
}

type PartitionScheduler

type PartitionScheduler struct {
	// contains filtered or unexported fields
}

func NewPartitionScheduler

func NewPartitionScheduler(partitions *Partition, partitionRules PartitionRules, logger *log.Logger) *PartitionScheduler

func (*PartitionScheduler) GetTaskCountdown

func (r *PartitionScheduler) GetTaskCountdown(task *Task) (duration time.Duration, err error)

func (*PartitionScheduler) RemoveTask

func (r *PartitionScheduler) RemoveTask(task *Task) (err error)

func (*PartitionScheduler) ScheduleTask

func (r *PartitionScheduler) ScheduleTask(task *Task, duration time.Duration) (err error)

type PartitionSchedulerRules

type PartitionSchedulerRules struct {
	TotalShards     int
	PartitionShards int
}

func (*PartitionSchedulerRules) Partitioning

func (p *PartitionSchedulerRules) Partitioning(task *Task) (int, int)

type Scheduler

type Scheduler interface {
	ScheduleTask(task *Task, duration time.Duration) (err error)
	RemoveTask(task *Task) (err error)
	GetTaskCountdown(task *Task) (duration time.Duration, err error)
}

type SinglePartitionWorker

type SinglePartitionWorker struct {
	Partition         *Partition
	PartitionChannels []*PartitionChannel
	TaskInvoker       Invoker
	Logger            *log.Logger
}

func NewSinglePartitionWorker

func NewSinglePartitionWorker(config *SinglePartitionWorkerConfig) (w *SinglePartitionWorker, err error)

func (*SinglePartitionWorker) Run

func (w *SinglePartitionWorker) Run()

func (*SinglePartitionWorker) RunFunc

func (s *SinglePartitionWorker) RunFunc(f func())

type SinglePartitionWorkerConfig

type SinglePartitionWorkerConfig struct {
	PartitionId          int
	PartitionSize        int
	PartitionRedisConfig []*PartitionRedisConfig
	TaskInvoker          Invoker
	Logger               *log.Logger
}

type Task

type Task struct {
	Function string
	Args     []interface{}
}

func NewTask

func NewTask(function string, args ...interface{}) *Task

func (*Task) Serialization

func (t *Task) Serialization() string

type TaskInvoker

type TaskInvoker struct {
	Functions map[string]reflect.Value
}

func (*TaskInvoker) Call

func (i *TaskInvoker) Call(workerTask WorkerTask) (err error)

type Worker

type Worker interface {
	Run()
}

type WorkerTask

type WorkerTask struct {
	TaskId string
}

func NewWorkerTask

func NewWorkerTask(taskId string) *WorkerTask

func (*WorkerTask) Deserialization

func (t *WorkerTask) Deserialization() (task *Task, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL