cassandra

package module
v0.0.0-...-4f4c708 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

Conduit Connector for Cassandra

Conduit destination connector for Cassandra.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit tests. Run make test-integration to run the integration tests.

The Docker compose file at test/docker-compose.yml can be used to run the required resource locally.

Destination

This destination connector pushes data from upstream resources to Cassandra via Conduit.

It parses the record received into a CQL (Cassandra Query Language) and executes that CQL into Cassandra using gocql. If the record has the operation create or snapshot then it's parsed into an INSERT query, if the operation is update then it's parsed into an UPDATE query, and DELETE query for delete operation.

Make sure that the destination table that the connector will write the records to, has the same schema as the payload for the records received. so if the payload looks like this:

{
  "id": 1,
  "name": "john",
  "full-time": true,
  "salary": 1000.1,
  "age": 25
}

then you should have a Cassandra table that looks like:

CREATE TABLE table_name ( id int NOT NULL, name varchar(255), full_time bool, salary double, age int, PRIMARY KEY (id));
Configuration
name description required default value
nodes Comma separated list of Cassandra nodes' addresses (at least one), ex: 127.0.0.1:9042,127.0.0.2:8080. true
keyspace The keyspace name that has the table (similar to a database in a relational database system). true
table The table name to write data into. true
auth.mechanism Authentication mechanism used by Cassandra, use basic for password auth, and none if auth is off. false none
auth.basic.username Username, required only if basic auth mechanism is used. false
auth.basic.password Password, required only if basic auth mechanism is used. false
Table name

If a record contains a cassandra.table property in its metadata it will be inserted in that table, otherwise it will fall back to use the table configured in the connector. Thus, a Destination can support multiple tables in a single connector.

Example pipeline configuration file

   pipelines:
   cassandra-pipeline:
     status: running
     name: example-pipeline
     description: write data into Cassandra.
     connectors:
       postgres-con:
         type: source
         plugin: builtin:postgres # you can use any other source connector, this is just an example.
         name: postgres-source
         settings:
           url: postgresql://username:pass@127.0.0.1:5432/mydb #postgresql://{username}:{password}@{host}:{port}/{database}
           table: employees
           orderingColumn: id
       cassandra-con:
         type: destination
         plugin: standalone:cassandra
         name: cassandra-dest
         settings:
           nodes: 127.0.0.1:9042 #{host}:{port}
           keyspace: company
           table: employees
     processors:
       proc1:
         type: parsejsonpayload #postgres creates raw data payload, but json formatted, so this processor will convert the raw data into structured. 

Build your Cassandra connector, then place the connector binary in the connectors directory relative to Conduit, check connectors for more details. Also, check Pipeline Configuration Files Docs for more details about how to run this pipeline.

Known Issues & Limitations

  • Supports only structured data format for the key and payload. If your data is raw and JSON formatted, then you can use conduit's builtin processor parsejsonpayload or parsejsonkey to parse your json data into structured data.

Planned work

  • Support raw data formats for keys and payloads.

Documentation

Index

Constants

View Source
const (
	AuthMechanismBasic = "basic"
	AuthMechanismNone  = "none"
)

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        nil,
	NewDestination:   NewDestination,
}

Connector combines all constructors for each plugin in one struct.

Functions

func NewDestination

func NewDestination() sdk.Destination

func Specification

func Specification() sdk.Specification

Specification returns the connector's specification.

Types

type Destination

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Configure

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error

func (*Destination) Open

func (d *Destination) Open(ctx context.Context) error

func (*Destination) Parameters

func (d *Destination) Parameters() map[string]sdk.Parameter

func (*Destination) Teardown

func (d *Destination) Teardown(context.Context) error

func (*Destination) Write

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error)

type DestinationConfig

type DestinationConfig struct {
	// The keyspace name that has the table (similar to a database in a relational database system).
	Keyspace string `json:"keyspace" validate:"required"`
	// The table name.
	Table string `json:"table" validate:"required"`
	// Comma separated list of Cassandra nodes' addresses (at least one), ex: 127.0.0.1:9042,127.0.0.2:8080
	Nodes []string `json:"nodes" validate:"required"`
	// Authentication mechanism used by Cassandra.
	AuthMechanism string `json:"auth.mechanism" validate:"inclusion=none|basic" default:"none"`
	// Username, only if basic auth is used.
	AuthUsername string `json:"auth.basic.username"`
	// Password, only if basic auth is used.
	AuthPassword string `json:"auth.basic.password"`
}

func (DestinationConfig) Parameters

func (DestinationConfig) Parameters() map[string]sdk.Parameter

type QueryBuilder

type QueryBuilder struct{}

QueryBuilder builds a CQL query statement and its values from a record.

func (*QueryBuilder) BuildDeleteQuery

func (q *QueryBuilder) BuildDeleteQuery(rec sdk.Record, table string) (string, []interface{})

BuildDeleteQuery takes a record, and returns the delete query statement and values representing that record.

func (*QueryBuilder) BuildInsertQuery

func (q *QueryBuilder) BuildInsertQuery(rec sdk.Record, table string) (string, []interface{})

BuildInsertQuery takes a record, and returns the insert query statement and values representing that record.

func (*QueryBuilder) BuildUpdateQuery

func (q *QueryBuilder) BuildUpdateQuery(rec sdk.Record, table string) (string, []interface{})

BuildUpdateQuery takes a record, and returns the update query statement and values representing that record.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL