thriftset

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2023 License: MIT Imports: 5 Imported by: 0

README

go.serversets/thriftset Build Status Godoc Reference

Package thriftset provides "least active request" balancing over a set of endpoints provided by go.serversets. Connections are kept in a pool and reused as needed.

Usage

zookeepers := []string{"10.0.1.0", "10.0.5.0", "10.0.9.0"}
watch, err := serversets.New(serversets.Production, "service_name", zookeepers).Watch()
if err != nil {
	// This will be a problem connecting to Zookeeper
	log.Fatalf("Registration error: %v", err)
}

ts := thriftset.New(watch)

// or using a fixed set of servers, to just use the loadbalancing
ts := thriftset.New(fixedset.New([]string{"host1:1234", "host2:1234"}))

service := New(ts)

resp, err := service.Find(123)
log.Printf("%v %v", resp, err)

The Service object is a helpful way to wrap the thrift interface into something nicer. It handles the "checking out" or connections and returning them. This thriftset package returns a connection to the endpoint with the least amount of of checked out connections.

// A Service object wraps the thrift interface with helper methods to fetch/update data.
type Service struct {
	set              *thriftset.ThriftSet
	protocolFactory  thrift.TProtocolFactory
	transportFactory thrift.TTransportFactory
}

func New(set *thriftset.ThriftSet) (*Service) {
	return &Service{
		set:              set,
		protocolFactory:  thrift.NewTBinaryProtocolFactory(),
		transportFactory: thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()),
	}
}

func (s *Service) Close() error {
	return s.set.Close()
}

func (s *Service) getClient() (*thriftset.Conn, *sampleservice.SampleServiceClient, error) {
	conn, err := s.set.GetConn()
	if err != nil {
		return nil, nil, err
	}

	if conn.Client == nil {
		transport := s.transportFactory.GetTransport(conn.Socket)
		err = transport.Open()
		if err != nil {
			return nil, nil, err
		}

		// The client factory is cached on the client
		conn.Client = sampleservice.NewSampleServiceClientFactory(transport, s.protocolFactory)
	}

	return conn, conn.Client.(*sampleservice.SampleServiceClient), nil
}

func (s *Service) releaseClient(conn *thriftset.Conn, err error) error {
	// do some sort of check to see if this error should close the connection or not.
	// ie. is it network related, or just a thrift exception
	if fatalError(err) {
		conn.Close()
		return err
	}

	return conn.Release() // returns the connection back to the pool
}

func (s *Service) Find(id int64) (*Sample, error) {
	req := sampleservice.NewGetRequest()
	req.ID = id

	conn, client, err := s.getClient()
	if err != nil {
		return nil, err
	}

	resp, err := client.Get(req)
	err = s.releaseClient(conn, err)
	if err != nil {
		return nil, err
	}

	// convert the thrift object into a "clearer" go type.
	// Maybe add some validation.
	return &Sample{
		ID: id
		Metadata: resp.Metadata,
	}, nil
}

Potential Improvements and Contributing

It'd be nice to mark an endpoint as down if it returns too many errors. If you'd like, submit a pull request.

Documentation

Index

Constants

View Source
const (
	DefaultMaxIdlePerHost   = 10
	DefaultMaxActivePerHost = 10
	DefaultTimeout          = time.Second
	DefaultIdleTimeout      = 5 * time.Minute
)

Default values for ThriftSet parameters

Variables

View Source
var (
	// ErrNoEndpoints is returned when no endpoints are configured or available.
	ErrNoEndpoints = errors.New("thriftset: no endpoints configured or available")

	// ErrGetOnClosedSet is returned when requesting a connection from a closed thrift set.
	ErrGetOnClosedSet = errors.New("thriftset: get on closed set")

	// ErrGetOnClosedEndpoint is returned by endpoint.GetConn if the endpoint
	// has been closed because it was removed from the watch.
	// This error is retryable.
	ErrGetOnClosedEndpoint = errors.New("endpoint closed")
)

Functions

This section is empty.

Types

type Conn

type Conn struct {
	Socket *thrift.TSocket // used to create new clients
	Client interface{}     // a place to cache thrift clients.
	// contains filtered or unexported fields
}

Conn is the connection returned by the pool.

func (*Conn) Close

func (c *Conn) Close() error

Close does not put this connection back into the pool. This should be called if there is some sort of problem with the connection.

func (*Conn) Release

func (c *Conn) Release() error

Release puts the connection back in the pool and allows others to use it.

type ThriftSet

type ThriftSet struct {
	LastEvent  time.Time
	EventCount int
	// contains filtered or unexported fields
}

ThriftSet defines a set of thift connections. It loadbalances over the set of hosts using the "least active connections" strategy.

func New

func New(watch Watcher) *ThriftSet

New creates a new ThriftSet with default parameters.

func (*ThriftSet) Close

func (ts *ThriftSet) Close() error

Close releases the resources used by the set. ie. closes all connections. There may still be connections in flight when this function returns.

func (*ThriftSet) Event

func (ts *ThriftSet) Event() <-chan struct{}

Event returns the event channel. This channel will get an object whenever something changes with the list of endpoints. Mostly used for testing as this will trigger after all the watch events handling completes.

func (*ThriftSet) GetConn

func (ts *ThriftSet) GetConn() (*Conn, error)

GetConn will create a connection or return one from the idle list. It will use a host from the Watcher with the least ammount of active connections.

func (*ThriftSet) IdleTimeout

func (ts *ThriftSet) IdleTimeout() time.Duration

IdleTimeout returns the timeout for connections to live in the idle pool. This is part of the endpoints.Pooler interface.

func (*ThriftSet) IsClosed

func (ts *ThriftSet) IsClosed() bool

IsClosed returns true if the set has been closed. There still might be active connections in flight but they will be closed as they are released.

func (*ThriftSet) MaxActivePerHost

func (ts *ThriftSet) MaxActivePerHost() int

MaxActivePerHost returna the max active connections for a given host. This is part of the endpoints.Pooler interface.

func (*ThriftSet) MaxIdlePerHost

func (ts *ThriftSet) MaxIdlePerHost() int

MaxIdlePerHost returns the max number of idle connections to keep in the pool. This is part of the endpoints.Pooler interface.

func (*ThriftSet) OpenConn

func (ts *ThriftSet) OpenConn(hostPort string) (io.Closer, error)

OpenConn creats a new thrift socket for the host. This is used by the endpoints.Set to create connections for a given endpoint. This is part of the endpoints.Pooler interface.

func (*ThriftSet) SetIdleTimeout

func (ts *ThriftSet) SetIdleTimeout(t time.Duration)

SetIdleTimeout sets the amount of time a connection can live in the idle pool.

func (*ThriftSet) SetMaxActivePerHost

func (ts *ThriftSet) SetMaxActivePerHost(max int)

SetMaxActivePerHost sets the max number of active connections to a given endpoint.

func (*ThriftSet) SetMaxIdlePerHost

func (ts *ThriftSet) SetMaxIdlePerHost(max int)

SetMaxIdlePerHost sets the max number of connections in the idle pool.

func (*ThriftSet) SetTimeout

func (ts *ThriftSet) SetTimeout(t time.Duration)

SetTimeout sets the thrift request timeout for new connections in the pool. This should be set at startup, as existing/live connections will not be updated with these new values.

func (*ThriftSet) Timeout

func (ts *ThriftSet) Timeout() time.Duration

Timeout is the max length for a given request to the thrift service.

type Watcher

type Watcher interface {
	Endpoints() []string
	Event() <-chan struct{}
	IsClosed() bool
}

A Watcher represents how a serverset.Watch is used so we can use the zookeeper kind or a fixed set or stub it out for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL