tunnel

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2023 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Example (Tunnel_download_arrow_simple)
session, err := tunnelIns.CreateDownloadSession(
	"test_new_console_gcc",
	// "upload_sample_arrow",
	"has_struct",
)
if err != nil {
	log.Fatalf("%+v", err)
}

recordCount := session.RecordCount()
println(fmt.Sprintf("record count is %d", recordCount))

reader, err := session.OpenRecordArrowReader(0, 2, nil)
if err != nil {
	log.Fatalf("%+v", err)
}

n := 0
reader.Iterator(func(rec array.Record, err error) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for i, col := range rec.Columns() {
		println(fmt.Sprintf("rec[%d][%d]: %v", n, i, col))
	}

	rec.Release()
	n++
})

err = reader.Close()
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

Example (Tunnel_download_arrow_with_partition)
session, err := tunnelIns.CreateDownloadSession(
	"test_new_console_gcc",
	"sale_detail",
	tunnel2.SessionCfg.WithPartitionKey("sale_date='202111',region='hangzhou'"),
)
if err != nil {
	log.Fatalf("%+v", err)
}

recordCount := session.RecordCount()
println(fmt.Sprintf("record count is %d", recordCount))

reader, err := session.OpenRecordArrowReader(
	0, 1000,
	[]string{"shop_name", "total_price"})

if err != nil {
	log.Fatalf("%+v", err)
}

n := 0
reader.Iterator(func(rec array.Record, err error) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for i, col := range rec.Columns() {
		println(fmt.Sprintf("rec[%d][%d]: %v", n, i, col))
	}

	rec.Release()
	n++
})

err = reader.Close()
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

Example (Tunnel_download_instance_result)
package main

import (
	"log"

	"github.com/jiuzhiqian/aliyun-odps-go-sdk/odps"
	account2 "github.com/jiuzhiqian/aliyun-odps-go-sdk/odps/account"
	"github.com/jiuzhiqian/aliyun-odps-go-sdk/odps/data"
	"github.com/jiuzhiqian/aliyun-odps-go-sdk/odps/restclient"
	"github.com/jiuzhiqian/aliyun-odps-go-sdk/odps/tunnel"
)

func main() {
	var account = account2.AliyunAccountFromEnv()
	var endpoint = restclient.LoadEndpointFromEnv()
	var odpsIns = odps.NewOdps(account, endpoint)

	projectName := "project_1"
	odpsIns.SetDefaultProjectName(projectName)
	project := odpsIns.DefaultProject()
	tunnelIns, err := tunnel.NewTunnelFromProject(project)

	if err != nil {
		log.Fatalf("%+v", err)
	}

	ins, err := odpsIns.ExecSQl("select * from data_type_demo;")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	err = ins.WaitForSuccess()
	if err != nil {
		log.Fatalf("%+v", err)
	}

	session, err := tunnelIns.CreateInstanceResultDownloadSession(projectName, ins.Id())
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// columnNames := []string {
	//	"ti", "si", "i", "bi", "b", "f", "d", "dc", "vc", "c", "s", "da", "dat", "t", "bl",
	// }

	// set columnNames=nil for get all the columns
	reader, err := session.OpenRecordReader(0, 100, 0, nil)
	if err != nil {
		log.Fatalf("%+v", err)
	}

	// 用read()逐个读取
	// record, err := reader.Read()
	// if err != nil && err != io.EOF {
	//	println(err.Error())
	// } else {
	//	for i, n := 0, record.Len(); i < n; i ++ {
	//		f := record.Get(i)
	//		println(f.String())
	//	}
	// }

	// 或用iterator遍历读取
	reader.Iterator(func(record data.Record, err error) {
		if err != nil {
			log.Fatalf("%+v", err)
		}

		for i, n := 0, record.Len(); i < n; i++ {
			f := record.Get(i)
			println(f.String())
		}
	})

}
Output:

Example (Tunnel_upload_arrow)
tunnelIns.SetHttpTimeout(10 * time.Second)

session, err := tunnelIns.CreateUploadSession(
	ProjectName,
	"sale_detail",
	tunnel2.SessionCfg.WithPartitionKey("sale_date='202111',region='hangzhou'"),
	// tunnel.SessionCfg.WithSnappyFramedCompressor(),
	// tunnel.SessionCfg.WithDeflateCompressor(tunnel.DeflateLevel.DefaultCompression),
	tunnel2.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
	log.Fatalf("%+v", err)
}
schema := session.ArrowSchema()

type SaleDetailData struct {
	ShopNames  []string
	CustomIDs  []string
	totalPrice []float64
}

rawData := []SaleDetailData{
	{
		[]string{"sun", "moon", "earth"},
		[]string{"fixed_start1", "satellite1", "planet3"},
		[]float64{10000.032, 200.00, 1500.232},
	},
	{
		[]string{"mars", "venus"},
		[]string{"planet4", "planet2"},
		[]float64{1000.1, 1232.2},
	},
	{
		[]string{"songjiang", "wusong"},
		[]string{"liangshan1", "liangshan2"},
		[]float64{100.13, 232.2},
	},
}

blockIds := make([]int, len(rawData))

writeBlock := func(blockId int, data SaleDetailData) error {
	recordWriter, err := session.OpenRecordArrowWriter(blockId)
	if err != nil {
		return errors.WithStack(err)
	}

	pool := memory.NewGoAllocator()
	recordBuilder := array.NewRecordBuilder(pool, schema)
	defer recordBuilder.Release()

	for i, field := range schema.Fields() {
		fieldBuilder := recordBuilder.Field(i)

		switch field.Name {
		case "shop_name":
			builder := fieldBuilder.(*array.StringBuilder)
			builder.AppendValues(data.ShopNames, nil)
		case "customer_id":
			builder := fieldBuilder.(*array.StringBuilder)
			builder.AppendValues(data.CustomIDs, nil)
		case "total_price":
			builder := fieldBuilder.(*array.Float64Builder)
			builder.AppendValues(data.totalPrice, nil)
		}
	}

	record := recordBuilder.NewRecord()
	defer record.Release()

	err = recordWriter.WriteArrowRecord(record)
	if err != nil {
		return errors.WithStack(err)
	}

	return errors.WithStack(recordWriter.Close())
}

wait := make(chan error, len(rawData))

for i, n := 0, len(rawData); i < n; i++ {
	i := i
	blockIds[i] = i

	go func() {
		err := writeBlock(i, rawData[i])

		wait <- err
	}()
}

for i, n := 0, len(rawData); i < n; i++ {
	e := <-wait
	if e != nil {
		log.Fatalf("%+v", err)
		return
	}
}

err = session.Commit(blockIds)
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

Index

Examples

Constants

View Source
const (
	DateTransformVersion        = "v1"
	Version                     = "5"
	DefaultTcpConnectionTimeout = 10 * time.Second
)
View Source
const (
	MetaCount    = protowire.Number(33554430) // magic num 2^25-2
	MetaChecksum = protowire.Number(33554431) // magic num 2^25-1
	EndRecord    = protowire.Number(33553408) // magic num 2^25-1024
)
View Source
const DefaultChunkSize = 65536
View Source
const DeflateName = "deflate"
View Source
const SnappyFramedName = "x-snappy-framed"

Variables

View Source
var ArrowCrcErr = errors.New("crc value error when get a tunnel arrow stream")
View Source
var DeflateLevel = struct {
	NoCompression      int
	BestSpeed          int
	BestCompression    int
	DefaultCompression int
	HuffmanOnly        int
}{
	NoCompression:      flate.NoCompression,
	BestSpeed:          flate.BestSpeed,
	BestCompression:    flate.BestCompression,
	DefaultCompression: flate.DefaultCompression,
	HuffmanOnly:        flate.HuffmanOnly,
}
View Source
var InstanceSessionCfg = struct {
	WithTaskName                 func(string) InstanceOption
	WithQueryId                  func(int) InstanceOption
	WithDefaultDeflateCompressor func() InstanceOption
	WithDeflateCompressor        func(int) InstanceOption
	WithSnappyFramedCompressor   func() InstanceOption
	EnableLimit                  func() InstanceOption
}{
	WithTaskName:                 withTaskName,
	WithQueryId:                  withQueryId,
	WithDefaultDeflateCompressor: _withDefaultDeflateCompressor,
	WithDeflateCompressor:        _withDeflateCompressor,
	WithSnappyFramedCompressor:   _withSnappyFramedCompressor,
	EnableLimit:                  enableLimit,
}
View Source
var SessionCfg = struct {
	WithPartitionKey             func(string) Option
	WithSchemaName               func(string) Option
	WithDefaultDeflateCompressor func() Option
	WithDeflateCompressor        func(int) Option
	WithSnappyFramedCompressor   func() Option
	Overwrite                    func() Option
	WithShardId                  func(int) Option
	Async                        func() Option
	WithSlotNum                  func(int) Option
	WithCreatePartition          func() Option
	WithColumns                  func([]string) Option
}{
	WithPartitionKey:             withPartitionKey,
	WithSchemaName:               withSchemaName,
	WithDefaultDeflateCompressor: withDefaultDeflateCompressor,
	WithDeflateCompressor:        withDeflateCompressor,
	WithSnappyFramedCompressor:   withSnappyFramedCompressor,
	Overwrite:                    overWrite,
	WithShardId:                  withShardId,
	Async:                        async,
	WithSlotNum:                  withSlotNum,
	WithCreatePartition:          withCreatePartition,
	WithColumns:                  withColumns,
}

Functions

func Retry

func Retry(f func() error)

func WrapByCompressor

func WrapByCompressor(rc io.ReadCloser, contentEncoding string) io.ReadCloser

Types

type ArrowStreamReader

type ArrowStreamReader struct {
	// contains filtered or unexported fields
}

func NewArrowStreamReader

func NewArrowStreamReader(rc io.ReadCloser) *ArrowStreamReader

func (*ArrowStreamReader) Close

func (ar *ArrowStreamReader) Close() error

func (*ArrowStreamReader) Read

func (ar *ArrowStreamReader) Read(dst []byte) (int, error)

func (*ArrowStreamReader) ReadChunk

func (ar *ArrowStreamReader) ReadChunk() error

type ArrowStreamWriter

type ArrowStreamWriter struct {
	// contains filtered or unexported fields
}

ArrowStreamWriter calculates the crc value in chunk unit

func NewArrowStreamWriter

func NewArrowStreamWriter(w io.WriteCloser) *ArrowStreamWriter

func (*ArrowStreamWriter) Close

func (aw *ArrowStreamWriter) Close() error

func (*ArrowStreamWriter) Write

func (aw *ArrowStreamWriter) Write(data []byte) (int, error)

type Compressor

type Compressor interface {
	Name() string
	NewReader(readCloser io.ReadCloser) io.ReadCloser
	NewWriter(writeCloser io.WriteCloser) io.WriteCloser
}

type Crc32CheckSum

type Crc32CheckSum struct {
	// contains filtered or unexported fields
}

func NewCrc32CheckSum

func NewCrc32CheckSum() Crc32CheckSum

func (*Crc32CheckSum) Reset

func (crc *Crc32CheckSum) Reset()

func (*Crc32CheckSum) Update

func (crc *Crc32CheckSum) Update(data interface{})

Update can not use data of int type, as the size of int is different on 32 and 64 platform. In java the size of int is always 32 bits, so the same int data can generate different crc value when using java and go

func (*Crc32CheckSum) Value

func (crc *Crc32CheckSum) Value() uint32

type Deflate

type Deflate struct {
	// contains filtered or unexported fields
}

func (Deflate) Name

func (d Deflate) Name() string

func (Deflate) NewReader

func (d Deflate) NewReader(rc io.ReadCloser) io.ReadCloser

func (Deflate) NewWriter

func (d Deflate) NewWriter(wc io.WriteCloser) io.WriteCloser

type DownLoadStatus

type DownLoadStatus int
const (
	DownloadStatusUnknown DownLoadStatus
	DownloadStatusNormal
	DownloadStatusClosed
	DownloadStatusExpired
	DownloadStatusInitiating
)

func DownloadStatusFromStr

func DownloadStatusFromStr(s string) DownLoadStatus

func (DownLoadStatus) String

func (status DownLoadStatus) String() string

type DownloadSession

type DownloadSession struct {
	Id          string
	ProjectName string
	// TODO use schema to get the resource url of a table
	SchemaName string
	TableName  string

	Async      bool
	ShardId    int
	Compressor Compressor
	RestClient restclient.RestClient
	// contains filtered or unexported fields
}

DownloadSession is used to download table data, it can be created by Tunnel. You can use RecordCount to get the count of total records, and can create multiply RecordReader in parallel according the record count to download the data in less time. The RecordArrowReader is the only RecordReader now.

Underneath the RecordReader is the http connection, when no data occurs in it during 300s, the tunnel sever will closeRes it.

func AttachToExistedDownloadSession

func AttachToExistedDownloadSession(
	sessionId, projectName, tableName string,
	restClient restclient.RestClient,
	opts ...Option,
) (*DownloadSession, error)

AttachToExistedDownloadSession get an existed session by the session id. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.DisableArrow, disable arrow reader, using protoc reader instead. SessionCfg.ShardId, set the shard id of the table SessionCfg.Async, enable the async mode of the session which can avoiding timeout when there are many small files

func CreateDownloadSession

func CreateDownloadSession(
	projectName, tableName string,
	restClient restclient.RestClient,
	opts ...Option,
) (*DownloadSession, error)

CreateDownloadSession create a new download session before downing data. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.DisableArrow, disable arrow reader, using protoc reader instead. SessionCfg.ShardId, set the shard id of the table SessionCfg.Async, enable the async mode of the session which can avoiding timeout when there are many small files

func (*DownloadSession) ArrowSchema

func (ds *DownloadSession) ArrowSchema() *arrow.Schema

func (*DownloadSession) OpenRecordArrowReader

func (ds *DownloadSession) OpenRecordArrowReader(start, count int, columnNames []string) (*RecordArrowReader, error)

func (*DownloadSession) OpenRecordReader

func (ds *DownloadSession) OpenRecordReader(start, count int, columnNames []string) (*RecordProtocReader, error)

func (*DownloadSession) PartitionKey

func (ds *DownloadSession) PartitionKey() string

func (*DownloadSession) RecordCount

func (ds *DownloadSession) RecordCount() int

func (*DownloadSession) ResourceUrl

func (ds *DownloadSession) ResourceUrl() string

func (*DownloadSession) Schema

func (*DownloadSession) SetPartitionKey

func (ds *DownloadSession) SetPartitionKey(partitionKey string)

func (*DownloadSession) ShouldTransformDate

func (ds *DownloadSession) ShouldTransformDate() bool

func (*DownloadSession) Status

func (ds *DownloadSession) Status() DownLoadStatus

type InstanceOption

type InstanceOption func(cfg *instanceSessionConfig)

InstanceOption must be created by InstanceSessionCfg.XXX

type InstanceResultDownloadSession

type InstanceResultDownloadSession struct {
	Id            string
	InstanceId    string
	ProjectName   string
	TaskName      string
	QueryId       int
	LimitEnabled  bool
	IsLongPolling bool
	Compressor    Compressor
	RestClient    restclient.RestClient
	// contains filtered or unexported fields
}

func AttachToExistedIRDownloadSession

func AttachToExistedIRDownloadSession(
	downloadId, projectName, instanceId string,
	restClient restclient.RestClient,
	opts ...InstanceOption,
) (*InstanceResultDownloadSession, error)

func CreateInstanceResultDownloadSession

func CreateInstanceResultDownloadSession(
	projectName, instanceId string,
	restClient restclient.RestClient,
	opts ...InstanceOption,
) (*InstanceResultDownloadSession, error)

func (*InstanceResultDownloadSession) OpenRecordReader

func (is *InstanceResultDownloadSession) OpenRecordReader(
	start, count, sizeLimit int,
	columnNames []string,
) (*RecordProtocReader, error)

OpenRecordReader open a reader to read result of select. The parameter start is the start position to read the result, count is the max number records to read, sizeLit is the max bytes of the result.

func (*InstanceResultDownloadSession) RecordCount

func (is *InstanceResultDownloadSession) RecordCount() int

func (*InstanceResultDownloadSession) ResourceUrl

func (is *InstanceResultDownloadSession) ResourceUrl() string

func (*InstanceResultDownloadSession) Schema

func (*InstanceResultDownloadSession) ShouldTransformDate

func (is *InstanceResultDownloadSession) ShouldTransformDate() bool

func (*InstanceResultDownloadSession) Status

type Option

type Option func(cfg *sessionConfig)

Option can not be used directly, it can be created by SessionCfg.XXX

type ProtocStreamReader

type ProtocStreamReader struct {
	// contains filtered or unexported fields
}

func NewProtocStreamReader

func NewProtocStreamReader(r io.Reader) *ProtocStreamReader

func (*ProtocStreamReader) ReadBool

func (r *ProtocStreamReader) ReadBool() (bool, error)

func (*ProtocStreamReader) ReadBytes

func (r *ProtocStreamReader) ReadBytes() ([]byte, error)

func (*ProtocStreamReader) ReadFixed32

func (r *ProtocStreamReader) ReadFixed32() (uint32, error)

func (*ProtocStreamReader) ReadFixed64

func (r *ProtocStreamReader) ReadFixed64() (uint64, error)

func (*ProtocStreamReader) ReadFloat32

func (r *ProtocStreamReader) ReadFloat32() (float32, error)

func (*ProtocStreamReader) ReadFloat64

func (r *ProtocStreamReader) ReadFloat64() (float64, error)

func (*ProtocStreamReader) ReadInt32

func (r *ProtocStreamReader) ReadInt32() (int32, error)

func (*ProtocStreamReader) ReadInt64

func (r *ProtocStreamReader) ReadInt64() (int64, error)

func (*ProtocStreamReader) ReadSFixed32

func (r *ProtocStreamReader) ReadSFixed32() (int32, error)

func (*ProtocStreamReader) ReadSFixed64

func (r *ProtocStreamReader) ReadSFixed64() (int64, error)

func (*ProtocStreamReader) ReadSInt32

func (r *ProtocStreamReader) ReadSInt32() (int32, error)

func (*ProtocStreamReader) ReadSInt64

func (r *ProtocStreamReader) ReadSInt64() (int64, error)

func (*ProtocStreamReader) ReadString

func (r *ProtocStreamReader) ReadString() (string, error)

func (*ProtocStreamReader) ReadTag

func (*ProtocStreamReader) ReadUInt32

func (r *ProtocStreamReader) ReadUInt32() (uint32, error)

func (*ProtocStreamReader) ReadUInt64

func (r *ProtocStreamReader) ReadUInt64() (uint64, error)

func (*ProtocStreamReader) ReadVarint

func (r *ProtocStreamReader) ReadVarint() (uint64, error)

type ProtocStreamWriter

type ProtocStreamWriter struct {
	// contains filtered or unexported fields
}

func NewProtocStreamWriter

func NewProtocStreamWriter(w io.Writer) *ProtocStreamWriter

func (*ProtocStreamWriter) WriteBool

func (r *ProtocStreamWriter) WriteBool(val bool) error

func (*ProtocStreamWriter) WriteBytes

func (r *ProtocStreamWriter) WriteBytes(b []byte) error

func (*ProtocStreamWriter) WriteFixed32

func (r *ProtocStreamWriter) WriteFixed32(val uint32) error

func (*ProtocStreamWriter) WriteFixed64

func (r *ProtocStreamWriter) WriteFixed64(val uint64) error

func (*ProtocStreamWriter) WriteFloat32

func (r *ProtocStreamWriter) WriteFloat32(val float32) error

func (*ProtocStreamWriter) WriteFloat64

func (r *ProtocStreamWriter) WriteFloat64(val float64) error

func (*ProtocStreamWriter) WriteInt32

func (r *ProtocStreamWriter) WriteInt32(val int32) error

func (*ProtocStreamWriter) WriteInt64

func (r *ProtocStreamWriter) WriteInt64(val int64) error

func (*ProtocStreamWriter) WriteSInt32

func (r *ProtocStreamWriter) WriteSInt32(val int32) error

func (*ProtocStreamWriter) WriteSInt64

func (r *ProtocStreamWriter) WriteSInt64(val int64) error

func (*ProtocStreamWriter) WriteTag

func (r *ProtocStreamWriter) WriteTag(num protowire.Number, typ protowire.Type) error

func (*ProtocStreamWriter) WriteUInt32

func (r *ProtocStreamWriter) WriteUInt32(val uint32) error

func (*ProtocStreamWriter) WriteUInt64

func (r *ProtocStreamWriter) WriteUInt64(val uint64) error

func (*ProtocStreamWriter) WriteVarint

func (r *ProtocStreamWriter) WriteVarint(v uint64) error

type RecordArrowReader

type RecordArrowReader struct {
	// contains filtered or unexported fields
}

func (*RecordArrowReader) Close

func (r *RecordArrowReader) Close() error

func (*RecordArrowReader) HttpRes

func (r *RecordArrowReader) HttpRes() *http.Response

func (*RecordArrowReader) Iterator

func (r *RecordArrowReader) Iterator(f func(array.Record, error))

func (*RecordArrowReader) Read

func (r *RecordArrowReader) Read() (array.Record, error)

func (*RecordArrowReader) RecordBatchReader

func (r *RecordArrowReader) RecordBatchReader() *ipc.RecordBatchReader

type RecordArrowWriter

type RecordArrowWriter struct {
	// contains filtered or unexported fields
}

func (*RecordArrowWriter) Close

func (writer *RecordArrowWriter) Close() error

func (*RecordArrowWriter) WriteArrowRecord

func (writer *RecordArrowWriter) WriteArrowRecord(record array.Record) error

type RecordPackStreamWriter

type RecordPackStreamWriter struct {
	// contains filtered or unexported fields
}

func (*RecordPackStreamWriter) Append

func (rsw *RecordPackStreamWriter) Append(record data.Record) error

func (*RecordPackStreamWriter) DataSize

func (rsw *RecordPackStreamWriter) DataSize() int64

func (*RecordPackStreamWriter) Flush

func (rsw *RecordPackStreamWriter) Flush(timeout_ ...time.Duration) (string, error)

func (*RecordPackStreamWriter) RecordCount

func (rsw *RecordPackStreamWriter) RecordCount() int64

type RecordProtocReader

type RecordProtocReader struct {
	// contains filtered or unexported fields
}

func (*RecordProtocReader) Close

func (r *RecordProtocReader) Close() error

func (*RecordProtocReader) HttpRes

func (r *RecordProtocReader) HttpRes() *http.Response

func (*RecordProtocReader) Iterator

func (r *RecordProtocReader) Iterator(f func(record data.Record, err error))

func (*RecordProtocReader) Read

func (r *RecordProtocReader) Read() (data.Record, error)

type RecordProtocWriter

type RecordProtocWriter struct {
	// contains filtered or unexported fields
}

func (*RecordProtocWriter) Close

func (r *RecordProtocWriter) Close() error

func (*RecordProtocWriter) Write

func (r *RecordProtocWriter) Write(record data.Record) error

type SnappyFramed

type SnappyFramed int

func (SnappyFramed) Name

func (s SnappyFramed) Name() string

func (SnappyFramed) NewReader

func (s SnappyFramed) NewReader(rc io.ReadCloser) io.ReadCloser

func (SnappyFramed) NewWriter

func (s SnappyFramed) NewWriter(wc io.WriteCloser) io.WriteCloser

type StreamUploadSession

type StreamUploadSession struct {
	ProjectName string
	// TODO use schema to get the resource url of a table
	SchemaName string
	TableName  string

	Compressor      Compressor
	RestClient      restclient.RestClient
	Columns         []string
	P2PMode         bool
	CreatePartition bool
	SlotNum         int
	// contains filtered or unexported fields
}

func CreateStreamUploadSession

func CreateStreamUploadSession(
	projectName, tableName string,
	restClient restclient.RestClient,
	opts ...Option,
) (*StreamUploadSession, error)

CreateStreamUploadSession create a new stream upload session before uploading data。 The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.SlotNum, 暂不对外开放 SessionCfg.CreatePartition, create partition if the partition specified by WithPartitionKey does not exist SessionCfg.Columns, TODO 作用待明确

func (*StreamUploadSession) OpenRecordPackWriter

func (su *StreamUploadSession) OpenRecordPackWriter() *RecordPackStreamWriter

func (*StreamUploadSession) ResourceUrl

func (su *StreamUploadSession) ResourceUrl() string

func (*StreamUploadSession) Schema

type Tunnel

type Tunnel struct {
	// contains filtered or unexported fields
}

Tunnel is used to upload or download data in odps, it can also be used to download the result of sql query. From the begging of one upload or download to the ending is called a session. As some table is very big, more than one http connections are used for the upload or download, all the http connections are created by session. The timeout of session is 24 hours

The typical table upload processes are 1. create tunnel 2. create UploadSession 3. create RecordWriter, use the writer to write Record data 4. commit the data

The typical table download processes are 1. create tunnel 2. create DownloadSession 3. create RecordReader, use the reader to read out Record

func NewTunnel

func NewTunnel(odpsIns *odps.Odps, endpoint string) Tunnel

func NewTunnelFromProject

func NewTunnelFromProject(project odps.Project) (Tunnel, error)

func (*Tunnel) AttachToExistedDownloadSession

func (t *Tunnel) AttachToExistedDownloadSession(
	projectName, tableName, sessionId string,
	opts ...Option) (*DownloadSession, error)

func (*Tunnel) AttachToExistedUploadSession

func (t *Tunnel) AttachToExistedUploadSession(
	projectName, tableName, sessionId string,
	opts ...Option) (*UploadSession, error)

func (*Tunnel) CreateDownloadSession

func (t *Tunnel) CreateDownloadSession(projectName, tableName string, opts ...Option) (*DownloadSession, error)

func (*Tunnel) CreateInstanceResultDownloadSession

func (t *Tunnel) CreateInstanceResultDownloadSession(
	projectName, instanceId string, opts ...InstanceOption,
) (*InstanceResultDownloadSession, error)

func (*Tunnel) CreateStreamUploadSession

func (t *Tunnel) CreateStreamUploadSession(projectName, tableName string, opts ...Option) (*StreamUploadSession, error)

func (*Tunnel) CreateUploadSession

func (t *Tunnel) CreateUploadSession(projectName, tableName string, opts ...Option) (*UploadSession, error)

func (*Tunnel) HttpTimeout

func (t *Tunnel) HttpTimeout() time.Duration

func (*Tunnel) SetHttpTimeout

func (t *Tunnel) SetHttpTimeout(httpTimeout time.Duration)

func (*Tunnel) SetTcpConnectionTimeout

func (t *Tunnel) SetTcpConnectionTimeout(tcpConnectionTimeout time.Duration)

func (*Tunnel) TcpConnectionTimeout

func (t *Tunnel) TcpConnectionTimeout() time.Duration

type UploadSession

type UploadSession struct {
	Id          string
	ProjectName string
	// TODO use schema to get the resource url of a table
	SchemaName string
	TableName  string

	Overwrite  bool
	Compressor Compressor
	RestClient restclient.RestClient
	// contains filtered or unexported fields
}

UploadSession works as "insert into", multiply sessions for the same table or partition do not affect each other. Session id is the unique identifier of a session。

UploadSession uses OpenRecordArrowWriter to create a RecordArrowWriter or OpenRecordWriter to create a RecordProtocWriter for writing data into a table. Each RecordWriter uses a http connection to transfer data with the tunnel server, and each UploadSession can create multiply RecordWriters, so multiply http connections can be used to upload data in parallel.

A block id must be given when creating a RecordWriter, it is the unique identifier of a writer. The block id can be one number in [0, 20000)。A single RecordWriter can write at most 100G data。If multiply RecordWriters are created with the same block id, the data will be overwritten, and only the data from the writer who calls Close lastly will be kept.

The timeout of http connection used by RecordWriter is 120s, the sever will closeRes the connection when no data occurs in the connection during 120 seconds.

The Commit method must be called to notify the server that all data has been upload and the data can be written into the table

In particular, the partition keys used by a session can not contain "'", for example, "region=hangzhou" is a positive case, and "region='hangzhou'" is a negative case. But the partition keys like "region='hangzhou'" are more common, to avoid the users use the error format, the partitionKey of UploadSession is private, it can be set when creating a session or using SetPartitionKey.

func AttachToExistedUploadSession

func AttachToExistedUploadSession(
	sessionId, projectName, tableName string,
	restClient restclient.RestClient,
	opts ...Option) (*UploadSession, error)

AttachToExistedUploadSession get an existed session by the session id. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.UseArrow, it is the default config

func CreateUploadSession

func CreateUploadSession(
	projectName, tableName string,
	restClient restclient.RestClient,
	opts ...Option,
) (*UploadSession, error)

CreateUploadSession create a new upload session before uploading data。 The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data

func (*UploadSession) ArrowSchema

func (u *UploadSession) ArrowSchema() *arrow.Schema

func (*UploadSession) BlockIds

func (u *UploadSession) BlockIds() []int

func (*UploadSession) Commit

func (u *UploadSession) Commit(blockIds []int) error

func (*UploadSession) Load

func (u *UploadSession) Load() error

func (*UploadSession) OpenRecordArrowWriter

func (u *UploadSession) OpenRecordArrowWriter(blockId int) (*RecordArrowWriter, error)

func (*UploadSession) OpenRecordWriter

func (u *UploadSession) OpenRecordWriter(blockId int) (*RecordProtocWriter, error)

func (*UploadSession) PartitionKey

func (u *UploadSession) PartitionKey() string

func (*UploadSession) ResourceUrl

func (u *UploadSession) ResourceUrl() string

func (*UploadSession) Schema

func (*UploadSession) SetPartitionKey

func (u *UploadSession) SetPartitionKey(partitionKey string)

func (*UploadSession) ShouldTransform

func (u *UploadSession) ShouldTransform() bool

func (*UploadSession) Status

func (u *UploadSession) Status() UploadStatus

type UploadStatus

type UploadStatus int
const (
	UploadStatusUnknown UploadStatus
	UploadStatusNormal
	UploadStatusClosing
	UploadStatusClosed
	UploadStatusCanceled
	UploadStatusExpired
	UploadStatusCritical
	UploadStatusCommitting
)

func UploadStatusFromStr

func UploadStatusFromStr(s string) UploadStatus

func (UploadStatus) String

func (status UploadStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL