scylla

package module
v0.0.0-...-ce81923 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

ScyllaDB Go Driver

This is a high-performance client-side driver for ScyllaDB written in pure Go.

Note: this driver is currently in alpha. Bug reports and pull requests are welcome!

Installation

go get github.com/scylladb/scylla-go-driver

Examples

ctx := context.Background()

cfg := scylla.DefaultSessionConfig("exampleks", "192.168.100.100")
session, err := scylla.NewSession(ctx, cfg)
if err != nil {
	return err
}
defer session.Close()

requestCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

q, err := session.Prepare(requestCtx, "SELECT id, name FROM exampleks.names WHERE id=?")
if err != nil {
	return err
}

res, err := q.BindInt64(0, 64).Exec(requestCtx)

Please see the full example program for more information.

All examples are available in the examples directory

Features and Roadmap

The driver supports the following:

  • Session and query context support
  • Token-aware routing
  • Shard-aware routing (specific to ScyllaDB)
  • Prepared statements
  • Query paging
  • CQL binary protocol version 4
  • Configurable load balancing policies
  • Configurable retry policies
  • TLS support
  • Authentication support
  • Compression (LZ4 and Snappy algorithms)

Ongoing efforts:

  • Gocql drop-in replacement
  • More tests
  • More benchmarks

Missing features:

  • Cassandra support
  • Batch statements
  • Full CQL Events Support
  • Support for all CQL types (Generic binding)
  • Speculative Execution
  • CQL tracing
  • Automatic node status updating
  • Caching prepared statements
  • Non-default keyspace token-aware query routing

Supported Go Versions

Our driver's minimum supported Go version is 1.18

Reference Documentation

License

This project is licensed under Apache License, Version 2.0

Documentation

Overview

Package scylla implements an efficient shard-aware driver for ScyllaDB.

Connecting to the cluster

Pass a keyspace and a list of initial node IP addresses to DefaultSessionConfig to create a new cluster configuration:

cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3")

Port can be specified as part of the address, the above is equivalent to:

cfg := scylla.DefaultSessionConfig("192.168.1.1:9042", "192.168.1.2:9042", "192.168.1.3:9042")

It is recommended to use the value set in the Scylla config for broadcast_address or listen_address, an IP address not a domain name. This is because events from Scylla will use the configured IP address, which is used to index connected hosts.

Then you can customize more options (see SessionConfig):

cfg.Keyspace = "example"
cfg.Consistency = scylla.QUORUM

When ready, create a session from the configuration and context.Context, once the context is done session will close automatically, stopping requests from being sent and new connections from being made.

Don't forget to Close the session once you are done with it and not sure context will be done:

session, err := scylla.CreateSession(context.Background(), cfg)
if err != nil {
	return err
}
defer session.Close()

Authentication

CQL protocol uses a SASL-based authentication mechanism and so consists of an exchange of server challenges and client response pairs. The details of the exchanged messages depend on the authenticator used.

Currently the driver supports only default password authenticator which can be used like this:

cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3")
cfg.Username = "user"
cfg.Password = "password"
session, err := scylla.CreateSession(context.Background(), cfg)
if err != nil {
	return err
}
defer session.Close()

Transport layer security

It is possible to secure traffic between the client and server with TLS, to do so just pass your tls.Config to session config.

For example:

cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3")
cfg.TLSConfig = &tls.Config{
	...
}
session, err := scylla.CreateSession(context.Background(), cfg)
if err != nil {
	return err
}
defer session.Close()

Data-center awareness and query routing

The driver by default will route prepared queries to nodes that hold data replicas based on partition key, and non-prepared queries in a round-robin fashion.

To route queries to local DC first, use TokenAwareDCAwarePolicy. For example, if the datacenter you want to primarily connect is called dc1 (as configured in the database):

cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3")
cfg.HostSelectionPolicy = NewTokenAwareDCAwarePolicy("dc1")

The driver can only use token-aware routing for queries where all partition key columns are query parameters. For example, instead of

session.Query("select value from mytable where pk1 = 'abc' AND pk2 = ?")

use

session.Query("select value from mytable where pk1 = ? AND pk2 = ?")

Executing queries

Create queries with Session.Query. Query values can be reused between different but must not be modified during executions of the query.

To execute a query use Query.Exec:

q := session.Query(`INSERT INTO tweet (timeline, id, text) VALUES ("me", ?, "hello world")`,
_, err := q.BindInt64(0, 2022).Exec(ctx)

Result rows can be read like this

q := session.Query("SELECT name FROM names WHERE pk=?")
result, err := q.BindInt64(0, 2022).Exec(ctx)
fmt.Println(len(result.Rows))
fmt.Println(result.Rows[0][0].AsText())

See Example for complete example.

Prepared statements

The driver can prepare DML queries (SELECT/INSERT/UPDATE/DELETE/BATCH statements). CQL protocol does not support preparing other query types.

Executing multiple queries concurrently

Session is safe to use from multiple goroutines, so to execute multiple concurrent queries, just execute them from several worker goroutines. Gocql provides synchronously-looking API (as recommended for Go APIs) and the queries are executed asynchronously at the protocol level.

Paging

The driver supports paging of results with automatic prefetch of 1 page, see Query.PageSize and Query.Iter.

It is also possible to control the paging manually with Query.PageState. Manual paging is useful if you want to store the page state externally, for example in a URL to allow users browse pages in a result. You might want to sign/encrypt the paging state when exposing it externally since it contains data from primary keys.

Paging state is specific to the CQL protocol version and the exact query used. It is meant as opaque state that should not be modified. If you send paging state from different query or protocol version, then the behaviour is not defined (you might get unexpected results or an error from the server). For example, do not send paging state returned by node using protocol version 3 to a node using protocol version 4. Also, when using protocol version 4, paging state between Cassandra 2.2 and 3.0 is incompatible (https://issues.apache.org/jira/browse/CASSANDRA-10880).

The driver does not check whether the paging state is from the same protocol version/statement. You might want to validate yourself as this could be a problem if you store paging state externally. For example, if you store paging state in a URL, the URLs might become broken when you upgrade your cluster.

Call Query.PageState(nil) to fetch just the first page of the query results. Pass the page state returned in Result.PageState by Query.Exec to Query.PageState of a subsequent query to get the next page. If the length of slice in Result.PageState is zero, there are no more pages available (or an error occurred).

Using too low values of PageSize will negatively affect performance, a value below 100 is probably too low. While Scylla returns exactly PageSize items (except for last page) in a page currently, the protocol authors explicitly reserved the right to return smaller or larger amount of items in a page for performance reasons, so don't rely on the page having the exact count of items.

Retries

Queries can be marked as idempotent. Marking the query as idempotent tells the driver that the query can be executed multiple times without affecting its result. Non-idempotent queries are not eligible for retrying nor speculative execution.

Idempotent queries are retried in case of errors based on the configured RetryPolicy.

Custom policies

If you need to use a custom Retry or HostSelectionPolicy please see the transport package documentation.

Index

Constants

View Source
const (
	// ErrCodeServer indicates unexpected error on server-side.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1051-L1052
	ErrCodeServer frame.ErrorCode = 0x0000

	// ErrCodeProtocol indicates a protocol violation by some client message.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1053-L1055
	ErrCodeProtocol frame.ErrorCode = 0x000A

	// ErrCodeCredentials indicates missing required authentication.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1056-L1059
	ErrCodeCredentials frame.ErrorCode = 0x0100

	// ErrCodeUnavailable indicates unavailable error.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1060-L1070
	ErrCodeUnavailable frame.ErrorCode = 0x1000

	// ErrCodeOverloaded returned in case of request on overloaded node coordinator.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1071-L1072
	ErrCodeOverloaded frame.ErrorCode = 0x1001

	// ErrCodeBootstrapping returned from the coordinator node in bootstrapping phase.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1073-L1074
	ErrCodeBootstrapping frame.ErrorCode = 0x1002

	// ErrCodeTruncate indicates truncation exception.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1075
	ErrCodeTruncate frame.ErrorCode = 0x1003

	// ErrCodeWriteTimeout returned in case of timeout during the request write.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1076-L1107
	ErrCodeWriteTimeout frame.ErrorCode = 0x1100

	// ErrCodeReadTimeout returned in case of timeout during the request read.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1108-L1124
	ErrCodeReadTimeout frame.ErrorCode = 0x1200

	// ErrCodeReadFailure indicates request read error which is not covered by ErrCodeReadTimeout.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1125-L1139
	ErrCodeReadFailure frame.ErrorCode = 0x1300

	// ErrCodeFunctionFailure indicates an error in user-defined function.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1140-L1146
	ErrCodeFunctionFailure frame.ErrorCode = 0x1400

	// ErrCodeWriteFailure indicates request write error which is not covered by ErrCodeWriteTimeout.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1147-L1180
	ErrCodeWriteFailure frame.ErrorCode = 0x1500

	// ErrCodeSyntax indicates the syntax error in the query.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1182
	ErrCodeSyntax frame.ErrorCode = 0x2000

	// ErrCodeUnauthorized indicates access rights violation by user on performed operation.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1183-L1184
	ErrCodeUnauthorized frame.ErrorCode = 0x2100

	// ErrCodeInvalid indicates invalid query error which is not covered by ErrCodeSyntax.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1185
	ErrCodeInvalid frame.ErrorCode = 0x2200

	// ErrCodeConfig indicates the configuration error.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1186
	ErrCodeConfig frame.ErrorCode = 0x2300

	// ErrCodeAlreadyExists is returned for the requests creating the existing keyspace/table.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1187-L1196
	ErrCodeAlreadyExists frame.ErrorCode = 0x2400

	// ErrCodeUnprepared returned from the host for prepared statement which is unknown.
	// See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1197-L1200
	ErrCodeUnprepared frame.ErrorCode = 0x2500
)

See CQL Binary Protocol v4, section 9 for more details. https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1046-L1200

Variables

View Source
var (
	ErrClosedIter = fmt.Errorf("iter is closed")
	ErrNoMoreRows = fmt.Errorf("no more rows left")
)
View Source
var (
	ErrNoHosts   = fmt.Errorf("error in session config: no hosts given")
	ErrEventType = fmt.Errorf("error in session config: invalid event\npossible events:\n" +
		"TopologyChange EventType = \"TOPOLOGY_CHANGE\"\n" +
		"StatusChange   EventType = \"STATUS_CHANGE\"\n" +
		"SchemaChange   EventType = \"SCHEMA_CHANGE\"")
	ErrConsistency = fmt.Errorf("error in session config: invalid consistency\npossible consistencies are:\n" +
		"ANY         Consistency = 0x0000\n" +
		"ONE         Consistency = 0x0001\n" +
		"TWO         Consistency = 0x0002\n" +
		"THREE       Consistency = 0x0003\n" +
		"QUORUM      Consistency = 0x0004\n" +
		"ALL         Consistency = 0x0005\n" +
		"LOCALQUORUM Consistency = 0x0006\n" +
		"EACHQUORUM  Consistency = 0x0007\n" +
		"SERIAL      Consistency = 0x0008\n" +
		"LOCALSERIAL Consistency = 0x0009\n" +
		"LOCALONE    Consistency = 0x000A")
	ErrNoConnection = fmt.Errorf("no connection to execute the query on")
)
View Source
var ErrNoQueryResults = fmt.Errorf("no query results to be fetched")

Functions

This section is empty.

Types

type Compression

type Compression = frame.Compression
var (
	Snappy Compression = frame.Snappy
	Lz4    Compression = frame.Lz4
)

type Consistency

type Consistency = uint16
const (
	ANY         Consistency = 0x0000
	ONE         Consistency = 0x0001
	TWO         Consistency = 0x0002
	THREE       Consistency = 0x0003
	QUORUM      Consistency = 0x0004
	ALL         Consistency = 0x0005
	LOCALQUORUM Consistency = 0x0006
	EACHQUORUM  Consistency = 0x0007
	SERIAL      Consistency = 0x0008
	LOCALSERIAL Consistency = 0x0009
	LOCALONE    Consistency = 0x000A
)

type EventType

type EventType = string
const (
	TopologyChange EventType = "TOPOLOGY_CHANGE"
	StatusChange   EventType = "STATUS_CHANGE"
	SchemaChange   EventType = "SCHEMA_CHANGE"
)

type Iter

type Iter struct {
	// contains filtered or unexported fields
}

Iter is used to execute paged queries, prefetching the next page. Close should be called when it goes out of use to release resources assigned to it.

func (*Iter) Close

func (it *Iter) Close()

Close releases the resources assigned to the Iter, performing queries on closed iter will result in an error.

func (*Iter) Next

func (it *Iter) Next() (frame.Row, error)

Next returns the next row of the query result, potentially executing the query.

type Query

type Query struct {
	// contains filtered or unexported fields
}

func (*Query) AsyncExec

func (q *Query) AsyncExec(ctx context.Context)

func (*Query) BindInt64

func (q *Query) BindInt64(pos int, v int64) *Query

func (*Query) Compression

func (q *Query) Compression() bool

func (*Query) Exec

func (q *Query) Exec(ctx context.Context) (Result, error)

func (*Query) Fetch

func (q *Query) Fetch() (Result, error)

Fetch returns results in the same order they were queried.

func (*Query) Idempotent

func (q *Query) Idempotent() bool

func (*Query) Iter

func (q *Query) Iter(ctx context.Context) Iter

Iter returns an iterator that can be used to read paged queries row by row.

func (*Query) PageSize

func (q *Query) PageSize() int32

func (*Query) SetCompression

func (q *Query) SetCompression(v bool)

func (*Query) SetIdempotent

func (q *Query) SetIdempotent(v bool)

func (*Query) SetPageSize

func (q *Query) SetPageSize(v int32)

type Result

type Result transport.QueryResult

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error)

func (*Session) AwaitSchemaAgreement

func (s *Session) AwaitSchemaAgreement(ctx context.Context, timeout time.Duration) error

AwaitSchemaAgreement will await for schema agreement checking it once every SchemaAgreementInterval specified in SessionConfig, if schema is not in agreement after timeout duration, or after context is done, it will return an error.

func (*Session) CheckSchemaAgreement

func (s *Session) CheckSchemaAgreement(ctx context.Context) (bool, error)

func (*Session) Close

func (s *Session) Close()

func (*Session) NewTokenAwareDCAwarePolicy

func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelectionPolicy

func (*Session) NewTokenAwarePolicy

func (s *Session) NewTokenAwarePolicy() transport.HostSelectionPolicy

func (*Session) Prepare

func (s *Session) Prepare(ctx context.Context, content string) (Query, error)

Prepare prepares a DML query on all the cluster nodes, it returns successfully if the prepare succeeds on at least one node.

func (*Session) Query

func (s *Session) Query(content string) Query

type SessionConfig

type SessionConfig struct {
	Hosts  []string
	Events []EventType
	// Default: DefaultRetryPolicy.
	HostSelectionPolicy transport.HostSelectionPolicy
	// Default: TokenAwarePolicy.
	RetryPolicy transport.RetryPolicy

	// Default: 200 milliseconds.
	SchemaAgreementInterval time.Duration
	// Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement.
	// If less or equal to 0, the automatic schema agreement is disabled.
	// Default: 60 seconds.
	AutoAwaitSchemaAgreementTimeout time.Duration

	transport.ConnConfig
}

func DefaultSessionConfig

func DefaultSessionConfig(keyspace string, hosts ...string) SessionConfig

func (SessionConfig) Clone

func (cfg SessionConfig) Clone() SessionConfig

func (*SessionConfig) Validate

func (cfg *SessionConfig) Validate() error

Directories

Path Synopsis
experiments
Package transport implements networking and routing for the Scylla Go driver.
Package transport implements networking and routing for the Scylla Go driver.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL