thriftbp

package
v0.9.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2024 License: BSD-3-Clause Imports: 34 Imported by: 0

Documentation

Overview

Package thriftbp provides Baseplate specific thrift related helpers.

Clients

On the client side, this package provides a middleware framework for thrift.TClient to allow you to automatically run code before and after making a Thrift call. It also includes middleware implementations to wrap each call in a Thrift client span as well as a function that most services can use as the "golden path" for setting up a Thrift client pool.

Servers

On the server side, this package provides middleware implementations for EdgeRequestContext handling and tracing propagation according to Baseplate spec.

Example (ClientPool)

This example demonstrates a typical use case of thriftbp pool in microservice code with custom middleware.

package main

import (
	"context"
	"time"

	"github.com/apache/thrift/lib/go/thrift"

	"github.com/reddit/baseplate.go/log"
	"github.com/reddit/baseplate.go/thriftbp"
)

// In real code these should be coming from either config file or flags instead.
const (
	remoteAddr     = "host:port"
	connectTimeout = time.Millisecond * 5
	socketTimeout  = time.Millisecond * 15

	initialConnections = 50
	maxConnections     = 100

	clientTTL = time.Minute * 5

	poolGaugeInterval = time.Second * 10
)

// BEGIN THRIFT GENERATED CODE SECTION
//
// In real code this section should be from thrift generated code instead,
// but for this example we just define some placeholders here.

type MyEndpointRequest struct{}

type MyEndpointResponse struct{}

type MyService interface {
	MyEndpoint(ctx context.Context, req *MyEndpointRequest) (*MyEndpointResponse, error)
}

func NewMyServiceClient(_ thrift.TClient) MyService {
	// In real code this certainly won't return nil.
	return nil
}

// END THRIFT GENERATED CODE SECTION

// MyServiceClientFactory is a thin wrapper around the ClientPool to help
// avoiding the reuse of the concrete client.
type MyServiceClientFactory struct {
	pool thriftbp.ClientPool
}

func (f MyServiceClientFactory) Client() MyService {
	return NewMyServiceClient(f.pool.TClient())
}

func LoggingMiddleware(next thrift.TClient) thrift.TClient {
	return thrift.WrappedTClient{
		Wrapped: func(ctx context.Context, method string, args, result thrift.TStruct) (thrift.ResponseMeta, error) {
			log.Infof("pre: %s", method)
			log.Infof("args: %#v", args)
			defer func() {
				log.Infof("after: %s", method)
			}()

			return next.Call(ctx, method, args, result)
		},
	}
}

// This example demonstrates a typical use case of thriftbp pool in
// microservice code with custom middleware.
func main() {
	pool, err := thriftbp.NewBaseplateClientPool(
		thriftbp.ClientPoolConfig{
			ServiceSlug:        "my-service",
			Addr:               remoteAddr,
			InitialConnections: initialConnections,
			MaxConnections:     maxConnections,
			MaxConnectionAge:   clientTTL,
			ConnectTimeout:     connectTimeout,
			SocketTimeout:      socketTimeout,
			ReportPoolStats:    true,
			PoolGaugeInterval:  poolGaugeInterval,
		},
		LoggingMiddleware,
	)
	if err != nil {
		panic(err)
	}
	defer pool.Close()
	clientFactory := MyServiceClientFactory{
		pool: pool,
	}

	// NOTE: client returned here should be treated as disposable and never shared
	// between goroutines.
	client := clientFactory.Client()
	if _, err = client.MyEndpoint(context.Background(), &MyEndpointRequest{}); err != nil {
		panic(err)
	}
}
Output:

Index

Examples

Constants

View Source
const DefaultMaxConnectionAge = time.Minute * 5

DefaultMaxConnectionAge is the default max age for a Thrift client connection.

View Source
const DefaultMaxConnectionAgeJitter = 0.1

DefaultMaxConnectionAgeJitter is the default jitter to MaxConnectionAge for a Thrift client connection.

View Source
const DefaultPoolGaugeInterval = time.Second * 10

DefaultPoolGaugeInterval is the fallback value to be used when ClientPoolConfig.PoolGaugeInterval <= 0.

Deprecated: Prometheus gauges are auto scraped.

View Source
const MonitorClientWrappedSlugSuffix = transport.WithRetrySlugSuffix

MonitorClientWrappedSlugSuffix is a suffix to be added to the service slug arg of MonitorClient function, in order to distinguish from the spans that are the raw client calls.

The MonitorClient with this suffix will have span operation names like:

service-with-retry.endpointName

Which groups all retries of the same client call together, while the MonitorClient without this suffix will have span operation names like:

service.endpointName
View Source
const ThriftHostnameHeader = "thrift-hostname"

Variables

View Source
var (
	ErrConfigMissingServiceSlug = errors.New("`ServiceSlug` cannot be empty")
	ErrConfigMissingAddr        = errors.New("`Addr` cannot be empty")
	ErrConfigInvalidConnections = errors.New("`InitialConnections` cannot be bigger than `MaxConnections`")
)

ClientPoolConfig errors are returned if the configuration validation fails.

HeadersToForward are the headers that should always be forwarded to upstream thrift servers, to be used in thrift.TSimpleServer.SetForwardHeaders.

Functions

func AbandonCanceledRequests deprecated added in v0.5.0

func AbandonCanceledRequests(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction

AbandonCanceledRequests transforms context.Canceled errors into thrift.ErrAbandonRequest errors.

When using thrift compiler version >=v0.14.0, the context object will be canceled after the client closes the connection, and returning thrift.ErrAbandonRequest as the error helps the server to not try to write the error back to the client, but close the connection directly.

Deprecated: The checking of ErrAbandonRequest happens before all the middlewares so doing this in a middleware will have no effect. Please do the context.Canceled -> thrift.ErrAbandonRequest translation in your service's endpoint handlers instead.

func AddClientHeader added in v0.7.2

func AddClientHeader(ctx context.Context, key, value string) context.Context

AddClientHeader adds a key-value pair to thrift client's headers.

It takes care of setting the header in context (overwrite previous value if any), and also adding the header to the write header list.

func ApplyBaseplate added in v0.2.1

func ApplyBaseplate(bp baseplate.Baseplate, server *thrift.TSimpleServer) baseplate.Server

ApplyBaseplate returns the given TSimpleServer as a baseplate Server with the given Baseplate.

You generally don't need to use this, instead use NewBaseplateServer, which will take care of this for you.

func AttachEdgeRequestContext

func AttachEdgeRequestContext(ctx context.Context, ecImpl ecinterface.Interface) context.Context

AttachEdgeRequestContext returns a context that has the header of the edge context attached to ctx object set to forward using the "Edge-Request" header on any Thrift calls made with that context object.

func BaseplateDefaultClientMiddlewares

func BaseplateDefaultClientMiddlewares(args DefaultClientMiddlewareArgs) []thrift.ClientMiddleware

BaseplateDefaultClientMiddlewares returns the default client middlewares that should be used by a baseplate service.

Currently they are (in order):

1. ForwardEdgeRequestContext.

2. SetClientName(clientName)

3. MonitorClient with MonitorClientWrappedSlugSuffix - This creates the spans from the view of the client that group all retries into a single, wrapped span.

4. PrometheusClientMiddleware with MonitorClientWrappedSlugSuffix - This creates the prometheus client metrics from the view of the client that group all retries into a single operation.

5. Retry(retryOptions) - If retryOptions is empty/nil, default to only retry.Attempts(1), this will not actually retry any calls but your client is configured to set retry logic per-call using retrybp.WithOptions.

6. FailureRatioBreaker - Only if BreakerConfig is non-nil.

7. MonitorClient - This creates the spans of the raw client calls.

8. PrometheusClientMiddleware

9. BaseplateErrorWrapper

10. thrift.ExtractIDLExceptionClientMiddleware

11. SetDeadlineBudget

func BaseplateDefaultProcessorMiddlewares

func BaseplateDefaultProcessorMiddlewares(args DefaultProcessorMiddlewaresArgs) []thrift.ProcessorMiddleware

BaseplateDefaultProcessorMiddlewares returns the default processor middlewares that should be used by a baseplate Thrift service.

Currently they are (in order):

1. ExtractDeadlineBudget

2. InjectServerSpan

3. InjectEdgeContext

4. ReportPayloadSizeMetrics

5. PrometheusServerMiddleware

func BaseplateErrorFilter added in v0.4.0

func BaseplateErrorFilter(codes ...int32) retrybp.Filter

BaseplateErrorFilter returns true if the given error is a baseplate.Error and returns one of the given codes and false if it is a baseplate.Error but does not return one of the given codes otherwise it calls the next filter in the chain.

func BaseplateErrorWrapper added in v0.8.2

func BaseplateErrorWrapper(next thrift.TClient) thrift.TClient

BaseplateErrorWrapper is a client middleware that calls WrapBaseplateError to wrap the error returned by the next client call.

func CreateThriftContextFromSpan

func CreateThriftContextFromSpan(ctx context.Context, span *tracing.Span) context.Context

CreateThriftContextFromSpan injects span info into a context object that can be used in thrift client code. If you are using a client pool created using thriftbp.NewBaseplateClientPool, all of your thrift calls will already be call this automatically, so there is no need to use it directly.

Caller should first create a client child-span for the thrift call as usual, then use that span and the parent context object with this call, then use the returned context object in the thrift call. Something like:

span, clientCtx := opentracing.StartSpanFromContext(
  ctx,
  "myCall",
  tracing.SpanTypeOption{Type: tracing.SpanTypeClient},
)
clientCtx = thriftbp.CreateThriftContextFromSpan(clientCtx, tracing.AsSpan(span))
result, err := client.MyCall(clientCtx, arg1, arg2)
span.FinishWithOptions(tracing.FinishOptions{
  Ctx: clientCtx,
  Err: err,
}.Convert())

func ExtractDeadlineBudget

func ExtractDeadlineBudget(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction

ExtractDeadlineBudget is the server middleware implementing Phase 1 of Baseplate deadline propagation.

It only sets the timeout if the passed in deadline is at least 1ms.

func ForwardEdgeRequestContext

func ForwardEdgeRequestContext(ecImpl ecinterface.Interface) thrift.ClientMiddleware

ForwardEdgeRequestContext forwards the EdgeRequestContext set on the context object to the Thrift service being called if one is set.

If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.

func IDLExceptionSuppressor added in v0.7.2

func IDLExceptionSuppressor(err error) bool

IDLExceptionSuppressor is an errorsbp.Suppressor implementation that returns true on errors from exceptions defined in thrift IDL files.

Note that if the exception is baseplate.Error, this function will NOT suppress it if the code is in range [500, 600).

func InitializeEdgeContext

func InitializeEdgeContext(ctx context.Context, impl ecinterface.Interface) context.Context

InitializeEdgeContext sets an edge request context created from the Thrift headers set on the context onto the context and configures Thrift to forward the edge requent context header on any Thrift calls made by the server.

func InjectEdgeContext

func InjectEdgeContext(impl ecinterface.Interface) thrift.ProcessorMiddleware

InjectEdgeContext returns a ProcessorMiddleware that injects an edge request context created from the Thrift headers set on the context into the `next` thrift.TProcessorFunction.

Note, this depends on the edge context headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.

func InjectServerSpan

func InjectServerSpan(suppressor errorsbp.Suppressor) thrift.ProcessorMiddleware

InjectServerSpan implements thrift.ProcessorMiddleware and injects a server span into the `next` context.

Starts the server span before calling the `next` TProcessorFunction and stops the span after it finishes. If the function returns an error that's not suppressed by the suppressor, that will be passed to span.Stop.

Please note that if suppressor passed in is nil, it will be changed to IDLExceptionSuppressor instead. Please use errorsbp.SuppressNone explicitly instead if that's what's wanted.

If "User-Agent" (HeaderUserAgent) THeader is set, the created server span will also have "peer.service" (tracing.TagKeyPeerService) tag set to its value.

Note, the span will be created according to tracing related headers already being set on the context object. These should be automatically injected by your thrift.TSimpleServer.

func Merge

func Merge(processors ...thrift.TProcessor) thrift.TProcessor

Merge merges together multiple processors into the first one.

It's useful when the server needs to support more than one separated thrift file.

It's kind of like thrift's TMultiplexedProcessor. The key difference is that TMultiplexedProcessor requires the client to also use TMultiplexedProtocol, while here the client doesn't need any special handling.

func MonitorClient

func MonitorClient(args MonitorClientArgs) thrift.ClientMiddleware

MonitorClient is a ClientMiddleware that wraps the inner thrift.TClient.Call in a thrift client span.

If you are using a thrift ClientPool created by NewBaseplateClientPool, this will be included automatically and should not be passed in as a ClientMiddleware to NewBaseplateClientPool.

Example

This example illustrates what thriftbp.MonitorClient does specifically and the details of how thriftbp.WrapClient works, a typical service will not write code like this and will instead be creating a ClientPool using thriftbp.NewBaseplateClientPool.

// variables should be properly initialized in production code
var (
	transport thrift.TTransport
	factory   thrift.TProtocolFactory
)
// Use MonitoredClient to wrap a standard thrift client
//
// The TClient returned by thrift.WrapClient *could* be shareable across
// multiple goroutines. For example, the one create here using protocol
// factory, or the one created via
// thriftbp.NewBaseplateClientPool/NewCustomClientPool.
shareableClient := thrift.WrapClient(
	thrift.NewTStandardClient(
		factory.GetProtocol(transport),
		factory.GetProtocol(transport),
	),
	thriftbp.MonitorClient(thriftbp.MonitorClientArgs{
		ServiceSlug: "service",
	}),
)
// Create an actual service client
//
// The actual client is NOT shareable between multiple goroutines and you
// should always create one on top of the shareable TClient for each call.
client := baseplate.NewBaseplateServiceV2Client(shareableClient)
// Create a context with a server span
_, ctx := opentracing.StartSpanFromContext(
	context.Background(),
	"test",
	tracing.SpanTypeOption{Type: tracing.SpanTypeServer},
)
// Calls should be automatically wrapped using client spans
healthy, err := client.IsHealthy(
	// The default middleware does not automatically retry requests but does set
	// up the retry middleware so individual requests can be configured to retry
	// using retrybp.WithOptions.
	retrybp.WithOptions(
		ctx,
		// This call will make at most 2 attempts, that is the initial attempt and
		// a single retry.
		retry.Attempts(2),
		// Apply the thriftbp default retry filters as well as NetworkErrorFilter
		// to retry networking errors.
		//
		// NetworkErrorFilter should only be used for requests that are safe to
		// repeat, such as reads or idempotent requests.
		retrybp.Filters(
			thriftbp.WithDefaultRetryFilters(retrybp.NetworkErrorFilter)...,
		),
	),
	&baseplate.IsHealthyRequest{
		Probe: baseplate.IsHealthyProbePtr(baseplate.IsHealthyProbe_READINESS),
	},
)
log.Debug("%v, %s", healthy, err)
Output:

func NewBaseplateServer

func NewBaseplateServer(
	bp baseplate.Baseplate,
	cfg ServerConfig,
) (baseplate.Server, error)

NewBaseplateServer returns a new Thrift implementation of a Baseplate server with the given config.

Example

This example demonstrates what a typical main function should look like for a Baseplate thrift service.

// In real code this MUST be replaced by the factory from the actual implementation.
var ecFactory ecinterface.Factory

var cfg baseplate.Config
if err := baseplate.ParseConfigYAML(&cfg); err != nil {
	panic(err)
}
ctx, bp, err := baseplate.New(context.Background(), baseplate.NewArgs{
	Config:             cfg,
	EdgeContextFactory: ecFactory,
})
if err != nil {
	panic(err)
}
defer bp.Close()

// In real prod code, you should define your thrift endpoints and create this
// handler instead.
var handler bpgen.BaseplateServiceV2
processor := bpgen.NewBaseplateServiceV2Processor(handler)

server, err := thriftbp.NewBaseplateServer(bp, thriftbp.ServerConfig{
	Processor: processor,
})
if err != nil {
	log.Fatal(err)
}

go thriftbp.ServeAdmin()
log.Info(baseplate.Serve(ctx, baseplate.ServeArgs{Server: server}))
Output:

func NewServer

func NewServer(cfg ServerConfig) (*thrift.TSimpleServer, error)

NewServer returns a thrift.TSimpleServer using the THeader transport and protocol to serve the given TProcessor which is wrapped with the given ProcessorMiddlewares.

func PrometheusClientMiddleware added in v0.9.2

func PrometheusClientMiddleware(remoteServerSlug string) thrift.ClientMiddleware

PrometheusClientMiddleware returns middleware to track Prometheus metrics specific to the Thrift client.

It emits the following prometheus metrics:

* thrift_client_active_requests gauge with labels:

  • thrift_method: the method of the endpoint called
  • thrift_client_name: an arbitray short string representing the backend the client is connecting to, the remoteServerSlug arg

* thrift_client_latency_seconds histogram with labels above plus:

  • thrift_success: "true" if err == nil, "false" otherwise

* thrift_client_requests_total counter with all labels above plus:

  • thrift_exception_type: the human-readable exception type, e.g. baseplate.Error, etc
  • thrift_baseplate_status: the numeric status code from a baseplate.Error as a string if present (e.g. 404), or the empty string
  • thrift_baseplate_status_code: the human-readable status code, e.g. NOT_FOUND, or the empty string

func PrometheusServerMiddleware added in v0.9.2

func PrometheusServerMiddleware(method string, next thrift.TProcessorFunction) thrift.TProcessorFunction

PrometheusServerMiddleware returns middleware to track Prometheus metrics specific to the Thrift service.

It emits the following prometheus metrics:

* thrift_server_active_requests gauge with labels:

  • thrift_method: the method of the endpoint called

* thrift_server_latency_seconds histogram with labels above plus:

  • thrift_success: "true" if err == nil, "false" otherwise

* thrift_server_requests_total counter with all labels above plus:

  • thrift_exception_type: the human-readable exception type, e.g. baseplate.Error, etc
  • thrift_baseplate_status: the numeric status code from a baseplate.Error as a string if present (e.g. 404), or the empty string
  • thrift_baseplate_status_code: the human-readable status code, e.g. NOT_FOUND, or the empty string

func RecoverPanic deprecated added in v0.9.0

func RecoverPanic(name string, next thrift.TProcessorFunction) thrift.TProcessorFunction

RecoverPanic recovers from panics raised in the TProccessorFunction chain, logs them, and records a metric indicating that the endpoint recovered from a panic.

Deprecated: This is always added as the last server middleware (with a different name) to ensure that other server middlewares get the correct error if panic happened.

func ReportPayloadSizeMetrics added in v0.7.1

func ReportPayloadSizeMetrics(_ float64) thrift.ProcessorMiddleware

ReportPayloadSizeMetrics returns a ProcessorMiddleware that reports metrics (histograms) of request and response payload sizes in bytes.

This middleware only works on sampled requests with the given sample rate, but the histograms it reports are overriding global histogram sample rate with 100% sample, to avoid double sampling. Although the overhead it adds is minimal, the sample rate passed in shouldn't be set too high (e.g. 0.01/1% is probably a good sample rate to use).

It does not count the bytes on the wire directly, but reconstructs the request/response with the same thrift protocol. As a result, the numbers it reports are not exact numbers, but should be good enough to show the overall trend and ballpark numbers.

It also only supports THeaderProtocol. If the request is not in THeaderProtocol it does nothing no matter what the sample rate is.

For endpoint named "myEndpoint", it reports histograms at:

- payload.size.myEndpoint.request

- payload.size.myEndpoint.response

func Retry added in v0.3.0

func Retry(defaults ...retry.Option) thrift.ClientMiddleware

Retry returns a thrift.ClientMiddleware that can be used to automatically retry thrift requests.

func ServeAdmin added in v0.9.5

func ServeAdmin()

ServeAdmin starts a blocking HTTP server for internal functions:

metrics       - serve /metrics for prometheus
profiling     - serve /debug/pprof for profiling, ref: https://pkg.go.dev/net/http/pprof

Default server address is admin.Addr.

This function blocks, so it should be run as its own goroutine.

func SetClientName added in v0.9.0

func SetClientName(clientName string) thrift.ClientMiddleware

SetClientName sets the "User-Agent" (HeaderUserAgent) thrift THeader on the requests.

If clientName is empty, no "User-Agent" header will be sent.

func SetDeadlineBudget

func SetDeadlineBudget(next thrift.TClient) thrift.TClient

SetDeadlineBudget is the client middleware implementing Phase 1 of Baseplate deadline propogation.

func StartSpanFromThriftContext

func StartSpanFromThriftContext(ctx context.Context, name string) (context.Context, *tracing.Span)

StartSpanFromThriftContext creates a server span from thrift context object.

This span would usually be used as the span of the whole thrift endpoint handler, and the parent of the child-spans.

Caller should pass in the context object they got from thrift library, which would have all the required headers already injected.

Please note that "Sampled" header is default to false according to baseplate spec, so if the context object doesn't have headers injected correctly, this span (and all its child-spans) will never be sampled, unless debug flag was set explicitly later.

If any of the tracing related thrift header is present but malformed, it will be ignored. The error will also be logged if InitGlobalTracer was last called with a non-nil logger. Absent tracing related headers are always silently ignored.

func WithDefaultRetryFilters added in v0.3.0

func WithDefaultRetryFilters(filters ...retrybp.Filter) []retrybp.Filter

WithDefaultRetryFilters returns a list of retrybp.Filters by appending the given filters to the "default" retry filters:

1. RetryableErrorFilter - handle errors already provided retryable information, this includes clientpool.ErrExhausted

2. ContextErrorFilter - do not retry on context cancellation/timeout.

func WithDefaultRetryableCodes added in v0.4.0

func WithDefaultRetryableCodes(codes ...int32) []int32

WithDefaultRetryableCodes returns a list including the given error codes and the default retryable error codes:

1. TOO_EARLY

2. TOO_MANY_REQUESTS

3. SERVICE_UNAVAILABLE

func WrapBaseplateError added in v0.8.2

func WrapBaseplateError(e error) error

WrapBaseplateError wraps *baseplate.Error into errors with better error message, and can be unwrapped to the original *baseplate.Error.

If e is not *baseplate.Error it will be returned as-is.

For logging this wrapping is auto applied as long as you initialize zap logger from log package so this is not needed.

Types

type AddressGenerator

type AddressGenerator func() (string, error)

AddressGenerator defines a function that returns the address of a thrift service.

Services should generally not have to use AddressGenerators directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.

func SingleAddressGenerator

func SingleAddressGenerator(addr string) AddressGenerator

SingleAddressGenerator returns an AddressGenerator that always returns addr.

Services should generally not have to use SingleAddressGenerator directly, instead you should use NewBaseplateClientPool which uses the default AddressGenerator for a typical Baseplate Thrift Client.

type BaseplateClientPoolConfig added in v0.7.1

type BaseplateClientPoolConfig ClientPoolConfig

BaseplateClientPoolConfig provides a more concrete Validate method tailored to validating baseplate service confgiurations.

func (BaseplateClientPoolConfig) Validate added in v0.7.1

func (c BaseplateClientPoolConfig) Validate() error

Validate checks the BaseplateClientPoolConfig for any missing or erroneous values.

This method is designated to be used when passing a configuration to NewBaseplateClientPool, for NewCustomClientPool other constraints apply.

type Client

type Client interface {
	clientpool.Client
	thrift.TClient
}

Client is a client object that implements both the clientpool.Client and thrift.TCLient interfaces.

This allows it to be managed by a clientpool.Pool and be passed to a thrift client as the base thrift.TClient.

type ClientPool

type ClientPool interface {
	// The returned TClient implements TClient by grabbing a Client from its pool
	// and releasing that Client after its Call method completes.
	//
	// A typical example of how to use the pool is like this:
	//
	//     // service.NewServiceClient comes from thrift compiled go code.
	//     // client you got here should be treated as disposable and never be
	//     // shared between goroutines.
	//     client := service.NewServiceClient(pool.TClient())
	//     resp, err := client.MyEndpoint(ctx, req)
	//
	// Or you can create a "client factory" for the service you want to call:
	//
	//    type ServiceClientFactory struct {
	//        pool thriftbp.ClientPool
	//    }
	//
	//    // service.Service and service.NewServiceClient are from thrift compiled
	//    // go code.
	//    func (f ServiceClientFactory) Client service.Service {
	//        return service.NewServiceClient(f.pool.TClient())
	//    }
	//
	//    client := factory.Client()
	//    resp, err := client.MyEndpoint(ctx, req)
	//
	// If the call fails to get a client from the pool, it will return PoolError.
	// You can check the error returned using:
	//
	//     var poolErr thriftbp.PoolError
	//     if errors.As(err, &poolErr) {
	//       // It's unable to get a client from the pool
	//     } else {
	//       // It's error from the actual thrift call
	//     }
	//
	// If the error is not of type PoolError that means it's returned by the call
	// from the actual client.
	//
	// If the call fails to release the client back to the pool,
	// it will log the error on error level but not return it to the caller.
	// It also increases thriftbp_client_pool_release_errors_total counter.
	TClient() thrift.TClient

	// Passthrough APIs from clientpool.Pool:
	io.Closer
	IsExhausted() bool
}

ClientPool defines an object that implements thrift.TClient using a pool of Client objects.

A ClientPool is safe to be shared across different goroutines, but a concrete thrift client created on top of it is not. A concrete thrift client is the one you created from thrift compiled go code, for example baseplate.NewBaseplateServiceV2Client(pool). You need to create a concrete thrift client for each of your goroutines, but they can share the same ClientPool underneath.

func NewBaseplateClientPool

func NewBaseplateClientPool(cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error)

NewBaseplateClientPool calls NewBaseplateClientPoolWithContext with background context. It should not be used with RequiredInitialConnections > 0.

func NewBaseplateClientPoolWithContext added in v0.9.12

func NewBaseplateClientPoolWithContext(ctx context.Context, cfg ClientPoolConfig, middlewares ...thrift.ClientMiddleware) (ClientPool, error)

NewBaseplateClientPoolWithContext returns a standard ClientPool wrapped with the BaseplateDefaultClientMiddlewares plus any additional client middlewares passed into this function.

It always uses SingleAddressGenerator with the server address configured in cfg, and THeader+TCompact as the protocol factory.

If you have RequiredInitialConnections > 0, ctx passed in controls the timeout of retries to hit required initial connections. Having a ctx without timeout with a downed upstream could cause this function to be blocked forever.

func NewCustomClientPool

func NewCustomClientPool(
	cfg ClientPoolConfig,
	genAddr AddressGenerator,
	protoFactory thrift.TProtocolFactory,
	middlewares ...thrift.ClientMiddleware,
) (ClientPool, error)

NewCustomClientPool calls NewCustomClientPoolWithContext with background context. It should not be used with RequiredInitialConnections > 0.

func NewCustomClientPoolWithContext added in v0.9.12

func NewCustomClientPoolWithContext(
	ctx context.Context,
	cfg ClientPoolConfig,
	genAddr AddressGenerator,
	protoFactory thrift.TProtocolFactory,
	middlewares ...thrift.ClientMiddleware,
) (ClientPool, error)

NewCustomClientPoolWithContext creates a ClientPool that uses a custom AddressGenerator and TProtocolFactory wrapped with the given middleware.

Most services will want to just use NewBaseplateClientPoolWithContext, this has been provided to support services that have non-standard and/or legacy needs.

If you have RequiredInitialConnections > 0, ctx passed in controls the timeout of retries to hit required initial connections. Having a ctx without timeout with a downed upstream could cause this function to be blocked forever.

type ClientPoolConfig

type ClientPoolConfig struct {
	// ServiceSlug is a short identifier for the thrift service you are creating
	// clients for.  The preferred convention is to take the service's name,
	// remove the 'Service' prefix, if present, and convert from camel case to
	// all lower case, hyphen separated.
	//
	// Examples:
	//
	//     AuthenticationService -> authentication
	//     ImageUploadService -> image-upload
	ServiceSlug string `yaml:"serviceSlug"`

	// Addr is the address of a thrift service.  Addr must be in the format
	// "${host}:${port}"
	Addr string `yaml:"addr"`

	// InitialConnections is the desired inital number of thrift connections
	// created by the client pool.
	//
	// If an error occurred when we try to establish the initial N connections for
	// the pool, we log the errors on warning level,
	// then return the pool with the <N connections we already established.
	//
	// If that's unacceptable, then RequiredInitialConnections can be used to set
	// the hard requirement of minimal initial connections to be established.
	// Note that enabling this can cause cascading failures during an outage,
	// so it shall only be used for extreme circumstances.
	InitialConnections         int `yaml:"initialConnections"`
	RequiredInitialConnections int `yaml:"requiredInitialConnections"`
	// Deprecated: InitialConnectionsFallback is always true and setting it to
	// false won't do anything.
	InitialConnectionsFallback bool `yaml:"initialConnectionsFallback"`
	// Deprecated: Individual connection errors during initialization is always
	// logged via zap logger on warning level.
	InitialConnectionsFallbackLogger log.Wrapper `yaml:"initialConnectionsFallbackLogger"`

	// MaxConnections is the maximum number of thrift connections the client
	// pool can maintain.
	MaxConnections int `yaml:"maxConnections"`

	// MaxConnectionAge is the maximum duration that a pooled connection will be
	// kept before closing in favor of a new one.
	//
	// If this is not set, the default duration is 5 minutes
	// (see DefaultMaxConnectionAge).
	//
	// To disable this and keep connections in the pool indefinetly, set this to
	// a negative value.
	//
	// MaxConnectionAgeJitter is the ratio of random jitter +/- on top of
	// MaxConnectionAge. Default to 10% (see DefaultMaxConnectionAgeJitter).
	// For example, when MaxConnectionAge is 5min and MaxConnectionAgeJitter is
	// 10%, the TTL of the clients would be in range of (4:30, 5:30).
	//
	// When this is enabled, there will be one additional goroutine per connection
	// in the pool to do background housekeeping (to replace the expired
	// connections). We emit thriftbp_ttlclient_connection_housekeeping_total
	// counter with thrift_success tag to provide observalibility into the
	// background housekeeping.
	//
	// Due to a Go runtime bug [1], if you use a very small MaxConnectionAge or a
	// jitter very close to 1, the background housekeeping could cause excessive
	// CPU overhead.
	//
	// [1]: https://github.com/golang/go/issues/27707
	MaxConnectionAge       time.Duration `yaml:"maxConnectionAge"`
	MaxConnectionAgeJitter *float64      `yaml:"maxConnectionAgeJitter"`

	// ConnectTimeout and SocketTimeout are timeouts used by the underlying
	// thrift.TSocket.
	//
	// In most cases, you would want ConnectTimeout to be short, because if you
	// have problem connecting to the upstream you want to fail fast.
	//
	// For SocketTimeout, the value you should set depends on whether you set a
	// deadline to the context object to the client call functions or not.
	// If ALL your client calls will have a context object with a deadline
	// attached, then it's recommended to set SocketTimeout to a short value,
	// as this is the max overhead the client call will take over the set
	// deadline, in case the server is not-responding.
	// But if you don't always have a deadline attached to your client calls,
	// then SocketTimeout needs to be at least your upstream service's p99 latency
	// SLA. If it's shorter than that you are gonna close connections and fail
	// requests prematurely.
	//
	// It's recommended to make sure all your client call context objects have a
	// deadline set, and set SocketTimeout to a short value. For example:
	//
	//     clientCtx, cancel := context.WithTimeout(ctx, myCallTimeout)
	//     defer cancel()
	//     resp, err := client.MyCall(clientCtx, args)
	//
	// For both values, <=0 would mean no timeout.
	// In most cases you would want to set timeouts for both.
	ConnectTimeout time.Duration `yaml:"connectTimeout"`
	SocketTimeout  time.Duration `yaml:"socketTimeout"`

	// Any tags that should be applied to metrics logged by the ClientPool.
	// This includes the optional pool stats.
	//
	// Deprecated: We no longer emit any statsd metrics so this has no effect.
	MetricsTags metricsbp.Tags `yaml:"metricsTags"`

	// DefaultRetryOptions is the list of retry.Options to apply as the defaults
	// for the Retry middleware.
	//
	// This is optional, if it is not set, we will use a single option,
	// retry.Attempts(1).  This sets up the retry middleware but does not
	// automatically retry any requests.  You can set retry behavior per-call by
	// using retrybp.WithOptions.
	DefaultRetryOptions []retry.Option

	// ReportPoolStats signals to the ClientPool that it should report
	// statistics on the underlying clientpool.Pool in a background
	// goroutine.  If this is set to false, the reporting goroutine will
	// not be started and it will not report pool stats.
	//
	// It reports:
	// - the number of active clients to a gauge named
	//   "${ServiceSlug}.pool-active-connections".
	// - the number of allocated clients to a gauge named
	//   "${ServiceSlug}.pool-allocated-clients".
	//
	// The reporting goroutine is cancelled when the global metrics client
	// context is Done.
	//
	// Deprecated: The statsd metrics are deprecated and the prometheus metrics
	// are always reported.
	ReportPoolStats bool `yaml:"reportPoolStats"`

	// PoolGaugeInterval indicates how often we should update the active
	// connections gauge when collecting pool stats.
	//
	// When PoolGaugeInterval <= 0 and ReportPoolStats is true,
	// DefaultPoolGaugeInterval will be used instead.
	//
	// Deprecated: Not used any more. Prometheus gauges are auto scraped.
	PoolGaugeInterval time.Duration `yaml:"poolGaugeInterval"`

	// Suppress some of the errors returned by the server before sending them to
	// the client span.
	//
	// See MonitorClientArgs.ErrorSpanSuppressor for more details.
	//
	// This is optional. If it's not set IDLExceptionSuppressor will be used.
	ErrorSpanSuppressor errorsbp.Suppressor

	// When BreakerConfig is non-nil,
	// a breakerbp.FailureRatioBreaker will be created for the pool,
	// and its middleware will be set for the pool.
	BreakerConfig *breakerbp.Config `yaml:"breakerConfig"`

	// The edge context implementation. Optional.
	//
	// If it's not set, the global one from ecinterface.Get will be used instead.
	EdgeContextImpl ecinterface.Interface

	// The name for the server to identify this client,
	// via the "User-Agent" (HeaderUserAgent) THeader.
	//
	// Optional. If this is empty, no "User-Agent" header will be sent.
	ClientName string `yaml:"clientName"`

	// The hostname to add as a "thrift-hostname" header.
	//
	// Optional. If empty, no "thrift-hostname" header will be sent.
	ThriftHostnameHeader string `yaml:"thriftHostnameHeader"`
}

ClientPoolConfig is the configuration struct for creating a new ClientPool.

func (ClientPoolConfig) ToTConfiguration added in v0.8.0

func (c ClientPoolConfig) ToTConfiguration() *thrift.TConfiguration

ToTConfiguration generates *thrift.TConfiguration from this config.

Note that it always set THeaderProtocolID to thrift.THeaderProtocolCompact, even though that's not part of the ClientPoolConfig. To override this behavior, change the value from the returned TConfiguration.

func (ClientPoolConfig) Validate added in v0.7.1

func (c ClientPoolConfig) Validate() error

Validate checks ClientPoolConfig for any missing or erroneous values.

If this is the configuration for a baseplate service BaseplateClientPoolConfig(c).Validate should be used instead.

type CountedTServerTransport deprecated added in v0.9.3

type CountedTServerTransport struct {
	thrift.TServerTransport
}

CountedTServerTransport is a wrapper around thrift.TServerTransport that emits a gauge of the number of client connections.

Deprecated: This is deprecated with statsd metrics.

func (*CountedTServerTransport) Accept added in v0.9.3

Accept implements thrift.TServerTransport by retruning a thrift.TTransport that counts the number of client connections.

type DefaultClientMiddlewareArgs added in v0.3.0

type DefaultClientMiddlewareArgs struct {
	// ServiceSlug is a short identifier for the thrift service you are creating
	// clients for.  The preferred convention is to take the service's name,
	// remove the 'Service' prefix, if present, and convert from camel case to
	// all lower case, hyphen separated.
	//
	// Examples:
	//
	//     AuthenticationService -> authentication
	//     ImageUploadService -> image-upload
	ServiceSlug string

	// RetryOptions is the list of retry.Options to apply as the defaults for the
	// Retry middleware.
	//
	// This is optional, if it is not set, we will use a single option,
	// retry.Attempts(1).  This sets up the retry middleware but does not
	// automatically retry any requests.  You can set retry behavior per-call by
	// using retrybp.WithOptions.
	RetryOptions []retry.Option

	// Suppress some of the errors returned by the server before sending them to
	// the client span.
	//
	// See MonitorClientArgs.ErrorSpanSuppressor for more details.
	//
	// This is optional. If it's not set IDLExceptionSuppressor will be used.
	ErrorSpanSuppressor errorsbp.Suppressor

	// When BreakerConfig is non-nil,
	// a breakerbp.FailureRatioBreaker will be created for the pool,
	// and its middleware will be set for the pool.
	BreakerConfig *breakerbp.Config

	// The edge context implementation. Optional.
	//
	// If it's not set, the global one from ecinterface.Get will be used instead.
	EdgeContextImpl ecinterface.Interface

	// The name for the server to identify this client,
	// via the "User-Agent" (HeaderUserAgent) THeader.
	//
	// Optional. If this is empty, no "User-Agent" header will be sent.
	ClientName string
}

DefaultClientMiddlewareArgs is the arg struct for BaseplateDefaultClientMiddlewares.

type DefaultProcessorMiddlewaresArgs added in v0.4.0

type DefaultProcessorMiddlewaresArgs struct {
	// Suppress some of the errors returned by the server before sending them to
	// the server span.
	//
	// Based on Baseplate spec, the errors defined in your thrift IDL are not
	// treated as errors, and should be suppressed here. So in most cases that's
	// what the service developer should implement as the Suppressor here.
	//
	// Note that this suppressor only affects the errors send to the span. It
	// won't affect the errors returned to the client.
	//
	// This is optional. If it's not set IDLExceptionSuppressor will be used.
	ErrorSpanSuppressor errorsbp.Suppressor

	// Report the payload size metrics with this sample rate.
	//
	// This is optional. If it's not set none of the requests will be sampled.
	ReportPayloadSizeMetricsSampleRate float64

	// The edge context implementation. Optional.
	//
	// If it's not set, the global one from ecinterface.Get will be used instead.
	EdgeContextImpl ecinterface.Interface
}

DefaultProcessorMiddlewaresArgs are the args to be passed into BaseplateDefaultProcessorMiddlewares function to create default processor middlewares.

type MonitorClientArgs added in v0.4.0

type MonitorClientArgs struct {
	// The slug string of the service.
	//
	// Note that if this is the MonitorClient before retry,
	// ServiceSlug should also come with MonitorClientWrappedSlugSuffix.
	ServiceSlug string

	// Suppress some of the errors returned by the server before sending them to
	// the client span.
	//
	// Based on Baseplate spec, the errors defined in the server's thrift IDL are
	// not treated as errors, and should be suppressed here. So in most cases
	// that's what should be implemented as the Suppressor here.
	//
	// Note that this suppressor only affects the errors send to the span. It
	// won't affect the errors returned to the caller of the client function.
	//
	// This is optional. If it's not set IDLExceptionSuppressor will be used.
	ErrorSpanSuppressor errorsbp.Suppressor
}

MonitorClientArgs are the args to be passed into MonitorClient function.

type PoolError added in v0.2.0

type PoolError struct {
	// Cause is the inner error wrapped by PoolError.
	Cause error
}

PoolError is returned by ClientPool.TClient.Call when it fails to get a client from its pool.

func (PoolError) Error added in v0.2.0

func (err PoolError) Error() string

func (PoolError) Unwrap added in v0.2.0

func (err PoolError) Unwrap() error

type ServerConfig

type ServerConfig struct {
	// Required, used by both NewServer and NewBaseplateServer.
	//
	// This is the thrift processor implementation to handle endpoints.
	Processor thrift.TProcessor

	// Optional, used by both NewServer and NewBaseplateServer.
	//
	// For NewServer, this defines all the middlewares to wrap the server with.
	// For NewBaseplateServer, this only defines the middlewares in addition to
	// (and after) BaseplateDefaultProcessorMiddlewares.
	Middlewares []thrift.ProcessorMiddleware

	// Optional, used only by NewServer.
	//
	// A log wrapper that is used by the TSimpleServer.
	Logger thrift.Logger

	// Optional, used only by NewBaseplateServer.
	//
	// Please refer to the documentation of
	// DefaultProcessorMiddlewaresArgs.ErrorSpanSuppressor for more details
	// regarding how it is used.
	ErrorSpanSuppressor errorsbp.Suppressor

	// Optional, used only by NewBaseplateServer.
	//
	// Report the payload size metrics with this sample rate.
	// If not set none of the requests will be sampled.
	ReportPayloadSizeMetricsSampleRate float64

	// Optional, used by NewBaseplateServer and NewServer.
	//
	// Report the number of clients connected to the server as a runtime gauge
	// with metric name of 'thrift.connections'
	//
	// Deprecated: This feature is removed.
	ReportConnectionCount bool

	// Optional, used only by NewServer.
	// In NewBaseplateServer the address set in bp.Config() will be used instead.
	//
	// The endpoint address of your thrift service.
	//
	// This is ignored if Socket is non-nil.
	Addr string

	// Deprecated: No-op for now, will be removed in a future release.
	Timeout time.Duration

	// Optional, This duration is used to set both the read and write idle timeouts
	// for the thrift.TServerSocket used by the baseplate server.
	//
	// This is an experimental configuration and is subject to change or deprecation
	// without notice. When using NewBaseplateServer, setting a socket timeout will
	// also override the default thrift server logger to one that emits metrics
	// instead of logs in the event of a socket disconnect. A zero value means I/O
	// read or write operations will not time out.
	SocketTimeout time.Duration

	// Optional, used only by NewServer.
	// In NewBaseplateServer the address and timeout set in bp.Config() will be
	// used instead.
	//
	// You can choose to set Socket instead of Addr.
	Socket *thrift.TServerSocket
}

ServerConfig is the arg struct for both NewServer and NewBaseplateServer.

Some of the fields are only used by NewServer and some of them are only used by NewBaseplateServer. Please refer to the documentation for each field to see how is it used.

Directories

Path Synopsis
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers.
Package thrifttest contains objects and utility methods to aid with testing code using Thrift clients and servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL