Documentation ¶
Index ¶
- Constants
- Variables
- func Uint32(value uint32) []byte
- func Uint64(value uint64) []byte
- type Auth
- type Box
- type BoxOptions
- type Bytes
- type Call
- type Connection
- func (conn *Connection) Close()
- func (conn *Connection) Exec(ctx context.Context, q Query) *Result
- func (conn *Connection) Execute(q Query) ([][]interface{}, error)
- func (conn *Connection) GetPrimaryKeyFields(space interface{}) ([]int, bool)
- func (conn *Connection) IsClosed() (bool, error)
- func (conn *Connection) String() string
- type ConnectionError
- type Connector
- type ContextError
- type Delete
- type Error
- type Eval
- type Greeting
- type Insert
- type IprotoServer
- type Iterator
- type Join
- type OnShutdownCallback
- type OpAdd
- type OpAssign
- type OpBitAND
- type OpBitOR
- type OpBitXOR
- type OpDelete
- type OpInsert
- type OpSplice
- type OpSub
- type Operator
- type Options
- type Packet
- type PacketIterator
- type Ping
- type Query
- type QueryError
- type QueryHandler
- type Replace
- type ReplicaSet
- type Result
- type Select
- type Slave
- func (s *Slave) Attach(out ...chan *Packet) (it PacketIterator, err error)
- func (s *Slave) Close() error
- func (s *Slave) Err() error
- func (s *Slave) HasNext() bool
- func (s *Slave) IsInReplicaSet() bool
- func (s *Slave) Join() (err error)
- func (s *Slave) JoinWithSnap(out ...chan *Packet) (it PacketIterator, err error)
- func (s *Slave) LastSnapVClock() (VectorClock, error)
- func (s *Slave) Next() (*Packet, error)
- func (s *Slave) Packet() *Packet
- func (s *Slave) Subscribe(lsns ...int64) (it PacketIterator, err error)
- type Subscribe
- type Tuple
- type Update
- type Upsert
- type VClock
- type VectorClock
Examples ¶
Constants ¶
const ( OKRequest = 0 SelectRequest = 1 InsertRequest = 2 ReplaceRequest = 3 UpdateRequest = 4 DeleteRequest = 5 CallRequest = 6 AuthRequest = 7 EvalRequest = 8 UpsertRequest = 9 PingRequest = 64 JoinCommand = 65 SubscribeRequest = 66 ErrorFlag = 0x8000 )
const ( KeyCode = 0x00 KeySync = 0x01 KeyInstanceID = 0x02 KeyLSN = 0x03 KeyTimestamp = 0x04 KeySchemaID = 0x05 KeySpaceNo = 0x10 KeyIndexNo = 0x11 KeyLimit = 0x12 KeyOffset = 0x13 KeyIterator = 0x14 KeyKey = 0x20 KeyTuple = 0x21 KeyFunctionName = 0x22 KeyUserName = 0x23 KeyInstanceUUID = 0x24 KeyReplicaSetUUID = 0x25 KeyVClock = 0x26 KeyExpression = 0x27 KeyDefTuple = 0x28 KeyData = 0x30 KeyError = 0x31 )
const ( // https://github.com/fl00r/go-tarantool-1.6/issues/2 IterEq = uint8(0) // key == x ASC order IterReq = uint8(1) // key == x DESC order IterAll = uint8(2) // all tuples IterLt = uint8(3) // key < x IterLe = uint8(4) // key <= x IterGe = uint8(5) // key >= x IterGt = uint8(6) // key > x IterBitsAllSet = uint8(7) // all bits from x are set in key IterBitsAnySet = uint8(8) // at least one x's bit is set IterBitsAllNotSet = uint8(9) // all bits are not set )
const ( OkCode = 0 PacketLengthBytes = 5 SchemaKeyClusterUUID = "cluster" ReplicaSetMaxSize = 32 VClockMax = ReplicaSetMaxSize UUIDStrLength = 36 )
const ( SpaceSchema = 272 SpaceSpace = 280 ViewSpace = 281 SpaceIndex = 288 ViewIndex = 289 SpaceFunc = 296 SpaceUser = 304 SpacePriv = 312 SpaceCluster = 320 SpaceSystemMax = 511 )
const ( ErrUnknown = iota // Unknown error ErrIllegalParams = iota // Illegal parameters, %s ErrMemoryIssue = iota // Failed to allocate %u bytes in %s for %s ErrTupleFound = iota // Duplicate key exists in unique index '%s' in space '%s' ErrTupleNotFound = iota // Tuple doesn't exist in index '%s' in space '%s' ErrUnsupported = iota // %s does not support %s ErrNonmaster = iota // Can't modify data on a replication slave. My master is: %s ErrReadonly = iota // Can't modify data because this server is in read-only mode. ErrInjection = iota // Error injection '%s' ErrCreateSpace = iota // Failed to create space '%s': %s ErrSpaceExists = iota // Space '%s' already exists ErrDropSpace = iota // Can't drop space '%s': %s ErrAlterSpace = iota // Can't modify space '%s': %s ErrIndexType = iota // Unsupported index type supplied for index '%s' in space '%s' ErrModifyIndex = iota // Can't create or modify index '%s' in space '%s': %s ErrLastDrop = iota // Can't drop the primary key in a system space, space '%s' ErrTupleFormatLimit = iota // Tuple format limit reached: %u ErrDropPrimaryKey = iota // Can't drop primary key in space '%s' while secondary keys exist ErrKeyPartType = iota // Supplied key type of part %u does not match index part type: expected %s ErrExactMatch = iota // Invalid key part count in an exact match (expected %u, got %u) ErrInvalidMsgpack = iota // Invalid MsgPack - %s ErrProcRet = iota // msgpack.encode: can not encode Lua type '%s' ErrTupleNotArray = iota // Tuple/Key must be MsgPack array ErrFieldType = iota // Tuple field %u type does not match one required by operation: expected %s ErrFieldTypeMismatch = iota // Ambiguous field type in index '%s', key part %u. Requested type is %s but the field has previously been defined as %s ErrSplice = iota // SPLICE error on field %u: %s ErrArgType = iota // Argument type in operation '%c' on field %u does not match field type: expected a %s ErrTupleIsTooLong = iota // Tuple is too long %u ErrUnknownUpdateOp = iota // Unknown UPDATE operation ErrUpdateField = iota // Field %u UPDATE error: %s ErrFiberStack = iota // Can not create a new fiber: recursion limit reached ErrKeyPartCount = iota // Invalid key part count (expected [0..%u], got %u) ErrProcLua = iota // %s ErrNoSuchProc = iota // Procedure '%.*s' is not defined ErrNoSuchTrigger = iota // Trigger is not found ErrNoSuchIndex = iota // No index #%u is defined in space '%s' ErrNoSuchSpace = iota // Space '%s' does not exist ErrNoSuchField = iota // Field %d was not found in the tuple ErrSpaceFieldCount = iota // Tuple field count %u does not match space '%s' field count %u ErrIndexFieldCount = iota // Tuple field count %u is less than required by a defined index (expected %u) ErrWalIo = iota // Failed to write to disk ErrMoreThanOneTuple = iota // More than one tuple found by get() ErrAccessDenied = iota // %s access denied for user '%s' ErrCreateUser = iota // Failed to create user '%s': %s ErrDropUser = iota // Failed to drop user '%s': %s ErrNoSuchUser = iota // User '%s' is not found ErrUserExists = iota // User '%s' already exists ErrPasswordMismatch = iota // Incorrect password supplied for user '%s' ErrUnknownRequestType = iota // Unknown request type %u ErrUnknownSchemaObject = iota // Unknown object type '%s' ErrCreateFunction = iota // Failed to create function '%s': %s ErrNoSuchFunction = iota // Function '%s' does not exist ErrFunctionExists = iota // Function '%s' already exists ErrFunctionAccessDenied = iota // %s access denied for user '%s' to function '%s' ErrFunctionMax = iota // A limit on the total number of functions has been reached: %u ErrSpaceAccessDenied = iota // %s access denied for user '%s' to space '%s' ErrUserMax = iota // A limit on the total number of users has been reached: %u ErrNoSuchEngine = iota // Space engine '%s' does not exist ErrReloadCfg = iota // Can't set option '%s' dynamically ErrCfg = iota // Incorrect value for option '%s': %s ErrSophia = iota // %s ErrLocalServerIsNotActive = iota // Local server is not active ErrUnknownServer = iota // Server %s is not registered with the cluster ErrClusterIDMismatch = iota // Cluster id of the replica %s doesn't match cluster id of the master %s ErrInvalidUUID = iota // Invalid UUID: %s ErrClusterIDIsRo = iota // Can't reset cluster id: it is already assigned ErrReserved66 = iota // Reserved66 ErrServerIDIsReserved = iota // Can't initialize server id with a reserved value %u ErrInvalidOrder = iota // Invalid LSN order for server %u: previous LSN = %llu, new lsn = %llu ErrMissingRequestField = iota // Missing mandatory field '%s' in request ErrIdentifier = iota // Invalid identifier '%s' (expected letters, digits or an underscore) ErrDropFunction = iota // Can't drop function %u: %s ErrIteratorType = iota // Unknown iterator type '%s' ErrReplicaMax = iota // Replica count limit reached: %u ErrInvalidXlog = iota // Failed to read xlog: %lld ErrInvalidXlogName = iota // Invalid xlog name: expected %lld got %lld ErrInvalidXlogOrder = iota // Invalid xlog order: %lld and %lld ErrNoConnection = iota // Connection is not established ErrTimeout = iota // Timeout exceeded ErrActiveTransaction = iota // Operation is not permitted when there is an active transaction ErrNoActiveTransaction = iota // Operation is not permitted when there is no active transaction ErrCrossEngineTransaction = iota // A multi-statement transaction can not use multiple storage engines ErrNoSuchRole = iota // Role '%s' is not found ErrRoleExists = iota // Role '%s' already exists ErrCreateRole = iota // Failed to create role '%s': %s ErrIndexExists = iota // Index '%s' already exists ErrTupleRefOverflow = iota // Tuple reference counter overflow ErrRoleLoop = iota // Granting role '%s' to role '%s' would create a loop ErrGrant = iota // Incorrect grant arguments: %s ErrPrivGranted = iota // User '%s' already has %s access on %s '%s' ErrRoleGranted = iota // User '%s' already has role '%s' ErrPrivNotGranted = iota // User '%s' does not have %s access on %s '%s' ErrRoleNotGranted = iota // User '%s' does not have role '%s' ErrMissingSnapshot = iota // Can't find snapshot ErrCantUpdatePrimaryKey = iota // Attempt to modify a tuple field which is part of index '%s' in space '%s' ErrUpdateIntegerOverflow = iota // Integer overflow when performing '%c' operation on field %u ErrGuestUserPassword = iota // Setting password for guest user has no effect ErrTransactionConflict = iota // Transaction has been aborted by conflict ErrUnsupportedRolePriv = iota // Unsupported role privilege '%s' ErrLoadFunction = iota // Failed to dynamically load function '%s': %s ErrFunctionLanguage = iota // Unsupported language '%s' specified for function '%s' ErrRtreeRect = iota // RTree: %s must be an array with %u (point) or %u (rectangle/box) numeric coordinates ErrProcC = iota // ??? ErrUnknownRtreeIndexDistanceType = iota //Unknown RTREE index distance type %s ErrProtocol = iota // %s ErrUpsertUniqueSecondaryKey = iota // Space %s has a unique secondary index and does not support UPSERT ErrWrongIndexRecord = iota // Wrong record in _index space: got {%s}, expected {%s} ErrWrongIndexParts = iota // Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ... ErrWrongIndexOptions = iota // Wrong index options (field %u): %s ErrWrongSchemaVaersion = iota // Wrong schema version, current: %d, in request: %u ErrSlabAllocMax = iota // Failed to allocate %u bytes for tuple in the slab allocator: tuple is too large. Check 'slab_alloc_maximal' configuration option. )
Tarantool server error codes
const ( DefaultIndex = "primary" DefaultLimit = 100 DefaultReaderBufSize = 128 * 1024 DefaultWriterBufSize = 128 * 1024 )
const (
GreetingSize = 128
)
const (
ServerIdent = "Tarantool 1.6.8 (Binary)"
)
Variables ¶
var ( ErrEmptyDefaultSpace = errors.New("zero-length default space or unnecessary slash in dsn.path") ErrSyncFailed = errors.New("SYNC failed") )
var ( // ErrNotSupported is returned when an unimplemented query type or operation is encountered. ErrNotSupported = NewQueryError("not supported yet") // ErrNotInReplicaSet means that join operation can not be performed on a replica set due to missing parameters. ErrNotInReplicaSet = NewQueryError("Full Replica Set params hasn't been set") // ErrBadResult means that query result was of invalid type or length. ErrBadResult = NewQueryError("invalid result") // ErrVectorClock is returns in case of bad manipulation with vector clock. ErrVectorClock = NewQueryError("vclock manipulation") // ErrUnknown is returns when ErrorCode isn't OK but Error is nil in Result. ErrUnknownError = NewQueryError("unknown error") )
var (
ErrPortAlreadyInUse = errors.New("Port already in use")
)
Functions ¶
Types ¶
type Box ¶
type Box struct { Root string WorkDir string Port uint Listen string // contains filtered or unexported fields }
Box is tarantool instance. For start/stop tarantool in tests
type BoxOptions ¶
type Connection ¶
type Connection struct { Greeting *Greeting // contains filtered or unexported fields }
func Connect ¶
func Connect(dsnString string, options *Options) (conn *Connection, err error)
Connect to tarantool instance with options. Returned Connection could be used to execute queries.
func (*Connection) Close ¶
func (conn *Connection) Close()
func (*Connection) Execute ¶
func (conn *Connection) Execute(q Query) ([][]interface{}, error)
func (*Connection) GetPrimaryKeyFields ¶
func (conn *Connection) GetPrimaryKeyFields(space interface{}) ([]int, bool)
func (*Connection) IsClosed ¶
func (conn *Connection) IsClosed() (bool, error)
func (*Connection) String ¶
func (conn *Connection) String() string
type ConnectionError ¶
type ConnectionError struct {
// contains filtered or unexported fields
}
func ConnectionClosedError ¶
func ConnectionClosedError(con *Connection) *ConnectionError
func NewConnectionError ¶
func NewConnectionError(con *Connection, message string) *ConnectionError
func (*ConnectionError) Error ¶
func (e *ConnectionError) Error() string
func (*ConnectionError) Temporary ¶
func (e *ConnectionError) Temporary() bool
type ContextError ¶
type ContextError struct {
// contains filtered or unexported fields
}
func NewContextError ¶
func NewContextError(ctx context.Context, con *Connection, message string) *ContextError
func (*ContextError) ContextErr ¶
func (e *ContextError) ContextErr() error
func (*ContextError) Error ¶
func (e *ContextError) Error() string
func (*ContextError) Temporary ¶
func (e *ContextError) Temporary() bool
type Delete ¶
type Delete struct { Space interface{} Index interface{} Key interface{} KeyTuple []interface{} }
type Eval ¶
type Eval struct { Expression string Tuple []interface{} }
Eval query
type IprotoServer ¶
func NewIprotoServer ¶
func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer
func (*IprotoServer) Accept ¶
func (s *IprotoServer) Accept(conn net.Conn)
func (*IprotoServer) CheckAuth ¶
func (s *IprotoServer) CheckAuth(hash []byte, password string) bool
func (*IprotoServer) Shutdown ¶
func (s *IprotoServer) Shutdown() error
type Join ¶
type Join struct {
UUID string
}
Join is the JOIN command
type OnShutdownCallback ¶
type OnShutdownCallback func(err error)
type Packet ¶
type PacketIterator ¶
PacketIterator is a wrapper around Slave provided iteration over new Packets functionality.
type QueryError ¶
type QueryError struct {
// contains filtered or unexported fields
}
func NewQueryError ¶
func NewQueryError(message string) *QueryError
func (*QueryError) Error ¶
func (e *QueryError) Error() string
func (*QueryError) Temporary ¶
func (e *QueryError) Temporary() bool
type ReplicaSet ¶
type ReplicaSet struct { UUID string Instances []string // Instances is read-only set of the instances uuid }
ReplicaSet is used to store params of the Replica Set.
func (*ReplicaSet) Has ¶
func (rs *ReplicaSet) Has(id uint32) bool
Has ReplicaSet specified instance?
func (*ReplicaSet) SetInstance ¶
func (rs *ReplicaSet) SetInstance(id uint32, uuid string) bool
SetInstance uuid in instance set.
type Select ¶
type Slave ¶
type Slave struct { UUID string VClock VectorClock ReplicaSet ReplicaSet // contains filtered or unexported fields }
Slave connects to Tarantool 1.6 instance and subscribes for changes. Tarantool instance acting as a master sees Slave like any replica in replication set. Slave can't be used concurrently, route responses from returned channel instead.
Example (SubscribeExisted) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Subscribe for master's changes synchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{ User: "username", Password: "password", // UUID of the instance in replica set. Required UUID: "7c025e42-2394-11e7-aacf-0242ac110002", // UUID of the Replica Set. Required ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"}) if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // let's start from the beginning var lsn int64 = 0 it, err := s.Subscribe(lsn) if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // print snapshot var p *tnt16.Packet var hr = strings.Repeat("-", 80) // iterate over master's changes permanently for { p, err = it.Next() if err != nil { log.Printf("Tnt Slave iterating error:%v", err) return } log.Println(p) log.Println(hr) } }
Output:
Example (SubscribeNew) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Silently join slave to Replica Set and consume master's changes synchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // let's start from the beginning it, err := s.Attach() if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // print snapshot var p *tnt16.Packet var hr = strings.Repeat("-", 80) // iterate over master's changes permanently for { p, err = it.Next() if err != nil { log.Printf("Tnt Slave iterating error:%v", err) return } log.Println(p) log.Println(hr) } }
Output:
func NewSlave ¶
NewSlave instance with tarantool master uri. URI is parsed by url package and therefore should contains any scheme supported by net.Dial.
func (*Slave) Attach ¶
func (s *Slave) Attach(out ...chan *Packet) (it PacketIterator, err error)
Attach Slave to Replica Set and subscribe for the new(!) DML requests. Use out chan for asynchronous packet receiving or synchronous PacketIterator otherwise. If you need all requests in chan use JoinWithSnap(chan) and then s.Subscribe(s.VClock[1:]...).
Example (Async) ¶
package main import ( "log" "strings" "sync" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Silently join slave to Replica Set and consume master's changes asynchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // chan for snapshot's packets xlogChan := make(chan *tnt16.Packet, 128) wg := &sync.WaitGroup{} // run xlog printer before subscribing command wg.Add(1) go func(in <-chan *tnt16.Packet, wg *sync.WaitGroup) { defer wg.Done() var hr = strings.Repeat("-", 80) for p := range in { log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } }(xlogChan, wg) // let's start from the beginning _, err = s.Attach(xlogChan) if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // consume master's changes permanently wg.Wait() }
Output:
Example (Sync) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Silently join slave to Replica Set and consume master's changes synchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // let's start from the beginning it, err := s.Attach() if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // print snapshot var p *tnt16.Packet var hr = strings.Repeat("-", 80) // consume master's changes permanently for { p, err = it.Next() if err != nil { log.Printf("Tnt Slave consuming error:%v", err) return } log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } }
Output:
func (*Slave) IsInReplicaSet ¶
IsInReplicaSet checks whether Slave has Replica Set params or not.
func (*Slave) Join ¶
Join the Replica Set using Master instance.
Example ¶
package main import ( "log" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Silently join slave to Replica Set // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() if err = s.Join(); err != nil { log.Printf("Tnt Slave joining error:%v", err) return } log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID) }
Output:
func (*Slave) JoinWithSnap ¶
func (s *Slave) JoinWithSnap(out ...chan *Packet) (it PacketIterator, err error)
JoinWithSnap the Replica Set using Master instance. Snapshot logs is available through the given out channel or returned PacketIterator. (In truth, Slave itself is returned in PacketIterator wrapper)
Example (Async) ¶
package main import ( "log" "strings" "sync" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Join slave to Replica Set with iterating snapshot asynchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // chan for snapshot's packets snapChan := make(chan *tnt16.Packet, 128) wg := &sync.WaitGroup{} // run snapshot printer before join command wg.Add(1) go func(in <-chan *tnt16.Packet, wg *sync.WaitGroup) { defer wg.Done() var hr = strings.Repeat("-", 80) for p := range in { log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } }(snapChan, wg) _, err = s.JoinWithSnap(snapChan) if err != nil { log.Printf("Tnt Slave joining error:%v", err) return } wg.Wait() log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID) }
Output:
Example (Sync) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Join slave to Replica Set with iterating snapshot synchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("username:password@127.0.0.1:8000") if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // skip returned iterator; will be using self bufio.scanner-style iterator instead _, err = s.JoinWithSnap() if err != nil { log.Printf("Tnt Slave joining error:%v", err) return } // print snapshot var p *tnt16.Packet var hr = strings.Repeat("-", 80) for s.HasNext() { p = s.Packet() // print request log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } // always checks for errors after iteration cycle if s.Err() != nil { log.Printf("Tnt Slave joining error:%v", err) return } log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID) }
Output:
func (*Slave) LastSnapVClock ¶
func (s *Slave) LastSnapVClock() (VectorClock, error)
func (*Slave) Subscribe ¶
func (s *Slave) Subscribe(lsns ...int64) (it PacketIterator, err error)
Subscribe for DML requests (insert, update, delete, replace, upsert) since vector clock. Variadic lsn is start vector clock. Each lsn is one clock in vector (sequentially). One lsn is enough for master-slave replica set. Replica Set and self UUID should be set before call subscribe. Use options in New or Join for it. Subscribe sends requests asynchronously to out channel specified or use synchronous PacketIterator otherwise.
Example (Async) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Subscribe for master's changes asynchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{ User: "username", Password: "password", // UUID of the instance in replica set. Required UUID: "7c025e42-2394-11e7-aacf-0242ac110002", // UUID of the Replica Set. Required ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"}) if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // chan for snapshot's packets xlogChan := make(chan *tnt16.Packet, 128) // run xlog printer before subscribing command go func(in <-chan *tnt16.Packet) { var hr = strings.Repeat("-", 80) for p := range in { log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } }(xlogChan) // let's start from the beginning var lsn int64 = 0 it, err := s.Subscribe(lsn) if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // consume requests infinitely var p *tnt16.Packet for { p, err = it.Next() if err != nil { close(xlogChan) log.Printf("Tnt Slave consuming error:%v", err) return } xlogChan <- p } }
Output:
Example (Sync) ¶
package main import ( "log" "strings" tnt16 "github.com/viciious/go-tarantool" ) func main() { // Subscribe for master's changes synchronously // new slave instance connects to provided dsn instantly s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{ User: "username", Password: "password", // UUID of the instance in replica set. Required UUID: "7c025e42-2394-11e7-aacf-0242ac110002", // UUID of the Replica Set. Required ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"}) if err != nil { log.Printf("Tnt Slave creating error:%v", err) return } // always close slave to preserve socket descriptor defer s.Close() // let's start from the beginning var lsn int64 = 0 it, err := s.Subscribe(lsn) if err != nil { log.Printf("Tnt Slave subscribing error:%v", err) return } // print snapshot var p *tnt16.Packet var hr = strings.Repeat("-", 80) // consume master's changes permanently for { p, err = it.Next() if err != nil { log.Printf("Tnt Slave consuming error:%v", err) return } log.Println(hr) switch q := p.Request.(type) { case *tnt16.Insert: switch q.Space { case tnt16.SpaceIndex, tnt16.SpaceSpace: // short default format log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n", p.LSN, q.Space, p.InstanceID) default: log.Printf("%v", p) } default: log.Printf("%v", p) } } }
Output:
type Subscribe ¶
type Subscribe struct { UUID string ReplicaSetUUID string VClock VectorClock }
Subscribe is the SUBSCRIBE command
type Update ¶
type Update struct { Space interface{} Index interface{} Key interface{} KeyTuple []interface{} Set []Operator }
type VClock ¶
type VClock struct { RequestID uint64 // RequestID is SYNC field; InstanceID uint32 VClock VectorClock }
VClock response (in OK). Similar to Result struct
func (*VClock) UnmarshalBinary ¶
UnmarshalBinary implements encoding.BinaryUnmarshaler
type VectorClock ¶
type VectorClock []int64
VectorClock is used to store logical clocks (direct dependency clock implementation). Zero index is always reserved for internal use. You can get any lsn indexing VectorClock by instance ID directly (without any index offset). One can count instances in vector just using built-in len function.
func NewVectorClock ¶
func NewVectorClock(lsns ...int64) VectorClock
NewVectorClockFrom returns VectorClock with clocks equal to the given lsn elements sequentially. Empty VectorClock would be returned if no lsn elements is given.
Source Files ¶
- auth.go
- box.go
- call.go
- connection.go
- connector.go
- const.go
- defaults.go
- delete.go
- error.go
- eval.go
- execute.go
- insert.go
- iterator.go
- join.go
- operator.go
- pack.go
- pack_data.go
- packet.go
- packet_pool.go
- ping.go
- replace.go
- request_map.go
- result.go
- select.go
- server.go
- slave.go
- subscribe.go
- tnt.go
- tuple.go
- update.go
- upsert.go
- vclock.go