Documentation ¶
Overview ¶
Package circle is a Go interface to the libcircle distributed-queue API. libcircle is available from https://github.com/hpc/libcircle.
Example ¶
This is an example of a complete program that uses the circle package. It uses the low-level API (i.e., CallbackCreate, CallbackProcess, and Begin instead of ChannelBegin) and shows how to set various package options.
// This file demonstrates how to use the circle package. package main import ( "fmt" "github.com/lanl/circle" ) var rank int // Our process's MPI rank // createWork creates 10 units of "work" -- strings listing a rank and // item number. func createWork(q circle.Handle) { for i := 0; i < 10; i++ { work := fmt.Sprintf("RANK %d, ITEM %d", rank, i+1) if ok := q.Enqueue(work); !ok { panic("Enqueue") } } } // doWork processes one unit of "work" by dequeueing and outputting a string. func doWork(q circle.Handle) { work, ok := q.Dequeue() if !ok { panic("Dequeue") } fmt.Printf("Rank %d is dequeueing %v\n", rank, work) } // This is an example of a complete program that uses the circle // package. It uses the low-level API (i.e., CallbackCreate, // CallbackProcess, and Begin instead of ChannelBegin) and shows how // to set various package options. func main() { // Initialize libcircle. rank = circle.Initialize() defer circle.Finalize() circle.EnableLogging(circle.LogErr) // Make libcircle a little quieter than normal. // Contrast the output when the following is uncommented (and // multiple MPI processes are used). // // circle.SetOptions(circle.CreateGlobal) // Create and execute some work. circle.CallbackCreate(createWork) circle.CallbackProcess(doWork) circle.Begin() }
Output:
Index ¶
- Constants
- func Abort()
- func Begin()
- func CallbackCreate(cb Callback)
- func CallbackProcess(cb Callback)
- func ChannelBegin() (putWork chan<- string, getWork <-chan string)
- func Checkpoint()
- func EnableLogging(ll LogLevel)
- func Finalize()
- func Initialize() (rank int)
- func ReadRestarts()
- func SetOptions(options Flag)
- func Wtime() float64
- type Callback
- type Flag
- type Handle
- type LogLevel
Examples ¶
Constants ¶
const ( SplitRandom = Flag(1 << iota) // Split work randomly. SplitEqual // Split work evenly. CreateGlobal // Call the creation callback on all processes. DefaultFlags = SplitEqual // Default behavior is random work stealing. )
These constants can be ORed together to produce a Flag.
const ( LogFatal = LogLevel(iota) // Output only fatal errors. LogErr // Output the above plus nonfatal errors. LogWarn // Output all of the above plus warnings. LogInfo // Output all of the above plus informational messages. LogDbg // Output all of the above plus internal debug messages. )
These constants define the various LogLevel values.
const MaxWorkItemLength int = C.CIRCLE_MAX_STRING_LEN - 1
MaxWorkItemLength is the maximum length of a work-item string.
Variables ¶
This section is empty.
Functions ¶
func Abort ¶
func Abort()
Abort makes each rank dump a checkpoint file (a la the Checkpoint function) and exit.
func Begin ¶
func Begin()
Begin creates and executes work based on the user-provided callback functions.
func CallbackCreate ¶
func CallbackCreate(cb Callback)
CallbackCreate specifies a user-provided callback that will enqueue work when asked.
func CallbackProcess ¶
func CallbackProcess(cb Callback)
CallbackProcess specifies a user-provided callback that will dequeue and perform work when asked. Note that the callback is allowed to call Enqueue to enqueue additional work if desired.
func ChannelBegin ¶
ChannelBegin replaces CallbackCreate, CallbackProcess, and Begin with a channel-based interface. The caller is expected to write work into putWork, close the channel, then read work from getWork. An implication is that no new work can be created after the initial set of work is written into putWork. Use the lower-level API (CallbackCreate, CallbackProcess, and Begin) if workers need to be able to enqueue new work.
Example ¶
Demonstrate how to use ChannelBegin to enqueue a bunch of Point objects then have remote workers dequeue and "process" (in this case, print) them.
// This file demonstrates how to use the higher-level, channel, // interface to the circle package. package main import ( "encoding/json" "fmt" "github.com/lanl/circle" "log" ) // A Point contains x and y coordinates. type Point struct { X, Y float64 } // String implements the fmt.Stringer interface for pretty-printed output. func (pt Point) String() string { return fmt.Sprintf("(%5.2f, %5.2f)", pt.X, pt.Y) } // Demonstrate how to use ChannelBegin to enqueue a bunch of Point // objects then have remote workers dequeue and "process" (in this // case, print) them. func main() { // Initialize libcircle. rank := circle.Initialize() defer circle.Finalize() // Create a pair of channels for writing work into the queue // and reading work from the queue. toQ, fromQ := circle.ChannelBegin() // Process 0 writes a bunch of Points into the queue. if rank == 0 { for j := 0; j < 5; j++ { for i := 0; i < 5; i++ { pt := Point{X: float64(i) * 1.23, Y: float64(j) * 4.56} enc, err := json.Marshal(pt) if err != nil { log.Fatalln(err) } toQ <- string(enc) } } close(toQ) } // All processes read Points from the queue and output them. for work := range fromQ { var pt Point if err := json.Unmarshal([]byte(work), &pt); err != nil { log.Fatalln(err) } fmt.Printf("Dequeueing %s\n", pt) } }
Output:
func Checkpoint ¶
func Checkpoint()
Checkpoint makes each rank dump a checkpoint file of the form "circle<rank>.txt".
func EnableLogging ¶
func EnableLogging(ll LogLevel)
EnableLogging sets libcircle's output verbosity.
func Initialize ¶
func Initialize() (rank int)
Initialize initializes libcircle and returns the current MPI rank.
func ReadRestarts ¶
func ReadRestarts()
ReadRestarts initializes the libcircle queues from the restart files produced by the Checkpoint function.
func SetOptions ¶
func SetOptions(options Flag)
SetOptions sets libcircle's global behavior according to the inclusive-or of a set of flags.
Types ¶
type Callback ¶
type Callback func(Handle)
A Callback is a user-provided function that libcircle will invoke as necessary.
type Flag ¶
type Flag int32
A Flag is passed to SetOptions and controls libcircle's global behavior.
type Handle ¶
type Handle interface { Enqueue(string) bool // Enqueue a user-defined work item. Return a success code. Dequeue() (string, bool) // Dequeue and return a user-defined work item plus a success code. LocalQueueSize() uint32 // Number of entries currently in the local queue }
A Handle provides an interface to enqueue and dequeue libcircle work items.