streamdal

package module
v0.0.86 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: Apache-2.0 Imports: 26 Imported by: 1

README

Streamdal Go SDK

Master build status Test Coverage Maintainability Go Report Card GitHub

Documentation

See https://docs.streamdal.com

Installation
go get github.com/streamdal/go-sdk
Example Usage
package main

import (
	"context"
	"fmt"

	"github.com/streamdal/go-sdk"
)

func main() {
	sc, _ := streamdal.New(&streamdal.Config{
		// Address of the streamdal server
		ServerURL:       "streamdal-server.svc.cluster.local:8082",
		
		// Token used for authenticating with the streamdal server
		ServerToken:     "1234",
		
		// Identify _this_ application/service (
		ServiceName:     "billing-svc",
	})
	
	resp := sc.Process(context.Background(), &streamdal.ProcessRequest{
		OperationType: streamdal.OperationTypeConsumer,
		OperationName: "new-order-topic",
		ComponentName: "kafka",
		Data:          []byte(`{"object": {"field": true}}`),
	})
	
	// Check if the .Process() call completed
	if resp.Status != streamdal.StatusError {
		fmt.Println("Successfully processed payload")
    }
	
	// Or you can inspect each individual pipeline & step result
	for _, pipeline := resp.PipelineStatus {
		fmt.Printf("Inspecting '%d' steps in pipeline '%s'...\n", len(resp.PipelineStatus), pipeline.Name)
		
		for _, step := range pipeline.StepStatus {
			fmt.Printf("Step '%s' status: '%s'\n", step.Name, step.Status)
		}
    }
}

Configuration

All configuration can be passed via streamdal.Config{}. Some values can be set via environment variables in order to support 12-Factor and usage of this SDK inside shims where streamdal.Config{} cannot be set.

Config Parameter Environment Variable Description Default
ServerURL STREAMDAL_URL URL pointing to your instance of streamdal server's gRPC API. Ex: localhost:8082 empty
ServerToken STREAMDAL_TOKEN API token set in streamdal server empty
ServiceName STREAMDAL_SERVICE_NAME Identifies this service in the streamdal console empty
PipelineTimeout STREAMDAL_PIPELINE_TIMEOUT Maximum time a pipeline can run before giving up 100ms
StepTimeout STREAMDAL_STEP_TIMEOUT Maximum time a pipeline step can run before giving up 10ms
DryRun STREAMDAL_DRY_RUN If true, no data will be modified false
Logger An optional custom logger
ClientType 1 = ClientTypeSDK, 2 = ClientTypeShim ClientTypeSDK
ShutdownCtx - Your application's main context which will receive shutdown signals
Metrics

Metrics are published to Streamdal server and are available in Prometheus format at http://streamdal_server_url:8081/metrics

Metric Description Labels
streamdal_counter_consume_bytes Number of bytes consumed by the client service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_consume_errors Number of errors encountered while consuming payloads service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_consume_processed Number of payloads processed by the client service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_produce_bytes Number of bytes produced by the client service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_produce_errors Number of errors encountered while producing payloads service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_produce_processed Number of payloads processed by the client service, component_name, operation_name, pipeline_id, pipeline_name
streamdal_counter_notify Number of notifications sent to the server service, component_name, operation_name, pipeline_id, pipeline_name

Documentation

Overview

Package streamdal is a library that allows running of Client data pipelines against data This package is designed to be included in golang message bus libraries. The only public method is Process() which is used to run pipelines against data.

Use of this package requires a running instance of a streamdal server©. The server can be downloaded at https://github.com/streamdal/streamdal/tree/main/apps/server

The following environment variables must be set: - STREAMDAL_URL: The address of the Client server - STREAMDAL_TOKEN: The token to use when connecting to the Client server - STREAMDAL_SERVICE_NAME: The name of the service to identify it in the streamdal console

Optional parameters: - STREAMDAL_DRY_RUN: If true, rule hits will only be logged, no failure modes will be ran

Index

Constants

View Source
const (
	// DefaultPipelineTimeoutDurationStr is the default timeout for a pipeline execution
	DefaultPipelineTimeoutDurationStr = "100ms"

	// DefaultStepTimeoutDurationStr is the default timeout for a single step.
	DefaultStepTimeoutDurationStr = "10ms"

	// ReconnectSleep determines the length of time to wait between reconnect attempts to streamdal server©
	ReconnectSleep = time.Second * 5

	// MaxWASMPayloadSize is the maximum size of data that can be sent to the WASM module
	MaxWASMPayloadSize = 1024 * 1024 // 1Mi

	// ClientTypeSDK & ClientTypeShim are referenced by shims and SDKs to indicate
	// in what context this SDK is being used.
	ClientTypeSDK  ClientType = 1
	ClientTypeShim ClientType = 2

	// OperationTypeConsumer and OperationTypeProducer are used to indicate the
	// type of operation the Process() call is performing.
	OperationTypeConsumer OperationType = 1
	OperationTypeProducer OperationType = 2

	AbortAllStr     = "aborted all pipelines"
	AbortCurrentStr = "aborted current pipeline"
	AbortNoneStr    = "no abort condition"

	// ExecStatusTrue & ExecStatusFalse & ExecStatusError are used to indicate
	// the execution status of a step.
	ExecStatusTrue  = protos.ExecStatus_EXEC_STATUS_TRUE
	ExecStatusFalse = protos.ExecStatus_EXEC_STATUS_FALSE
	ExecStatusError = protos.ExecStatus_EXEC_STATUS_ERROR
)
View Source
const (
	// NumTailWorkers is the number of tail workers to start for each tail request
	// The workers are responsible for reading from the tail channel and streaming
	// TailResponse messages to the server
	NumTailWorkers = 2

	// MinTailResponseIntervalMS is how often we send a TailResponse to the server
	// If this rate is exceeded, we will drop messages rather than flooding the server
	// This is an int to avoid a .Milliseconds() call
	MinTailResponseIntervalMS = 10
)

Variables

View Source
var (
	ErrEmptyConfig          = errors.New("config cannot be empty")
	ErrEmptyServiceName     = errors.New("data source cannot be empty")
	ErrEmptyOperationName   = errors.New("operation name cannot be empty")
	ErrInvalidOperationType = errors.New("operation type must be set to either OperationTypeConsumer or OperationTypeProducer")
	ErrEmptyComponentName   = errors.New("component name cannot be empty")
	ErrMissingShutdownCtx   = errors.New("shutdown context cannot be nil")
	ErrEmptyCommand         = errors.New("command cannot be empty")
	ErrEmptyProcessRequest  = errors.New("process request cannot be empty")

	// ErrMaxPayloadSizeExceeded is returned when the payload is bigger than MaxWASMPayloadSize
	ErrMaxPayloadSizeExceeded = fmt.Errorf("payload size exceeds maximum of '%d' bytes", MaxWASMPayloadSize)

	// ErrPipelineTimeout is returned when a pipeline exceeds the configured timeout
	ErrPipelineTimeout = errors.New("pipeline timeout exceeded")
)
View Source
var (
	ErrPipelineNotPaused = errors.New("pipeline not paused")
	ErrPipelineNotActive = errors.New("pipeline not active or does not exist")
)

Functions

This section is empty.

Types

type Audience

type Audience struct {
	ComponentName string
	OperationType OperationType
	OperationName string
}

Audience is used to announce an audience to the Streamdal server on library initialization We use this to avoid end users having to import our protos

type ClientType

type ClientType int

ClientType is used to indicate if this library is being used by a shim or directly (as an SDK)

type Config

type Config struct {
	// ServerURL the hostname and port for the gRPC API of Streamdal Server
	// If this value is left empty, the library will not attempt to connect to the server
	// and New() will return nil
	ServerURL string

	// ServerToken is the authentication token for the gRPC API of the Streamdal server
	// If this value is left empty, the library will not attempt to connect to the server
	// and New() will return nil
	ServerToken string

	// ServiceName is the name that this library will identify as in the UI. Required
	ServiceName string

	// PipelineTimeout defines how long this library will allow a pipeline to
	// run. Optional; default: 100ms
	PipelineTimeout time.Duration

	// StepTimeout defines how long this library will allow a single step to run.
	// Optional; default: 10ms
	StepTimeout time.Duration

	// IgnoreStartupError defines how to handle an error on initial startup via
	// New(). If left as false, failure to complete startup (such as bad auth)
	// will cause New() to return an error. If true, the library will block and
	// continue trying to initialize. You may want to adjust this if you want
	// your application to behave a certain way on startup when the server
	// is unavailable. Optional; default: false
	IgnoreStartupError bool

	// If specified, library will connect to the server but won't apply any
	// pipelines. Optional; default: false
	DryRun bool

	// ShutdownCtx is a context that the library will listen to for cancellation
	// notices. Optional; default: nil
	ShutdownCtx context.Context

	// Logger is a logger you can inject (such as logrus) to allow this library
	// to log output. Optional; default: nil
	Logger logger.Logger

	// Audiences is a list of audiences you can specify at registration time.
	// This is useful if you know your audiences in advance and want to populate
	// service groups in the Streamdal UI _before_ your code executes any .Process()
	// calls. Optional; default: nil
	Audiences []*Audience

	// ClientType specifies whether this of the SDK is used in a shim library or
	// as a standalone SDK. This information is used for both debug info and to
	// help the library determine whether ServerURL and ServerToken should be
	// optional or required. Optional; default: ClientTypeSDK
	ClientType ClientType
}

type IStreamdal

type IStreamdal interface {
	// Process is used to run data pipelines against data
	Process(ctx context.Context, req *ProcessRequest) *ProcessResponse
}

type OperationType

type OperationType int

OperationType is used to indicate if the operation is a consumer or a producer

type ProcessRequest

type ProcessRequest struct {
	ComponentName string
	OperationType OperationType
	OperationName string
	Data          []byte
}

ProcessRequest is used to maintain a consistent API for the Process() call

type ProcessResponse

type ProcessResponse protos.SDKResponse

ProcessResponse is the response struct from a Process() call

type Streamdal

type Streamdal struct {
	// contains filtered or unexported fields
}

Streamdal is the main struct for this library

func New

func New(cfg *Config) (*Streamdal, error)

func (*Streamdal) Process

func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessResponse

type Tail

type Tail struct {
	Request    *protos.Command
	CancelFunc context.CancelFunc
	// contains filtered or unexported fields
}

func (*Tail) ShipResponse

func (t *Tail) ShipResponse(tr *protos.TailResponse)

func (*Tail) ShouldSend added in v0.0.78

func (t *Tail) ShouldSend() bool

Directories

Path Synopsis
Package helper contains WASM-related helper functions and methods.
Package helper contains WASM-related helper functions and methods.
Package hostfunc contains host function methods.
Package hostfunc contains host function methods.
kv
kvfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
loggerfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Package metrics is responsible for tracking and publishing metrics to the Streamdal server.
Package metrics is responsible for tracking and publishing metrics to the Streamdal server.
metricsfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Package server is a wrapper for the Client gRPC API.
Package server is a wrapper for the Client gRPC API.
serverfakes
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL