transcoder

package module
v0.0.0-...-1774433 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2021 License: MIT Imports: 14 Imported by: 0

README

transcoder

Golang video transcode workers to run FFMPEG commands using os/exec. Created with multiple server instances in mind.

Optionally uses rmq (Redis message queue) as a distributed worker job queue: https://github.com/adjust/rmq

Creating a new job

Jobs must have a valid Preset to run. JobParams (map[string]string) can be attached before submission to replace {{placeholder}} strings in a Preset.

	preset := &transcoder.Preset{
		Path: "ffmpeg",
		Args: []string{"-y", "-progress", "-", "-nostats", "-i", "{{input}}", "{{output}}"},
	}

	params := map[string]string{
		"input":  "input.mp4",
		"output": "output.mp4",
	}

	job := transcoder.NewJob(preset, params)

Getting jobs from an rmq.Queue

A NewDirector in the queue package creates a worker pool and subscribes to a rmq.Queue

	redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs: []string{"localhost:6379"},
	})

	jobUpdatesChan := make(chan *transcoder.JobStatus, 100)
	director, err := queue.NewDirector(QueueName, WorkerNum, redisClient, jobUpdatesChan)
	if err != nil {
		log.Fatalf("Could not create director %v", err)
	}

Creating a standalone worker pool

If you don't want to use an rmq.Queue, you can instead create your own worker pool.

        // Queue to submit new jobs
	jobQueue := make(chan *transcoder.Job, 100)

	// Chan for status updates from workers (alternatively, can pass nil)
	jobUpdatesChan := make(chan *transcoder.JobStatus, 100)
	for i := 0; i < WorkerNum; i++ {
		transcoder.NewWorker(jobQueue, jobUpdatesChan)
	}

	// Read updates
	go func() {
		for update := range jobUpdatesChan {
			log.Printf("%v Status: %s %s", update.Job.ID, update.Status, update.Message)
		}
	}()

    ...

	// Send new jobs to the pool
	newJob := &transcoder.Job{Preset: &transcoder.Preset{}}
	jobQueue <- newJob

	// Block until job finishes
	newJob.Wait()

Examples

Example servers, workers, and cli in /cmd

Standalone http server

cmd/server Listen for incoming JobSubmission requests and submit them to the worker pool. Handles running, killing, and getting job status.

CLI

cmd/cli Serially run many jobs directly using a single Preset. flag args are used for JobParams input and output.

CLI with pool

cmd/cli_pool Batch many ffmpeg jobs using a single Preset. flag args are used for JobParams input and output.

Server with rmq.Queue and DB saves

cmd/rmq_server Listen for incoming JobSubmission requests and add them to rmq.Queue to via queue.Director. Saves any JobStatus messages to the db

Worker with rmq.Queue and DB saves

cmd/rmq_worker_db Listen to rmq.Queue to via queue.Director and run consumed jobs. Saves any JobStatus messages to the db

Worker with rmq.Queue

cmd/rmq_worker Listen to rmq.Queue to via queue.Director and run consumed jobs.

Documentation

Index

Constants

View Source
const (
	JobStatusSubmitted  = "submitted"
	JobStatusInProgress = "inProgress"
	JobStatusDone       = "done"
	JobStatusFailed     = "failed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	ID            uuid.UUID `json:"id" gorm:"type:uuid;index"`
	CreatedAt     time.Time `json:"createdAt"`
	Status        string    `json:"status" gorm:"index"`
	PresetID      uuid.UUID `json:"presetId"`
	Preset        *Preset   `json:"preset,omitempty" gorm:"-"`
	Params        JobParams `json:"params" gorm:"type:jsonb"`
	CommandOutput string    `json:"commandOutput" gorm:"type:text"`
	// contains filtered or unexported fields
}

func NewJob

func NewJob(preset *Preset, params JobParams) *Job

NewJob - create new job with filled defaults

func (*Job) Done

func (job *Job) Done() <-chan struct{}

Done - channel that blocks until process has finished

func (*Job) Err

func (job *Job) Err() error

Err - exec related errors. nil until process exits

func (*Job) ErrOutput

func (job *Job) ErrOutput() []string

ErrOutput - return any messages collected from stderr

func (*Job) Info

func (job *Job) Info() string

InfoString - return json string of all collected info from exec.Cmd process

func (*Job) Kill

func (job *Job) Kill() error

Kill a running process

func (*Job) Output

func (job *Job) Output() []string

Output - return any messages collected from stdin

func (*Job) Reset

func (job *Job) Reset()

Reset - reset job to pre-run state

func (*Job) Run

func (job *Job) Run() error

Run - execute job cmd and collect output will block until job has exited

func (*Job) Wait

func (job *Job) Wait()

Wait - block until job has finished

type JobParams

type JobParams map[string]string

JobParams - Custom map[string]string for postgres jsob compatibility

func (*JobParams) Scan

func (jp *JobParams) Scan(value interface{}) error

Scan - allow retrieving of jsonb -> JobParams

func (JobParams) Value

func (jp JobParams) Value() (driver.Value, error)

Value - allow saving JobParams as jsonb

type JobStatus

type JobStatus struct {
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
	Job     *Job   `json:"job"`
}

type Preset

type Preset struct {
	ID            uuid.UUID  `json:"id"`
	Description   string     `json:"description"`
	Path          string     `json:"path"` // Executable path
	PresetGroupID *uuid.UUID `json:"presetGroupId,omitempty"`

	// Arguments to pass to executable
	// Any arguments that should be replaced by job Params should be delimited by "{{" and "}}"
	// Example: {{input}} will get replaced if "input" is present in job.Params map
	Args []string `json:"args"`
}

Preset - A single command + args to exec

type PresetGroup

type PresetGroup struct {
	ID      uuid.UUID `json:"id"`
	Presets []*Preset
}

type Worker

type Worker struct {
	Name string
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(jobQueue chan *Job, jobUpdatesChan chan *JobStatus) *Worker

NewWorker create new worker and start consuming jobQueue

Directories

Path Synopsis
cmd
cli

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL