thriftutils

package module
v0.0.0-...-f6b0c2a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

Thrift utils

This is a small wrapper library around a thrift.TProcessor that emits a textual representation of Thrift function calls, their input and output. Additionally, it provides a small TClient that allows retrying on network errors

Installing

go get github.com/ozkatz/thrift-utils

Using (logging wrapper)

import (
    "github.com/apache/thrift/lib/go/thrift"
    tutil "github.com/ozkatz/thrift-utils"
    ...
)

// existing processor
processor := hello_world.NewHelloWorldProcesor(myHelloWorldHandler{})

// Let's wrap it with a proxy:
processor = tutil.Log(processor, func(c *tutil.Call) {
    logLine := logger.WithFields(logging.Fields{
        "name": c.Name,
        "input": c.Input,
        "output": c.Output,
        "took_ms": c.Took.Milliseconds(),
    })

    if c.Err != nil {
        logLine.WithError(c.Err).Error("error while running thrift call")
    } else {
        logLine.Debug("thrift call executed successfully")
    }   
})

// and use it as normal:
server := thrift.NewTSimpleServer4(proc, transport, transportFactory, protocolFactory)
server.Serve()

Using (client wrapper that retries on socket errors)

This is only useful because I want to restart the upstream server from time to time and the TCP connection gets broken.

import (
    "github.com/apache/thrift/lib/go/thrift"
    tutil "github.com/ozkatz/thrift-utils"
    ...
)


func connectUpstream(addr string) (*hello_world.ThriftHelloWorldClient, error) {
	connectFn := func() (thrift.TClient, error) {
		cfg := &thrift.TConfiguration{}
		transport := thrift.NewTSocketConf(addr, cfg)
		if err := transport.Open(); err != nil {
			return nil, err
		}
		protocolFactory := thrift.NewTBinaryProtocolFactoryConf(cfg)
		return thrift.NewTStandardClient(
			protocolFactory.GetProtocol(transport),
			protocolFactory.GetProtocol(transport)), nil
	}
    
	client, err := tutil.NewRertryingClient(
		connectFn, 
        tutil.RetryOnNetError,
        tutil.DefaultExponentialBackoff)
	if err != nil {
		return nil, err
	}
	return hello_world.NewThriftHelloWorldClient(client), nil
}

func RunProxyServer(ctx context.Context, upstream, addr string) error {
	logging.SetLevel("debug")
	logging.SetOutputs([]string{"-"}, 0, 0)
	transportFactory := thrift.NewTBufferedTransportFactory(8192)
	protocolFactory := thrift.NewTBinaryProtocolFactoryConf(nil)
	transport, err := thrift.NewTServerSocket(addr)
	if err != nil {
		return err
	}

	client, err := connectUpstream(upstream)
	if err != nil {
		return err
	}

	processor := hello_world.NewThriftHelloWorldProcessor(client)
	server := thrift.NewTSimpleServer4(processor, transport, transportFactory, protocolFactory)

	logging.FromContext(ctx).WithFields(logging.Fields{"addr": addr}).Info("starting Thrift proxy server")
	return server.Serve()
}

Documentation

Index

Constants

View Source
const (
	MESSAGE thriftType = iota
	MAP
	LIST
	SET
	STRUCT
	FIELD
	STRING
	I16
	I32
	I64
	DOUBLE
	UUID
	BINARY
	BYTE
	BOOL
)

Variables

View Source
var (
	DefaultExponentialBackoff = NewExponentialBackoff(time.Millisecond*50, 16, 2.0)
)
View Source
var (
	ErrRetriesExhausted = errors.New("retries exhausted")
)

Functions

func Log

func Log(processor thrift.TProcessor, emitter EmitterFn) thrift.TProcessor

func RetryOnNetError

func RetryOnNetError(err error) bool

func StdoutLogger

func StdoutLogger(c *Call)

Types

type BackoffHandler

type BackoffHandler interface {
	BackOff(attempt int) bool
}

type Call

type Call struct {
	Name     string
	Input    string
	Output   string
	Duration time.Duration
	Err      thrift.TException
}

type EmitterFn

type EmitterFn func(c *Call)

type ExponentialBackoff

type ExponentialBackoff struct {
	// contains filtered or unexported fields
}

func NewExponentialBackoff

func NewExponentialBackoff(startDuration time.Duration, maxAttempts int, base float64) *ExponentialBackoff

func (*ExponentialBackoff) BackOff

func (e *ExponentialBackoff) BackOff(attempt int) bool

type LoggingProtocol

type LoggingProtocol struct {
	// contains filtered or unexported fields
}

func NewLoggingProtocol

func NewLoggingProtocol(wraps thrift.TProtocol) *LoggingProtocol

func (*LoggingProtocol) Flush

func (l *LoggingProtocol) Flush(ctx context.Context) (err error)

func (*LoggingProtocol) ReadBinary

func (l *LoggingProtocol) ReadBinary(ctx context.Context) (value []byte, err error)

func (*LoggingProtocol) ReadBool

func (l *LoggingProtocol) ReadBool(ctx context.Context) (value bool, err error)

func (*LoggingProtocol) ReadByte

func (l *LoggingProtocol) ReadByte(ctx context.Context) (value int8, err error)

func (*LoggingProtocol) ReadDouble

func (l *LoggingProtocol) ReadDouble(ctx context.Context) (value float64, err error)

func (*LoggingProtocol) ReadFieldBegin

func (l *LoggingProtocol) ReadFieldBegin(ctx context.Context) (name string, typeId thrift.TType, id int16, err error)

func (*LoggingProtocol) ReadFieldEnd

func (l *LoggingProtocol) ReadFieldEnd(ctx context.Context) error

func (*LoggingProtocol) ReadI16

func (l *LoggingProtocol) ReadI16(ctx context.Context) (value int16, err error)

func (*LoggingProtocol) ReadI32

func (l *LoggingProtocol) ReadI32(ctx context.Context) (value int32, err error)

func (*LoggingProtocol) ReadI64

func (l *LoggingProtocol) ReadI64(ctx context.Context) (value int64, err error)

func (*LoggingProtocol) ReadListBegin

func (l *LoggingProtocol) ReadListBegin(ctx context.Context) (elemType thrift.TType, size int, err error)

func (*LoggingProtocol) ReadListEnd

func (l *LoggingProtocol) ReadListEnd(ctx context.Context) error

func (*LoggingProtocol) ReadMapBegin

func (l *LoggingProtocol) ReadMapBegin(ctx context.Context) (keyType thrift.TType, valueType thrift.TType, size int, err error)

func (*LoggingProtocol) ReadMapEnd

func (l *LoggingProtocol) ReadMapEnd(ctx context.Context) error

func (*LoggingProtocol) ReadMessageBegin

func (l *LoggingProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId thrift.TMessageType, seqid int32, err error)

readers!

func (*LoggingProtocol) ReadMessageEnd

func (l *LoggingProtocol) ReadMessageEnd(ctx context.Context) error

func (*LoggingProtocol) ReadSetBegin

func (l *LoggingProtocol) ReadSetBegin(ctx context.Context) (elemType thrift.TType, size int, err error)

func (*LoggingProtocol) ReadSetEnd

func (l *LoggingProtocol) ReadSetEnd(ctx context.Context) error

func (*LoggingProtocol) ReadString

func (l *LoggingProtocol) ReadString(ctx context.Context) (value string, err error)

func (*LoggingProtocol) ReadStructBegin

func (l *LoggingProtocol) ReadStructBegin(ctx context.Context) (name string, err error)

func (*LoggingProtocol) ReadStructEnd

func (l *LoggingProtocol) ReadStructEnd(ctx context.Context) error

func (*LoggingProtocol) ReadUUID

func (l *LoggingProtocol) ReadUUID(ctx context.Context) (value thrift.Tuuid, err error)

func (*LoggingProtocol) Skip

func (l *LoggingProtocol) Skip(ctx context.Context, fieldType thrift.TType) (err error)

func (*LoggingProtocol) String

func (l *LoggingProtocol) String() string

func (*LoggingProtocol) Transport

func (l *LoggingProtocol) Transport() thrift.TTransport

func (*LoggingProtocol) WriteBinary

func (l *LoggingProtocol) WriteBinary(ctx context.Context, value []byte) error

func (*LoggingProtocol) WriteBool

func (l *LoggingProtocol) WriteBool(ctx context.Context, value bool) error

func (*LoggingProtocol) WriteByte

func (l *LoggingProtocol) WriteByte(ctx context.Context, value int8) error

func (*LoggingProtocol) WriteDouble

func (l *LoggingProtocol) WriteDouble(ctx context.Context, value float64) error

func (*LoggingProtocol) WriteFieldBegin

func (l *LoggingProtocol) WriteFieldBegin(ctx context.Context, name string, typeId thrift.TType, id int16) error

func (*LoggingProtocol) WriteFieldEnd

func (l *LoggingProtocol) WriteFieldEnd(ctx context.Context) error

func (*LoggingProtocol) WriteFieldStop

func (l *LoggingProtocol) WriteFieldStop(ctx context.Context) error

func (*LoggingProtocol) WriteI16

func (l *LoggingProtocol) WriteI16(ctx context.Context, value int16) error

func (*LoggingProtocol) WriteI32

func (l *LoggingProtocol) WriteI32(ctx context.Context, value int32) error

func (*LoggingProtocol) WriteI64

func (l *LoggingProtocol) WriteI64(ctx context.Context, value int64) error

func (*LoggingProtocol) WriteListBegin

func (l *LoggingProtocol) WriteListBegin(ctx context.Context, elemType thrift.TType, size int) error

func (*LoggingProtocol) WriteListEnd

func (l *LoggingProtocol) WriteListEnd(ctx context.Context) error

func (*LoggingProtocol) WriteMapBegin

func (l *LoggingProtocol) WriteMapBegin(ctx context.Context, keyType thrift.TType, valueType thrift.TType, size int) error

func (*LoggingProtocol) WriteMapEnd

func (l *LoggingProtocol) WriteMapEnd(ctx context.Context) error

func (*LoggingProtocol) WriteMessageBegin

func (l *LoggingProtocol) WriteMessageBegin(ctx context.Context, name string, typeId thrift.TMessageType, seqid int32) error

func (*LoggingProtocol) WriteMessageEnd

func (l *LoggingProtocol) WriteMessageEnd(ctx context.Context) error

func (*LoggingProtocol) WriteSetBegin

func (l *LoggingProtocol) WriteSetBegin(ctx context.Context, elemType thrift.TType, size int) error

func (*LoggingProtocol) WriteSetEnd

func (l *LoggingProtocol) WriteSetEnd(ctx context.Context) error

func (*LoggingProtocol) WriteString

func (l *LoggingProtocol) WriteString(ctx context.Context, value string) error

func (*LoggingProtocol) WriteStructBegin

func (l *LoggingProtocol) WriteStructBegin(ctx context.Context, name string) error

func (*LoggingProtocol) WriteStructEnd

func (l *LoggingProtocol) WriteStructEnd(ctx context.Context) error

func (*LoggingProtocol) WriteUUID

func (l *LoggingProtocol) WriteUUID(ctx context.Context, value thrift.Tuuid) error

type RertryingClient

type RertryingClient struct {
	// contains filtered or unexported fields
}

func NewRertryingClient

func NewRertryingClient(fn RetyClientFn, shoudRetryFn ShouldRetryFn, backoffHandler BackoffHandler) (*RertryingClient, error)

func (*RertryingClient) Call

func (c *RertryingClient) Call(ctx context.Context, method string, args thrift.TStruct, result thrift.TStruct) (thrift.ResponseMeta, error)

Call implements thrift.TClient.

type RetyClientFn

type RetyClientFn func() (thrift.TClient, error)

type ShouldRetryFn

type ShouldRetryFn func(error) bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL