carbo

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: MIT Imports: 0 Imported by: 0

README

Carbo

Carbo is a package that contains a framework for Gophers to provide a way to build a data pipeline.

There are multiple headaches when building an application containing a data pipeline, like controlling concurrency, scalability, back pressure, etc. This framework is built to deal with these issues without much effort.

Why Carbo?

As far as I know, there are many great frameworks to control tasks and those dependencies in complex workflows, and such a tool also provides a way to monitor tasks' situations.

But, there are cases where a programmer thinks configuring a cluster and running a large number of tasks there can be overkill.

Carbo would fit such a case. It is a pure Golang implementation that helps run small tasks in a process with easy control of concurrencies.

Additionally, Carbo also provides an easy way to feed data from one process to another with gRPC. This way provides enough scalability in many cases.

Documentation

https://pkg.go.dev/github.com/hiroara/carbo

Exposing / pulling data through gRPC

As described above, Carbo provides an easy way to feed data from one process to another with gRPC.

In this way, for example, you can separate a data pipeline into a CPU-intensive part and an IO-intensive part as different processes, and run it with a different concurrency limit.

Additionally, this means that Carbo doesn't necessarily force you to stick to this framework itself or even Golang, thanks to the programming language-agnostic RPC protocol, gRPC.

For example, you can pull data from a Golang process that uses Carbo with grpcurl for debugging.

Or, you can also write another program, for example, in Python, to pull data via gRPC. This is convenient, for example, when you want to write a fast data pipeline in Golang and feed the output into Python to build a machine-learning model with scikit-learn.

Documentation

Overview

Carbo provides building blocks to compose data processing pipeline with concurrency.

Carbo's core component is task.Task, which represents a process that takes inputs and outputs.

You can build an entire build pipeline with a number of tasks.

Althouogh it is possible to define a task directly with task.FromFn, it is recommended to use sub-types of task.Task: source, pipe and sink.

Sub-types of task.Task

A data pipeline is built with three sub-types of task.Task: source, pipe and sink.

The basic form of a data pipeline should look like: source -> pipe -> ... -> pipe -> sink

Source

A source is a special type of task.Task that takes an empty input that will be closed immediately after a data pipeline starts.

This is used as an entry point of a data pipeline. For example, source.FromSlice takes a slice as an argument, and then produces each element of the slice as its outputs.

Pipe

A pipe is a similar component to task.Task, except the passed output channel is closed automatically after its process is finished.

This is used to convert inputs into outputs. For example, pipe.Map processes inputs one by one, and produces corresponding outputs. This can be used in case an input and an output has one-to-one correspondence.

Sink

A sink is a special type of task.Task that takes an empty output. This is just like an opposite of a source.

This is used as an last component of a data pipeline. For example, sink.ToSlice takes a pointer to a slice, and appends elements from its inputs.

Connecting tasks

Each task can be connected with another one using task.Connect if the type of an upstream task's output and the type of a downstream task's input match.

task.Connect also returns task.Task so it can be chained. An entire data pipeline can be built by connecting multiple tasks in this way.

Running a data pipeline

Carbo has a Flow component which is a wrapper of a task that takes an empty input and an empty output. This kind of task is typically built as a chain of tasks that starts from a source and ends with a sink.

Example (Flow)

Build a flow and directly run it.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hiroara/carbo/flow"
	"github.com/hiroara/carbo/pipe"
	"github.com/hiroara/carbo/sink"
	"github.com/hiroara/carbo/source"
	"github.com/hiroara/carbo/task"
)

func main() {
	ss := source.FromSlice([]string{"a", "b", "c"})
	ds := task.Connect(
		ss.AsTask(),
		pipe.Map(func(ctx context.Context, s string) (string, error) {
			return s + s, nil
		}).AsTask(),
		1,
	)
	pr := task.Connect(
		ds,
		sink.ElementWise(func(ctx context.Context, s string) error {
			fmt.Println(s)
			return nil
		}).AsTask(),
		1,
	)

	err := flow.FromTask(pr).Run(context.Background())
	if err != nil {
		log.Fatal(err)
	}
}
Output:

aa
bb
cc
Example (FlowFactory)

Define a flow factory function to build a flow with a config struct, and run the flow.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hiroara/carbo/flow"
	"github.com/hiroara/carbo/sink"
	"github.com/hiroara/carbo/source"
	"github.com/hiroara/carbo/task"
)

type MyConfig struct {
	StringField string `yaml:"string_field"`
	IntField    int    `yaml:"int_field"`
}

func main() {
	fac := func(cfg *MyConfig) (*flow.Flow, error) {
		ss := source.FromSlice([]string{cfg.StringField})
		pr := task.Connect(
			ss.AsTask(),
			sink.ElementWise(func(ctx context.Context, s string) error {
				fmt.Println(s)
				return nil
			}).AsTask(),
			1,
		)
		return flow.FromTask(pr), nil
	}

	err := flow.RunWithConfig(context.Background(), fac, "testdata/config.yaml")
	if err != nil {
		log.Fatal(err)
	}
}
Output:

value-from-string-field
Example (Registry)

Define multiple flow factories, register them to a registry, and run a flow. This is useful to make an executable that takes a subcommand.

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/hiroara/carbo/flow"
	"github.com/hiroara/carbo/registry"
	"github.com/hiroara/carbo/sink"
	"github.com/hiroara/carbo/source"
	"github.com/hiroara/carbo/task"
)

type MyConfig struct {
	StringField string `yaml:"string_field"`
	IntField    int    `yaml:"int_field"`
}

func main() {
	fac1 := func() (*flow.Flow, error) {
		ss := source.FromSlice([]string{"item1"})
		pr := task.Connect(
			ss.AsTask(),
			sink.ElementWise(func(ctx context.Context, s string) error {
				fmt.Println(s)
				return nil
			}).AsTask(),
			1,
		)
		return flow.FromTask(pr), nil
	}
	fac2 := func(cfg *MyConfig) (*flow.Flow, error) {
		ss := source.FromSlice([]int{cfg.IntField})
		pr := task.Connect(
			ss.AsTask(),
			sink.ElementWise(func(ctx context.Context, i int) error {
				fmt.Println(i)
				return nil
			}).AsTask(),
			1,
		)
		return flow.FromTask(pr), nil
	}

	r := registry.New()
	r.Register("flow1", flow.NewFactory(fac1))
	r.Register("flow2", flow.NewFactoryWithConfig(fac2, "testdata/config.yaml"))
	err := r.Run(context.Background(), "flow2")
	if err != nil {
		log.Fatal(err)
	}
}
Output:

100

Directories

Path Synopsis
Package cache provides a way to define caching behavior used in a data pipeline.
Package cache provides a way to define caching behavior used in a data pipeline.
store
Package defines Store interface for cache.
Package defines Store interface for cache.
Package implements parsing YAML file as a configuration struct.
Package implements parsing YAML file as a configuration struct.
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface.
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface.
Package defines a Flow type that represents an entire data pipeline.
Package defines a Flow type that represents an entire data pipeline.
internal
Package defines Spec which is a type to define marshaling behavior.
Package defines Spec which is a type to define marshaling behavior.
Package defines Pipe which is a type of a task.
Package defines Pipe which is a type of a task.
Package defines Registry that is a place to register flows.
Package defines Registry that is a place to register flows.
Package defines Sink which is a type of a task.
Package defines Sink which is a type of a task.
Package defines Source which is a type of a task.
Package defines Source which is a type of a task.
Package defines Task interface which is a core component to build a data pipeline.
Package defines Task interface which is a core component to build a data pipeline.
Package implements utilities to execute tasks.
Package implements utilities to execute tasks.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL