Documentation ¶
Overview ¶
Package scylla implements an efficient shard-aware driver for ScyllaDB.
Connecting to the cluster ¶
Pass a keyspace and a list of initial node IP addresses to DefaultSessionConfig to create a new cluster configuration:
cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3")
Port can be specified as part of the address, the above is equivalent to:
cfg := scylla.DefaultSessionConfig("192.168.1.1:9042", "192.168.1.2:9042", "192.168.1.3:9042")
It is recommended to use the value set in the Scylla config for broadcast_address or listen_address, an IP address not a domain name. This is because events from Scylla will use the configured IP address, which is used to index connected hosts.
Then you can customize more options (see SessionConfig):
cfg.Keyspace = "example" cfg.Consistency = scylla.QUORUM
When ready, create a session from the configuration and context.Context, once the context is done session will close automatically, stopping requests from being sent and new connections from being made.
Don't forget to Close the session once you are done with it and not sure context will be done:
session, err := scylla.CreateSession(context.Background(), cfg) if err != nil { return err } defer session.Close()
Authentication ¶
CQL protocol uses a SASL-based authentication mechanism and so consists of an exchange of server challenges and client response pairs. The details of the exchanged messages depend on the authenticator used.
Currently the driver supports only default password authenticator which can be used like this:
cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3") cfg.Username = "user" cfg.Password = "password" session, err := scylla.CreateSession(context.Background(), cfg) if err != nil { return err } defer session.Close()
Transport layer security ¶
It is possible to secure traffic between the client and server with TLS, to do so just pass your tls.Config to session config.
For example:
cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3") cfg.TLSConfig = &tls.Config{ ... } session, err := scylla.CreateSession(context.Background(), cfg) if err != nil { return err } defer session.Close()
Data-center awareness and query routing ¶
The driver by default will route prepared queries to nodes that hold data replicas based on partition key, and non-prepared queries in a round-robin fashion.
To route queries to local DC first, use TokenAwareDCAwarePolicy. For example, if the datacenter you want to primarily connect is called dc1 (as configured in the database):
cfg := scylla.DefaultSessionConfig("keyspace", "192.168.1.1", "192.168.1.2", "192.168.1.3") cfg.HostSelectionPolicy = NewTokenAwareDCAwarePolicy("dc1")
The driver can only use token-aware routing for queries where all partition key columns are query parameters. For example, instead of
session.Query("select value from mytable where pk1 = 'abc' AND pk2 = ?")
use
session.Query("select value from mytable where pk1 = ? AND pk2 = ?")
Executing queries ¶
Create queries with Session.Query. Query values can be reused between different but must not be modified during executions of the query.
To execute a query use Query.Exec:
q := session.Query(`INSERT INTO tweet (timeline, id, text) VALUES ("me", ?, "hello world")`, _, err := q.BindInt64(0, 2022).Exec(ctx)
Result rows can be read like this
q := session.Query("SELECT name FROM names WHERE pk=?") result, err := q.BindInt64(0, 2022).Exec(ctx) fmt.Println(len(result.Rows)) fmt.Println(result.Rows[0][0].AsText())
See Example for complete example.
Prepared statements ¶
The driver can prepare DML queries (SELECT/INSERT/UPDATE/DELETE/BATCH statements). CQL protocol does not support preparing other query types.
Executing multiple queries concurrently ¶
Session is safe to use from multiple goroutines, so to execute multiple concurrent queries, just execute them from several worker goroutines. Gocql provides synchronously-looking API (as recommended for Go APIs) and the queries are executed asynchronously at the protocol level.
Paging ¶
The driver supports paging of results with automatic prefetch of 1 page, see Query.PageSize and Query.Iter.
It is also possible to control the paging manually with Query.PageState. Manual paging is useful if you want to store the page state externally, for example in a URL to allow users browse pages in a result. You might want to sign/encrypt the paging state when exposing it externally since it contains data from primary keys.
Paging state is specific to the CQL protocol version and the exact query used. It is meant as opaque state that should not be modified. If you send paging state from different query or protocol version, then the behaviour is not defined (you might get unexpected results or an error from the server). For example, do not send paging state returned by node using protocol version 3 to a node using protocol version 4. Also, when using protocol version 4, paging state between Cassandra 2.2 and 3.0 is incompatible (https://issues.apache.org/jira/browse/CASSANDRA-10880).
The driver does not check whether the paging state is from the same protocol version/statement. You might want to validate yourself as this could be a problem if you store paging state externally. For example, if you store paging state in a URL, the URLs might become broken when you upgrade your cluster.
Call Query.PageState(nil) to fetch just the first page of the query results. Pass the page state returned in Result.PageState by Query.Exec to Query.PageState of a subsequent query to get the next page. If the length of slice in Result.PageState is zero, there are no more pages available (or an error occurred).
Using too low values of PageSize will negatively affect performance, a value below 100 is probably too low. While Scylla returns exactly PageSize items (except for last page) in a page currently, the protocol authors explicitly reserved the right to return smaller or larger amount of items in a page for performance reasons, so don't rely on the page having the exact count of items.
Retries ¶
Queries can be marked as idempotent. Marking the query as idempotent tells the driver that the query can be executed multiple times without affecting its result. Non-idempotent queries are not eligible for retrying nor speculative execution.
Idempotent queries are retried in case of errors based on the configured RetryPolicy.
Custom policies ¶
If you need to use a custom Retry or HostSelectionPolicy please see the transport package documentation.
Index ¶
- Constants
- Variables
- type Compression
- type Consistency
- type EventType
- type Iter
- type Query
- func (q *Query) AsyncExec(ctx context.Context)
- func (q *Query) BindInt64(pos int, v int64) *Query
- func (q *Query) Compression() bool
- func (q *Query) Exec(ctx context.Context) (Result, error)
- func (q *Query) Fetch() (Result, error)
- func (q *Query) Idempotent() bool
- func (q *Query) Iter(ctx context.Context) Iter
- func (q *Query) PageSize() int32
- func (q *Query) SetCompression(v bool)
- func (q *Query) SetIdempotent(v bool)
- func (q *Query) SetPageSize(v int32)
- type Result
- type Session
- func (s *Session) AwaitSchemaAgreement(ctx context.Context, timeout time.Duration) error
- func (s *Session) CheckSchemaAgreement(ctx context.Context) (bool, error)
- func (s *Session) Close()
- func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelectionPolicy
- func (s *Session) NewTokenAwarePolicy() transport.HostSelectionPolicy
- func (s *Session) Prepare(ctx context.Context, content string) (Query, error)
- func (s *Session) Query(content string) Query
- type SessionConfig
Constants ¶
const ( // ErrCodeServer indicates unexpected error on server-side. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1051-L1052 ErrCodeServer frame.ErrorCode = 0x0000 // ErrCodeProtocol indicates a protocol violation by some client message. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1053-L1055 ErrCodeProtocol frame.ErrorCode = 0x000A // ErrCodeCredentials indicates missing required authentication. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1056-L1059 ErrCodeCredentials frame.ErrorCode = 0x0100 // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1060-L1070 ErrCodeUnavailable frame.ErrorCode = 0x1000 // ErrCodeOverloaded returned in case of request on overloaded node coordinator. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1071-L1072 ErrCodeOverloaded frame.ErrorCode = 0x1001 // ErrCodeBootstrapping returned from the coordinator node in bootstrapping phase. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1073-L1074 ErrCodeBootstrapping frame.ErrorCode = 0x1002 // ErrCodeTruncate indicates truncation exception. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1075 ErrCodeTruncate frame.ErrorCode = 0x1003 // ErrCodeWriteTimeout returned in case of timeout during the request write. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1076-L1107 ErrCodeWriteTimeout frame.ErrorCode = 0x1100 // ErrCodeReadTimeout returned in case of timeout during the request read. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1108-L1124 ErrCodeReadTimeout frame.ErrorCode = 0x1200 // ErrCodeReadFailure indicates request read error which is not covered by ErrCodeReadTimeout. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1125-L1139 ErrCodeReadFailure frame.ErrorCode = 0x1300 // ErrCodeFunctionFailure indicates an error in user-defined function. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1140-L1146 ErrCodeFunctionFailure frame.ErrorCode = 0x1400 // ErrCodeWriteFailure indicates request write error which is not covered by ErrCodeWriteTimeout. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1147-L1180 ErrCodeWriteFailure frame.ErrorCode = 0x1500 // ErrCodeSyntax indicates the syntax error in the query. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1182 ErrCodeSyntax frame.ErrorCode = 0x2000 // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1183-L1184 ErrCodeUnauthorized frame.ErrorCode = 0x2100 // ErrCodeInvalid indicates invalid query error which is not covered by ErrCodeSyntax. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1185 ErrCodeInvalid frame.ErrorCode = 0x2200 // ErrCodeConfig indicates the configuration error. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1186 ErrCodeConfig frame.ErrorCode = 0x2300 // ErrCodeAlreadyExists is returned for the requests creating the existing keyspace/table. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1187-L1196 ErrCodeAlreadyExists frame.ErrorCode = 0x2400 // ErrCodeUnprepared returned from the host for prepared statement which is unknown. // See https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1197-L1200 ErrCodeUnprepared frame.ErrorCode = 0x2500 )
See CQL Binary Protocol v4, section 9 for more details. https://github.com/apache/cassandra/blob/adcff3f630c0d07d1ba33bf23fcb11a6db1b9af1/doc/native_protocol_v4.spec#L1046-L1200
Variables ¶
var ( ErrClosedIter = fmt.Errorf("iter is closed") ErrNoMoreRows = fmt.Errorf("no more rows left") )
var ( ErrNoHosts = fmt.Errorf("error in session config: no hosts given") ErrEventType = fmt.Errorf("error in session config: invalid event\npossible events:\n" + "TopologyChange EventType = \"TOPOLOGY_CHANGE\"\n" + "StatusChange EventType = \"STATUS_CHANGE\"\n" + "SchemaChange EventType = \"SCHEMA_CHANGE\"") ErrConsistency = fmt.Errorf("error in session config: invalid consistency\npossible consistencies are:\n" + "ANY Consistency = 0x0000\n" + "ONE Consistency = 0x0001\n" + "TWO Consistency = 0x0002\n" + "THREE Consistency = 0x0003\n" + "QUORUM Consistency = 0x0004\n" + "ALL Consistency = 0x0005\n" + "LOCALQUORUM Consistency = 0x0006\n" + "EACHQUORUM Consistency = 0x0007\n" + "SERIAL Consistency = 0x0008\n" + "LOCALSERIAL Consistency = 0x0009\n" + "LOCALONE Consistency = 0x000A") ErrNoConnection = fmt.Errorf("no connection to execute the query on") )
var ErrNoQueryResults = fmt.Errorf("no query results to be fetched")
Functions ¶
This section is empty.
Types ¶
type Compression ¶
type Compression = frame.Compression
var ( Snappy Compression = frame.Snappy Lz4 Compression = frame.Lz4 )
type Consistency ¶
type Consistency = uint16
const ( ANY Consistency = 0x0000 ONE Consistency = 0x0001 TWO Consistency = 0x0002 THREE Consistency = 0x0003 QUORUM Consistency = 0x0004 ALL Consistency = 0x0005 LOCALQUORUM Consistency = 0x0006 EACHQUORUM Consistency = 0x0007 SERIAL Consistency = 0x0008 LOCALSERIAL Consistency = 0x0009 LOCALONE Consistency = 0x000A )
type Iter ¶
type Iter struct {
// contains filtered or unexported fields
}
Iter is used to execute paged queries, prefetching the next page. Close should be called when it goes out of use to release resources assigned to it.
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
func (*Query) Compression ¶
func (*Query) Idempotent ¶
func (*Query) SetCompression ¶
func (*Query) SetIdempotent ¶
func (*Query) SetPageSize ¶
type Result ¶
type Result transport.QueryResult
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
func NewSession ¶
func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error)
func (*Session) AwaitSchemaAgreement ¶
AwaitSchemaAgreement will await for schema agreement checking it once every SchemaAgreementInterval specified in SessionConfig, if schema is not in agreement after timeout duration, or after context is done, it will return an error.
func (*Session) CheckSchemaAgreement ¶
func (*Session) NewTokenAwareDCAwarePolicy ¶
func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelectionPolicy
func (*Session) NewTokenAwarePolicy ¶
func (s *Session) NewTokenAwarePolicy() transport.HostSelectionPolicy
type SessionConfig ¶
type SessionConfig struct { Hosts []string Events []EventType // Default: DefaultRetryPolicy. HostSelectionPolicy transport.HostSelectionPolicy // Default: TokenAwarePolicy. RetryPolicy transport.RetryPolicy // Default: 200 milliseconds. SchemaAgreementInterval time.Duration // Controls the timeout for the automatic wait for schema agreement after sending a schema-altering statement. // If less or equal to 0, the automatic schema agreement is disabled. // Default: 60 seconds. AutoAwaitSchemaAgreementTimeout time.Duration transport.ConnConfig }
func DefaultSessionConfig ¶
func DefaultSessionConfig(keyspace string, hosts ...string) SessionConfig
func (SessionConfig) Clone ¶
func (cfg SessionConfig) Clone() SessionConfig
func (*SessionConfig) Validate ¶
func (cfg *SessionConfig) Validate() error
Directories ¶
Path | Synopsis |
---|---|
experiments
|
|
Package transport implements networking and routing for the Scylla Go driver.
|
Package transport implements networking and routing for the Scylla Go driver. |