twopc

package
v0.0.0-...-9789875 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2021 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package twopc provides a simple but complete library for two-phase commit protocol for transcations.

Firstly, we should setup the static topology for two-phase commit, including one Coordinator and serveral Participants. Participant.RegisterCaller could bind user-defined Caller to an identifier.

The transcation has an initial logic and the subsequent logic, which is made up of several parts (named as TxnPart), each of which is executed on a specific Participant.

The initial logic decides whether the txn will continue or not. Only the txn continues, the TxnParts will be submitted to Participants and the executed. Every TxnPart has a Caller, which finally gives a errCode. ErrCode reveals whether the part of transaction fails or succeeds. If one of them fails, all the TxnPart should roll back the state, which is the essense of two-phase commit protocol.

The error code finally given to the user is the result of OR operation among the error codes of all the TxnPart. So the error code of every TxnPart should be 0 (success) or more than 0. A user can define the error codes for various situations.

Coordinator and Participant are the roles of two-phase commit protocol. CommitTxn and AbortTxn are mutually exclusive, which is the essence of two-phase commit protocol. So please notice AbortTxn because of timeout and simultaneous CommitTxn. Also, the two functions must be invoked at most once.

Fault-tolerance model

We assume that at least one participant is OK. So timeout could be started to calculated from receiving the 1st StatePrepared.

We assume that there is no fault in the Coordinator, including the communication errors (timeout, socket close, etc), the process and power failures.

State of part of transaction

Initial state is StateTxnPartWorking when Participant receives TxnPart submitted by the Coordinator. StateTxnPartWorking could be transferred to StateTxnPartPrepared if TxnPart is processed successfully. Otherwise, StateTxnPartWorking is transferred to StateTxnPartAborted. Only if the Coordinator informs the Participant that the corresponding Txn could commit, i.e. receiving all the Prepared msgs, the TxnPart could be changed to StateTxnPartCommitted.

State of transaction

Initial state is StateTxnCreated. StateTxnCreated changes to StateTxnInit only if the initial function of Txn is successfully executed. StateTxnInit could be transferred to StateTxnPreparing if 1st received msg is Prepared; it could be transferred to StateTxnAborted if 1st received msg is Aborted. StateTxnPreparing could be transferred to StateTxnAborted if receving any Aborted msg or timeout after receiving the 1st Prepared msg. StateTxnPrepared could be transferred to StateTxnCommitted if receving all Prepared msgs.

Reference

Consensus on Transaction Commit. https://www.microsoft.com/en-us/research/publication/consensus-on-transaction-commit/.

Index

Constants

View Source
const (
	// Created state in NewTxn.
	StateTxnCreated = iota
	// Initial function is executed successfully.
	StateTxnInit
	// The first msg from Participant for a Txn is Prepared msg.
	StateTxnPreparing
	// All TxnParts have been informed of the Prepared msgs.
	StateTxnCommitted
	// Received one or more Aborted msgs, or timeout after receiving the
	// first Prepared msg.
	StateTxnAborted
)

States of Txn.

View Source
const (
	// Participant received the TxnPart before executing it.
	StateTxnPartWorking = iota
	// TxnPart is executed successfully.
	StateTxnPartPrepared
	// TxnPart is executed with an error.
	StateTxnPartAborted
	// TxnPart is informed that all TxnPart have been gotten Prepared states.
	StateTxnPartCommitted
)

States of TxnPart.

View Source
const (
	ErrTxnTimeout   = -1
	ErrTxnUserAbort = -2
)

Special error codes for Txn. They are less than 0. 0 means success. User-defined error code must be more than 0.

View Source
const CoordClientMaxSizeForOnePpt = 100

CoordClientMaxSizeForOnePpt is the maximum number of connections in the pool from the Coordinator to a Participant.

View Source
const DefaultPptPoolSize = 5

DefaultPptPoolSize is the maximum number of connections in the pool from the Participant to the Coordinator.

Variables

This section is empty.

Functions

This section is empty.

Types

type AbortArgs

type AbortArgs struct {
	TxnPartID string
}

AbortArgs is the arg for Abort RPC call.

type AbortReply

type AbortReply struct{}

AbortReply is the reply for Abort RPC call.

type AbortedArgs

type AbortedArgs struct {
	TxnID      string
	TxnPartIdx int
	ErrCode    int
}

AbortedArgs is the arg for InformAborted RPC call.

type AbortedReply

type AbortedReply struct{}

AbortedReply is the reply for InformAborted RPC call.

type CallFunc

type CallFunc func(initRet interface{}) (errCode int, rb Rollbacker)

CallFunc is a Caller. User could use the function directly as a Caller.

func (CallFunc) Call

func (f CallFunc) Call(initRet interface{}) (errCode int, rb Rollbacker)

Call makes CallFunc implement the Caller interface.

type Caller

type Caller interface {
	Call(initRet interface{}) (errCode int, rb Rollbacker)
}

Caller represents one part of transaction logic executed by the specific Participant. It's implemented by the library user and registered in the Participant by RegisterCaller function.

ErrCode decide whether the Participant sends back the StatePrepared or StateAborted msg. It must be more than 0. Rb is the rollbacker, which is execuetd while errCode is not 0, i.e. the txn should be aborted and the txnPart should rollback the state.

InitRet is the return value of initial function of the txn. It is filled in the Coordinator before the txnPart is submitted to the Participant. It could be nil.

type CommitArgs

type CommitArgs struct {
	TxnPartID string
}

CommitArgs is the arg for Commit RPC call.

type CommitReply

type CommitReply struct {
	TxnPartID string
}

CommitReply is the reply for Commit RPC call.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator is the manager role of two-phase commit.

func NewCoordinator

func NewCoordinator(network, coord string, ppts []string) *Coordinator

NewCoordinator init a Coordinator service. Network could be "tcp" or "unix". Coord is the listened address on the Coordiantor. Ppts is the listened addresses of the list of all the Participants.

func (*Coordinator) Abort

func (ctr *Coordinator) Abort(txnID string)

Abort triggers when users want to actively abort the transaction in some conditions.

func (*Coordinator) InformAborted

func (ctr *Coordinator) InformAborted(args *AbortedArgs, reply *AbortedReply) error

InformAborted is a RPC call invoked by the participant when it informs the Coordinator of the aborted state.

func (*Coordinator) InformPrepared

func (ctr *Coordinator) InformPrepared(args *PreparedArgs, reply *PreparedReply) error

InformPrepared is a RPC call invoked by the participant when it informs the Coordinator of the prepared state.

func (*Coordinator) Kill

func (ctr *Coordinator) Kill()

Kill tell the coordinator to shut itself down for testing.

func (*Coordinator) NewTxn

func (ctr *Coordinator) NewTxn(initFunc TxnInitFunc,
	keyHashFunc KeyHashFunc, timeoutMs int64) *Txn

NewTxn initialize a new Txn. The user-defined initial txn function, hash function for key, and the timeout should be set. The coordinator assign an unique ID for the transaction.

It's thread-safe.

func (*Coordinator) RegisterService

func (ctr *Coordinator) RegisterService(service interface{})

RegisterService registers the rpc calls of the service onto the Coordinator.

func (*Coordinator) StateTxn

func (ctr *Coordinator) StateTxn(txnID *string, reply *TxnState) error

StateTxn is a RPC call that returns the latest state of the transcation.

func (*Coordinator) SyncTxnEnd

func (ctr *Coordinator) SyncTxnEnd(txnID *string, reply *TxnState) error

SyncTxnEnd is a RPC call wait until the state of the transaction changed to StateTxnAborted or StateTxnCommitted.

type KeyHashFunc

type KeyHashFunc func(key string) uint64

KeyHashFunc is the hash func for distributing the TxnParts.

var DefaultKeyHashFunc KeyHashFunc = func(key string) uint64 {
	var hash uint64 = 0
	for i := 0; i < len(key); i++ {
		hash = 31*hash + uint64(key[i])
	}
	return hash & (1<<63 - 1)
}

DefaultKeyHashFunc is the default KeyHashFunc.

type Participant

type Participant struct {
	// contains filtered or unexported fields
}

Participant is the executed role of two-phase commit.

func NewParticipant

func NewParticipant(network, addr, coord string) *Participant

NewParticipant init a participant service. Network could be "tcp" or "unix". Coord is the listened address on the Coordiantor. Addr is the listened address of this Participant.

func (*Participant) Abort

func (ppt *Participant) Abort(args *AbortArgs, reply *AbortReply) error

Abort is a RPC call invoked by Coordinator when the coordinator decides the transaction should be aborted, including timeout event or receiving the Aborted msg from one or more Participants. It could be called not only once.

func (*Participant) Commit

func (ppt *Participant) Commit(args *CommitArgs, reply *CommitReply) error

Commit is a RPC call invoked by Coordinator when the coordinator make sure all the participants have entered the Prepared state. It could be called not only once.

func (*Participant) Kill

func (ppt *Participant) Kill()

Kill tell the peer to shut itself down. for testing. please do not change these two functions.

func (*Participant) RegisterCaller

func (ppt *Participant) RegisterCaller(caller Caller, name string)

RegisterCaller registers a caller with a unique name, which can be used to identify the caller in Txn.AddTxnPart and Txn.BroadcastTxnPart.

func (*Participant) RegisterRPCService

func (ppt *Participant) RegisterRPCService(service interface{})

RegisterRPCService registers the service onto the rpc calls.

func (*Participant) SubmitTxnPart

func (ppt *Participant) SubmitTxnPart(tp *TxnPart, reply *struct{}) error

SubmitTxnPart is a RPC call, which submits the TxnPart to the participant and executes it. It is returned immediately without waiting for the execution of the TxnPart.

The reply could be nil.

type PreparedArgs

type PreparedArgs struct {
	TxnID      string
	TxnPartIdx int
	ErrCode    int
}

PreparedArgs is the arg for InformPrepared RPC call.

type PreparedReply

type PreparedReply struct{}

PreparedReply is the reply for InformPrepared RPC call.

type RollbackFunc

type RollbackFunc func()

RollbackFunc is a Rollbacker. User could use the function directly as a Rollbacker.

var BlankRollbackFunc RollbackFunc = func() {}

BlankRollbackFunc is the Rollbacker with blank logic.

func (RollbackFunc) Rollback

func (f RollbackFunc) Rollback()

Rollback makes RollbackFunc implement the Rollbacker interface.

type Rollbacker

type Rollbacker interface {
	Rollback()
}

Rollbacker represents rollback logic of the txnPart on the specific Participant. It related to the specific Caller. If the errCode of Caller is not 0, then the rollbacker will be executed to redo the state changes. It's implemented by the library user and returned in the Caller's implementation.

type Txn

type Txn struct {
	ID string
	// contains filtered or unexported fields
}

Txn is the structure for a transaction, which is created by Coordinator and be controlled by binding functions.

func (*Txn) AddTxnPart

func (txn *Txn) AddTxnPart(key, callName string)

AddTxnPart adds TxnPart into the Txn. Key decides which specific Participant will execute the TxnPart. CallName is the function name binding to the Particpant.

func (*Txn) BroadcastTxnPart

func (txn *Txn) BroadcastTxnPart(callName string)

BroadcastTxnPart adds TxnPart into the Txn. The TxnPart will be executed on all Participants instead of the specific Participant. CallName is the function name binding to the Particpant.

It is usually be used when we don't know which Partipant should execute the TxnPart logic.

func (*Txn) Start

func (txn *Txn) Start(initArgs interface{})

Start to execute the transcation. Firstly initilize the Txn by initArgs. If the return code is 0, the TxnParts will be submitted into the corresponding shards on specific Participators. Otherwise, abort the Txn immediately.

type TxnInitFunc

type TxnInitFunc func(args interface{}) (ret interface{}, errCode int)

TxnInitFunc is the initialization before Txn processing. The returning errCode indicates the state of the procedure, which decides whether the following Txn processes or not. If it's 0, then do the next. Otherwise, stop the txn.

var BlankTxnInitFunc TxnInitFunc = func(args interface{}) (ret interface{}, errCode int) {
	ret = nil
	errCode = 0
	return
}

BlankTxnInitFunc is the blank TxnInitFunc without any logics and return 0.

type TxnPart

type TxnPart struct {
	// ID of TxnPart.
	ID string

	// ID of the corresponding Txn.
	TxnID string

	// Idx is the index of TxnPart among the parts of Txn.
	Idx int

	// Shard is th index of shards
	Shard int

	// Remote address(host:port) of the corresponding participant
	// of the its shard.
	Remote string

	// CallName is the binding name of the function.
	CallName string

	// InitRet is the return value of initFunction of Txn.
	InitRet interface{}
	// contains filtered or unexported fields
}

TxnPart is one part of the transaction. One transcation is made up for several TxnParts, function named by CallName will be executed on the specific participant and errCode returned will affect whether the participant sends back StatePrepared or StateAborted msg. The rollbacker will be executed if the corresponding txn aborted.

type TxnState

type TxnState struct {
	State   int32
	ErrCode int
}

TxnState is the state of the transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL