timequeue

package module
v0.0.0-...-38a64ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2016 License: MIT Imports: 4 Imported by: 1

README

timequeue

timequeue provides a TimeQueue type that releases arbitrary messages at given time.Times.

Status

Build Status Coverage Status Go Report Card

Documentation and Usage

Full documentation and examples can be found at GoDoc

Documentation

Overview

Package timequeue provides the TimeQueue type that is a queue of Messages. Each Message contains a time.Time that determines the time at which the Message should be released from the queue. Message types also have a Data field of type interface{} that should be used as the payload of the Message. TimeQueue is safe for use by multiple go-routines.

Messages need only be pushed to the queue, and then when their time passes, they will be sent on the channel returned by Messages(). See below for examples.

TimeQueue uses a single go-routine, spawned from Start() that returns from Stop(), that processes the Messages as their times pass. When a Message is pushed to the queue, the earliest Message in the queue is used to determine the next time the running go-routine should wake. The running go-routine knows when to wake because the earliest time is used to make a channel via time.After(). Receiving on that channel wakes the running go-routine if a call to Stop() has not happened prior. Upon waking, that Message is removed from the queue and released on the channel returned from Messages(). Then the newest remaining Message is used to determine when to wake, etc. If a Message with a time before any other in the queue is inserted, then that Message is pushed to the front of the queue and released appropriately.

Messages that are "released", i.e. sent on the Messages() channel, are always released from a newly spawned go-routine so that other go-routines are not paused waiting for a receive from Messages().

Messages with the same Time value will be "flood-released" from the same separately spawned go-routine. Additionally, Messages that are pushed with times before time.Now() will immediately be released from the queue.

Example
package main

import (
	"fmt"
	"time"

	"github.com/gogolfing/timequeue"
)

func main() {
	tq := timequeue.New()
	tq.Start()
	//this would normally be a long-running process,
	//and not stop at the return of a function call.
	defer tq.Stop()

	startTime := time.Now()

	tq.Push(startTime, "this will be released immediately")

	//adding Messages in chronological order.
	for i := 1; i <= 4; i++ {
		tq.Push(
			startTime.Add(time.Duration(i)*time.Second),
			fmt.Sprintf("message at second %v", i),
		)
	}
	//adding Messages in reverse chronological order.
	for i := 8; i >= 5; i-- {
		tq.Push(
			startTime.Add(time.Duration(i)*time.Second),
			fmt.Sprintf("message at second %v", i),
		)
	}

	//receive all 9 Messages that were pushed.
	for i := 0; i < 9; i++ {
		message := <-tq.Messages()
		fmt.Println(message.Data)
	}

	fmt.Printf("there are %v messages left in the queue\n", tq.Size())

	endTime := time.Now()
	if endTime.Sub(startTime) > time.Duration(8)*time.Second {
		fmt.Println("releasing all messages took more than 8 seconds")
	} else {
		fmt.Println("releasing all messages took less than 8 seconds")
	}

}
Output:

this will be released immediately
message at second 1
message at second 2
message at second 3
message at second 4
message at second 5
message at second 6
message at second 7
message at second 8
there are 0 messages left in the queue
releasing all messages took more than 8 seconds

Index

Examples

Constants

View Source
const (
	//DefaultCapacity is the default capacity used for Messages() channels in New().
	DefaultCapacity = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	time.Time
	Data interface{}
	// contains filtered or unexported fields
}

Message is a simple holder struct for a time.Time (the time the Message will be released from the queue) and a Data payload of type interface{}.

A Message is not safe for modification from multiple go-routines. The Time field is used to calculate when the Message should be released from a TimeQueue, and thus changing its value while the Message is still referenced by a TimeQueue could have unknown side-effects. The Data field is never modified by a TimeQueue.

It is up to client code to ensure that Data is always of the same underlying type if that is desired.

func (*Message) String

func (m *Message) String() string

String returns the standard string representation of a struct.

type TimeQueue

type TimeQueue struct {
	// contains filtered or unexported fields
}

TimeQueue is a queue of Messages that releases its Messages when their Time fields pass.

When Messages are pushed to a TimeQueue, the earliest Message is used to determine when the TimeQueue should wake. Upon waking, that earliest Message is "released" from the TimeQueue by being sent on the channel returned by Messages().

Messages may be pushed and popped from a TimeQueue whether or not the TimeQueue is running or not. Start() and Stop() may be called as many times as desired, but Messsages will be released only between calls to Start() and Stop(), i.e. while the TimeQueue is running and IsRunning() returns true.

Calls to Pop(), PopAll(), and PopAllUntil() may be called to remove Messages from a TimeQueue, but this is required for normal use.

One of the New*() functions should be used to create a TimeQueue. A zero-value TimeQueue is not in a valid or working state.

func New

func New() *TimeQueue

New creates a new *TimeQueue with a call to New(DefaultCapacity).

func NewCapacity

func NewCapacity(capacity int) *TimeQueue

NewCapacity creates a new *TimeQueue where the channel returned from Messages() has the capacity given by capacity. The new TimeQueue is in the stopped state and has no Messages in it.

func (*TimeQueue) IsRunning

func (q *TimeQueue) IsRunning() bool

IsRunning returns whether or not q is running. E.g. in between calls to Start() and Stop(). If IsRunning returns true, then it is possible that Messages are being released.

func (*TimeQueue) Messages

func (q *TimeQueue) Messages() <-chan *Message

Messages returns the receive only channel that all Messages are released on. The returned channel will be the same instance on every call, and this value will never be closed.

In order to receive Messages when they are earliest available a go-routine should be spawned to drain the channel of all Messages.

q := timequeue.New()
q.Start()
go func() {
	message := <-q.Messages()
}()
//push Messages to q.

func (*TimeQueue) Peek

func (q *TimeQueue) Peek() (time.Time, interface{})

Peek returns (without removing) the Time and Data fields from the earliest Message in q. If q is empty, then the zero Time and nil are returned.

func (*TimeQueue) PeekMessage

func (q *TimeQueue) PeekMessage() *Message

PeekMessage returns (without removing) the earliest Message in q or nil if q is empty.

func (*TimeQueue) Pop

func (q *TimeQueue) Pop(release bool) *Message

Pop removes and returns the earliest Message in q or nil if q is empty. If release is true, then the Message (if not nil) will also be sent on the channel returned from Messages().

func (*TimeQueue) PopAll

func (q *TimeQueue) PopAll(release bool) []*Message

PopAll removes and returns a slice of all Messages in q. The returned slice will be non-nil but empty if q is itseld empty. If release is true, then all returned Messages will also be sent on the channel returned from Messages().

func (*TimeQueue) PopAllUntil

func (q *TimeQueue) PopAllUntil(until time.Time, release bool) []*Message

PopAllUntil removes and returns a slice of Messages in q with Time fields before, but not equal to, until. If release is true, then all returned Messages will also be sent on the channel returned from Messages().

Example
package main

import (
	"fmt"
	"time"

	"github.com/gogolfing/timequeue"
)

func main() {
	tq := timequeue.New()
	now := time.Now()
	for i := 0; i < 4; i++ {
		tq.Push(now.Add(time.Duration(i)*time.Second), i)
	}
	tq.PopAllUntil(now.Add(time.Duration(2)*time.Second), true)
	for i := 0; i < 2; i++ {
		message := <-tq.Messages()
		fmt.Println(message.Data)
	}
	fmt.Println("messages left:", tq.Size())

}
Output:

0
1
messages left: 2

func (*TimeQueue) Push

func (q *TimeQueue) Push(t time.Time, data interface{}) *Message

Push creates and adds a Message to q with t and data. The created Message is returned.

func (*TimeQueue) Remove

func (q *TimeQueue) Remove(message *Message, release bool) bool

Remove removes message from q. If q is empty, message is nil, or message is not in q, then Remove is a nop and returns false. Returns true or false indicating whether or not message was actually removed from q. If release is true and message was actually removed, then message will also be sent on the channel returned by Messages().

func (*TimeQueue) Size

func (q *TimeQueue) Size() int

Size returns the number of Messages in q. This is the number of Messages that have yet to be released (or waiting to be sent on Messages()) in q. Therefore, there could still be Messages that q has reference to that are waiting to be released or in the Messages() channel buffer.

To obtain the number of total Messages that q still has references to add this value and the length of Messages():

q.Size() + len(q.Messages())

func (*TimeQueue) Start

func (q *TimeQueue) Start()

Start spawns a new go-routine to listen for wake times of Messages and sets the state to running. If q is already running, then Start is a nop.

func (*TimeQueue) Stop

func (q *TimeQueue) Stop()

Stop tells the running go-routine to stop running. This results in no more Messages being released (until a subsequent call to Start()) and the state to be set to not running. If q is already stopped, then Stop is a nop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL