ozero

package module
v0.0.0-...-9dee132 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2016 License: Apache-2.0 Imports: 5 Imported by: 0

README

Ozero

Build Status GoDoc

Ozero is a goroutine pool for Go, focused on simplicity.

Goroutine pool

When you create a new ozero, you can set the pool size, by using NewPoolN(), or you can just let it be the default size, being the CPU count, with NewPool(). The interface has been designed to have a user-friendly style, so all you have to do is .SetWorkerFunc() to handle the jobs in the pool. From that moment on, you have a ready-to-use pool, to which you can .SendJob(data) any job you want to the pool, and it will be processed by the first available goroutine.

Sending jobs

You have two functions to send jobs to the pool:

  • SendJob(data). It will send the job to the pool and return inmediately, no matter how busy the pool is.
  • SendJobSync(data). It will send the job to the pool, waiting until one goroutine gets the job and starts working on it.

Usually, there is no big difference on which method you use, however, there is one little gotcha you need to know.

In the following example, the pool will behave randomly, because some of the goroutines started by .SendJob may have not been initialized yet when your main goroutine gets to .Close(). Because of that, some of the jobs may not get processed, because they are being sent to a closed pool. Sending jobs on a closed pool does not cause a panic, they will just get ignored.

func main() {
	nThreads := 10

	taskPool := ozero.NewPoolN(nThreads).SetWorkerFunc(func(data interface{}) error {
		x := data.(int)
		log.Printf("Data: %d\n", x)
		time.Sleep(time.Second)
        return nil
	})

	before := time.Now()
	for i := 0; i < 20; i++ {
		taskPool.SendJob(i) // Here you should use .SendJobSync()
	}
	taskPool.Close()

	log.Printf("Elapsed %.2f seconds", time.Now().Sub(before).Seconds())
}

In the previous example, using SendJobSync, being that the pool size is 10, and there are 20 jobs to be processed, each lasting 1 second, the expected total time is 2 seconds.

If you are not going to close the pool, the recommended method to send jobs is SendJob, because it will let you send jobs, even if the pool is busy.

Errors

If your WorkerFunc crashes, a new goroutine is spawned, so you don't have to worry about the pool crashing. Everything is built thread-safe for you. If you want to catch this crashes, or the errors your workerFunc returns, you can just .SetErrorFunc(), and you'll get the data and the error caused.

Retrying jobs

Often you want to retry a job if it fails. To do this, you have the following functions available:

  • SetTries(n). Sets the maximum number of times that a job is retried if it crashes. Set to zero to retry indefinitely.
  • SetRetryDelay(duration). Set the time between retries.
  • SetShouldRetryFunc(data, error, retry count). You can avoid a job being retried for a specficied error by implementing this funcion and returning false. This is useful if your job might fail in a permanent way, like in a HTTP 404 error, or might fail in a temporary way, like in a HTTP 500 error.

One important note, is that your error func is only called after all the retries are being executed, and your ShouldRetryFunc is called after every error or crash.

A common way to use the WorkerFunc is to let it panic on error, letting the pool retry the job.

Finally, you can create as many pools as you want!

Complete usage example

package main

import (
	"log"
	"time"

	"github.com/ANPez/Ozero"
)

func main() {
    nThreads := 10

	taskPool := ozero.NewPoolN(nThreads).SetWorkerFunc(func(data interface{}) error {
		url := data.(string)
		log.Printf("Downloading URL: %s.", url)
		downloadOrPanic(url)
		log.Printf("Job finished OK")
        return nil
	}).SetErrorFunc(func(data interface{}, err error) {
		log.Printf("Error while processing job in queue")
	}).SetShouldRetryFunc(func(data interface{}, err error, retry int) bool {
		switch err := err.(type) {
		case *types.HTTPError:
			return (err.StatusCode < 400) || (err.StatusCode >= 500)
		}
		return true
	}).SetTries(3).SetRetryDelay(time.Second)
}

License

Copyright 2016 Antonio Nicolás Pina

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorFunc

type ErrorFunc func(data interface{}, err error)

ErrorFunc represents an error handling function for panics happening in workers. It receives the data failing in the operation and the error occured. It is only called after all the retries failed.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a thread (goroutine) pool. All of his methods are thread-safe.

func NewPool

func NewPool() *Pool

NewPool creates a new pool with predefined size. By default, it uses the CPU count.

func NewPoolN

func NewPoolN(size int) *Pool

NewPoolN creates a new pool with fixed size.

func (*Pool) Close

func (pool *Pool) Close()

Close closes the pool inmediately, waiting for the running tasks to end.

func (*Pool) CloseAsync

func (pool *Pool) CloseAsync()

CloseAsync asynchronously closes the pool and waits for the running tasks to end. It returns inmediately.

func (*Pool) GetSize

func (pool *Pool) GetSize() int

GetSize returns the number of threads in the pool.

func (*Pool) SendJob

func (pool *Pool) SendJob(data interface{})

SendJob sends a new job to the pool to be processed by the worker. It returns inmediately no matter how busy the pool is.

func (*Pool) SendJobSync

func (pool *Pool) SendJobSync(data interface{})

SendJobSync sends a new job to the pool to be processed by the worker. It waits until a worker gets the job and then returns.

func (*Pool) SetErrorFunc

func (pool *Pool) SetErrorFunc(f ErrorFunc) *Pool

SetErrorFunc sets the function to be executed when a panic occurrs in a worker. If nil, nothing gets executed on panic.

func (*Pool) SetRetryDelay

func (pool *Pool) SetRetryDelay(d time.Duration) *Pool

SetRetryDelay sets the default timeout after a failing function gets retried. Default is retry inmediately.

func (*Pool) SetShouldRetryFunc

func (pool *Pool) SetShouldRetryFunc(f ShouldRetryFunc) *Pool

SetShouldRetryFunc sets a function to be executed when a panic occurrs, to determine if the job should be retried.

func (*Pool) SetTries

func (pool *Pool) SetTries(count int) *Pool

SetTries sets the default amount of times a failing job gets re-executed before giving up and calling error function. The default amount of times is 1. Set to zero to retry indefinitely.

func (*Pool) SetWorkerFunc

func (pool *Pool) SetWorkerFunc(f WorkerFunc) *Pool

SetWorkerFunc sets the function to be processed when sending jobs to the worker.

type ShouldRetryFunc

type ShouldRetryFunc func(data interface{}, err error, retry int) bool

ShouldRetryFunc represents an error handling function for panics happening in workers. It receives the data failing in the operation, the error occured and the current retry count, eg. 0 if first time, 1 if second. Should return whether the op has to be retried or not. It is only called if there are remaining retries left.

type WorkerFunc

type WorkerFunc func(interface{}) error

WorkerFunc defines a function that receives a job and processes it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL