dtf

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

pterergate-dtf

Pterergate-dtf (Pterergate Distributed Task Framework, PDTF) is a high-performance distributed task framework that supports parallelly scheduling thousands of running tasks deployed in a cluster consisting of tens thousands of nodes.

Go GoDoc Go Report Card GitHub

GitHub last commit (branch) GitHub commit activity (branch)

Install

go get github.com/danenmao/pterergate-dtf

Requirement

  1. MySQL

    PDTF uses a MySQL table tbl_task to store the information of created tasks. Users should provide a MySQL server, and create this table in a database.

    See the Usage part to known more.

  2. Redis

    PDTF uses Redis frequently to store kinds of intermediate data.Users should provide a Redis server.

    See the Usage part to known more.

Get Started

Read the Get Started wiki to get how to use PDTF.

Usage

  1. Implement ITaskGenerator, ITaskExecutor, ITaskSchedulerCallback and ITaskCollectorCallback. Users can perform their business logic in these interfaces.

    // implement taskmodel.ITaskGenerator
    type SampleGenerator struct{}
    
    // implement taskmodel.ITaskExecutor
    type SampleExecutor struct{}
    
    // implement taskmodel.ITaskSchedulerCallback
    type SampleSchedulerCallback struct{}
    
    // implement taskmodel.ITaskCollectorCallback
    type SampleCollectorCallback struct{}
    
  2. Implement a task plugin.

    // implement taskplugin.ITaskPlugin
    type SamplePlugin struct{
        PluginBody taskmodel.PluginBody
        PluginConf taskmodel.PluginConf
    }
    
    func (p * SamplePlugin) GetPluginConf(conf *taskmodel.PluginConf) error{
        *conf = p.PluginConf
        return nil
    }
    
    func  (p * SamplePlugin) GetPluginBody(body *taskmodel.PluginBody) error{
        *body = p.PluginBody
        return nil
    }
    
    var plugin = SamplePlugin{
        PluginBody: taskmodel.PluginBody{
            Generator: &SampleGenerator{},
            Executor: &SampleExecutor{},
            SchedulerCallback: &SampleSchedulerCallback{},
            CollectorCallback: &SampleCollectorCallback{},
        },
        PluginConf: taskmodel.PluginConf{
            IterationMode: taskmodel.IterationMode_No,
            TaskTypeTimeout: time.Hour,
        },
    }
    
  3. Register a task type.

    const SampleTaskType = 1
    register := taskplugin.TaskPluginRegistration{
        TaskType: SampleTaskType,
        Name: "SampleTaskType",
        Description: "a sample task type",
        PluginFactoryFn: func(p *ITaskPlugin) error{
            *p = &plugin
        }
    }
    
    err := dtf.RegisterTaskType(&register)
    
  4. Start the required dtf services.

    // start the task manager service
    err := dtf.StartService(
        dtfdef.ServiceRole_Manager, 
        dtf.WithMySQL(&extconfig.MySQLAddress{
            Name:"mysql", Type:"mysql", Protocol:"tcp", Address:"192.168.1.101:3306", Username:"servera", Password:"*", DB:"db_task",
        }),
        dtf.WithRedis(&extconfig.RedisAddress{
            Name:"redis", Type:"tcp", Address:"192.168.1.100:6380", Password:"*", DB:"0",
        }),
        dtf.WithMongoDB(&extconfig.MongoAddress{
            Address:"", Username:"", Password:"", Database:"", ReplicaSet:"",
        }),
    )
    
    // start the task generator service
    err := dtf.StartService(
        dtfdef.ServiceRole_Generator, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
    )
    
    // start the task scheduler service
    err := dtf.StartService(
        dtfdef.ServiceRole_Scheduler, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithExecutor(serversupport.ExecutorInvoker{...}.GetInvoker()),
    )
    
    // define the executor server
    executorSvr := serversupport.ExecutorServer{...}
    
    // start the executor service
    err := dtf.StartService(
        dtfdef.ServiceRole_Executor, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithRegisterExecutorHandler(executorSvr.GetRegister()),
        dtf.WithCollector(serversupport.CollectorInvoker{...}.GetInvoker()),
    )
    
    // start the executor server
    executorSvr.StartServer()
    
    // define the collector server
    collectorSvr := serversupport.CollectorServer{...}
    
    // start the collector service
    err := dtf.StartService(
        dtfdef.ServiceRole_Collector, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithRegisterCollectorHandler(collectorSvr.GetRegister()),
    )
    
    // start the collector server
    colletorSvr.StartServer()
    
  5. Create a task to perform your business operations.

    taskParam := taskmodel.TaskParam{
        ...
    }
    
    taskId, err := dtf.CreateTask(
        SampleTaskType,
        taskParam,
    )
    
  6. Wait for the running services to exit.

    dtf.Join()
    

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CancelTask

func CancelTask(taskId taskmodel.TaskIdType) error

cancel a running task

func CreateTask

func CreateTask(taskType uint32, param *taskmodel.TaskParam) (taskmodel.TaskIdType, error)

create a task

func GetTaskStatus

func GetTaskStatus(taskId taskmodel.TaskIdType, status *taskmodel.TaskStatusData) error

retrieve the task status

func Join

func Join() error

wait for the service to stop

func NotifyStop

func NotifyStop() error

notify to stop the service

func PauseTask

func PauseTask(taskId taskmodel.TaskIdType) error

pause a running task

func RegisterTaskType

func RegisterTaskType(register *taskplugin.TaskPluginRegistration) error

register a task type plugin

func ResumeTask

func ResumeTask(taskId taskmodel.TaskIdType) error

resume a paused task

func StartService

func StartService(role dtfdef.ServiceRole, opts ...ServiceOption) error

start the specified service

func Version

func Version() string

Types

type ServiceOption

type ServiceOption func(config *dtfdef.ServiceConfig)

用于设置服务配置

func WithCollector

func WithCollector(collector taskmodel.CollectorInvoker) ServiceOption

func WithExecutor

func WithExecutor(executor taskmodel.ExecutorInvoker) ServiceOption

func WithMongoDB

func WithMongoDB(mongo *extconfig.MongoAddress) ServiceOption

func WithMySQL

func WithMySQL(mysql *extconfig.MySQLAddress) ServiceOption

func WithPrestop

func WithPrestop(d time.Duration) ServiceOption

func WithRedis

func WithRedis(redis *extconfig.RedisAddress) ServiceOption

func WithRegisterExecutorHandler

func WithRegisterExecutorHandler(register taskmodel.RegisterExecutorRequestHandler) ServiceOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL