multi

package
v0.0.0-...-0e65dd1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2015 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PHashTaskIn

func PHashTaskIn(filepath string) interface{}

func PHashTaskOut

func PHashTaskOut(out interface{}) uint64

Types

type AcquireImage

type AcquireImage struct {
	DownloadTimeout time.Duration
	MaxHeight       int
	MaxWidth        int
	Uploader        Uploader
	Log             logging.Logger
}

func (*AcquireImage) Acquire

func (a *AcquireImage) Acquire(url, filename string) (*AcquiredImage, error)

func (*AcquireImage) Resize

func (a *AcquireImage) Resize(imgR io.Reader, maxWidth, maxHeight int) (io.Reader, int, int, error)

type AcquiredImage

type AcquiredImage struct {
	SourceSize        int
	SourceUrl         string
	SourceContentType string

	DestUrl    string
	DestSize   int64
	DestHeight int
	DestWidth  int
	PHash      uint64
}

type AsyncOutcome

type AsyncOutcome struct {
	Error    error
	Duration time.Duration
	Id       string
}

type Coordinator

type Coordinator struct {
	Success chan *Flow
	Fail    chan *Flow
	// contains filtered or unexported fields
}

Coordinator - concurrent actions

func NewCoordinator

func NewCoordinator(name string, concurrency int, retries int, maxItems int) *Coordinator

NewCoordintor

func (*Coordinator) Act

func (d *Coordinator) Act(flows []*Flow)

func (*Coordinator) Finished

func (d *Coordinator) Finished() <-chan struct{}

func (*Coordinator) From

func (d *Coordinator) From(c *Coordinator)

func (*Coordinator) Run

func (d *Coordinator) Run()

type DataContext

type DataContext struct {
	// contains filtered or unexported fields
}

func NewDataContext

func NewDataContext() *DataContext

func (*DataContext) Get

func (c *DataContext) Get(key string) interface{}

func (*DataContext) Set

func (c *DataContext) Set(key string, value interface{})

type DataFlow

type DataFlow interface {
	Set(name string, value interface{})
	Get(name string) interface{}
}

type FileDownloadInput

type FileDownloadInput struct {
	Url      string
	Filename string
}

type FileDownloadOutput

type FileDownloadOutput struct {
	Path        string
	Size        int64
	ContentType string
}

type FileDownloadTask

type FileDownloadTask struct {
	Folder  string
	Timeout time.Duration
}

func (*FileDownloadTask) Name

func (d *FileDownloadTask) Name() string

func (*FileDownloadTask) Run

func (d *FileDownloadTask) Run(input interface{}) (interface{}, error)

type FileUploadOutput

type FileUploadOutput struct {
	Url      string
	FileSize int64
}

type FileUploadTask

type FileUploadTask struct {
	Uploader Uploader
}

func (*FileUploadTask) Name

func (u *FileUploadTask) Name() string

func (*FileUploadTask) Run

func (u *FileUploadTask) Run(input interface{}) (interface{}, error)

type Flow

type Flow struct {
	Steps map[string]*StepState
	Data  DataFlow
}

func CreateFlows

func CreateFlows(count int) []*Flow

func GatherFailures

func GatherFailures(cs ...*Coordinator) []*Flow

func NewFlow

func NewFlow() *Flow

func (*Flow) NewStep

func (f *Flow) NewStep(name string, action StepAction, state interface{})

func (*Flow) StepsWithErrors

func (f *Flow) StepsWithErrors() []*StepState

type ImageProcessor

type ImageProcessor struct {

	// Uploader - required
	Uploader Uploader

	// LocalPath - defaults to os.TempDir()/downloads
	LocalPath string

	Log logging.Logger

	MaxHeight int
	MaxWidth  int
	// contains filtered or unexported fields
}

func (*ImageProcessor) Completed

func (p *ImageProcessor) Completed() <-chan *ImageProcessorOutput

func (*ImageProcessor) Injest

func (p *ImageProcessor) Injest(url, filename string, ctx *DataContext)

func (*ImageProcessor) Shutdown

func (p *ImageProcessor) Shutdown()

func (*ImageProcessor) Startup

func (p *ImageProcessor) Startup()

type ImageProcessorOutput

type ImageProcessorOutput struct {
	DownloadSize        int64
	DownloadContentType string
	DownloadUrl         string
	PHash               uint64

	FileSize    int64
	Height      int
	Width       int
	ContentType string

	DestinationUrl string
	Error          error
	Context        *DataContext
}

type ImageResizeOutput

type ImageResizeOutput struct {
	FileSize    int64
	Height      int
	Width       int
	ContentType string
	FilePath    string
}

type NoOpUploader

type NoOpUploader struct {
}

func (*NoOpUploader) DestinationUrl

func (w *NoOpUploader) DestinationUrl(filename string) string

func (*NoOpUploader) Writer

func (w *NoOpUploader) Writer(filename string) (io.WriteCloser, error)

type NoOpWriteCloser

type NoOpWriteCloser struct {
}

func (*NoOpWriteCloser) Close

func (n *NoOpWriteCloser) Close() error

func (*NoOpWriteCloser) Write

func (n *NoOpWriteCloser) Write(p []byte) (int, error)

type PHashTask

type PHashTask struct {
	Log logging.Logger
}

func (*PHashTask) Name

func (p *PHashTask) Name() string

func (*PHashTask) Run

func (p *PHashTask) Run(input interface{}) (interface{}, error)

type RecentlyExisted

type RecentlyExisted struct {
	// contains filtered or unexported fields
}

func NewRecentlyExisted

func NewRecentlyExisted(size int) *RecentlyExisted

func (*RecentlyExisted) CheckAndAdd

func (r *RecentlyExisted) CheckAndAdd(t string) bool

type ResizeImageTask

type ResizeImageTask struct {
	MaxWidth  int
	MaxHeight int
}

func (*ResizeImageTask) Name

func (r *ResizeImageTask) Name() string

func (*ResizeImageTask) Run

func (r *ResizeImageTask) Run(input interface{}) (interface{}, error)

type S3Writer

type S3Writer struct {
	BucketName string
	Root       string
}

func (*S3Writer) DestinationUrl

func (w *S3Writer) DestinationUrl(filename string) string

func (*S3Writer) Writer

func (w *S3Writer) Writer(filename string) (io.WriteCloser, error)

type StepAction

type StepAction interface {
	Action(tuple DataFlow, done func())
	Error() error
}

type StepState

type StepState struct {
	Name     string
	Attempts int
	Action   StepAction
	State    interface{}
}

type TaskAction

type TaskAction interface {
	Run(input interface{}) (interface{}, error)
	Name() string
}

type TaskRun

type TaskRun struct {
	Action        TaskAction
	DiscardOutput bool

	Track        stats.BasicMeter
	Log          logging.Logger
	Concurrency  int
	MaxQueuedIn  int
	MaxQueuedOut int
	// contains filtered or unexported fields
}

func (*TaskRun) Add

func (t *TaskRun) Add(todo interface{}, context *DataContext)

Add - will block when the number of items queued reaches MaxQueuedInput

func (*TaskRun) Completed

func (t *TaskRun) Completed() <-chan *TaskRunOutput

func (*TaskRun) Name

func (t *TaskRun) Name() string

func (*TaskRun) Shutdown

func (t *TaskRun) Shutdown() chan struct{}

Shutdown - begins the shutdown operation. Reading on the returned channel will block until the shutdown is complete

func (*TaskRun) Startup

func (t *TaskRun) Startup()

type TaskRunInput

type TaskRunInput struct {
	Input   interface{}
	Context *DataContext
}

type TaskRunOutput

type TaskRunOutput struct {
	Context *DataContext
	// contains filtered or unexported fields
}

func (*TaskRunOutput) Error

func (o *TaskRunOutput) Error() error

func (*TaskRunOutput) Input

func (o *TaskRunOutput) Input() interface{}

func (*TaskRunOutput) Output

func (o *TaskRunOutput) Output() interface{}

func (*TaskRunOutput) Previous

func (o *TaskRunOutput) Previous(name string) *TaskRunResult

type TaskRunResult

type TaskRunResult struct {
	Error  error
	Input  interface{}
	Output interface{}
}

type Uploader

type Uploader interface {
	Writer(string) (io.WriteCloser, error)
	DestinationUrl(string) string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL