iterators

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: Apache-2.0 Imports: 6 Imported by: 4

Documentation

Overview

Package iterators provides generic iterators, that is constructs that may be iterated over using Next() in a loop, then calling some Item() (T, error) method to retrieve the current iterated value.

When iteration is complete, the iterator resources should be relinquished using Close().

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChanIterator

type ChanIterator[T any] struct {
	// contains filtered or unexported fields
}

ChanIterator is a channel-based iterator that may be used to run a collection of StructIterators in parallel.

Notice that its asynchronous working does not make it suitable to collect ordered items.

The ChanIterator is goroutine-safe and may be iterated by several concurrent goroutines.

The input is collected from a collection of input StructIterators, then may be collected by one or several goroutines reading from the ChanIterator using Next() and Item().

Item() may return io.EOF is the iterator is done with producing records (e.g. some other consumer reached the end of the stream).

WithChanFanInBuffers may be used to pre-fetch from input iterators asynchronously.

Methods Collect() and CollectPrt() can't be used by concurrent goroutines and are protected against such a misuse.

Example
baseIterators := []iterators.StructIterator[SampleStruct]{
	iterators.NewSliceIterator[SampleStruct](testSlice()),
	iterators.NewSliceIterator[SampleStruct](testSlice()),
	iterators.NewSliceIterator[SampleStruct](testSlice()),
}

group, ctx := errgroup.WithContext(context.Background())
iterator := iterators.NewChanIterator[SampleStruct](ctx, baseIterators)
defer func() {
	_ = iterator.Close()
}()
var mx sync.Mutex
items := make(SortableStructs, 0, 6)
latch := make(chan struct{})

// In this example, we iterate in parallel:
// 3 producer iterators and 3 consumer iterators are running in parallel against
// a single inner channel.
for i := 0; i < 3; i++ {
	group.Go(func() error {
		<-latch
		count := 0
		defer func() {
			fmt.Fprintf(os.Stderr, "goroutine count: %d\n", count) // stderr doesn't count for example asserted output
		}()

		for iterator.Next() {
			item, err := iterator.Item()
			if err != nil {
				if errors.Is(err, io.EOF) {
					return nil
				}

				return err
			}
			mx.Lock()
			items = append(items, item)
			mx.Unlock()
			count++
		}

		return nil
	})
}
close(latch)
if err := group.Wait(); err != nil {
	fmt.Printf("an error occured: %v", err)
}

sort.Sort(items)
fmt.Printf("count: %d\n", len(items))
fmt.Printf("items: %v\n", items)
Output:

count: 6
items: [{1 x}, {1 x}, {1 x}, {2 y}, {2 y}, {2 y}]

func NewChanIterator

func NewChanIterator[T any](ctx context.Context, iterators []StructIterator[T], opts ...ChanIteratorOption) *ChanIterator[T]

NewChanIterator builds a ChanIterator and starts the goroutines pumping items from the input iterators.

All goroutines are terminated and input iterators closed if the context is cancelled.

func (*ChanIterator[T]) Close

func (d *ChanIterator[T]) Close() error

func (*ChanIterator[T]) Collect

func (d *ChanIterator[T]) Collect() ([]T, error)

func (*ChanIterator[T]) CollectPtr

func (d *ChanIterator[T]) CollectPtr() ([]*T, error)

func (*ChanIterator[T]) Item

func (d *ChanIterator[T]) Item() (T, error)

func (*ChanIterator[T]) Next

func (d *ChanIterator[T]) Next() bool
Example
baseIterators := []iterators.StructIterator[SampleStruct]{
	iterators.NewSliceIterator[SampleStruct](testSlice()),
	iterators.NewSliceIterator[SampleStruct](testSlice()),
}
count := 0

iterator := iterators.NewChanIterator[SampleStruct](context.Background(), baseIterators)
defer func() {
	_ = iterator.Close()
}()
items := make(SortableStructs, 0, 4)

for iterator.Next() {
	item, err := iterator.Item()
	if err != nil {
		if !errors.Is(err, io.EOF) {
			fmt.Printf("err: %v\n", err)
		}

		break
	}
	count++
	items = append(items, item)
}

sort.Sort(items)
fmt.Printf("count: %d\n", count)
fmt.Printf("items: %v\n", items)
Output:

count: 4
items: [{1 x}, {1 x}, {2 y}, {2 y}]

type ChanIteratorOption

type ChanIteratorOption func(*chanIteratorOptions)

ChanIteratorOption provides options to the ChanIterator

func WithChanFanInBuffers

func WithChanFanInBuffers(n int) ChanIteratorOption

WithChanFanInBuffers allocates buffers to fan-in the input results.

The default value is the number of underlying iterators.

func WithChanPreallocatedItems

func WithChanPreallocatedItems(n int) ChanIteratorOption

WithChanPreallocatedItems preallocate n items in the returned slice when using the Collect and CollectPtr methods.

type Iterator

type Iterator interface {
	Next() bool
	Close() error
}

Iterator knows how to iterate over a collection.

type IteratorContext

type IteratorContext struct {
	Iterated int
}

func GetIteratorContext

func GetIteratorContext(ctx context.Context) *IteratorContext

GetIteratorContext allows the retrieval of the context of the iterator from within a transformer.

type RowsIterator

type RowsIterator[R ScannableIterator, T any] struct {
	// contains filtered or unexported fields
}

RowsIterator transforms a ScannableIterator of type R (e.g. a DB cursor such as sqlx.Rows) into a StructIterator with target type T.

Rows iteratated over R are scanned into structs of type T.

Notice that the rows iterator is not goroutine-safe and should not be iterated concurrently.

func NewRowsIterator

func NewRowsIterator[R ScannableIterator, T any](rows R, opts ...RowsIteratorOption) *RowsIterator[R, T]

NewRowsIterator makes a StructIterator[T] from a ScannableIterator.

func (*RowsIterator[R, T]) Close

func (ri *RowsIterator[R, T]) Close() error

func (*RowsIterator[R, T]) Collect

func (ri *RowsIterator[R, T]) Collect() ([]T, error)
Example
package main

import (
	"fmt"
	"log"

	"github.com/fredbi/go-patterns/iterators"
	"github.com/fredbi/go-patterns/iterators/internal/testdb"
)

func main() {
	dbName := testdb.UniqueDBName()

	db, err := testdb.CreateDBAndData(dbName)
	if err != nil {
		log.Fatalf("could not create test DB: %v", err)
	}

	rows, err := testdb.OpenDBCursor(db)
	if err != nil {
		log.Fatalf("could not create test DB: %v", err)
	}

	iterator := iterators.NewSqlxIterator[testdb.DummyRow](rows)
	items, err := iterator.Collect()
	if err != nil {
		fmt.Printf("err: %v\n", err)
	}

	fmt.Printf("items: %#v\n", items)
	fmt.Printf("count: %d\n", len(items))

}
Output:

items: []testdb.DummyRow{testdb.DummyRow{A:1, B:"x"}, testdb.DummyRow{A:2, B:"y"}}
count: 2

func (*RowsIterator[R, T]) CollectPtr

func (ri *RowsIterator[R, T]) CollectPtr() ([]*T, error)
Example
package main

import (
	"fmt"
	"log"

	"github.com/fredbi/go-patterns/iterators"
	"github.com/fredbi/go-patterns/iterators/internal/testdb"
)

func main() {
	dbName := testdb.UniqueDBName()

	db, err := testdb.CreateDBAndData(dbName)
	if err != nil {
		log.Fatalf("could not create test DB: %v", err)
	}

	rows, err := testdb.OpenDBCursor(db)
	if err != nil {
		log.Fatalf("could not create test DB: %v", err)
	}

	iterator := iterators.NewSqlxIterator[testdb.DummyRow](rows)
	items, err := iterator.CollectPtr()
	if err != nil {
		fmt.Printf("err: %v\n", err)
	}

	fmt.Printf("count: %d\n", len(items))

}
Output:

count: 2

func (*RowsIterator[R, T]) Item

func (ri *RowsIterator[R, T]) Item() (T, error)

func (*RowsIterator[R, T]) Next

func (ri *RowsIterator[R, T]) Next() bool
Example
package main

import (
	"fmt"
	"log"

	"github.com/fredbi/go-patterns/iterators"
	"github.com/fredbi/go-patterns/iterators/internal/testdb"
	"github.com/jmoiron/sqlx"
)

func main() {
	dbName := testdb.UniqueDBName()

	// create a DB and fill a table with some data
	db, err := testdb.CreateDBAndData(dbName)
	if err != nil {
		log.Fatalf("could not create test DB: %v", err)
	}

	// open a cursor selecting over the test data
	rows, err := testdb.OpenDBCursor(db)
	if err != nil {
		log.Fatalf("could not query DB: %v", err)
	}

	iterator := iterators.NewRowsIterator[*sqlx.Rows, testdb.DummyRow](rows)
	defer func() {
		_ = iterator.Close()
	}()
	count := 0

	for iterator.Next() {
		item, err := iterator.Item()
		if err != nil {
			fmt.Printf("err: %v\n", err)
		}
		count++
		fmt.Printf("item: %#v\n", item)
	}

	fmt.Printf("count: %d\n", count)

}
Output:

item: testdb.DummyRow{A:1, B:"x"}
item: testdb.DummyRow{A:2, B:"y"}
count: 2

type RowsIteratorOption

type RowsIteratorOption func(*rowsIteratorOptions)

RowsIteratorOption provides options to the RowsIterator

func WithRowsPreallocatedItems

func WithRowsPreallocatedItems(n int) RowsIteratorOption

WithRowsPreallocatedItems preallocate n items in the returned slice when using the Collect and CollectPtr methods.

The default value is 1000.

type ScannableIterator

type ScannableIterator interface {
	Iterator
	StructScan(interface{}) error
}

ScannableIterator is an iterator over DB records that can be scanned.

type SliceIterator

type SliceIterator[T any] struct {
	// contains filtered or unexported fields
}

SliceIterator constructs an iterator based on a slice of items.

This very simple iterator is essentially used for testing.

func NewSliceIterator

func NewSliceIterator[T any](rows []T) *SliceIterator[T]

NewSliceIterator constructs a SliceIterator from a slice of items (rows).

func (*SliceIterator[T]) Close

func (si *SliceIterator[T]) Close() error

func (*SliceIterator[T]) Collect

func (si *SliceIterator[T]) Collect() ([]T, error)
Example
iterator := iterators.NewSliceIterator[SampleStruct](testSlice())
items, err := iterator.Collect()
if err != nil {
	fmt.Printf("err: %v\n", err)
}

fmt.Printf("items: %#v\n", items)
fmt.Printf("count: %d\n", len(items))
Output:

items: []iterators_test.SampleStruct{iterators_test.SampleStruct{A:1, B:"x"}, iterators_test.SampleStruct{A:2, B:"y"}}
count: 2

func (*SliceIterator[T]) CollectPtr

func (si *SliceIterator[T]) CollectPtr() ([]*T, error)
Example
iterator := iterators.NewSliceIterator[SampleStruct](testSlice())
items, err := iterator.CollectPtr()
if err != nil {
	fmt.Printf("err: %v\n", err)
}

fmt.Printf("count: %d\n", len(items))
Output:

count: 2

func (*SliceIterator[T]) Item

func (si *SliceIterator[T]) Item() (T, error)

func (*SliceIterator[T]) Next

func (si *SliceIterator[T]) Next() bool
Example
iterator := iterators.NewSliceIterator[SampleStruct](testSlice())
defer func() {
	_ = iterator.Close()
}()
count := 0

for iterator.Next() {
	item, err := iterator.Item()
	if err != nil {
		fmt.Printf("err: %v\n", err)
	}
	count++
	fmt.Printf("item: %#v\n", item)
}

fmt.Printf("count: %d\n", count)
Output:

item: iterators_test.SampleStruct{A:1, B:"x"}
item: iterators_test.SampleStruct{A:2, B:"y"}
count: 2

type SqlxIterator

type SqlxIterator[T any] struct {
	*RowsIterator[*sqlx.Rows, T]
}

SqlxIterator is a shorthand for RowsIterator[*sqlx.Rows, T].

func NewSqlxIterator

func NewSqlxIterator[T any](rows *sqlx.Rows, opts ...RowsIteratorOption) *SqlxIterator[T]

NewSqlxIterator makes a SqlxIterator[T] producing items of type T from a github.com/jmoiron/sqlx.Rows cursor.

type StructIterator

type StructIterator[T any] interface {
	Iterator

	// Item return the current iterated item.
	//
	// Next() must have been called at least once.
	Item() (T, error)

	// Collect returns all items in one slice, then closes the iterator
	Collect() ([]T, error)

	// CollectPtr returns all items in one slice of pointers, then closes the iterator
	CollectPtr() ([]*T, error)
}

StructIterator is an iterator that delivers items of some type T.

type TransformIterator

type TransformIterator[S, T any] struct {
	StructIterator[S]
	// contains filtered or unexported fields
}

TransformIterator transforms any iterator into an iterator that transforms the input of type S into type T at every call to Item().

Example
// this example interrupts the iterations after 1 iteration, using
// a transformer and the iterator's context
baseIterator := iterators.NewSliceIterator[SampleStruct](testSlice())

transformer := func(ctx context.Context, in SampleStruct) (SampleStruct, error) {
	ictx := iterators.GetIteratorContext(ctx)
	var index int

	// this retrieves the current iterated count from the context
	if ictx != nil {
		index = ictx.Iterated
	}
	if index > 1 {
		return SampleStruct{}, io.EOF
	}

	return in, nil
}

iterator := iterators.NewTransformIterator[SampleStruct, SampleStruct](context.Background(), baseIterator, transformer)
defer func() {
	_ = iterator.Close()
}()
count := 0

for iterator.Next() {
	item, err := iterator.Item()
	if err != nil {
		if errors.Is(err, io.EOF) {
			break
		}

		fmt.Printf("err: %v\n", err)
		break
	}

	count++
	fmt.Printf("item: %#v\n", item)
}

fmt.Printf("count: %d\n", count)
Output:

item: iterators_test.SampleStruct{A:1, B:"x"}
count: 1

func NewTransformIterator

func NewTransformIterator[S, T any](ctx context.Context, iterator StructIterator[S], transformer TransformerCtx[S, T], opts ...RowsIteratorOption) *TransformIterator[S, T]

NewTransformIterator makes a StructIterator[T] from a ScannableIterator.

The parent context provided allows the transformer to know about the current context of the iterator.

This is useful if the transformation depends on the currently iterated step.

Notice that the transformer may also perform some other things, e.g. logging, collecting some stats or traces.

func (*TransformIterator[S, T]) Collect

func (rt *TransformIterator[S, T]) Collect() ([]T, error)

func (*TransformIterator[S, T]) CollectPtr

func (rt *TransformIterator[S, T]) CollectPtr() ([]*T, error)

func (*TransformIterator[S, T]) Item

func (rt *TransformIterator[S, T]) Item() (T, error)

func (*TransformIterator[S, T]) Next

func (rt *TransformIterator[S, T]) Next() bool
Example
baseIterator := iterators.NewSliceIterator[SampleStruct](testSlice())

transformer := func(ctx context.Context, in SampleStruct) (TransformedStruct, error) {
	ictx := iterators.GetIteratorContext(ctx)
	var index int

	// this retrieves the current iterated count from the context
	if ictx != nil {
		index = ictx.Iterated
	}

	fmt.Printf("transforming iteration %d\n", index)

	return TransformedStruct{
		X: in.A + index,
		Y: strings.Repeat(in.B, 2),
	}, nil
}

iterator := iterators.NewTransformIterator[SampleStruct, TransformedStruct](context.Background(), baseIterator, transformer)
defer func() {
	_ = iterator.Close()
}()
count := 0

for iterator.Next() {
	item, err := iterator.Item()
	if err != nil {
		fmt.Printf("err: %v\n", err)
	}
	count++
	fmt.Printf("item: %#v\n", item)
}

fmt.Printf("count: %d\n", count)
Output:

transforming iteration 1
item: iterators_test.TransformedStruct{X:2, Y:"xx"}
transforming iteration 2
item: iterators_test.TransformedStruct{X:4, Y:"yy"}
count: 2

type TransformerCtx

type TransformerCtx[S, T any] func(context.Context, S) (T, error)

TransformerCtx converts a struct of type S into one of type T, with a context about the state of the iterator.

Directories

Path Synopsis
internal
testdb
Package testdb expose internal utilities for testing against a postgres database.
Package testdb expose internal utilities for testing against a postgres database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL