tunnel

package
v4.1.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2019 License: Apache-2.0 Imports: 30 Imported by: 0

README

Tunnel service

Table Store tunnel service golang sdk.

Install

  • download tunnel client source code
go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel
  • use dep to install dependencies under tunnel directory
    • install dep
    • dep ensure -v
  • or use go get to install dependencies
go get -u go.uber.org/zap
go get github.com/cenkalti/backoff
go get github.com/golang/protobuf/proto
go get github.com/satori/go.uuid
go get github.com/stretchr/testify/assert
go get github.com/smartystreets/goconvey/convey
go get github.com/golang/mock/gomock
go get gopkg.in/natefinch/lumberjack.v2

Quick Start

  • tunnel type

    • TunnelTypeStream:stream data(增量数据流)
    • TunnelTypeBaseData: full data(全量数据流)
    • TunnelTypeBaseStream: full and stream data(先全量后增量数据流)
  • init tunnel client

    tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
       accessKeyId, accessKeySecret)
  • create new tunnel
    req := &tunnel.CreateTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
       Type:       tunnel.TunnelTypeBaseStream, //base and stream data tunnel
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  • get existing tunnel detail information
    req := &tunnel.DescribeTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
    }
    resp, err := tunnelClient.DescribeTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.Tunnel.TunnelId)
  • consume tunnel data with callback function
//user-defined callback function
func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
	fmt.Println("user-defined information", ctx.CustomValue)
	for _, rec := range records {
		fmt.Println("tunnel record detail:", rec.String())
	}
	fmt.Println("a round of records consumption finished")
	return nil
}

//set callback to SimpleProcessFactory
workConfig := &tunnel.TunnelWorkerConfig{
   ProcessorFactory: &tunnel.SimpleProcessFactory{
      CustomValue: "user custom interface{} value",
      ProcessFunc: exampleConsumeFunction,
   },
}

//use TunnelDaemon to consume tunnel with specified tunnelId
daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
log.Fatal(daemon.Run())
  • delete tunnel
req := &tunnel.DeleteTunnelRequest {
   TableName: "testTable",
   TunnelName: "testTunnel",
}
_, err := tunnelClient.DeleteTunnel(req)
if err != nil {
   log.Fatal("delete test tunnel failed", err)
}

See the sample directory for more details.

tunnel document

configuration

  • Default TunnelConfig definition
var DefaultTunnelConfig = &TunnelConfig{
      //Max backoff retry duration.
      MaxRetryElapsedTime: 45 * time.Second,
      //HTTP request timeout.
      RequestTimeout:      30 * time.Second,
      //http.DefaultTransport.
      Transport:           http.DefaultTransport,
}
  • TunnelWorkerConfig definition
type TunnelWorkerConfig struct {
   //The heartbeat timeout time of the worker. If nil, the default value is used.
   HeartbeatTimeout  time.Duration
   //The heartbeat interval time of the worker. If nil, the default value is used.
   HeartbeatInterval time.Duration
   //The channel connection dial interface. If nil, the default dialer is used.
   //Usually the default dialer is fine.
   ChannelDialer     ChannelDialer

   //The channel processor creation interface.
   //It's recomended to use the pre-defined SimpleChannelProcessorFactory.
   ProcessorFactory ChannelProcessorFactory

   //zap log config. If nil, the DefaultLogConfig is used.
   LogConfig      *zap.Config
   //zap log rotate config. If nil, the DefaultSyncer is used.
   LogWriteSyncer zapcore.WriteSyncer
}

Documentation

Index

Constants

View Source
const (
	ErrCodeParamInvalid      = "OTSParameterInvalid"
	ErrCodeResourceGone      = "OTSResourceGone"
	ErrCodeServerUnavailable = "OTSTunnelServerUnavailable"
	ErrCodeSequenceNotMatch  = "OTSSequenceNumberNotMatch"
	ErrCodeClientError       = "OTSClientError"
	ErrCodeTunnelExpired     = "OTSTunnelExpired"
	ErrCodePermissionDenied  = "OTSPermissionDenied"
	ErrCodeTunnelExist       = "OTSTunnelExist"
)
View Source
const FinishTag = "finished"

Variables

View Source
var (
	DefaultHeartbeatInterval = 30 * time.Second
	DefaultHeartbeatTimeout  = 300 * time.Second
)
View Source
var (
	DefaultChannelSize        = 10
	DefaultCheckpointInterval = 10 * time.Second
)
View Source
var DaemonRandomRetryMs = 10000
View Source
var DefaultLogConfig = zap.Config{
	Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
	Development: false,
	Sampling: &zap.SamplingConfig{
		Initial:    100,
		Thereafter: 100,
	},
	Encoding: "json",
	EncoderConfig: zapcore.EncoderConfig{
		TimeKey:        "ts",
		LevelKey:       "level",
		NameKey:        "logger",
		CallerKey:      "caller",
		MessageKey:     "msg",
		StacktraceKey:  "stacktrace",
		LineEnding:     zapcore.DefaultLineEnding,
		EncodeLevel:    zapcore.LowercaseLevelEncoder,
		EncodeTime:     zapcore.ISO8601TimeEncoder,
		EncodeDuration: zapcore.SecondsDurationEncoder,
		EncodeCaller:   zapcore.ShortCallerEncoder,
	},
}
View Source
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
	Filename:   "tunnelClient.log",
	MaxSize:    512,
	MaxBackups: 5,
	MaxAge:     30,
	Compress:   true,
})
View Source
var DefaultTunnelConfig = &TunnelConfig{
	MaxRetryElapsedTime: 75 * time.Second,
	RequestTimeout:      60 * time.Second,
	Transport:           http.DefaultTransport,
}

Functions

func ExponentialBackoff

func ExponentialBackoff(interval, maxInterval, maxElapsed time.Duration, multiplier, factor float64) *backoff.ExponentialBackOff

func ParseRequestToken

func ParseRequestToken(token string) (*protocol.TokenContentV2, error)

func ReplaceLogCore

func ReplaceLogCore(ws zapcore.WriteSyncer, conf zap.Config) zap.Option

hack replace zap config build core with lumberjack logger

Types

type ActionType

type ActionType int
const (
	AT_Put ActionType = iota
	AT_Update
	AT_Delete
)

func ParseActionType

func ParseActionType(pbType *protocol.ActionType) (ActionType, error)

func (ActionType) String

func (t ActionType) String() string

type BatchGetStatusReq

type BatchGetStatusReq struct {
	// contains filtered or unexported fields
}

func NewBatchGetStatusReq

func NewBatchGetStatusReq() *BatchGetStatusReq

type ChannelConn

type ChannelConn interface {
	NotifyStatus(channel *ChannelStatus)
	Closed() bool
	Close()
}

type ChannelContext

type ChannelContext struct {
	TunnelId  string
	ClientId  string
	ChannelId string

	TraceId string

	NextToken string

	CustomValue interface{}
}

func (*ChannelContext) String

func (c *ChannelContext) String() string

type ChannelDialer

type ChannelDialer interface {
	ChannelDial(tunnelId, clientId, channelId, token string, p ChannelProcessor, state *TunnelStateMachine) ChannelConn
}

type ChannelInfo

type ChannelInfo struct {
	ChannelId     string
	ChannelType   string
	ChannelStatus string
	ClientId      string
	ChannelRPO    int64
}

type ChannelProcessor

type ChannelProcessor interface {
	Process(records []*Record, nextToken, traceId string) error
	Shutdown()
}

type ChannelProcessorFactory

type ChannelProcessorFactory interface {
	NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) ChannelProcessor
}

type ChannelStatus

type ChannelStatus struct {
	ChannelId string
	Version   int64
	Status    protocol.ChannelStatus
}

func ToChannelStatus

func ToChannelStatus(c *protocol.Channel) *ChannelStatus

func (*ChannelStatus) ToPbChannel

func (c *ChannelStatus) ToPbChannel() *protocol.Channel

type Checkpointer

type Checkpointer interface {
	Checkpoint(token string) error
}

type CreateTunnelRequest

type CreateTunnelRequest struct {
	TableName  string
	TunnelName string
	Type       TunnelType
}

type CreateTunnelResponse

type CreateTunnelResponse struct {
	TunnelId string
	ResponseInfo
}

type DefaultTunnelClient

type DefaultTunnelClient struct {
	// contains filtered or unexported fields
}

func (*DefaultTunnelClient) CreateTunnel

func (*DefaultTunnelClient) DeleteTunnel

func (*DefaultTunnelClient) DescribeTunnel

func (*DefaultTunnelClient) ListTunnel

func (*DefaultTunnelClient) NewTunnelWorker

func (c *DefaultTunnelClient) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)

type DeleteTunnelRequest

type DeleteTunnelRequest struct {
	TableName  string
	TunnelName string
}

type DeleteTunnelResponse

type DeleteTunnelResponse struct {
	ResponseInfo
}

type DescribeTunnelRequest

type DescribeTunnelRequest struct {
	TableName  string
	TunnelName string
}

type DescribeTunnelResponse

type DescribeTunnelResponse struct {
	TunnelRPO int64
	Tunnel    *TunnelInfo
	Channels  []*ChannelInfo
	ResponseInfo
}

type ListTunnelRequest

type ListTunnelRequest struct {
	TableName string
}

type ListTunnelResponse

type ListTunnelResponse struct {
	Tunnels []*TunnelInfo
	ResponseInfo
}

type MockTunnelMetaApi

type MockTunnelMetaApi struct {
	// contains filtered or unexported fields
}

MockTunnelMetaApi is a mock of TunnelMetaApi interface

func NewMockTunnelMetaApi

func NewMockTunnelMetaApi(ctrl *gomock.Controller) *MockTunnelMetaApi

NewMockTunnelMetaApi creates a new mock instance

func (*MockTunnelMetaApi) CreateTunnel

CreateTunnel mocks base method

func (*MockTunnelMetaApi) DeleteTunnel

DeleteTunnel mocks base method

func (*MockTunnelMetaApi) DescribeTunnel

DescribeTunnel mocks base method

func (*MockTunnelMetaApi) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTunnelMetaApi) ListTunnel

ListTunnel mocks base method

func (*MockTunnelMetaApi) NewTunnelWorker

func (m *MockTunnelMetaApi) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)

NewTunnelWorker mocks base method

type MockTunnelMetaApiMockRecorder

type MockTunnelMetaApiMockRecorder struct {
	// contains filtered or unexported fields
}

MockTunnelMetaApiMockRecorder is the mock recorder for MockTunnelMetaApi

func (*MockTunnelMetaApiMockRecorder) CreateTunnel

func (mr *MockTunnelMetaApiMockRecorder) CreateTunnel(req interface{}) *gomock.Call

CreateTunnel indicates an expected call of CreateTunnel

func (*MockTunnelMetaApiMockRecorder) DeleteTunnel

func (mr *MockTunnelMetaApiMockRecorder) DeleteTunnel(req interface{}) *gomock.Call

DeleteTunnel indicates an expected call of DeleteTunnel

func (*MockTunnelMetaApiMockRecorder) DescribeTunnel

func (mr *MockTunnelMetaApiMockRecorder) DescribeTunnel(req interface{}) *gomock.Call

DescribeTunnel indicates an expected call of DescribeTunnel

func (*MockTunnelMetaApiMockRecorder) ListTunnel

func (mr *MockTunnelMetaApiMockRecorder) ListTunnel(req interface{}) *gomock.Call

ListTunnel indicates an expected call of ListTunnel

func (*MockTunnelMetaApiMockRecorder) NewTunnelWorker

func (mr *MockTunnelMetaApiMockRecorder) NewTunnelWorker(tunnelId, workerConfig interface{}) *gomock.Call

NewTunnelWorker indicates an expected call of NewTunnelWorker

type MockTunnelWorker

type MockTunnelWorker struct {
	// contains filtered or unexported fields
}

MockTunnelWorker is a mock of TunnelWorker interface

func NewMockTunnelWorker

func NewMockTunnelWorker(ctrl *gomock.Controller) *MockTunnelWorker

NewMockTunnelWorker creates a new mock instance

func (*MockTunnelWorker) ConnectAndWorking

func (m *MockTunnelWorker) ConnectAndWorking() error

ConnectAndWorking mocks base method

func (*MockTunnelWorker) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTunnelWorker) Shutdown

func (m *MockTunnelWorker) Shutdown()

Shutdown mocks base method

type MockTunnelWorkerMockRecorder

type MockTunnelWorkerMockRecorder struct {
	// contains filtered or unexported fields
}

MockTunnelWorkerMockRecorder is the mock recorder for MockTunnelWorker

func (*MockTunnelWorkerMockRecorder) ConnectAndWorking

func (mr *MockTunnelWorkerMockRecorder) ConnectAndWorking() *gomock.Call

ConnectAndWorking indicates an expected call of ConnectAndWorking

func (*MockTunnelWorkerMockRecorder) Shutdown

func (mr *MockTunnelWorkerMockRecorder) Shutdown() *gomock.Call

Shutdown indicates an expected call of Shutdown

type MocktunnelDataApi

type MocktunnelDataApi struct {
	// contains filtered or unexported fields
}

MocktunnelDataApi is a mock of tunnelDataApi interface

func NewMocktunnelDataApi

func NewMocktunnelDataApi(ctrl *gomock.Controller) *MocktunnelDataApi

NewMocktunnelDataApi creates a new mock instance

func (*MocktunnelDataApi) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

type MocktunnelDataApiMockRecorder

type MocktunnelDataApiMockRecorder struct {
	// contains filtered or unexported fields
}

MocktunnelDataApiMockRecorder is the mock recorder for MocktunnelDataApi

type PrimaryKey

type PrimaryKey struct {
	PrimaryKeys []*PrimaryKeyColumn
}

type PrimaryKeyColumn

type PrimaryKeyColumn struct {
	ColumnName string
	Value      interface{}
}

type Record

type Record struct {
	Type       ActionType
	Timestamp  int64
	PrimaryKey *PrimaryKey // required
	Columns    []*RecordColumn
}

func DeserializeRecordFromRawBytes

func DeserializeRecordFromRawBytes(data []byte, actionType ActionType) (*Record, error)

func (*Record) String

func (r *Record) String() string

type RecordColumn

type RecordColumn struct {
	Type      RecordColumnType
	Name      *string     // required
	Value     interface{} // optional. present when Type is RCT_Put
	Timestamp *int64      // optional, in msec. present when Type is RCT_Put or RCT_DeleteOneVersion
}

func (*RecordColumn) String

func (c *RecordColumn) String() string

type RecordColumnType

type RecordColumnType int
const (
	RCT_Put RecordColumnType = iota
	RCT_DeleteOneVersion
	RCT_DeleteAllVersions
)

type ResponseInfo

type ResponseInfo struct {
	RequestId string
}

type SimpleProcessFactory

type SimpleProcessFactory struct {
	CustomValue interface{}

	CpInterval time.Duration

	ProcessFunc  func(channelCtx *ChannelContext, records []*Record) error
	ShutdownFunc func(channelCtx *ChannelContext)

	Logger *zap.Logger
}

func (*SimpleProcessFactory) NewProcessor

func (s *SimpleProcessFactory) NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) ChannelProcessor

type TunnelApi

type TunnelApi struct {
	// contains filtered or unexported fields
}

func NewTunnelApi

func NewTunnelApi(endpoint, instanceName, accessKeyId, accessKeySecret string, conf *TunnelConfig) *TunnelApi

func (*TunnelApi) CreateTunnel

func (api *TunnelApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)

func (*TunnelApi) DeleteTunnel

func (api *TunnelApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)

func (*TunnelApi) DescribeTunnel

func (api *TunnelApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)

func (*TunnelApi) ListTunnel

func (api *TunnelApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)

type TunnelClient

type TunnelClient interface {
	TunnelMetaApi
	NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
}

func NewTunnelClient

func NewTunnelClient(endpoint, instanceName, accessId, accessKey string) TunnelClient

func NewTunnelClientWithConfig

func NewTunnelClientWithConfig(endpoint, instanceName, accessId, accessKey string, conf *TunnelConfig) TunnelClient

type TunnelConfig

type TunnelConfig struct {
	MaxRetryElapsedTime time.Duration
	RequestTimeout      time.Duration
	Transport           http.RoundTripper
}

type TunnelError

type TunnelError struct {
	Code      string
	Message   string
	RequestId string
	TunnelId  string
}

func (*TunnelError) Error

func (te *TunnelError) Error() string

func (*TunnelError) Temporary

func (te *TunnelError) Temporary() bool

type TunnelInfo

type TunnelInfo struct {
	TunnelId     string
	TunnelName   string
	TunnelType   string
	TableName    string
	InstanceName string
	StreamId     string
	Stage        string
}

type TunnelMetaApi

type TunnelMetaApi interface {
	CreateTunnel(req *CreateTunnelRequest) (resp *CreateTunnelResponse, err error)
	ListTunnel(req *ListTunnelRequest) (resp *ListTunnelResponse, err error)
	DescribeTunnel(req *DescribeTunnelRequest) (resp *DescribeTunnelResponse, err error)
	DeleteTunnel(req *DeleteTunnelRequest) (resp *DeleteTunnelResponse, err error)
}

type TunnelStateMachine

type TunnelStateMachine struct {
	// contains filtered or unexported fields
}

func NewTunnelStateMachine

func NewTunnelStateMachine(tunnelId, clientId string, dialer ChannelDialer, factory ChannelProcessorFactory, api *TunnelApi, lg *zap.Logger) *TunnelStateMachine

func (*TunnelStateMachine) BatchGetStatus

func (s *TunnelStateMachine) BatchGetStatus(req *BatchGetStatusReq) ([]*protocol.Channel, error)

func (*TunnelStateMachine) BatchUpdateStatus

func (s *TunnelStateMachine) BatchUpdateStatus(batchChannels []*protocol.Channel)

func (*TunnelStateMachine) Close

func (s *TunnelStateMachine) Close()

func (*TunnelStateMachine) UpdateStatus

func (s *TunnelStateMachine) UpdateStatus(channel *ChannelStatus)

type TunnelType

type TunnelType string
const (
	TunnelTypeBaseData   TunnelType = "BaseData"
	TunnelTypeStream     TunnelType = "Stream"
	TunnelTypeBaseStream TunnelType = "BaseAndStream"
)

type TunnelWorker

type TunnelWorker interface {
	ConnectAndWorking() error
	Shutdown()
}

type TunnelWorkerConfig

type TunnelWorkerConfig struct {
	HeartbeatTimeout  time.Duration
	HeartbeatInterval time.Duration
	ChannelDialer     ChannelDialer

	ProcessorFactory ChannelProcessorFactory

	LogConfig      *zap.Config
	LogWriteSyncer zapcore.WriteSyncer
}

type TunnelWorkerDaemon

type TunnelWorkerDaemon struct {
	// contains filtered or unexported fields
}

func NewTunnelDaemon

func NewTunnelDaemon(c TunnelClient, id string, conf *TunnelWorkerConfig) *TunnelWorkerDaemon

func (*TunnelWorkerDaemon) Close

func (d *TunnelWorkerDaemon) Close()

func (*TunnelWorkerDaemon) Run

func (d *TunnelWorkerDaemon) Run() error

Directories

Path Synopsis
Package protocol is a generated protocol buffer package.
Package protocol is a generated protocol buffer package.
sample

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL