frunk

package
v0.0.0-...-d4c7a2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2019 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

package frunk provides a distributed SQLite instance.

Distributed consensus is provided via the Raft algorithm.

Index

Constants

View Source
const (
	SequenceRangeSize   = 1000
	SequencePartitions  = 5
	SequencePreretrieve = 50
)

Variables

View Source
var (
	// ErrNotLeader is returned when a node attempts to execute a leader-only
	// operation.
	ErrNotLeader = errors.New("not leader")

	// ErrOpenTimeout is returned when the Store does not apply its initial
	// logs within the specified time.
	ErrOpenTimeout = errors.New("timeout waiting for initial logs application")

	// ErrInvalidBackupFormat is returned when the requested backup format
	// is not valid.
	ErrInvalidBackupFormat = errors.New("invalid backup format")

	// ErrTransactionActive is returned when an operation is blocked by an
	// active transaction.
	ErrTransactionActive = errors.New("transaction in progress")

	// ErrDefaultConnection is returned when an attempt is made to delete the
	// default connection.
	ErrDefaultConnection = errors.New("cannot delete default connection")

	// ErrConnectionDoesNotExist is returned when an operation is attempted
	// on a non-existent connection. This can happen if the connection
	// was previously open but is now closed.
	ErrConnectionDoesNotExist = errors.New("connection does not exist")

	// ErrStoreInvalidState is returned when a Store is in an invalid
	// state for the requested operation.
	ErrStoreInvalidState = errors.New("store not in valid state")
)

Functions

func JoinAllowed

func JoinAllowed(raftDir string) (bool, error)

JoinAllowed returns whether the config files within raftDir indicate that the node can join a cluster.

func NumPeers

func NumPeers(raftDir string) (int, error)

NumPeers returns the number of peers indicated by the config files within raftDir.

This code makes assumptions about how the Raft module works.

Types

type BackupFormat

type BackupFormat int

BackupFormat represents the backup formats supported by the Store.

const (
	// BackupSQL is dump of the database in SQL text format.
	BackupSQL BackupFormat = iota

	// BackupBinary is a copy of the SQLite database file.
	BackupBinary
)

type ClusterState

type ClusterState int

ClusterState defines the possible Raft states the current node can be in

const (
	Leader ClusterState = iota
	Follower
	Candidate
	Shutdown
	Unknown
)

Represents the Raft cluster states

type Connection

type Connection struct {
	ID uint64 `json:"id,omitempty"` // Connection ID, used as a handle by clients.

	CreatedAt   time.Time     `json:"created_at,omitempty"`
	LastUsedAt  time.Time     `json:"last_used_at,omitempty"`
	IdleTimeout time.Duration `json:"idle_timeout,omitempty"`
	TxTimeout   time.Duration `json:"tx_timeout,omitempty"`

	TxStartedAt time.Time `json:"tx_started_at,omitempty"`
	// contains filtered or unexported fields
}

Connection is a connection to the database.

func NewConnection

func NewConnection(c *sdb.Conn, s *Store, id uint64, it, tt time.Duration) *Connection

NewConnection returns a connection to the database.

func (*Connection) AbortTransaction

func (c *Connection) AbortTransaction() error

AbortTransaction aborts -- rolls back -- any active transaction. Calling code should know exactly what it is doing if it decides to call this function. It can be used to clean up any dangling state that may result from certain error scenarios.

func (*Connection) Close

func (c *Connection) Close() error

Close closes the connection via consensus.

func (*Connection) Execute

func (c *Connection) Execute(ex *ExecuteRequest) (*ExecuteResponse, error)

Execute executes queries that return no rows, but do modify the database.

func (*Connection) ExecuteOrAbort

func (c *Connection) ExecuteOrAbort(ex *ExecuteRequest) (resp *ExecuteResponse, retErr error)

ExecuteOrAbort executes the requests, but aborts any active transaction on the underlying database in the case of any error.

func (*Connection) IdleTimedOut

func (c *Connection) IdleTimedOut() bool

IdleTimedOut returns if the connection has not been active in the idle time.

func (*Connection) Query

func (c *Connection) Query(qr *QueryRequest) (*QueryResponse, error)

Query executes queries that return rows, and do not modify the database.

func (*Connection) Restore

func (c *Connection) Restore(dbConn *sdb.Conn, s *Store)

Restore prepares a partially ready connection.

func (*Connection) SetLastUsedNow

func (c *Connection) SetLastUsedNow()

SetLastUsedNow marks the connection as being used now.

func (*Connection) Stats

func (c *Connection) Stats() (interface{}, error)

Stats returns the status of the connection.

func (*Connection) String

func (c *Connection) String() string

String implements the Stringer interface on the Connection.

func (*Connection) TransactionActive

func (c *Connection) TransactionActive() bool

TransactionActive returns whether a transaction is active on the connection.

func (*Connection) TxTimedOut

func (c *Connection) TxTimedOut() bool

TxTimedOut returns if the transaction has been open, without activity in transaction-idle time.

type ConnectionOptions

type ConnectionOptions struct {
	IdleTimeout time.Duration
	TxTimeout   time.Duration
}

ConnectionOptions controls connection behaviour.

type ConsistencyLevel

type ConsistencyLevel int

ConsistencyLevel represents the available read consistency levels.

const (
	None ConsistencyLevel = iota
	Weak
	Strong
)

Represents the available consistency levels.

type DBConfig

type DBConfig struct {
	DSN    string // Any custom DSN
	Memory bool   // Whether the database is in-memory only.
}

DBConfig represents the configuration of the underlying SQLite database.

func NewDBConfig

func NewDBConfig(dsn string, memory bool) *DBConfig

NewDBConfig returns a new DB config instance.

type ExecuteRequest

type ExecuteRequest struct {
	Queries []string
	Timings bool
	Atomic  bool
}

ExecuteRequest represents a query that returns now rows, but does modify the database.

type ExecuteResponse

type ExecuteResponse struct {
	Results []*sdb.Result
	Time    float64
	Raft    RaftResponse
}

ExecuteResponse encapsulates a response to an execute.

type Listener

type Listener interface {
	net.Listener
	Dial(address string, timeout time.Duration) (net.Conn, error)
}

Listener is the interface Raft-compatible network layers should implement.

type Peer

type Peer struct {
	ID   string
	Addr string
}

type QueryRequest

type QueryRequest struct {
	Queries []string
	Timings bool
	Atomic  bool
	Lvl     ConsistencyLevel
}

QueryRequest represents a query that returns rows, and does not modify the database.

type QueryResponse

type QueryResponse struct {
	Rows []*sdb.Rows
	Time float64
	Raft *RaftResponse
}

QueryResponse encapsulates a response to a query.

type RaftResponse

type RaftResponse struct {
	Index  uint64 `json:"index,omitempty"`
	NodeID string `json:"node_id,omitempty"`
}

RaftResponse is the Raft metadata that will be included with responses, if the associated request modified the Raft log.

type SequenceChunk

type SequenceChunk struct {
	Store
	// contains filtered or unexported fields
}

func (*SequenceChunk) GetSequenceIndex

func (sequence *SequenceChunk) GetSequenceIndex() uint64

func (*SequenceChunk) Next

func (sequence *SequenceChunk) Next() (uint64, error)

type SequenceChunkResponse

type SequenceChunkResponse struct {
	SequenceName string
	Start        uint64
	End          uint64
	Offset       uint64
	Count        uint64
}

type Server

type Server struct {
	ID   string `json:"id,omitempty"`
	Addr string `json:"addr,omitempty"`
}

Server represents another node in the cluster.

type Servers

type Servers []*Server

Servers is a set of Servers.

func (Servers) Len

func (s Servers) Len() int

func (Servers) Less

func (s Servers) Less(i, j int) bool

func (Servers) Swap

func (s Servers) Swap(i, j int)

type Store

type Store struct {
	SnapshotThreshold uint64
	SnapshotInterval  time.Duration
	HeartbeatTimeout  time.Duration
	ApplyTimeout      time.Duration
	// contains filtered or unexported fields
}

Store is a SQLite database, where all changes are made via Raft consensus.

func New

func New(ln Listener, c *StoreConfig) *Store

New returns a new Store.

func (*Store) Addr

func (s *Store) Addr() string

Addr returns the address of the store.

func (*Store) Apply

func (s *Store) Apply(l *raft.Log) interface{}

Apply applies a Raft log entry to the database.

func (*Store) Backup

func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error

Backup writes a snapshot of the underlying database to dst

If leader is true, this operation is performed with a read consistency level equivalent to "weak". Otherwise no guarantees are made about the read consistency level.

func (*Store) Close

func (s *Store) Close(wait bool) error

Close closes the store. If wait is true, waits for a graceful shutdown. Once closed, a Store may not be re-opened.

func (*Store) Connect

func (s *Store) Connect(opt *ConnectionOptions) (*Connection, error)

Connect returns a new connection to the database. Changes made to the database through this connection are applied via the Raft consensus system. The Store must have been opened first. Must be called on the leader or an error will we returned.

Any connection returned by this call are READ_COMMITTED isolated from all other connections, including the connection built-in to the Store itself.

func (*Store) Connection

func (s *Store) Connection(id uint64) (*Connection, bool)

Connection returns the connection for the given ID.

func (*Store) DeregisterObserver

func (s *Store) DeregisterObserver(o *raft.Observer)

DeregisterObserver deregisters an observer of Raft events

func (*Store) Exec

func (s *Store) Exec(query string) (*ExecuteResponse, error)

Exec executes a write-only query against the database.

func (*Store) ExecEx

func (s *Store) ExecEx(query string, atomic bool) (*ExecuteResponse, error)

ExecEx executes a write-only query against the database. With the option of being atomic or not.

func (*Store) ExecuteEx

func (s *Store) ExecuteEx(ex *ExecuteRequest) (*ExecuteResponse, error)

ExecuteEx executes queries that return no rows, but do modify the database. Changes made to the database through this call are applied via the Raft consensus system. The Store must have been opened first. Must be called on the leader or an error will we returned. The changes are made using the database connection built-in to the Store.

func (*Store) ExecuteOrAbort

func (s *Store) ExecuteOrAbort(ex *ExecuteRequest) (resp *ExecuteResponse, retErr error)

ExecuteOrAbort executes the requests, but aborts any active transaction on the underlying database in the case of any error. Any changes are made using the database connection built-in to the Store.

func (*Store) Get

func (s *Store) Get(key []byte) ([]byte, error)

Get returns a byte array for a given key from BoltDB.

func (*Store) GetSequenceChunk

func (s *Store) GetSequenceChunk(sequenceName string) (*SequenceChunkResponse, error)

func (*Store) ID

func (s *Store) ID() string

ID returns the Raft ID of the store.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

IsLeader is used to determine if the current node is cluster leader

func (*Store) Join

func (s *Store) Join(id, addr string, metadata map[string]string) error

Join joins a node, identified by id and located at addr, to this store. The node must be ready to respond to Raft communications at that address.

func (*Store) LeaderAddr

func (s *Store) LeaderAddr() string

LeaderAddr returns the Raft address of the current leader. Returns a blank string if there is no leader.

func (*Store) LeaderID

func (s *Store) LeaderID() (string, error)

LeaderID returns the node ID of the Raft leader. Returns a blank string if there is no leader, or an error.

func (*Store) Metadata

func (s *Store) Metadata(id, key string) string

Metadata returns the value for a given key, for a given node ID.

func (*Store) NextSequenceValueById

func (s *Store) NextSequenceValueById(sequenceName string) (uint64, error)

func (*Store) Nodes

func (s *Store) Nodes() ([]*Server, error)

Nodes returns the slice of nodes in the cluster, sorted by ID ascending.

func (*Store) Open

func (s *Store) Open(enableSingle bool, peers ...raft.Server) error

Open opens the store. If enableSingle is set, and there are no existing peers, then this node becomes the first node, and therefore leader, of the cluster.

func (*Store) Path

func (s *Store) Path() string

Path returns the path to the store's storage directory.

func (*Store) Query

func (s *Store) Query(query string) (*QueryResponse, error)

Query allows read-only queries to be issued to the database.

func (*Store) QueryEx

func (s *Store) QueryEx(qr *QueryRequest) (*QueryResponse, error)

QueryEx executes queries that return rows, and do not modify the database. The queries are made using the database connection built-in to the Store. Depending on the read consistency requested, it may or may not need to be called on the leader.

func (*Store) RegisterObserver

func (s *Store) RegisterObserver(o *raft.Observer)

RegisterObserver registers an observer of Raft events

func (*Store) Remove

func (s *Store) Remove(id string) error

Remove removes a node from the store, specified by ID.

func (*Store) Restore

func (s *Store) Restore(rc io.ReadCloser) error

Restore restores the node to a previous state.

func (*Store) SequenceIndexById

func (s *Store) SequenceIndexById(sequenceName string) (uint64, error)

func (*Store) Set

func (s *Store) Set(key, value []byte) error

Set updates the value of a key-value pair.

func (*Store) SetMetadata

func (s *Store) SetMetadata(md map[string]string) error

SetMetadata adds the metadata md to any existing metadata for this node.

func (*Store) Snapshot

func (s *Store) Snapshot() (raft.FSMSnapshot, error)

Snapshot returns a snapshot of the store. The caller must ensure that no Raft transaction is taking place during this call. Hashicorp Raft guarantees that this function will not be called concurrently with Apply.

func (*Store) State

func (s *Store) State() ClusterState

State returns the current node's Raft state

func (*Store) Stats

func (s *Store) Stats() (map[string]interface{}, error)

Stats returns stats for the store.

func (*Store) WaitForApplied

func (s *Store) WaitForApplied(timeout time.Duration) error

WaitForApplied waits for all Raft log entries to to be applied to the underlying database.

func (*Store) WaitForAppliedIndex

func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error

WaitForAppliedIndex blocks until a given log index has been applied, or the timeout expires.

func (*Store) WaitForLeader

func (s *Store) WaitForLeader(timeout time.Duration) (string, error)

WaitForLeader blocks until a leader is detected, or the timeout expires.

type StoreConfig

type StoreConfig struct {
	DBConf *DBConfig   // The DBConfig object for this Store.
	Dir    string      // The working directory for raft.
	Tn     Transport   // The underlying Transport for raft.
	ID     string      // Node ID.
	Logger *log.Logger // The logger to use to log stuff.
}

StoreConfig represents the configuration of the underlying Store.

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the network service provided to Raft, and wraps a Listener.

func NewTransport

func NewTransport(ln Listener) *Transport

NewTransport returns an initialized Transport.

func (*Transport) Accept

func (t *Transport) Accept() (net.Conn, error)

Accept waits for the next connection.

func (*Transport) Addr

func (t *Transport) Addr() net.Addr

Addr returns the binding address of the transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport

func (*Transport) Dial

func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial creates a new network connection.

type TxStateChange

type TxStateChange struct {
	// contains filtered or unexported fields
}

TxStateChange is a helper that detects when the transaction state on a connection changes.

func NewTxStateChange

func NewTxStateChange(c *Connection) *TxStateChange

NewTxStateChange returns an initialized TxStateChange

func (*TxStateChange) CheckAndSet

func (t *TxStateChange) CheckAndSet()

CheckAndSet sets whether a transaction has begun or ended on the connection since the TxStateChange was created. Once CheckAndSet has been called, this function will panic if called a second time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL