client

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: MIT Imports: 16 Imported by: 0

README

说明

文件中的查询语句样例的测试代码在 TestSemanticSegmentInstance() 中

客户端分为两部分:

  • memcache/memcache.go : 包含修改的 Set() 、Get() , 用于和cache系统交互,存入和读取字节数组

  • v2/client.go : influxdb 1.x 客户端,包含查询等功能,添加了生成语义段(semantic segment)、合并多个查询结果、客户端和cache的数据交互(influx Response结果类型和memcache字节数组之间的类型转换)的功能

各函数的详细用法参考 v2/client_test.go

向Go项目中导入客户端包
import (
    "github.com/InfluxDB-client/memcache"
    "github.com/InfluxDB-client/v2"
)

两部分的包名分别为 clientmemcache, 调用方法如下:

query := client.NewQuery("queryString", "databaseName", "ns")
mc := memcache.New("localhost:11213")
常量和全局变量修改

在 /v2/client.go 的开头 line-36

// 连接数据库
var c, err = NewHTTPClient(HTTPConfig{
    Addr: "http://10.170.48.244:8086",
    //Username: username,
    //Password: password,
})

// 连接cache
var mc = memcache.New("localhost:11213")

// 数据库中所有表的tag和field
var TagKV = GetTagKV(c, MyDB)
var Fields = GetFieldKeys(c, MyDB)

// 结果转换成字节数组时string类型占用字节数
const STRINGBYTELENGTH = 25

// 数据库名称
const (
    MyDB     = "NOAA_water_database"
    username = "root"
    password = "12345678"
)
连接数据库:
c, err := client.NewHTTPClient(client.HTTPConfig{
    Addr: "http://localhost:8086",
    //Username: username,
    //Password: password,
})
if err != nil {
    log.Fatal(err)
}

需要先在服务器启动InfluxDB,在终端执行以下命令

$influxd

如果想在终端进行查询,执行influx进入InfluxDB shell;输入use databaseName选择要用的数据库;输入SELECT查询语句

从InfluxDB查询
query := client.NewQuery(queryString, MyDB, "ns")	//构造查询
resp, err := c.Query(query)	//查询结果
if err != nil {
    t.Errorf(err.Error())
}

NewQuery()的三个参数分别为:查询语句字符串、数据库名称、时间精确度

时间精确度为空时返回结果的时间戳是字符串,指定为"ns" 或 "s" 等精度时,时间戳是int64

连接cache系统
mc := memcache.New("localhost:11213")

常用 11211 端口,不过测试代码里用的是 11213

client的Set()

由memcache的Set()封装而来,传入查询语句和创建的连接,由Set()进行查询并把结果处理成字节数组存入cache client.go line-806

func Set(queryString string, c Client, mc *memcache.Client) error 
memcache 的 Set() 和 Get()
semanticSegment := SemanticSegment(queryString, resp)	//用作set key的语义段
startTime, endTime := GetResponseTimeRange(resp)	//查询结果的时间范围
respCacheByte := resp.ToByteArray(tt.queryString)	//数据库查询结果转化为字节数组
tableNumbers := int64(len(resp.Results[0].Series))	//表的数量

err = mc.Set(&memcache.Item{Key: semanticSegment, Value: respCacheByte, Time_start: startTime, Time_end: endTime, NumOfTables: tableNumbers})

valueBytes, _, err := mc.Get(semanticSegment, startTime, endTime) // 接收cache返回的字节数组

Get(key string, start_time int64, end_time int64) (itemValues []byte, item *Item, err error) : memcache.go line-355 -> getFromAddr line-411 -> parseGetResponse line-547 (从cache读取数据的核心部分)

for { //每次读入一行,直到 Reader 为空
    line, err := r.ReadBytes('\n') //从查寻结果中读取一行, 换行符对应的字节码是 10, 如果数据中有 int64 类型的 10,读取时会把数字错误当作换行符,不过对结果的字节数组没有影响,不用处理
    if err != nil {
       return err
    }
    if bytes.Equal(line, resultEnd) { // get 命令查询结束后会在数据末尾添加 "END", 如果读到 END,说明数据读取完毕
       return nil
    }
    it := new(Item)
    it.Value = line // 读取value的值
    if err != nil {
       it.Value = nil
       return err
    }
    cb(it)
    *itemValues = append(*itemValues, it.Value...)
}

Set(item *Item) error : memcache.go line-638 -> (*Client).set line-642 -> populateOne() line-708

_, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n", //用item的基本信息构建命令的第一行,存入rw buffer 如:set user 0 0 3
    verb, item.Key, len(item.Value), item.Time_start, item.Time_end) // set key len st et  memcache的set格式暂时是这样
//_, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n",
//  verb, item.Key, item.Time_start, item.Time_end, item.NumOfTables)  //最终格式是这样的(大概),最后一个参数是结果中表的数量
客户端查询结果的结构(Response)
[	-	-	-	>	Response

​	Results[]	{	-	-	-	-	->	根据 statement_id 区别;InfluxDB不再支持一次执行多条查询,所以只有一个Results[0]

​							statement_id	int	(从 0 开始计数)

​							Series[]	[	-	-	->	series[0]	(查询结果的表,比如用 GROUP BY 时会有多个Series,根据 tags 值的数量)

​													name			string

​													tags			map[string]string	-	-	-	->	不用 GROUP BY 时为空

​													columns			[]string

​													Values{		  	[] [] interface{}

​																		field_value[0]

​																		......

​															}

​													partial			bool

​										]

​										[	-	-	->	series[1]
													......
​										]

​										.......

​							message[]	{	(正常为  “[]” )

​													Level		string

​													Text	  	string		 

​										}

​							Error	( 正常没有 )

​				}

​	Error	(正常没有)	

]

示例:

[    
	{   0   -   -   -   -   -   -   -   -   -   ->  statement_id        
		[   -   -   -   -   -   -   -   -   -   ->  series            
			{   h2o_pH  -   -   -   -   -   -   ->  measurement name                
				map[location:coyote_creek]  -   ->  GROUPE BY tags                
				[time pH]   -   -   -   -   -   ->  columns                
				[   -   -   -   -   -   -   -   ->  values                    
					[2019-08-17T00:00:00Z 7]                     
					[2019-08-17T00:06:00Z 8]                     
					[2019-08-17T00:12:00Z 8]                     
					[2019-08-17T00:18:00Z 7]                     
					[2019-08-17T00:24:00Z 7]                
				]                 
				false   -   -   -   -   -   -   ->  partial             
			}             
			{   h2o_pH                  
				map[location:santa_monica]  -   ->  GROUPE BY tags                
				[time pH]                 
				[                    
					[2019-08-17T00:00:00Z 6]                     
                    [2019-08-17T00:06:00Z 6]                     
                    [2019-08-17T00:12:00Z 6]                     
                    [2019-08-17T00:18:00Z 7]                     
                    [2019-08-17T00:24:00Z 8]                
				]                 
				false            
			}       
		]         
		[]  -   -   -   -   -   -   -   -   -   ->  message    
	}
]
获取语义段(semantic segment)
// v2/client.go line-1166
func SemanticSegment(queryString string, response *Response) string 

根据查询语句和数据库返回数据组成字段,用作存入cache的key {SM}#{SF}#{SP}#{SG}

ss := SemanticSegment(queryString, response)

结果:测试代码 v2/client_test.go line-1678

"{(h2o_quality.location=coyote_creek,h2o_quality.randtag=2)(h2o_quality.location=coyote_creek,h2o_quality.randtag=3)(h2o_quality.location=santa_monica,h2o_quality.randtag=2)(h2o_quality.location=santa_monica,h2o_quality.randtag=3)}#{time[int64],index[int64]}#{(index>=50[int64])}#{max,12m}"
合并查询结果
// v2/client.go line-796
func Merge(precision string, resps ...*Response) []*Response 

precision:允许合并的时间误差精度 "h"/"ns"/"us" 等

resps ... : 要合并的多个查询结果,可以整合成一个数组,也可以直接挨个传入

返回合并后的结果数组

先按照结果数据的起止时间排序,然后遍历合并

用法:详细过程看测试代码 v2/client_test.go line-2923

merged := Merge("h", resps...)
Response转化为字节数组
// v2/client.go line-1714
func (resp *Response) ToByteArray(queryString string) []byte

格式:

"{(h2o_quality.location=coyote_creek,h2o_quality.randtag=1)}#{time[int64],index[int64],location[string],randtag[string]}#{(location='coyote_creek'[string])}#{empty,empty} [0 0 0 0 0 0 0 198]\r\n" +
    "[1566086760000000000 66 coyote_creek 1]\r\n" +
    "[1566087480000000000 91 coyote_creek 1]\r\n" +
    "[1566087840000000000 29 coyote_creek 1]\r\n" +
"{(h2o_quality.location=coyote_creek,h2o_quality.randtag=2)}#{time[int64],index[int64],location[string],randtag[string]}#{(location='coyote_creek'[string])}#{empty,empty} [0 0 0 0 0 0 0 66]\r\n" +
    "[1566087120000000000 78 coyote_creek 2]\r\n" +
"{(h2o_quality.location=coyote_creek,h2o_quality.randtag=3)}#{time[int64],index[int64],location[string],randtag[string]}#{(location='coyote_creek'[string])}#{empty,empty} [0 0 0 0 0 0 0 132]\r\n" +
    "[1566086400000000000 85 coyote_creek 3]\r\n" +
    "[1566088200000000000 75 coyote_creek 3]\r\n"

转换成字节数组的数据之间暂时加上了换行符,如果不用的话把下面一行代码注释掉:

/* 如果传入cache的数据之间不需要换行,就把这一行注释掉 */
result = append(result, []byte("\r\n")...) // 每条数据之后换行

测试代码 v2/client_test.go line-3561

字节数组转换成Response
// v2/client.go line-1765
func ByteArrayToResponse(byteArray []byte) *Response 

如果返回的字节数组的数据之间没有换行,把下面一行注释掉:

/* 如果cache传回的数据之间不需要换行符,把这一行注释掉 */
index += 2 // 跳过每行数据之间的换行符CRLF,处理下一行数据

测试代码 v2/client_test.go line-3636

Documentation

Overview

Package client implements a now-deprecated client for InfluxDB; use github.com/influxdata/influxdb1-client/v2 instead.

Index

Examples

Constants

View Source
const (
	// DefaultHost is the default host used to connect to an InfluxDB instance
	DefaultHost = "localhost"

	// DefaultPort is the default port used to connect to an InfluxDB instance
	DefaultPort = 8086

	// DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance
	DefaultTimeout = 0
)
View Source
const (
	// ConsistencyOne requires at least one data node acknowledged a write.
	ConsistencyOne = "one"

	// ConsistencyAll requires all data nodes to acknowledge a write.
	ConsistencyAll = "all"

	// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
	ConsistencyQuorum = "quorum"

	// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
	ConsistencyAny = "any"
)

Variables

This section is empty.

Functions

func EpochToTime

func EpochToTime(epoch int64, precision string) (time.Time, error)

EpochToTime takes a unix epoch time and uses precision to return back a time.Time

func ParseConnectionString

func ParseConnectionString(path string, ssl bool) (url.URL, error)

ParseConnectionString will parse a string to create a valid connection URL

func SetPrecision

func SetPrecision(t time.Time, precision string) time.Time

SetPrecision will round a time to the specified precision

Types

type BatchPoints

type BatchPoints struct {
	Points           []Point           `json:"points,omitempty"`
	Database         string            `json:"database,omitempty"`
	RetentionPolicy  string            `json:"retentionPolicy,omitempty"`
	Tags             map[string]string `json:"tags,omitempty"`
	Time             time.Time         `json:"time,omitempty"`
	Precision        string            `json:"precision,omitempty"`
	WriteConsistency string            `json:"-"`
}

BatchPoints is used to send batched data in a single write. Database and Points are required If no retention policy is specified, it will use the databases default retention policy. If tags are specified, they will be "merged" with all points. If a point already has that tag, it will be ignored. If time is specified, it will be applied to any point with an empty time. Precision can be specified if the time is in epoch format (integer). Valid values for Precision are n, u, ms, s, m, and h

func (*BatchPoints) UnmarshalJSON

func (bp *BatchPoints) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the BatchPoints struct

type ChunkedResponse

type ChunkedResponse struct {
	// contains filtered or unexported fields
}

ChunkedResponse represents a response from the server that uses chunking to stream the output.

func NewChunkedResponse

func NewChunkedResponse(r io.Reader) *ChunkedResponse

NewChunkedResponse reads a stream and produces responses from the stream.

func (*ChunkedResponse) NextResponse

func (r *ChunkedResponse) NextResponse() (*Response, error)

NextResponse reads the next line of the stream and returns a response.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is used to make calls to the server.

func NewClient

func NewClient(c Config) (*Client, error)

NewClient will instantiate and return a connected client to issue commands to the server.

Example
package main

import (
	"fmt"
	"log"
	"net/url"
	"os"

	client "github.com/influxdata/influxdb1-client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}

	// NOTE: this assumes you've setup a user and have setup shell env variables,
	// namely INFLUX_USER/INFLUX_PWD. If not just omit Username/Password below.
	conf := client.Config{
		URL:      *host,
		Username: os.Getenv("INFLUX_USER"),
		Password: os.Getenv("INFLUX_PWD"),
	}
	con, err := client.NewClient(conf)
	if err != nil {
		log.Fatal(err)
	}
	log.Println("Connection", con)
}
Output:

func (*Client) Addr

func (c *Client) Addr() string

Addr provides the current url as a string of the server the client is connected to.

func (*Client) Ping

func (c *Client) Ping() (time.Duration, string, error)

Ping will check to see if the server is up Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.

Example
package main

import (
	"fmt"
	"log"
	"net/url"

	client "github.com/influxdata/influxdb1-client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	dur, ver, err := con.Ping()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("Happy as a hippo! %v, %s", dur, ver)
}
Output:

func (*Client) Query

func (c *Client) Query(q Query) (*Response, error)

Query sends a command to the server and returns the Response

Example
package main

import (
	"fmt"
	"log"
	"net/url"

	client "github.com/influxdata/influxdb1-client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	q := client.Query{
		Command:  "select count(value) from shapes",
		Database: "square_holes",
	}
	if response, err := con.Query(q); err == nil && response.Error() == nil {
		log.Println(response.Results)
	}
}
Output:

func (*Client) QueryContext

func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error)

QueryContext sends a command to the server and returns the Response It uses a context that can be cancelled by the command line client

func (*Client) SetAuth

func (c *Client) SetAuth(u, p string)

SetAuth will update the username and passwords

func (*Client) SetPrecision

func (c *Client) SetPrecision(precision string)

SetPrecision will update the precision

func (*Client) Write

func (c *Client) Write(bp BatchPoints) (*Response, error)

Write takes BatchPoints and allows for writing of multiple points with defaults If successful, error is nil and Response is nil If an error occurs, Response may contain additional information if populated.

Example
package main

import (
	"fmt"
	"log"
	"math/rand"
	"net/url"
	"strconv"
	"time"

	client "github.com/influxdata/influxdb1-client"
)

func main() {
	host, err := url.Parse(fmt.Sprintf("http://%s:%d", "localhost", 8086))
	if err != nil {
		log.Fatal(err)
	}
	con, err := client.NewClient(client.Config{URL: *host})
	if err != nil {
		log.Fatal(err)
	}

	var (
		shapes     = []string{"circle", "rectangle", "square", "triangle"}
		colors     = []string{"red", "blue", "green"}
		sampleSize = 1000
		pts        = make([]client.Point, sampleSize)
	)

	rand.Seed(42)
	for i := 0; i < sampleSize; i++ {
		pts[i] = client.Point{
			Measurement: "shapes",
			Tags: map[string]string{
				"color": strconv.Itoa(rand.Intn(len(colors))),
				"shape": strconv.Itoa(rand.Intn(len(shapes))),
			},
			Fields: map[string]interface{}{
				"value": rand.Intn(sampleSize),
			},
			Time:      time.Now(),
			Precision: "s",
		}
	}

	bps := client.BatchPoints{
		Points:          pts,
		Database:        "BumbeBeeTuna",
		RetentionPolicy: "default",
	}
	_, err = con.Write(bps)
	if err != nil {
		log.Fatal(err)
	}
}
Output:

func (*Client) WriteLineProtocol

func (c *Client) WriteLineProtocol(data, database, retentionPolicy, precision, writeConsistency string) (*Response, error)

WriteLineProtocol takes a string with line returns to delimit each write If successful, error is nil and Response is nil If an error occurs, Response may contain additional information if populated.

type Config

type Config struct {
	URL              url.URL
	UnixSocket       string
	Username         string
	Password         string
	UserAgent        string
	Timeout          time.Duration
	Precision        string
	WriteConsistency string
	UnsafeSsl        bool
	Proxy            func(req *http.Request) (*url.URL, error)
	TLS              *tls.Config
}

Config is used to specify what server to connect to. URL: The URL of the server connecting to. Username/Password are optional. They will be passed via basic auth if provided. UserAgent: If not provided, will default "InfluxDBClient", Timeout: If not provided, will default to 0 (no timeout)

func NewConfig

func NewConfig() Config

NewConfig will create a config to be used in connecting to the client

type Message

type Message struct {
	Level string `json:"level,omitempty"`
	Text  string `json:"text,omitempty"`
}

Message represents a user message.

type Point

type Point struct {
	Measurement string
	Tags        map[string]string
	Time        time.Time
	Fields      map[string]interface{}
	Precision   string
	Raw         string
}

Point defines the fields that will be written to the database Measurement, Time, and Fields are required Precision can be specified if the time is in epoch format (integer). Valid values for Precision are n, u, ms, s, m, and h

func (*Point) MarshalJSON

func (p *Point) MarshalJSON() ([]byte, error)

MarshalJSON will format the time in RFC3339Nano Precision is also ignored as it is only used for writing, not reading Or another way to say it is we always send back in nanosecond precision

func (*Point) MarshalString

func (p *Point) MarshalString() string

MarshalString renders string representation of a Point with specified precision. The default precision is nanoseconds.

func (*Point) UnmarshalJSON

func (p *Point) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Point struct

type Query

type Query struct {
	Command  string
	Database string

	// RetentionPolicy tells the server which retention policy to use by default.
	// This option is only effective when querying a server of version 1.6.0 or later.
	RetentionPolicy string

	// Chunked tells the server to send back chunked responses. This places
	// less load on the server by sending back chunks of the response rather
	// than waiting for the entire response all at once.
	Chunked bool

	// ChunkSize sets the maximum number of rows that will be returned per
	// chunk. Chunks are either divided based on their series or if they hit
	// the chunk size limit.
	//
	// Chunked must be set to true for this option to be used.
	ChunkSize int

	// NodeID sets the data node to use for the query results. This option only
	// has any effect in the enterprise version of the software where there can be
	// more than one data node and is primarily useful for analyzing differences in
	// data. The default behavior is to automatically select the appropriate data
	// nodes to retrieve all of the data. On a database where the number of data nodes
	// is greater than the replication factor, it is expected that setting this option
	// will only retrieve partial data.
	NodeID int
}

Query is used to send a command to the server. Both Command and Database are required.

type Response

type Response struct {
	Results []Result
	Err     error
}

Response represents a list of statement results.

func (*Response) Error

func (r *Response) Error() error

Error returns the first error from any statement. Returns nil if no errors occurred on any statements.

func (*Response) MarshalJSON

func (r *Response) MarshalJSON() ([]byte, error)

MarshalJSON encodes the response into JSON.

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Response struct

type Result

type Result struct {
	Series   []models.Row
	Messages []*Message
	Err      error
}

Result represents a resultset returned from a single statement.

func (*Result) MarshalJSON

func (r *Result) MarshalJSON() ([]byte, error)

MarshalJSON encodes the result into JSON.

func (*Result) UnmarshalJSON

func (r *Result) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the data into the Result struct

Directories

Path Synopsis
Package memcache provides a client for the memcached cache server.
Package memcache provides a client for the memcached cache server.
Package models implements basic objects used throughout the TICK stack.
Package models implements basic objects used throughout the TICK stack.
pkg
escape
Package escape contains utilities for escaping parts of InfluxQL and InfluxDB line protocol.
Package escape contains utilities for escaping parts of InfluxQL and InfluxDB line protocol.
Package client (v2) is the current official Go client for InfluxDB.
Package client (v2) is the current official Go client for InfluxDB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL