foundation

package module
v0.0.0-...-fd49e9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: MIT Imports: 46 Imported by: 5

README ΒΆ

Foundation

Go Build Go Report Card License

Early Development Notice: Foundation is currently in an early development stage. While you're welcome to explore and experiment, it's not yet ready for production use.

πŸ” Overview

Foundation is a Go-based microservices framework aimed to help developers write scalable, resilient, and highly available applications with ease. By providing a cohesive set of well-chosen tools and features, Foundation aims to minimize boilerplate and allow developers to focus on writing business logic.

Foundation is built upon several proven technologies including:

  • gRPC: A high-performance, open-source RPC framework.
  • gRPC Gateway: A gRPC to JSON reverse proxy.
  • Protocol Buffers: A language-neutral extensible mechanism for serializing structured data, used for gRPC and Kafka message serialization.
  • Kafka: A powerful distributed streaming platform.
  • OAuth 2.0: An industry-standard authorization framework.
  • PostgreSQL: A robust open-source relational database system.
  • WebSockets: Enabling real-time, bi-directional, and full-duplex communication channels over TCP connections.

⭐ Key Features

  • πŸŒ‰ Running Modes: Adapt Foundation to cater to diverse operational requirements:
    • Gateway Mode: Facilitate the exposure of gRPC services as HTTP endpoints, leveraging gRPC Gateway. This mode acts as a bridge, allowing HTTP clients to communicate with your gRPC microservices transparently.
    • gRPC Mode: Operate as a standard gRPC server, enabling high-performance RPC communication, ideal for microservices interaction.
    • HTTP Mode: Deploy as a traditional HTTP server, offering a more general-purpose approach for serving web requests.
    • Spin Worker Mode: This is your background worker, designed to continuously execute tasks. It offers configurability in terms of processing functions and the interval between task iterations.
    • Jobs Worker Mode: A mode to run background jobs with Gocraft Work. Support scheduled jobs, retrying, and concurrency.
    • Events Worker Mode: Building on the Worker Mode, this variant is tailored for Kafka. It ingests messages from Kafka topics and triggers associated Go function handlers.
    • Job Mode: Best suited for one-off operations. Think of tasks like initializing your database, running migrations, or seeding initial data.
    • Cable gRPC Mode: Function as an AnyCable-compatible gRPC server, ideal for real-time WebSocket functionalities without sacrificing scalability.
    • Cable Courier Mode: This mode specializes in reading events from Kafka and then broadcasting them to Redis, readying the events for AnyCable processing. Yeah, it would be much better if we could just use Kafka directly, but AnyCable doesn't support it.
    • Outbox Courier Mode: A mode to run a Kafka producer that reads messages from the database and publishes them to Kafka. This is useful for implementing the transactional outbox pattern.
  • πŸ“¬ Transactional Outbox: Implement the transactional outbox pattern for transactional message publishing to Kafka.
  • ✏️ Unified Logging: Conveniently log with colors during development and structured logging in production using logrus.
  • πŸ” Tracing: Trace and log your requests in a structured format with OpenTracing.
  • πŸ“Š Metrics: Collect and expose service metrics to Prometheus.
  • πŸ’“ Health Check: Provide Kubernetes with health status of your service.
  • πŸ” (m)TLS: TLS authentication for Kafka and mTLS for gRPC.
  • ⏳ Graceful Shutdown: Ensure clean shutdown on SIGTERM signal reception.
  • πŸ› οΈ Helpers: A variety of helpers for common tasks.
  • πŸ–₯️ CLI: A CLI tool to help you get started and manage your project.

πŸ”Œ Integrations

Foundation comes with built-in support for:

  • PostgreSQL: Easily connect to a PostgreSQL database.
  • Dotenv: Load environment variables from .env files.
  • ORY Hydra: Authenticate users on a gateway with ORY Hydra.
  • gRPC Gateway: Expose gRPC services as JSON endpoints.
  • Kafka: Produce and consume messages with Kafka (via kafka-go).
  • AnyCable: Implement real-time WebSocket functionalities with AnyCable.
  • Gocraft Work: Run background jobs with Gocraft Work.
  • Sentry: Report errors to Sentry.

πŸš€ Getting Started

Currently, the best way to get started is by exploring the examples directory. There is an example application called clubchat that demonstrates how to use Foundation to create a simple event-driven microservices application.

πŸ–₯️ CLI Tool

To install the CLI tool, run:

go install github.com/foundation-go/foundation/cmd/foundation@main

There are several commands available:

foundation completion # Generate shell completion scripts (prints to stdout)
foundation db:migrate # Run database migrations
foundation db:rollback # Rollback database migrations
foundation start # Start the service (you will be prompted to choose a service to start)
foundation test # Run tests
foundation new # Create `--app` or `--service`

You can also run foundation without any arguments to see a list of available commands, or run foundation <command> --help to see the available options for a specific command.

🀝 Contributing

We're always looking for contributions from the community! If you've found a bug, have a suggestion, or want to add a new feature, feel free to open an issue or submit a pull request.

πŸ“œ License

Foundation is released under the MIT License.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

View Source
const (
	OutboxDefaultBatchSize = 100
	OutboxDefaultInterval  = time.Second * 1
)
View Source
const (
	// Alphabet is the default alphabet used for string generation.
	Alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
)
View Source
const (
	// GatewayDefaultTimeout is the default timeout for downstream services requests.
	GatewayDefaultTimeout = 30 * time.Second
)
View Source
const (
	MetricsServerComponentName = "metrics-server"
)
View Source
const (
	SpinWorkerDefaultInterval = 5 * time.Millisecond
)
View Source
const Version = "0.2.1"

Variables ΒΆ

This section is empty.

Functions ΒΆ

func AddSuffix ΒΆ

func AddSuffix(s string, suffix string) string

AddSuffix adds a suffix to a string, if it doesn't already have it.

func CableDefaultErrorResolver ΒΆ

func CableDefaultErrorResolver(_ context.Context, event *Event, _ proto.Message) (string, error)

CableDefaultErrorResolver is a default resolver for errors that returns a stream name based on the user ID in the event headers.

func Clone ΒΆ

func Clone(obj interface{}) interface{}

Clone clones an object.

func ExtractHostAndPort ΒΆ

func ExtractHostAndPort(URL string) (string, error)

func GenerateRandomString ΒΆ

func GenerateRandomString(n int) string

GenerateRandomString generates a random string of length n. Panics if an error occurs.

func GetEnvOrBool ΒΆ

func GetEnvOrBool(key string, defaultValue bool) bool

GetEnvOrBool returns the value of the environment variable named by the key argument, or defaultValue if there is no such variable set or it is empty.

func GetEnvOrFloat ΒΆ

func GetEnvOrFloat(key string, defaultValue float64) float64

GetEnvOrFloat returns the value of the environment variable named by the key argument, or defaultValue if there is no such variable set or it is empty.

func GetEnvOrInt ΒΆ

func GetEnvOrInt(key string, defaultValue int) int

GetEnvOrInt returns the value of the environment variable named by the key argument, or defaultValue if there is no such variable set or it is empty.

func GetEnvOrString ΒΆ

func GetEnvOrString(key string, defaultValue string) string

GetEnvOrString returns the value of the environment variable named by the key argument, or defaultValue if there is no such variable set or it is empty.

func IsDevelopmentEnv ΒΆ

func IsDevelopmentEnv() bool

IsDevelopmentEnv returns true if the service is running in development mode.

func IsProductionEnv ΒΆ

func IsProductionEnv() bool

IsProductionEnv returns true if the service is running in production mode.

func IsTestEnv ΒΆ

func IsTestEnv() bool

IsTestEnv returns true if the service is running in test mode.

func NewMessageFromEvent ΒΆ

func NewMessageFromEvent(event *Event) (*kafka.Message, error)

NewMessageFromEvent creates a new Kafka message from a Foundation Outbox event

func ProtoNameToTopic ΒΆ

func ProtoNameToTopic(protoName string) string

TODO: extract these functions to a more appropriate place

func ProtoToName ΒΆ

func ProtoToName(msg proto.Message) string

func ProtoToTopic ΒΆ

func ProtoToTopic(msg proto.Message) string

Types ΒΆ

type CableCourier ΒΆ

type CableCourier struct {
	*EventsWorker

	Options *CableCourierOptions
}

CableCourier is a mode in which events are received from Kafka and published to Redis PubSub channels for AnyCable.

func InitCableCourier ΒΆ

func InitCableCourier(name string) *CableCourier

InitCableCourier initializes a new CableCourier.

func (*CableCourier) Start ΒΆ

func (c *CableCourier) Start(opts *CableCourierOptions)

Start runs a cable_courier worker using the given CableCourierOptions.

type CableCourierOptions ΒΆ

type CableCourierOptions struct {
	// Resolvers map protocol names to lists of CableMessageResolvers.
	Resolvers map[proto.Message][]CableMessageResolver
	// RedisChannel is the name of the Redis PubSub channel to publish events to.
	RedisChannel string
}

CableCourierOptions contains configuration options to instantiate a CableCourier. It maps protocol names to their corresponding message resolvers.

func (*CableCourierOptions) EventHandlers ΒΆ

func (opts *CableCourierOptions) EventHandlers(s *Service) map[proto.Message][]EventHandler

EventHandlers takes the resolvers defined in CableCourierOptions and wraps them into event handlers.

type CableCourierResolvers ΒΆ

type CableCourierResolvers map[proto.Message][]CableMessageResolver

CableCourierResolvers maps proto.Message types to their corresponding resolvers.

type CableGRPC ΒΆ

type CableGRPC struct {
	*Service

	Options *CableGRPCOptions
}

CableGRPC is a Foundation service in AnyCable gRPC Server mode.

func InitCableGRPC ΒΆ

func InitCableGRPC(name string) *CableGRPC

InitCableGRPC initializes a Foundation service in AnyCable gRPC Server mode.

func (*CableGRPC) ServiceFunc ΒΆ

func (s *CableGRPC) ServiceFunc(ctx context.Context) error

func (*CableGRPC) Start ΒΆ

func (s *CableGRPC) Start(opts *CableGRPCOptions)

Start runs the Foundation as an AnyCable-compartible gRPC server.

type CableGRPCOptions ΒΆ

type CableGRPCOptions struct {
	// GRPCServerOptions are the gRPC server options to use.
	GRPCServerOptions []grpc.ServerOption

	// StartComponentsOptions are the options to start the components.
	StartComponentsOptions []StartComponentsOption

	// Channels are the channels to use.
	Channels map[string]cablegrpc.Channel

	// WithAuthentication enables authentication.
	WithAuthentication bool
	// AuthenticationFunc is the function to use for authentication.
	AuthenticationFunc cablegrpc.AuthenticationFunc
}

CableGRPCOptions are the options to start a Foundation service in gRPC Server mode.

func NewCableGRPCOptions ΒΆ

func NewCableGRPCOptions() *CableGRPCOptions

type CableMessageEventHandler ΒΆ

type CableMessageEventHandler struct {
	// Resolver is used to resolve the stream name for a given event.
	Resolver     CableMessageResolver
	Logger       *logrus.Entry
	Service      *Service
	RedisChannel string
}

CableMessageEventHandler is a concrete implementation of EventHandler that uses a CableMessageResolver to handle events.

func (*CableMessageEventHandler) Handle ΒΆ

Handle uses the associated CableMessageResolver to determine the appropriate stream for the event and broadcasts the message to that stream.

type CableMessageResolver ΒΆ

type CableMessageResolver func(context.Context, *Event, proto.Message) (string, error)

CableMessageResolver is a function that resolves the stream name for a given event.

type Component ΒΆ

type Component interface {
	// Health returns the health of the component
	Health() error
	// Name returns the name of the component
	Name() string
	// Start runs the component
	Start() error
	// Stop stops the component
	Stop() error
}

Component describes an interface for all components in the Foundation framework. This could be an external service, a database, a cache, etc.

type Config ΒΆ

type Config struct {
	Database     *DatabaseConfig
	EventsWorker *EventsWorkerConfig
	GRPC         *GRPCConfig
	Kafka        *KafkaConfig
	Metrics      *MetricsConfig
	Outbox       *OutboxConfig
	Redis        *RedisConfig
	Sentry       *SentryConfig
	JobsEnqueuer *JobsEnqueuerConfig
}

Config represents the configuration of a Service.

func NewConfig ΒΆ

func NewConfig() *Config

NewConfig returns a new Config with values populated from environment variables.

type DatabaseConfig ΒΆ

type DatabaseConfig struct {
	Enabled bool
	Pool    int
	URL     string
}

DatabaseConfig represents the configuration of a PostgreSQL database.

type Env ΒΆ

type Env string

Env represents the service environment name (development, production, etc).

const (
	EnvDevelopment Env = "development"
	EnvProduction  Env = "production"
	EnvTest        Env = "test"
)

func FoundationEnv ΒΆ

func FoundationEnv() Env

FoundationEnv returns the service environment name.

type ErrorHandlingStrategy ΒΆ

type ErrorHandlingStrategy int

ErrorHandlingStrategy defines the EventsWorker behavior when errors occur while handle event

const (
	// Default strategy: commit the message and skip the event
	IgnoreError ErrorHandlingStrategy = iota

	// ShutdownOnError stops the worker on error
	ShutdownOnError
)

type Event ΒΆ

type Event struct {
	Topic     string
	Key       string
	Payload   []byte
	ProtoName string
	Headers   map[string]string
	CreatedAt time.Time
}

Event represents an event to be published to the outbox

func NewEventFromProto ΒΆ

func NewEventFromProto(msg proto.Message, key string, headers map[string]string) (*Event, ferr.FoundationError)

NewEventFromProto creates a new event from a protobuf message

func (*Event) Unmarshal ΒΆ

func (e *Event) Unmarshal(msg proto.Message) ferr.FoundationError

Unmarshal unmarshals the event payload into a protobuf message

type EventHandler ΒΆ

type EventHandler interface {
	Handle(context.Context, *Event, proto.Message) ([]*Event, ferr.FoundationError)
}

EventHandler represents an event handler

type EventsWorker ΒΆ

type EventsWorker struct {
	*SpinWorker
	// contains filtered or unexported fields
}

func InitEventsWorker ΒΆ

func InitEventsWorker(name string) *EventsWorker

func (*EventsWorker) Start ΒΆ

func (w *EventsWorker) Start(opts *EventsWorkerOptions)

Start runs the worker that handles events

type EventsWorkerConfig ΒΆ

type EventsWorkerConfig struct {
	// ErrorsTopic is the name of the Kafka topic to which errors from the
	// events worker handlers should be published.
	ErrorsTopic string

	// DeliverErrors determines whether errors from events worker handlers
	// should be published to the errors topic (and thus, delivered
	// to originator, aka user) or not.
	DeliverErrors bool
}

EventsWorkerConfig represents the configuration of an event bus.

type EventsWorkerOptions ΒΆ

type EventsWorkerOptions struct {
	Handlers               map[proto.Message][]EventHandler
	Topics                 []string
	ModeName               string
	ErrorHandlingStrategy  ErrorHandlingStrategy
	StartComponentsOptions []StartComponentsOption
}

EventsWorkerOptions represents the options for starting an events worker

func (*EventsWorkerOptions) GetTopics ΒΆ

func (opts *EventsWorkerOptions) GetTopics() []string

func (*EventsWorkerOptions) ProtoNamesToMessages ΒΆ

func (opts *EventsWorkerOptions) ProtoNamesToMessages() map[string]proto.Message

type GRPCConfig ΒΆ

type GRPCConfig struct {
	TLSDir string
}

GRPCConfig represents the configuration of a gRPC server.

type GRPCServer ΒΆ

type GRPCServer struct {
	*Service

	Options *GRPCServerOptions
}

GRPCServer represents a gRPC server mode Foundation service.

func InitGRPCServer ΒΆ

func InitGRPCServer(name string) *GRPCServer

InitGRPCServer initializes a new Foundation service in gRPC Server mode.

func (*GRPCServer) ServiceFunc ΒΆ

func (s *GRPCServer) ServiceFunc(ctx context.Context) error

func (*GRPCServer) Start ΒΆ

func (s *GRPCServer) Start(opts *GRPCServerOptions)

Start initializes the Foundation service in gRPC server mode.

type GRPCServerOptions ΒΆ

type GRPCServerOptions struct {
	// RegisterFunc is a function that registers the gRPC server implementation.
	RegisterFunc func(s *grpc.Server)

	// GRPCServerOptions are the gRPC server options to use.
	GRPCServerOptions []grpc.ServerOption

	// StartComponentsOptions are the options to start the components.
	StartComponentsOptions []StartComponentsOption
}

GRPCServerOptions are the options to start a Foundation service in gRPC Server mode.

func NewGRPCServerOptions ΒΆ

func NewGRPCServerOptions() *GRPCServerOptions

type Gateway ΒΆ

type Gateway struct {
	*Service

	Options *GatewayOptions
}

Gateway represents a gateway mode Foundation service.

func InitGateway ΒΆ

func InitGateway(name string) *Gateway

InitGateway initializes a new Foundation service in Gateway mode.

func (*Gateway) ServiceFunc ΒΆ

func (s *Gateway) ServiceFunc(ctx context.Context) error

func (*Gateway) Start ΒΆ

func (s *Gateway) Start(opts *GatewayOptions)

Start runs the Foundation gateway.

type GatewayOptions ΒΆ

type GatewayOptions struct {
	// Services to register with the gateway
	Services []*gateway.Service
	// Timeout for downstream services requests (default: 30 seconds, if constructed with `NewGatewayOptions`)
	Timeout time.Duration
	// AuthenticationDetailsMiddleware is a middleware that populates the request context with authentication details.
	AuthenticationDetailsMiddleware func(http.Handler) http.Handler
	// WithAuthentication enables authentication for the gateway.
	WithAuthentication bool
	// AuthenticationExcept is a list of paths that should not be authenticated.
	AuthenticationExcept []string
	// Middleware is a list of middleware to apply to the gateway. The middleware is applied in the order it is defined.
	Middleware []func(http.Handler) http.Handler
	// StartComponentsOptions are the options to start the components.
	StartComponentsOptions []StartComponentsOption
	// CORSOptions are the options for CORS.
	CORSOptions *gateway.CORSOptions
}

GatewayOptions represents the options for starting the Foundation gateway.

func NewGatewayOptions ΒΆ

func NewGatewayOptions() *GatewayOptions

NewGatewayOptions returns a new GatewayOptions with default values.

type HTTPServer ΒΆ

type HTTPServer struct {
	*Service

	Options *HTTPServerOptions
}

HTTPServer represents a HTTP Server mode Foundation service.

func InitHTTPServer ΒΆ

func InitHTTPServer(name string) *HTTPServer

InitHTTPServer initializes a new Foundation service in HTTP Server mode.

func (*HTTPServer) ServiceFunc ΒΆ

func (s *HTTPServer) ServiceFunc(ctx context.Context) error

func (*HTTPServer) Start ΒΆ

func (s *HTTPServer) Start(opts *HTTPServerOptions)

Start runs the Foundation service in HTTP Server mode.

type HTTPServerOptions ΒΆ

type HTTPServerOptions struct {
	// Handler is the HTTP handler to use.
	Handler http.Handler

	// StartComponentsOptions are the options to start the components.
	StartComponentsOptions []StartComponentsOption
}

HTTPServerOptions are the options to start a Foundation service in HTTP Server mode.

func NewHTTPServerOptions ΒΆ

func NewHTTPServerOptions() *HTTPServerOptions

type JobOptions ΒΆ

type JobOptions struct {
	Handler  func(job *work.Job) error
	Schedule string
	Options  *work.JobOptions
}

type JobsEnqueuerConfig ΒΆ

type JobsEnqueuerConfig struct {
	Enabled   bool
	URL       string
	Pool      int
	Namespace string
}

JobsEnqueuerConfig represents the configuration of a jobs enqueuer.

type JobsWorker ΒΆ

type JobsWorker struct {
	*Service

	Options *JobsWorkerOptions
}

func InitJobsWorker ΒΆ

func InitJobsWorker(name string) *JobsWorker

func (*JobsWorker) LoggingMiddleware ΒΆ

func (w *JobsWorker) LoggingMiddleware(job *work.Job, next work.NextMiddlewareFunc) error

func (*JobsWorker) ServiceFunc ΒΆ

func (w *JobsWorker) ServiceFunc(ctx context.Context) error

func (*JobsWorker) Start ΒΆ

func (w *JobsWorker) Start(opts *JobsWorkerOptions)

Start runs the worker that handles jobs

type JobsWorkerOptions ΒΆ

type JobsWorkerOptions struct {
	// JobHandlers are the handlers to use for the jobs
	Jobs map[string]JobOptions
	// JobMiddlewares are the middlewares to use for all jobs
	Middlewares []func(job *work.Job, next work.NextMiddlewareFunc) error
	// Namespace is the redis namespace to use for the jobs
	Namespace string
	// Concurrency is the number of concurrent jobs to run
	Concurrency int
	// StartComponentsOptions are the options to start the components.
	StartComponentsOptions []StartComponentsOption
}

JobsWorkerOptions represents the options for starting a jobs worker

func NewJobsWorkerOptions ΒΆ

func NewJobsWorkerOptions() *JobsWorkerOptions

type KafkaConfig ΒΆ

type KafkaConfig struct {
	Brokers  []string
	Consumer *KafkaConsumerConfig
	Producer *KafkaProducerConfig
	TLSDir   string
}

KafkaConfig represents the configuration of a Kafka client.

type KafkaConsumerConfig ΒΆ

type KafkaConsumerConfig struct {
	Enabled bool
	Topics  []string
}

KafkaConsumerConfig represents the configuration of a Kafka consumer.

type KafkaProducerConfig ΒΆ

type KafkaProducerConfig struct {
	Enabled bool
}

KafkaProducerConfig represents the configuration of a Kafka producer.

type MetricsConfig ΒΆ

type MetricsConfig struct {
	Enabled bool
	Port    int
}

MetricsConfig represents the configuration of a metrics server.

type MetricsServerComponent ΒΆ

type MetricsServerComponent struct {
	// contains filtered or unexported fields
}

func (*MetricsServerComponent) Health ΒΆ

func (c *MetricsServerComponent) Health() error

Health implements the Component interface.

func (*MetricsServerComponent) Name ΒΆ

func (c *MetricsServerComponent) Name() string

Name implements the Component interface.

func (*MetricsServerComponent) Start ΒΆ

func (c *MetricsServerComponent) Start() error

Start implements the Component interface.

func (*MetricsServerComponent) Stop ΒΆ

func (c *MetricsServerComponent) Stop() error

Stop implements the Component interface.

type MetricsServerComponentOption ΒΆ

type MetricsServerComponentOption func(*MetricsServerComponent)

func WithMetricsServerHealthHandler ΒΆ

func WithMetricsServerHealthHandler(handler http.HandlerFunc) MetricsServerComponentOption

WithMetricsServerHealthHandler sets the health handler for the MetricsServer component.

func WithMetricsServerLogger ΒΆ

func WithMetricsServerLogger(logger *logrus.Entry) MetricsServerComponentOption

WithMetricsServerLogger sets the logger for the MetricsServer component.

func WithMetricsServerPort ΒΆ

func WithMetricsServerPort(port int) MetricsServerComponentOption

WithMetricsServerPort sets the port for the MetricsServer component.

type OutboxConfig ΒΆ

type OutboxConfig struct {
	Enabled bool
}

OutboxConfig represents the configuration of an outbox.

type OutboxCourier ΒΆ

type OutboxCourier struct {
	*SpinWorker
}

func InitOutboxCourier ΒΆ

func InitOutboxCourier(name string) *OutboxCourier

func (*OutboxCourier) Start ΒΆ

func (o *OutboxCourier) Start(outboxOpts *OutboxCourierOptions)

Start runs the outbox courier

type OutboxCourierOptions ΒΆ

type OutboxCourierOptions struct {
	Interval               time.Duration
	BatchSize              int32
	ModeName               string
	StartComponentsOptions []StartComponentsOption
}

OutboxCourierOptions represents the options for starting an outbox courier

func NewOutboxCourierOptions ΒΆ

func NewOutboxCourierOptions() *OutboxCourierOptions

type RedisConfig ΒΆ

type RedisConfig struct {
	Enabled bool
	URL     string
}

RedisConfig represents the configuration of a Redis client.

type SentryConfig ΒΆ

type SentryConfig struct {
	DSN     string
	Enabled bool
}

SentryConfig represents the configuration of a Sentry client.

type Service ΒΆ

type Service struct {
	Name       string
	Config     *Config
	Components []Component
	ModeName   string

	Logger *logrus.Entry
	// contains filtered or unexported fields
}

Service represents a single microservice - part of the bigger Foundation-based application, which implements an isolated domain of the application logic.

func Init ΒΆ

func Init(name string) *Service

Init initializes the Foundation service.

func (*Service) CommitMessage ΒΆ

func (s *Service) CommitMessage(ctx context.Context, msg kafka.Message) ferr.FoundationError

CommitMessage tries to commit a Kafka message using the service's KafkaConsumer. If the commit operation fails, it retries up to three times with a one-second pause between retries. If all attempts fail, the function returns the last occurred error.

func (*Service) DeleteOutboxEvents ΒΆ

func (s *Service) DeleteOutboxEvents(ctx context.Context, tx *sql.Tx, maxID int64) ferr.FoundationError

DeleteOutboxEvents deletes outbox events up to (and including) the given ID.

func (*Service) GetComponent ΒΆ

func (s *Service) GetComponent(name string) Component

GetComponent returns the component with the given name.

func (*Service) GetJobsEnqueuer ΒΆ

func (s *Service) GetJobsEnqueuer() *work.Enqueuer

func (*Service) GetKafkaConsumer ΒΆ

func (s *Service) GetKafkaConsumer() *kafka.Reader

func (*Service) GetKafkaProducer ΒΆ

func (s *Service) GetKafkaProducer() *kafka.Writer

func (*Service) GetPostgreSQL ΒΆ

func (s *Service) GetPostgreSQL() *sql.DB

func (*Service) GetRedis ΒΆ

func (s *Service) GetRedis() *redis.Client

func (*Service) HandleError ΒΆ

func (s *Service) HandleError(err ferr.FoundationError, prefix string)

func (*Service) ListOutboxEvents ΒΆ

func (s *Service) ListOutboxEvents(ctx context.Context, tx *sql.Tx, limit int32) ([]outboxrepo.FoundationOutboxEvent, ferr.FoundationError)

ListOutboxEvents returns a list of outbox events in the order they were created.

func (*Service) NewAndPublishEvent ΒΆ

func (s *Service) NewAndPublishEvent(ctx context.Context, msg proto.Message, key string, headers map[string]string, tx *sql.Tx) ferr.FoundationError

NewAndPublishEvent creates a new event and publishes it to the outbox within a transaction

func (*Service) PublishEvent ΒΆ

func (s *Service) PublishEvent(ctx context.Context, event *Event, tx *sql.Tx) ferr.FoundationError

PublishEvent publishes an event to the outbox, starting a new transaction, or straight to the Kafka topic if `OUTBOX_ENABLED` is not set.

func (*Service) Start ΒΆ

func (s *Service) Start(opts *StartOptions)

Start runs the Foundation service.

func (*Service) StartComponents ΒΆ

func (s *Service) StartComponents(opts ...StartComponentsOption) error

StartComponents starts the default Foundation service components.

func (*Service) StopComponents ΒΆ

func (s *Service) StopComponents()

StopComponents stops the default Foundation service components.

func (*Service) WithTransaction ΒΆ

func (s *Service) WithTransaction(ctx context.Context, f func(tx *sql.Tx) ([]*Event, ferr.FoundationError)) ferr.FoundationError

WithTransaction executes the given function in a transaction. If the function returns an event, it will be published.

type SpinWorker ΒΆ

type SpinWorker struct {
	*Service

	Options *SpinWorkerOptions
}

SpinWorker is a type of Foundation service.

func InitSpinWorker ΒΆ

func InitSpinWorker(name string) *SpinWorker

InitSpinWorker initializes a new Foundation service in worker mode.

func (*SpinWorker) ServiceFunc ΒΆ

func (sw *SpinWorker) ServiceFunc(ctx context.Context) error

ServiceFunc is the default service function for a worker.

func (*SpinWorker) Start ΒΆ

func (sw *SpinWorker) Start(opts *SpinWorkerOptions)

Start runs the Foundation worker

type SpinWorkerOptions ΒΆ

type SpinWorkerOptions struct {
	// ProcessFunc is the function to execute in the loop iteration.
	ProcessFunc func(ctx context.Context) ferr.FoundationError

	// Interval is the interval to run the iteration function. If function execution took less time than the interval,
	// the worker will sleep for the remaining time of the interval. Otherwise, the function will be executed again
	// immediately. Default: 5ms, if constructed with NewSpinWorkerOptions().
	Interval time.Duration

	// ModeName is the name of the worker mode. It will be used in the startup log message. Default: "spin_worker".
	// Meant to be used in custom modes based on the `spin_worker` mode.
	ModeName string

	StartComponentsOptions []StartComponentsOption
}

SpinWorkerOptions are the options to start a Foundation service in worker mode.

func NewSpinWorkerOptions ΒΆ

func NewSpinWorkerOptions() *SpinWorkerOptions

NewSpinWorkerOptions returns a new SpinWorkerOptions instance with default values.

type StartComponentsOption ΒΆ

type StartComponentsOption func(*Service)

StartComponentsOption is an option to `StartComponents`.

func WithJobsEnqueuer ΒΆ

func WithJobsEnqueuer() StartComponentsOption

WithJobsEnqueuer sets the jobs enqueuer enabled flag.

func WithKafkaConsumer ΒΆ

func WithKafkaConsumer() StartComponentsOption

WithKafkaConsumer sets the Kafka consumer enabled flag.

func WithKafkaConsumerTopics ΒΆ

func WithKafkaConsumerTopics(topics ...string) StartComponentsOption

WithKafkaConsumerTopics sets the Kafka consumer topics.

func WithKafkaProducer ΒΆ

func WithKafkaProducer() StartComponentsOption

WithKafkaProducer sets the Kafka producer enabled flag.

func WithOutbox ΒΆ

func WithOutbox() StartComponentsOption

WithOutbox sets the outbox enabled flag.

func WithRedis ΒΆ

func WithRedis() StartComponentsOption

WithRedis sets the redis enabled flag.

type StartOptions ΒΆ

type StartOptions struct {
	ModeName               string
	StartComponentsOptions []StartComponentsOption
	ServiceFunc            func(ctx context.Context) error
}

Directories ΒΆ

Path Synopsis
cable
cmd
examples
clubchat/protos/chats
Package chats is a reverse proxy.
Package chats is a reverse proxy.
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL