client

package
v0.0.0-...-5e8aad0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2022 License: Apache-2.0 Imports: 20 Imported by: 5

Documentation

Overview

Package client contains basic utilities to exchange native protocol frames with compatible endpoints.

The main type in this package is CqlClient, a simple CQL client that can be used to test any CQL-compatible backend.

Please note that code in this package is intended mostly to help driver implementors test their libraries; it should not be used in production.

Index

Constants

View Source
const (
	DefaultConnectTimeout = time.Second * 5
	DefaultReadTimeout    = time.Second * 12
)
View Source
const (
	DefaultMaxInFlight = 1024
	DefaultMaxPending  = 10
)
View Source
const (
	DefaultAcceptTimeout = time.Second * 60
	DefaultIdleTimeout   = time.Hour
)
View Source
const (
	ServerStateNotStarted = int32(iota)
	ServerStateRunning    = int32(iota)
	ServerStateClosed     = int32(iota)
)
View Source
const DefaultMaxConnections = 128
View Source
const ManagedStreamId int16 = 0

Variables

This section is empty.

Functions

func PerformHandshake

func PerformHandshake(clientConn *CqlClientConnection, serverConn *CqlServerConnection, version primitive.ProtocolVersion, streamId int16) error

PerformHandshake performs a handshake between the given client and server connections, using the provided protocol version. The handshake will use stream id 1, unless the client connection is in managed mode.

Types

type AuthCredentials

type AuthCredentials struct {
	Username string
	Password string
}

AuthCredentials encapsulates a username and a password to use with plain-text authenticators.

func (AuthCredentials) Copy

func (c AuthCredentials) Copy() *AuthCredentials

func (*AuthCredentials) Marshal

func (c *AuthCredentials) Marshal() []byte

Marshal serializes the current credentials to an authentication token with the expected format for PasswordAuthenticator.

func (*AuthCredentials) String

func (c *AuthCredentials) String() string

func (*AuthCredentials) Unmarshal

func (c *AuthCredentials) Unmarshal(token []byte) error

Unmarshal deserializes an authentication token with the expected format for PasswordAuthenticator into the current AuthCredentials.

type CqlClient

type CqlClient struct {
	// The remote contact point address to connect to.
	RemoteAddress string
	// The AuthCredentials for authenticated servers. If nil, no authentication will be used.
	Credentials *AuthCredentials
	// The compression to use; if unspecified, no compression will be used.
	Compression primitive.Compression
	// The maximum number of in-flight requests to apply for each connection created with Connect. Must be strictly
	// positive.
	MaxInFlight int
	// The maximum number of pending responses awaiting delivery to store per request. Must be strictly positive.
	// This is only useful when using continuous paging, a feature specific to DataStax Enterprise.
	MaxPending int
	// The timeout to apply when establishing new connections.
	ConnectTimeout time.Duration
	// The timeout to apply when waiting for incoming responses.
	ReadTimeout time.Duration
	// An optional list of handlers to handle incoming events.
	EventHandlers []EventHandler
	// TLSConfig is the TLS configuration to use.
	TLSConfig *tls.Config
}

CqlClient is a client for Cassandra-compatible backends. It is preferable to create CqlClient instances using the constructor function NewCqlClient. Once the client is created and properly configured, use Connect or ConnectAndInit to establish new connections to the server.

func NewCqlClient

func NewCqlClient(remoteAddress string, credentials *AuthCredentials) *CqlClient

NewCqlClient Creates a new CqlClient with default options. Leave credentials nil to opt out from authentication.

func (*CqlClient) Connect

func (client *CqlClient) Connect(ctx context.Context) (*CqlClientConnection, error)

Connect establishes a new TCP connection to the client's remote address. Set ctx to context.Background if no parent context exists. The returned CqlClientConnection is ready to use, but one must initialize it manually, for example by calling CqlClientConnection.InitiateHandshake. Alternatively, use ConnectAndInit to get a fully-initialized connection.

func (*CqlClient) ConnectAndInit

func (client *CqlClient) ConnectAndInit(
	ctx context.Context,
	version primitive.ProtocolVersion,
	streamId int16,
) (*CqlClientConnection, error)

ConnectAndInit establishes a new TCP connection to the server, then initiates a handshake procedure using the specified protocol version. The CqlClientConnection connection will be fully initialized when this method returns. Use stream id zero to activate automatic stream id management. Set ctx to context.Background if no parent context exists.

func (*CqlClient) String

func (client *CqlClient) String() string

type CqlClientConnection

type CqlClientConnection struct {
	// contains filtered or unexported fields
}

CqlClientConnection encapsulates a TCP client connection to a remote Cassandra-compatible backend. CqlClientConnection instances should be created by calling CqlClient.Connect or CqlClient.ConnectAndInit.

func (*CqlClientConnection) Close

func (c *CqlClientConnection) Close() (err error)

func (*CqlClientConnection) Credentials

func (c *CqlClientConnection) Credentials() *AuthCredentials

Credentials returns a copy of the connection's AuthCredentials, if any, or nil if no authentication was configured.

func (*CqlClientConnection) EventChannel

func (c *CqlClientConnection) EventChannel() EventChannel

EventChannel returns a channel for listening to incoming events received on this connection. This channel will be closed when the connection is closed. If this connection has already been closed, this method returns nil.

func (*CqlClientConnection) InitiateHandshake

func (c *CqlClientConnection) InitiateHandshake(version primitive.ProtocolVersion, streamId int16) (err error)

InitiateHandshake initiates the handshake procedure to initialize the client connection, using the given protocol version. The handshake will use authentication if the connection was created with auth credentials; otherwise it will proceed without authentication. Use stream id zero to activate automatic stream id management.

func (*CqlClientConnection) IsClosed

func (c *CqlClientConnection) IsClosed() bool

func (*CqlClientConnection) LocalAddr

func (c *CqlClientConnection) LocalAddr() net.Addr

LocalAddr returns the connection's local address (that is, the client address).

func (*CqlClientConnection) NewStartupRequest

func (c *CqlClientConnection) NewStartupRequest(version primitive.ProtocolVersion, streamId int16) (*frame.Frame, error)

NewStartupRequest is a convenience method to create a new STARTUP request frame. The compression option will be automatically set to the appropriate compression algorithm, depending on whether the connection was configured to use a compressor. Use stream id zero to activate automatic stream id management.

func (*CqlClientConnection) Receive

Receive is a convenience method that takes an InFlightRequest obtained through Send and waits until the next response frame is received, or an error occurs, whichever happens first. If the in-flight request is completed already without returning more frames, this method return a nil frame and a nil error.

func (*CqlClientConnection) ReceiveEvent

func (c *CqlClientConnection) ReceiveEvent() (*frame.Frame, error)

ReceiveEvent waits until an event frame is received, or the configured read timeout is triggered, or the connection is closed, whichever happens first. Returns the event frame, if any.

func (*CqlClientConnection) RemoteAddr

func (c *CqlClientConnection) RemoteAddr() net.Addr

RemoteAddr returns the connection's remote address (that is, the server address).

func (*CqlClientConnection) Send

Send sends the given request frame and returns a receive channel that can be used to receive response frames and errors matching the request's stream id. The channel will be closed after receiving the last frame, or if the configured read timeout is triggered, or if the connection itself is closed, whichever happens first. Stream id management: if the frame's stream id is ManagedStreamId (0), it is assumed that the frame's stream id is to be automatically assigned by the connection upon write. Users are free to choose between managed stream ids or manually assigned ones, but it is not recommended mixing managed stream ids with non-managed ones on the same connection.

func (*CqlClientConnection) SendAndReceive

func (c *CqlClientConnection) SendAndReceive(f *frame.Frame) (*frame.Frame, error)

SendAndReceive is a convenience method chaining a call to Send to a call to Receive.

func (*CqlClientConnection) String

func (c *CqlClientConnection) String() string

type CqlServer

type CqlServer struct {
	// ListenAddress is the address to listen to.
	ListenAddress string
	// Credentials is the AuthCredentials to use. If nil, no authentication will be used; otherwise, clients will be
	// required to authenticate with plain-text auth using the same credentials.
	Credentials *AuthCredentials
	// MaxConnections is the maximum number of open client connections to accept. Must be strictly positive.
	MaxConnections int
	// MaxInFlight is the maximum number of in-flight requests to apply for each connection created with Accept. Must
	// be strictly positive.
	MaxInFlight int
	// AcceptTimeout is the timeout to apply when accepting new connections.
	AcceptTimeout time.Duration
	// IdleTimeout is the timeout to apply for closing idle connections.
	IdleTimeout time.Duration
	// RequestHandlers is an optional list of handlers to handle incoming requests.
	RequestHandlers []RequestHandler
	// RequestRawHandlers is an optional list of handlers to handle incoming requests and return a response in a byte slice format.
	RequestRawHandlers []RawRequestHandler
	// TLSConfig is the TLS configuration to use.
	TLSConfig *tls.Config
	// contains filtered or unexported fields
}

CqlServer is a minimalistic server stub that can be used to mimic CQL-compatible backends. It is preferable to create CqlServer instances using the constructor function NewCqlServer. Once the server is properly created and configured, use Start to start the server, then call Accept or AcceptAny to accept incoming client connections.

func NewCqlServer

func NewCqlServer(listenAddress string, credentials *AuthCredentials) *CqlServer

NewCqlServer creates a new CqlServer with default options. Leave credentials nil to opt out from authentication.

func (*CqlServer) Accept

func (server *CqlServer) Accept(client *CqlClientConnection) (*CqlServerConnection, error)

Accept waits until the given client address is accepted, the configured timeout is triggered, or the server is closed, whichever happens first.

func (*CqlServer) AcceptAny

func (server *CqlServer) AcceptAny() (*CqlServerConnection, error)

AcceptAny waits until any client is accepted, the configured timeout is triggered, or the server is closed, whichever happens first. This method is useful when the client is not known in advance.

func (*CqlServer) AllAcceptedClients

func (server *CqlServer) AllAcceptedClients() ([]*CqlServerConnection, error)

AllAcceptedClients returns a list of all the currently active server connections.

func (*CqlServer) Bind

Bind is a convenience method to connect a CqlClient to this CqlServer. The returned connections will be open, but not initialized (i.e., no handshake performed). The server must be started prior to calling this method.

func (*CqlServer) BindAndInit

func (server *CqlServer) BindAndInit(
	client *CqlClient,
	ctx context.Context,
	version primitive.ProtocolVersion,
	streamId int16,
) (*CqlClientConnection, *CqlServerConnection, error)

BindAndInit is a convenience method to connect a CqlClient to this CqlServer. The returned connections will be open and initialized (i.e., handshake is already performed). The server must be started prior to calling this method. Use stream id zero to activate automatic stream id management.

func (*CqlServer) Close

func (server *CqlServer) Close() (err error)

func (*CqlServer) IsClosed

func (server *CqlServer) IsClosed() bool

func (*CqlServer) IsNotStarted

func (server *CqlServer) IsNotStarted() bool

func (*CqlServer) IsRunning

func (server *CqlServer) IsRunning() bool

func (*CqlServer) Start

func (server *CqlServer) Start(ctx context.Context) (err error)

Start starts the server and binds to its listen address. This method must be called before calling Accept. Set ctx to context.Background if no parent context exists.

func (*CqlServer) String

func (server *CqlServer) String() string

type CqlServerConnection

type CqlServerConnection struct {
	// contains filtered or unexported fields
}

CqlServerConnection encapsulates a TCP server connection to a remote CQL client. CqlServerConnection instances should be created by calling CqlServer.Accept or CqlServer.Bind.

func (*CqlServerConnection) AcceptHandshake

func (c *CqlServerConnection) AcceptHandshake() (err error)

AcceptHandshake Listens for a client STARTUP request and proceeds with the server-side handshake procedure. Authentication will be required if the connection was created with auth credentials; otherwise the handshake will proceed without authentication. This method is intended for use when server-side handshake should be triggered manually. For automatic server-side handshake, consider using HandshakeHandler instead.

func (*CqlServerConnection) Close

func (c *CqlServerConnection) Close() (err error)

func (*CqlServerConnection) Credentials

func (c *CqlServerConnection) Credentials() *AuthCredentials

Credentials Returns a copy of the connection's AuthCredentials, if any, or nil if no authentication was configured.

func (*CqlServerConnection) GetConn

func (c *CqlServerConnection) GetConn() net.Conn

func (*CqlServerConnection) IsClosed

func (c *CqlServerConnection) IsClosed() bool

func (*CqlServerConnection) LocalAddr

func (c *CqlServerConnection) LocalAddr() net.Addr

LocalAddr Returns the connection's local address (that is, the client address).

func (*CqlServerConnection) Receive

func (c *CqlServerConnection) Receive() (*frame.Frame, error)

Receive waits until the next request frame is received, or the configured idle timeout is triggered, or the connection itself is closed, whichever happens first.

func (*CqlServerConnection) RemoteAddr

func (c *CqlServerConnection) RemoteAddr() net.Addr

RemoteAddr Returns the connection's remote address (that is, the server address).

func (*CqlServerConnection) Send

func (c *CqlServerConnection) Send(f *frame.Frame) error

Send sends the given response frame.

func (*CqlServerConnection) SendRaw

func (c *CqlServerConnection) SendRaw(rawResponse []byte) error

SendRaw sends the given response frame (already encoded).

func (*CqlServerConnection) String

func (c *CqlServerConnection) String() string

type EventChannel

type EventChannel <-chan *frame.Frame

EventChannel is a receive-only channel for incoming events. A receive channel can be obtained through CqlClientConnection.EventChannel.

type EventHandler

type EventHandler func(event *frame.Frame, conn *CqlClientConnection)

EventHandler An event handler is a callback function that gets invoked whenever a CqlClientConnection receives an incoming event.

type InFlightRequest

type InFlightRequest interface {

	// StreamId is the in-flight request stream id.
	StreamId() int16

	// Incoming returns a channel to receive incoming frames for this in-flight request. Typically the channel will
	// only ever emit one single frame, except when using continuous paging (DataStax Enterprise only).
	// The returned channel is never nil. It is closed after receiving the last frame, or if an error occurs
	// (typically a timeout), whichever happens first; when the channel is closed, IsDone returns true.
	// If the channel is closed because of an error, Err will return that error, otherwise it will return nil.
	// Successive calls to Incoming return the same channel.
	Incoming() <-chan *frame.Frame

	// IsDone returns true if Incoming is closed, and false otherwise.
	IsDone() bool

	// Err returns nil if Incoming is not yet closed.
	// If Incoming is closed, Err returns either nil if the channel was closed normally, or a non-nil error explaining
	// why the channel was closed abnormally.
	// After Err returns a non-nil error, successive calls to Err return the same error.
	Err() error
}

InFlightRequest is an in-flight request sent through CqlClientConnection.Send.

type PlainTextAuthenticator

type PlainTextAuthenticator struct {
	Credentials *AuthCredentials
}

A simple authenticator to perform plain-text authentications for CQL clients.

func (*PlainTextAuthenticator) EvaluateChallenge

func (a *PlainTextAuthenticator) EvaluateChallenge(challenge []byte) ([]byte, error)

func (*PlainTextAuthenticator) InitialResponse

func (a *PlainTextAuthenticator) InitialResponse(authenticator string) ([]byte, error)

type RawRequestHandler

type RawRequestHandler func(request *frame.Frame, conn *CqlServerConnection, ctx RequestHandlerContext) (encodedResponse []byte)

RawRequestHandler is similar to RequestHandler but returns an already encoded response in byte slice format, this can be used to return responses that the embedded codecs can't encode

type RequestHandler

type RequestHandler func(request *frame.Frame, conn *CqlServerConnection, ctx RequestHandlerContext) (response *frame.Frame)

RequestHandler is a callback function that gets invoked whenever a CqlServerConnection receives an incoming frame. The handler function should inspect the request frame and determine if it can handle the response for it. If so, it should return a non-nil response frame. When that happens, no further handlers will be tried for the incoming request. If a handler returns nil, it is assumed that it was not able to handle the request, in which case another handler, if any, may be tried.

var HandshakeHandler RequestHandler = func(request *frame.Frame, conn *CqlServerConnection, ctx RequestHandlerContext) (response *frame.Frame) {
	if ctx.GetAttribute(handshakeStateKey) == handshakeStateDone {
		return
	}
	version := request.Header.Version
	id := request.Header.StreamId
	switch msg := request.Body.Message.(type) {
	case *message.Options:
		log.Debug().Msgf("%v: [handshake handler]: intercepted OPTIONS before STARTUP", conn)
		response = frame.NewFrame(version, id, &message.Supported{})
	case *message.Startup:
		if conn.Credentials() == nil {
			ctx.PutAttribute(handshakeStateKey, handshakeStateDone)
			log.Info().Msgf("%v: [handshake handler]: handshake successful", conn)
			response = frame.NewFrame(version, id, &message.Ready{})
		} else {
			ctx.PutAttribute(handshakeStateKey, handshakeStateStarted)
			response = frame.NewFrame(version, id, &message.Authenticate{Authenticator: "org.apache.cassandra.auth.PasswordAuthenticator"})
		}
	case *message.AuthResponse:
		if ctx.GetAttribute(handshakeStateKey) == handshakeStateStarted {
			userCredentials := &AuthCredentials{}
			if err := userCredentials.Unmarshal(msg.Token); err == nil {
				serverCredentials := conn.Credentials()
				if userCredentials.Username == serverCredentials.Username &&
					userCredentials.Password == serverCredentials.Password {
					log.Info().Msgf("%v: [handshake handler]: handshake successful", conn)
					response = frame.NewFrame(version, id, &message.AuthSuccess{})
				} else {
					log.Error().Msgf("%v: [handshake handler]: authentication error: invalid credentials", conn)
					response = frame.NewFrame(version, id, &message.AuthenticationError{ErrorMessage: "invalid credentials"})
				}
				ctx.PutAttribute(handshakeStateKey, handshakeStateDone)
			}
		} else {
			ctx.PutAttribute(handshakeStateKey, handshakeStateDone)
			log.Error().Msgf("%v: [handshake handler]: expected STARTUP, got AUTH_RESPONSE", conn)
			response = frame.NewFrame(version, id, &message.ProtocolError{ErrorMessage: "handshake failed"})
		}
	default:
		ctx.PutAttribute(handshakeStateKey, handshakeStateDone)
		log.Error().Msgf("%v: [handshake handler]: expected OPTIONS, STARTUP or AUTH_RESPONSE, got %v", conn, msg)
		response = frame.NewFrame(version, id, &message.ProtocolError{ErrorMessage: "handshake failed"})
	}
	return
}

HandshakeHandler is a RequestHandler to handle server-side handshakes. This is an alternative to CqlServerConnection.AcceptHandshake to make the server connection automatically handle all incoming handshake attempts.

var HeartbeatHandler RequestHandler = func(request *frame.Frame, conn *CqlServerConnection, _ RequestHandlerContext) (response *frame.Frame) {
	if _, ok := request.Body.Message.(*message.Options); ok {
		log.Debug().Msgf("%v: [heartbeat handler]: received heartbeat probe", conn)
		response = frame.NewFrame(request.Header.Version, request.Header.StreamId, &message.Supported{})
	}
	return
}

A RequestHandler to handle server-side heartbeats. This handler assumes that every OPTIONS request is a heartbeat probe and replies with a SUPPORTED response.

var RegisterHandler RequestHandler = func(request *frame.Frame, conn *CqlServerConnection, _ RequestHandlerContext) (response *frame.Frame) {
	if register, ok := request.Body.Message.(*message.Register); ok {
		log.Debug().Msgf("%v: [register handler]: received REGISTER: %v", conn, register.EventTypes)
		response = frame.NewFrame(request.Header.Version, request.Header.StreamId, &message.Ready{})
	}
	return
}

A RequestHandler to handle USE requests. This handler intercepts REGISTER requests and replies with READY.

func NewCompositeRequestHandler

func NewCompositeRequestHandler(handlers ...RequestHandler) RequestHandler

Creates a new composite RequestHandler combining many child handlers together. The child handlers are invoked in order. Registering a composite handler is functionally equivalent to the individual registration of its child handlers, but allows to ensure that handlers that are supposed to work together are all registered and invoked in proper order.

func NewDriverConnectionInitializationHandler

func NewDriverConnectionInitializationHandler(cluster string, datacenter string, onKeyspaceSet func(string)) RequestHandler

A RequestHandler to fully initialize a connection initiated by a DataStax driver. This handler intercepts all the requests that a driver typically issues when opening a new connection and / or probing for its liveness: - Heartbeats - Handshake, including with plain-text authentication if configured - USE queries - REGISTER requests - Queries targeting system.local and system.peers tables

func NewPreparedStatementHandler

func NewPreparedStatementHandler(
	query string,
	variables *message.VariablesMetadata,
	columns *message.RowsMetadata,
	rows func(options *message.QueryOptions) message.RowSet,
) RequestHandler

A RequestHandler to handle PREPARE and EXECUTE requests for the given query string, effectively emulating the behavior of a statement being prepared, then executed. When a PREPARE request targets the query string, it is intercepted and the handler replies with a PreparedResult. The prepared id is simply the query string bytes, and the metadata is the metadata provided to the function. When an EXECUTE request targets the same query string: - If the request was previously prepared, returns a Rows RESULT response; the actual data returned is produced by invoking the provided rows factory function, which allows the result to be customized according to the bound variables provided with the EXECUTE message. - If the request was not prepared, returns an Unprepared ERROR response.

func NewSetKeyspaceHandler

func NewSetKeyspaceHandler(onKeyspaceSet func(string)) RequestHandler

A RequestHandler to handle USE queries. This handler intercepts QUERY requests with a USE statement and replies with a message.SetKeyspaceResult. The provided callback function will be invoked with the new keyspace.

func NewSystemTablesHandler

func NewSystemTablesHandler(cluster string, datacenter string) RequestHandler

Creates a new RequestHandler to handle queries to system tables (system.local and system.peers).

type RequestHandlerContext

type RequestHandlerContext interface {
	// PutAttribute puts the given value in this context under the given key name.
	// Will override any previously-stored value under that key.
	PutAttribute(name string, value interface{})
	// GetAttribute retrieves the value stored in this context under the given key name.
	// Returns nil if nil is stored, or if the key does not exist.
	GetAttribute(name string) interface{}
}

RequestHandlerContext is the RequestHandler invocation context. Each invocation of a given RequestHandler will be passed one instance of a RequestHandlerContext, that remains the same between invocations. This allows handlers to become stateful if required.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL