Documentation ¶
Index ¶
- Constants
- func PartitionShardingName(partitionId, shardingId int) string
- func ShardingId(partitionId, partitionShards, sharding int) int
- type Invoker
- type Partition
- type PartitionChannel
- type PartitionRedis
- type PartitionRedisConfig
- func NewPartitionRedisConfig(clientOptions *redis.Options, partitionShards, partitionId, sharding int) *PartitionRedisConfig
- func NewPartitionRedisConfigs(clientOptions []*redis.Options, partitionId int) []*PartitionRedisConfig
- func NewSchedulerPartitionRedisConfigs(clientOptions []*redis.Options, partitionSize int) []*PartitionRedisConfig
- type PartitionRules
- type PartitionScheduler
- type PartitionSchedulerRules
- type Scheduler
- type SinglePartitionWorker
- type SinglePartitionWorkerConfig
- type Task
- type TaskInvoker
- type Worker
- type WorkerTask
Constants ¶
const ( StartPartition int = 1 StartSharding int = 1 )
Variables ¶
This section is empty.
Functions ¶
func PartitionShardingName ¶
func ShardingId ¶
Returns the shardingId Returns value range: [StartSharding, Size * PartitionShards] Note: The sharding parameter is greater than or equal to the constant StartSharding
Types ¶
type Invoker ¶
type Invoker interface {
Call(task WorkerTask) (err error)
}
type Partition ¶
type Partition struct { Id int // Size: preallocate partition sizes Size int // PartitionShards: number of shards per partition PartitionShards int // contains filtered or unexported fields }
func NewPartitions ¶
func NewPartitions(size int, clients []*PartitionRedis) *Partition
Initialize all partitions. Configure for the scheduler,redis client required. clients: redis client for each sharding
func NewSinglePartition ¶
Initialize single partition. Worker configuration only,no redis client required. Id: partition id Size: preallocate partition sizes PartitionShards: number of shards per partition
type PartitionChannel ¶
type PartitionChannel struct { Channel chan WorkerTask // contains filtered or unexported fields }
func NewPartitionChannel ¶
func NewPartitionChannel(pr *PartitionRedis, sleep time.Duration, logger *log.Logger) *PartitionChannel
func (*PartitionChannel) Run ¶
func (c *PartitionChannel) Run()
type PartitionRedis ¶
type PartitionRedis struct {
// contains filtered or unexported fields
}
-----------------------------------------
func NewPartitionRedis ¶
func NewPartitionRedis(config *PartitionRedisConfig) *PartitionRedis
func NewPartitionRedisSlice ¶
func NewPartitionRedisSlice(configs []*PartitionRedisConfig) []*PartitionRedis
func (*PartitionRedis) Node ¶
func (p *PartitionRedis) Node() string
type PartitionRedisConfig ¶
type PartitionRedisConfig struct { ClientOptions *redis.Options // PartitionId is greater than or equal to the constant StartPartition PartitionId int // ShardingId is greater than or equal to the constant StartSharding ShardingId int }
-----------------------------------------
func NewPartitionRedisConfig ¶
func NewPartitionRedisConfig(clientOptions *redis.Options, partitionShards, partitionId, sharding int) *PartitionRedisConfig
ClientOptions: Redis configuration of sharding PartitionShards: number of shards per partition PartitionId range: [StartPartition, ...] Sharding range: [StartSharding, ...]
func NewPartitionRedisConfigs ¶
func NewPartitionRedisConfigs(clientOptions []*redis.Options, partitionId int) []*PartitionRedisConfig
func NewSchedulerPartitionRedisConfigs ¶
func NewSchedulerPartitionRedisConfigs(clientOptions []*redis.Options, partitionSize int) []*PartitionRedisConfig
func (*PartitionRedisConfig) String ¶
func (p *PartitionRedisConfig) String() string
type PartitionRules ¶
type PartitionScheduler ¶
type PartitionScheduler struct {
// contains filtered or unexported fields
}
func NewPartitionScheduler ¶
func NewPartitionScheduler(partitions *Partition, partitionRules PartitionRules, logger *log.Logger) *PartitionScheduler
func (*PartitionScheduler) GetTaskCountdown ¶
func (r *PartitionScheduler) GetTaskCountdown(task *Task) (duration time.Duration, err error)
func (*PartitionScheduler) RemoveTask ¶
func (r *PartitionScheduler) RemoveTask(task *Task) (err error)
func (*PartitionScheduler) ScheduleTask ¶
func (r *PartitionScheduler) ScheduleTask(task *Task, duration time.Duration) (err error)
type PartitionSchedulerRules ¶
func (*PartitionSchedulerRules) Partitioning ¶
func (p *PartitionSchedulerRules) Partitioning(task *Task) (int, int)
type SinglePartitionWorker ¶
type SinglePartitionWorker struct { Partition *Partition PartitionChannels []*PartitionChannel TaskInvoker Invoker Logger *log.Logger }
func NewSinglePartitionWorker ¶
func NewSinglePartitionWorker(config *SinglePartitionWorkerConfig) (w *SinglePartitionWorker, err error)
func (*SinglePartitionWorker) Run ¶
func (w *SinglePartitionWorker) Run()
func (*SinglePartitionWorker) RunFunc ¶
func (s *SinglePartitionWorker) RunFunc(f func())
type TaskInvoker ¶
func (*TaskInvoker) Call ¶
func (i *TaskInvoker) Call(workerTask WorkerTask) (err error)
type WorkerTask ¶
type WorkerTask struct {
TaskId string
}
func NewWorkerTask ¶
func NewWorkerTask(taskId string) *WorkerTask
func (*WorkerTask) Deserialization ¶
func (t *WorkerTask) Deserialization() (task *Task, err error)