pkservices

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2021 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

A lightweight framework for managing the lifetime of multiple services and their resource dependencies (db connector, etc).

Index

Examples

Constants

View Source
const DefaultGrpcAddress = "0.0.0.0:50051"

DefaultGrpcAddress is the default address the gRPC server will listen on.

Variables

View Source
var NewPingClient = protogen.NewPingClient

NewPingClient is an alias to protogen.NewPingClient

View Source
var RegisterPingServer = protogen.RegisterPingServer

RegisterPingServer is an alias to protogen.RegisterPingServer

Functions

This section is empty.

Types

type GenericService

type GenericService interface {
	Service
	// Run is called to run the main service. Run should not exit until an irrecoverable
	// error occurs or runCtx is cancelled.
	//
	// runCtx cancelling indicates that the service should begin to gracefully shutdown.
	//
	// When shutdownCtx is cancelled, the manager will stop waiting for run to return
	// and continue with the shutdown process.
	//
	// After Run returns, the resourcesCtx passed to Setup will be cancelled.
	Run(runCtx context.Context, shutdownCtx context.Context) error
}

Generic framework for registering non-grpc based services to the monitor, so multiple services can be run in parallel by the same monitor.

type GrpcService

type GrpcService interface {
	Service
	// RegisterOnServer is invoked before Service.Setup to allow the service to
	// register itself with the server.
	//
	// RegisterOnServer need only call the protoc generated registration function.
	RegisterOnServer(server *grpc.Server)
}

GrpcService is a Service with methods to support running a gRPC service.

grpc Server options are passed to the monitor, which is responsible for the lifetime and spin up, and shutdown of the server hosting and GrpcService values.

The service just needs to expose a generic server registration method that calls the specific protoc generated registration method for that service.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages the lifetime of the service.

Example (Basic)

Run a basic gRPC service with a manger.

package main

import (
	"context"
	"fmt"
	"github.com/golang/protobuf/ptypes/empty"
	"github.com/peake100/gRPEAKEC-go/pkclients"
	"github.com/peake100/gRPEAKEC-go/pkservices"
	"github.com/peake100/gRPEAKEC-go/pkservices/protogen"
	"github.com/peake100/gRPEAKEC-go/pktesting"
	"github.com/rs/zerolog"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/emptypb"
	"sync"
)

// pingService is a basic implementation of PingServer that the manager can use to test
// connectivity to the server.
type ExampleService struct {
}

// Id implements Service and returns "ExampleService".
func (ping ExampleService) Id() string {
	return "ExampleService"
}

// Setup implements Service.
func (ping ExampleService) Setup(
	resourcesCtx context.Context,
	resourcesReleased *sync.WaitGroup,
	shutdownCtx context.Context,
	logger zerolog.Logger,
) error {
	// Add 1 to the resourcesReleased waitGroup
	resourcesReleased.Add(1)

	// Run some sort of resource (like a db connector, message broker, etc) inside
	// a goroutine.
	go func() {
		// When resourcesCtx is cancelled, this resource should be released and the
		// WaitGroup should be decremented.
		defer resourcesReleased.Done()
		logger.Info().Msg("Resource 1 starting")
		<-resourcesCtx.Done()
		logger.Info().Msg("Resource 1 released")
	}()

	// Add another to the resource.
	resourcesReleased.Add(1)
	go func() {
		defer resourcesReleased.Done()
		logger.Info().Msg("Resource 2 starting")
		<-resourcesCtx.Done()
		logger.Info().Msg("Resource 2 released")
	}()

	// When this function returns, the manager will move on to the run stage.
	return nil
}

// RegisterOnServer implements GrpcService, and handles registering it onto the
// manager's gRPC server.
func (ping ExampleService) RegisterOnServer(server *grpc.Server) {
	protogen.RegisterPingServer(server, ping)
}

// Ping implements PingServer. It receives an empty message and returns the
// result.
func (ping ExampleService) Ping(
	ctx context.Context, msg *empty.Empty,
) (*empty.Empty, error) {
	fmt.Println("PING RECEIVED!")
	return msg, nil
}

// Run a basic gRPC service with a manger.
func main() {
	// Our manager options.
	managerOpts := pkservices.NewManagerOpts().
		// We are adding our own implementation of Ping, so we don't need to register
		// The default one.
		WithGrpcPingService(false)

	// Create a new manager to manage our service. Example service implements
	// pkservices.PingServer.
	manager := pkservices.NewManager(managerOpts, ExampleService{})

	// Run the manager in it's own goroutine and return errors to our error channel.
	errChan := make(chan error)
	go func() {
		defer close(errChan)
		errChan <- manager.Run()
	}()

	// make a gRPC client to ping the service
	clientConn, err := grpc.Dial(pkservices.DefaultGrpcAddress, grpc.WithInsecure())
	if err != nil {
		panic(err)
	}

	// create a new client to interact with our server
	pingClient := pkservices.NewPingClient(clientConn)

	// Wait for the server to be serving requests.
	ctx, cancel := pktesting.New3SecondCtx()
	defer cancel()
	err = pkclients.WaitForGrpcServer(ctx, clientConn)
	if err != nil {
		panic(fmt.Errorf("server did not respond: %w", err))
	}

	// Send a ping
	ctx, cancel = pktesting.New3SecondCtx()
	defer cancel()
	_, err = pingClient.Ping(ctx, new(emptypb.Empty))
	if err != nil {
		panic(fmt.Errorf("error on ping: %w", err))
	}

	// Start shutdown.
	manager.StartShutdown()

	// Grab our error from the error channel (blocks until the manager is shutdown)
	err = <-errChan
	if err != nil {
		panic(fmt.Errorf("error from manager: %w", err))
	}

	// Exit.

}
Output:

PING RECEIVED!

func NewManager

func NewManager(opts *ManagerOpts, services ...Service) *Manager

NewManager creates a new Manager to run the given Service values with the passed ManagerOpts. If opts is nil, NewManagerOpts will be used to generate default options.

func (*Manager) Reset

func (manager *Manager) Reset()

Reset resets the manager for a new run.

func (*Manager) Run

func (manager *Manager) Run() error

Run runs the manager and all it's services / resources. Run blocks until the manager has fully shut down.

func (*Manager) StartShutdown

func (manager *Manager) StartShutdown()

StartShutdown begins shutdown of the manager. Can be called multiple times. This methods returns immediately rather than blocking until the manager shuts down.

func (*Manager) Test

func (manager *Manager) Test(t *testing.T) ManagerTesting

Test returns a test harness with helper methods for testing the manager.

func (*Manager) WaitForShutdown

func (manager *Manager) WaitForShutdown()

WaitForShutdown blocks until the manager is fully shutdown.

type ManagerError

type ManagerError struct {
	// SetupErr is an error that was returned during setup of the services. This error
	// will be a ServicesErrors if the cause of the error was one or more services.
	SetupErr error
	// RunErr is an error that occurred during running of 1 or more services.This error
	// will be a ServicesErrors if the cause of the error was one or more services.
	RunErr error
	// ShutdownErr is an error that occurred during the shutdown of 1 or more services.
	ShutdownErr error
}

ManagerError is returned from Manager.Run, and returns errors from the various stages of the run.

func (ManagerError) Error

func (err ManagerError) Error() string

Error implements builtins.error

type ManagerOpts

type ManagerOpts struct {
	// contains filtered or unexported fields
}

ManagerOpts are the options for running a manager

func NewManagerOpts

func NewManagerOpts() *ManagerOpts

NewManagerOpts creates a new *ManagerOpts value with default options set.

func (*ManagerOpts) WithErrNotFoundMiddleware added in v0.0.5

func (opts *ManagerOpts) WithErrNotFoundMiddleware() *ManagerOpts

WithErrNotFoundMiddleware adds pkerr.NewErrNotFoundMiddleware to the unary middlewares.

Default: false.

func (*ManagerOpts) WithErrorGenerator added in v0.0.4

func (opts *ManagerOpts) WithErrorGenerator(
	errGen *pkerr.ErrorGenerator,
) *ManagerOpts

WithErrorGenerator adds error-handling middleware from the passed generator to the list of gRPC server options.

Test clients will have corresponding interceptors added to them as well.

Default: nil

func (*ManagerOpts) WithGrpcLogging added in v0.0.4

func (opts *ManagerOpts) WithGrpcLogging(
	logRPCLevel zerolog.Level,
	logReqLevel zerolog.Level,
	logRespLevel zerolog.Level,
	logErrors bool,
	errorTrace bool,
) *ManagerOpts

WithGrpcLogging will add logging middleware to the grpc server using pkmiddleware.NewLoggingMiddleware.

Default:

logRPCLevel: zerolog.DebugLevel

logReqLevel: zerolog.Disabled + 1 (won't be logged)

logRespLevel: zerolog.Disabled + 1 (won't be logged)

logErrors: true

errorTrace: true

func (*ManagerOpts) WithGrpcPingService

func (opts *ManagerOpts) WithGrpcPingService(addService bool) *ManagerOpts

WithGrpcPingService adds a protogen.PingServer service to the server that can be used to test if the server is taking requests.

If true, the service will be added regardless of whether there are any other gRPC services the manager is handling.

Default: true.

func (*ManagerOpts) WithGrpcServerAddress

func (opts *ManagerOpts) WithGrpcServerAddress(address string) *ManagerOpts

WithGrpcServerAddress sets the address the gRPC server should handle rpc calls on.

Default: ':5051'.

func (*ManagerOpts) WithGrpcServerOpts

func (opts *ManagerOpts) WithGrpcServerOpts(
	grpcOpts ...grpc.ServerOption,
) *ManagerOpts

WithGrpcServerOpts stores server opts to create the gRPC server with. When called multiple times, new grpc.ServerOption values are added to list, rather than replacing values from the previous call.

Default: nil.

func (*ManagerOpts) WithGrpcStreamServerMiddleware added in v0.0.4

func (opts *ManagerOpts) WithGrpcStreamServerMiddleware(
	middleware ...pkmiddleware.StreamServerMiddleware,
)

WithGrpcStreamServerMiddleware adds stream server middlewares for the gRPC server.

Default: nil.

func (*ManagerOpts) WithGrpcUnaryServerMiddleware added in v0.0.4

func (opts *ManagerOpts) WithGrpcUnaryServerMiddleware(
	middleware ...pkmiddleware.UnaryServerMiddleware,
) *ManagerOpts

WithGrpcUnaryServerMiddleware adds unary server middlewares for the gRPC server.

func (*ManagerOpts) WithLogger

func (opts *ManagerOpts) WithLogger(logger zerolog.Logger) *ManagerOpts

WithLogger sets a zerolog.Logger to use for the manger.

Default: Global Logger.

func (*ManagerOpts) WithMaxShutdownDuration

func (opts *ManagerOpts) WithMaxShutdownDuration(max time.Duration) *ManagerOpts

WithMaxShutdownDuration sets the maximum duration the manager should allow for the services and resources to shut down gracefully before cancelling the shutdown context and force-exiting.

func (*ManagerOpts) WithoutGrpcLogging added in v0.0.4

func (opts *ManagerOpts) WithoutGrpcLogging() *ManagerOpts

WithoutGrpcLogging keeps the logging middleware from being added to the gRPC server.

type ManagerTesting

type ManagerTesting struct {
	// contains filtered or unexported fields
}

ManagerTesting exposes testing methods for testing a service manager. These methods are not safe to use in production code.

func (ManagerTesting) GrpcClientConn

func (tester ManagerTesting) GrpcClientConn(
	cleanup bool, opts ...grpc.DialOption,
) *grpc.ClientConn

GrpcClientConn generates a grpc.ClientConn with the passed opts connected to the gRPC server address in the manager options.

If Dialing the server results in an error, an error will be added to the test and t.FailNow() is called to exit the test immediately.

If cleanup is set to true then a cleanup function will be registered that closes the connection on test completion.

If an error generator was passed to the manager for server interceptors, the corresponding client interceptors will be added to the grpc.ClientConn.

func (ManagerTesting) GrpcPingClient

func (tester ManagerTesting) GrpcPingClient(
	cleanup bool, opts ...grpc.DialOption,
) PingClient

GrpcPingClient returns a PingClient connected to the PingServer running on the gRPC server. If ManagerOpts.WithGrpcPingService was set to false, an error will be logged to the test and t.FailNow() will be called,

If cleanup is set to true then a cleanup function will be registered that closes the underlying clientConn on test completion.

func (ManagerTesting) PingGrpcServer

func (tester ManagerTesting) PingGrpcServer(ctx context.Context)

PingGrpcServer will continually ping the gRPC server's PingServer.Ping method until a connection is established or the passed context times out. All errors will be ignored and ping will be tried again on failure.

If this method returns, the server is up an running and ready to take requests.

If ctx expires, the ctx.Error() will be logged and FailNow() called on the test.

If ctx is nil, a default 3-second context will be used.

func (ManagerTesting) SendSignal

func (tester ManagerTesting) SendSignal(osSignal os.Signal)

SendSignal sends an os.Signal to the Manager to test shutdown via host signal.

func (ManagerTesting) Services

func (tester ManagerTesting) Services() []Service

Services returns the list of registered services. This method is for testing purposes only and returned values should not be modified unless you know what you are doing!

type PanicError

type PanicError = pkerr.PanicError

PanicError is a type alias for pkerr.PanicError.

type PingClient

type PingClient = protogen.PingClient

PingClient is a type alias to protogen.PingClient

type PingServer

type PingServer = protogen.PingServer

PingServer is a type alias to protogen.PingServer

type Service

type Service interface {
	// Id should return a unique, but human readable id for the service. This id is
	// used for both logging and ServiceError context. If two services share the same
	// id, the manger will return an error.
	Id() string

	// Setup is called before Run to set up any resources the service requires.
	//
	// resourcesCtx will be cancelled AFTER Run returns to signal that all the main
	// process has finished, and it is safe to release resources.
	//
	// resourcesReleased should be incremented and decremented by individual resources,
	// and the Manager will block on it until the context passed to Shutdown cancels.
	//
	// shutdownCtx is cancelled when the graceful shutdown limit has been reached and
	// resources should be released immediately for a forced shutdown.
	//
	// logger is a zerolog.Logger with a
	// zerolog.Logger.WithString("SERVICE", [Service.Id()]) entry already on it.
	Setup(
		resourcesCtx context.Context,
		resourcesReleased *sync.WaitGroup,
		shutdownCtx context.Context,
		logger zerolog.Logger,
	) error
}

Service is an interface that a service must implement to be run by the Manager

type ServiceError

type ServiceError struct {
	// ServiceId is the return of the service's Service.Id method.
	ServiceId string
	// The error the service threw.
	Err error
}

ServiceError reports an error from a specific service.

func (ServiceError) Error

func (err ServiceError) Error() string

Error implements builtins.error.

func (ServiceError) Unwrap

func (err ServiceError) Unwrap() error

Unwrap implements xerrors.Wrapper and returns the underlying error.

type ServicesErrors

type ServicesErrors struct {
	Errs []error
}

ServicesErrors stores errors from multiple services as a single error.

func (ServicesErrors) Error

func (err ServicesErrors) Error() string

Error implements builtins.errors, and reports the number of errors.

Directories

Path Synopsis
For internal use.
For internal use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL