pfunc

package module
v0.1.2-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2023 License: Apache-2.0, BSD-3-Clause Imports: 21 Imported by: 8

README

logo

last commit GitHub release (latest by date) downloads

! PFUNC IS ALPHA !

PFunc is a Pulsar Function SDK for Go.

Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.

PFunc was created to add some features and to experiment with a more ergonomic API, while also serving as a proof-of-concept for the official SDK. The goal is not to compete with the official SDK, but to help improve it and potentially eventually serve as a replacement.

Status

PFunc is in early pre-release and is not considered production-ready. The API is still in active development, and will likely have at several breaking changes before beta, and may include some before v1.0.0. If you want to experiment with it, version pinning is highly recommended pre 1.0.0.

Please check the milestones for more info on progress to release.

Planned Features

  • Streamlined Messaging API
    • Return one or more messages for multiple topics
    • Support for message properties
    • Support for message key (with automatic defaults)
  • Improved access to context, and user config
  • Support for slog as a logging interface
  • Better crash reporting (initialize logging as soon as possible and report panics when possible)
  • Java SDK Parity
  • New Features
    • Support for go 1.19 soft memory limit
    • Expanded Standard Metrics (messages output, output percentage)
    • Extended User Metrics (more than just an overloaded histogram)
    • Continuous Profiling Reporting (w/ optional reporting on a topic)

Versioning

TBD, but will follow semver conventions first, pulsar major versioning second, if possible.

Usage

TODO

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Documentation

Index

Constants

View Source
const (
	PulsarFunctionMetricsPrefix = "pulsar_function_"

	TotalSuccessfullyProcessed = "processed_successfully_total"
	TotalSystemExceptions      = "system_exceptions_total"
	TotalUserExceptions        = "user_exceptions_total"
	ProcessLatencyMs           = "process_latency_ms"
	LastInvocation             = "last_invocation"
	TotalReceived              = "received_total"

	TotalSuccessfullyProcessed1min = "processed_successfully_total_1min"
	TotalSystemExceptions1min      = "system_exceptions_total_1min"
	TotalUserExceptions1min        = "user_exceptions_total_1min"
	ProcessLatencyMs1min           = "process_latency_ms_1min"
	TotalReceived1min              = "received_total_1min"

	UserMetric = "user_metric"
)

Variables

This section is empty.

Functions

func GetUserConfMap

func GetUserConfMap() map[string]any

GetUserConfMap provides a means to access the pulsar function's user config map before initializing the pulsar function

func GetUserConfValue

func GetUserConfValue(key string) any

GetUserConfValue provides access to a user configuration value before initializing the pulsar function

func NewContext

func NewContext(parent context.Context, fc *FunctionContext) context.Context

NewContext returns a new Context that carries value u.

func Start

func Start(funcName any)

Rules:

  • handler must be a function
  • handler may take between 0 and two arguments.
  • if there are two arguments, the first argument must satisfy the "context.Context" interface.
  • handler may return between 0 and two arguments.
  • if there are two return values, the second argument must be an error.
  • if there is one return value it must be an error.

Valid function signatures:

func ()
func () error
func (input) error
func () (output, error)
func (input) (output, error)
func (context.Context) error
func (context.Context, input) error
func (context.Context) (output, error)
func (context.Context, input) (output, error)

Where "input" and "output" are types compatible with the "encoding/json" standard library. See https://golang.org/pkg/encoding/json/#Unmarshal for how deserialization behaves

Types

type FunctionContext

type FunctionContext struct {
	// contains filtered or unexported fields
}

FunctionContext provides contextual information to the executing function. Features like which message id we are handling, whats the topic name of the message, what are our operating constraints, etc can be accessed by the executing function

func FromContext

func FromContext(ctx context.Context) (*FunctionContext, bool)

FromContext returns the User value stored in ctx, if any.

func NewFuncContext

func NewFuncContext() *FunctionContext

NewFuncContext returns a new Function context

func (*FunctionContext) GetClusterName

func (c *FunctionContext) GetClusterName() string

GetClusterName returns the name of the cluster the pulsar function is running in

func (*FunctionContext) GetCurrentRecord

func (c *FunctionContext) GetCurrentRecord() pulsar.Message

GetCurrentRecord gets the current message from the function context

func (*FunctionContext) GetExpectedHealthCheckInterval

func (c *FunctionContext) GetExpectedHealthCheckInterval() int32

GetExpectedHealthCheckInterval returns the expected time between health checks in seconds

func (*FunctionContext) GetExpectedHealthCheckIntervalAsDuration

func (c *FunctionContext) GetExpectedHealthCheckIntervalAsDuration() time.Duration

GetExpectedHealthCheckIntervalAsDuration returns the expected time between health checks in seconds as a time.Duration

func (*FunctionContext) GetFuncID

func (c *FunctionContext) GetFuncID() string

GetFuncID returns the id of the pulsar function

func (*FunctionContext) GetFuncName

func (c *FunctionContext) GetFuncName() string

GetFuncName returns the name given to the pulsar function

func (*FunctionContext) GetFuncNamespace

func (c *FunctionContext) GetFuncNamespace() string

GetFuncNamespace returns the namespace the pulsar function belongs to

func (*FunctionContext) GetFuncTenant

func (c *FunctionContext) GetFuncTenant() string

GetFuncTenant returns the tenant the pulsar function belongs to

func (*FunctionContext) GetFuncVersion

func (c *FunctionContext) GetFuncVersion() string

GetFuncVersion returns the version of the pulsar function

func (*FunctionContext) GetInputTopics

func (c *FunctionContext) GetInputTopics() []string

GetInputTopics returns a list of all input topics the pulsar function has been invoked on

func (*FunctionContext) GetInstanceID

func (c *FunctionContext) GetInstanceID() int

GetInstanceID returns the id of the instance that invokes the running pulsar function.

func (*FunctionContext) GetMaxIdleTime

func (c *FunctionContext) GetMaxIdleTime() int64

GetMaxIdleTime returns the amount of time the pulsar function has to respond to the most recent health check before it is considered to be failing.

func (*FunctionContext) GetMetricsPort

func (c *FunctionContext) GetMetricsPort() int

GetMetricsPort returns the port the pulsar function metrics listen on

func (*FunctionContext) GetOutputTopic

func (c *FunctionContext) GetOutputTopic() string

GetOutputTopic returns the output topic the pulsar function was invoked on

func (*FunctionContext) GetPort

func (c *FunctionContext) GetPort() int

GetPort returns the port the pulsar function communicates on

func (*FunctionContext) GetTenantAndNamespace

func (c *FunctionContext) GetTenantAndNamespace() string

GetTenantAndNamespace returns the tenant and namespace the pulsar function belongs to in the format of `<tenant>/<namespace>`

func (*FunctionContext) GetTenantAndNamespaceAndName

func (c *FunctionContext) GetTenantAndNamespaceAndName() string

GetTenantAndNamespaceAndName returns the full name of the pulsar function in the format of `<tenant>/<namespace>/<function name>`

func (*FunctionContext) GetUserConfMap

func (c *FunctionContext) GetUserConfMap() map[string]any

GetUserConfMap returns the pulsar function's user configuration map

func (*FunctionContext) GetUserConfValue

func (c *FunctionContext) GetUserConfValue(key string) any

GetUserConfValue returns the value of a key from the pulsar function's user configuration map

func (*FunctionContext) NewOutputMessage

func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer

NewOutputMessage send message to the topic @param topicName: The name of the topic for output message

func (*FunctionContext) RecordMetric

func (c *FunctionContext) RecordMetric(metricName string, metricValue float64)

RecordMetric records an observation to the user_metric summary with the provided value

func (*FunctionContext) SetCurrentRecord

func (c *FunctionContext) SetCurrentRecord(record pulsar.Message)

SetCurrentRecord sets the current message into the function context called for each message before executing a handler function

type InstanceControlServicer

type InstanceControlServicer struct {
	fn.UnimplementedInstanceControlServer
	// contains filtered or unexported fields
}

func (*InstanceControlServicer) GetAndResetMetrics

func (icServicer *InstanceControlServicer) GetAndResetMetrics(
	ctx context.Context, req *empty.Empty,
) (*fn.MetricsData, error)

func (*InstanceControlServicer) GetFunctionStatus

func (icServicer *InstanceControlServicer) GetFunctionStatus(
	ctx context.Context, req *empty.Empty,
) (*fn.FunctionStatus, error)

func (*InstanceControlServicer) GetMetrics

func (icServicer *InstanceControlServicer) GetMetrics(
	ctx context.Context, req *empty.Empty,
) (*fn.MetricsData, error)

func (*InstanceControlServicer) HealthCheck

func (icServicer *InstanceControlServicer) HealthCheck(
	ctx context.Context, req *empty.Empty,
) (*fn.HealthCheckResult, error)

func (*InstanceControlServicer) ResetMetrics

func (icServicer *InstanceControlServicer) ResetMetrics(
	ctx context.Context, req *empty.Empty,
) (*empty.Empty, error)

type LatestException

type LatestException struct {
	// contains filtered or unexported fields
}

type LogAppender

type LogAppender struct {
	// contains filtered or unexported fields
}

func NewLogAppender

func NewLogAppender(client pulsar.Client, logTopic, fqn string) *LogAppender

func (*LogAppender) Append

func (la *LogAppender) Append(logByte []byte)

func (*LogAppender) GetName

func (la *LogAppender) GetName() string

func (*LogAppender) Start

func (la *LogAppender) Start() error

func (*LogAppender) Stop

func (la *LogAppender) Stop()

type MetricsServicer

type MetricsServicer struct {
	// contains filtered or unexported fields
}

func NewMetricsServicer

func NewMetricsServicer(goInstance *goInstance) *MetricsServicer

type StatWithLabelValues

type StatWithLabelValues struct {
	// contains filtered or unexported fields
}

Be sure to use the constructor method: NewStatWithLabelValues

func NewStatWithLabelValues

func NewStatWithLabelValues(metricsLabels ...string) StatWithLabelValues

type TopicName

type TopicName struct {
	Domain    string
	Namespace string
	Name      string
	Partition int
}

TopicName abstract a struct contained in a Topic

func ParseTopicName

func ParseTopicName(topic string) (*TopicName, error)

ParseTopicName parse the given topic name and return TopicName.

func (*TopicName) NameWithoutPartition

func (tn *TopicName) NameWithoutPartition() string

NameWithoutPartition returns the topic name, sans the partition portion

Directories

Path Synopsis
examples module
internal
pb/pulsar/fn
Package fn provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol.
Package fn provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL