frames

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2023 License: Apache-2.0 Imports: 19 Imported by: 1

README

V3IO Frames

GoDoc License

V3IO Frames ("Frames") is a multi-model open-source data-access library that provides a unified high-performance DataFrame API for working with different types of data sources (backends). The library was developed by Iguazio to simplify working with data in the Iguazio Data Science Platform ("the platform"), but it can be extended to support additional backend types.

In This Document

Client Python API Reference

Overview

Python Version

The current version of Frames supports Python 3.6 and 3.7.

Initialization

To use Frames, you first need to import the v3io_frames Python library. For example:

import v3io_frames as v3f

Then, you need to create and initialize an instance of the Client class; see Client Constructor. You can then use the client methods to perform different data operations on the supported backend types.

Backend Types

All Frames client methods receive a backend parameter for setting the Frames backend type. Frames currently supports the following backend types:

  • nosql | kv — a platform NoSQL (key/value) table.

    Note: The documentation uses the "nosql" alias to the "kv" type, which was added in Frames v0.6.10-v0.9.13; "kv" is still supported for backwards compatibility with earlier releases.

  • stream — a platform data stream [Tech Preview].
  • tsdb — a time-series database (TSDB).
  • csv — a comma-separated-value (CSV) file. This backend type is used only for testing purposes.

Client Methods

The Client class features the following methods for supporting operations on a data collection, such as a NoSQL or TSDB table or a data stream:

  • create — creates a new collection.
  • delete — deletes a collection or specific items of the collection.
  • read — reads data from a collection into pandas DataFrames.
  • write — writes data from pandas DataFrames to a collection.
  • execute — executes a backend-specific command on a collection. Each backend may support multiple commands.
  • history — returns information about requests made to the service.

Note: Some methods or method parameters are backend-specific, as detailed in this reference.

User Authentication

When creating a Frames client, you must provide valid credentials for accessing the backend data, which Frames will use to identify the identity of the user. This can be done by using any of the following alternative methods (documented in order of precedence).

  • Provide the authentication credentials in the call to the Client constructor — either by setting the token parameter to a valid authentication token (access key) or by setting the user and password parameters to a username and password. Note that you cannot set the token parameter concurrently with the username and password parameters.

  • Provide the authentication credentials in environment variables — either by setting the V3IO_ACCESS_KEY variable to an authentication token or by setting the V3IO_USERNAME and V3IO_PASSWORD variables to a username and password.

    Note:

    • When V3IO_ACCESS_KEY is defined, V3IO_USERNAME and V3IO_PASSWORD are ignored.
    • When the client constructor is called with authentication parameters (option #1), the authentication-credentials environment variables (if defined) are ignored.

Client Constructor

All Frames operations are executed via an object of the Client class.

Syntax
Client(address=""[, data_url=""], container=""[, user="", password="", token=""])

Parameters and Data Members
  • address — The address of the Frames service (framesd). Use the grpc:// prefix for gRPC (default; recommended) or the http:// prefix for HTTP. When running locally on the platform, set this parameter to framesd:8081 to use the gRPC (recommended) or to framesd:8080 to use HTTP.

    • Type: str
    • Requirement: Required
  • data_url — A web-API base URL for accessing the backend data. By default, the client uses the data URL that's configured for the Frames service; for the platform backends, this is typically the HTTPS URL of the web-APIs service of the parent tenant.

    • Type: str
    • Requirement: Optional
  • container — The name of the data container that contains the backend data. For example, "bigdata" or "users".

    • Type: str
    • Requirement: Required
  • user — The username of a user with permissions to access the backend data. See User Authentication.

    • Type: str
    • Requirement: Required when neither the token parameter or the authentication environment variables are set.
      When the user parameter is set, the password parameter must also be set to a matching user password.
  • password — A valid password for the user configured in the user parameter. See User Authentication.

    • Type: str
    • Requirement: Required when the user parameter is set.
  • token — A valid token that allows access to the backend data, such as a platform access key for the platform backends. See User Authentication.

    • Type: str
    • Requirement: Required when neither the user or password parameters or the authentication environment variables are set.

Return Value

Returns a new Frames Client data object.

Examples

The following examples, for local platform execution, both create a Frames client for accessing data in the "users" container by using the authentication credentials of user "iguazio"; the first example uses token (access-key) authentication while the second example uses username and password authentication (see User Authentication):

import v3io_frames as v3f
client = v3f.Client("framesd:8081", token="e8bd4ca2-537b-4175-bf01-8c74963e90bf", container="users")
import v3io_frames as v3f
client = v3f.Client("framesd:8081", user="iguazio", password="mypass", container="users")

Common Client Method Parameters

All client methods receive the following common parameters; additional, method-specific parameters are described for each method.

  • backend — The backend data type for the operation. See Backend Types.

    • Type: str
    • Requirement: Required
    • Valid Values: "nosql" | "stream" | "tsdb" | "csv" (for testing)
  • table — The relative path to a data collection of the specified backend type in the target data container (as configured for the client object). For example, "mytable" or "/examples/tsdb/my_metrics".

    • Type: str
    • Requirement: Required unless otherwise specified in the method-specific documentation

create Method

Creates a new data collection in the configured client data container, according to the specified backend type.

Note: The create method isn't applicable to the nosql backend, because NoSQL tables in the platform don't need to be created prior to ingestion; when ingesting data into a table that doesn't exist, the table is automatically created.

Syntax
create(backend, table, schema=None, if_exists=FAIL, **kw)

Common create Parameters

All Frames backends that support the create method support the following common parameters:

  • if_exists — Determines whether to raise an error when the specified collection (table) already exists.

    • Type: pb.ErrorOptions enumeration. To use the enumeration, import the frames_pb2 module; for example:

      from v3io_frames import frames_pb2 as fpb
      
    • Requirement: Optional
    • Valid Values: FAIL to raise an error when the specified collection already exist; IGNORE to ignore this
    • Default Value: FAIL
  • schema — a schema for describing unstructured collection data. This parameter is intended to be used only for testing purposes with the csv backend.

    • Type: Backend-specific or None
    • Requirement: Optional
    • Default Value: None
  • kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.

    • Type: ** — variable-length keyword arguments list
    • Requirement: Optional

tsdb Backend create Parameters

The following create parameters are specific to the tsdb backend and are passed as keyword arguments via the kw parameter:

  • rate — metric-samples ingestion rate.

    • Type: str
    • Requirement: Required
    • Valid Values: A string of the format "[0-9]+/[smh]" — where 's' = seconds, 'm' = minutes, and 'h' = hours. For example, "1/s" (one sample per minute), "20/m" (20 samples per minute), or "50/h" (50 samples per hour).
  • aggregates — A list of aggregation functions for real-time aggregation during the samples ingestion ("pre-aggregation").

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing a comma-separated list of supported aggregation functions — avg| count| last| max| min| rate| stddev| stdvar| sum. For example, "count,avg,min,max".
  • aggregation_granularity — Aggregation granularity; applicable when the aggregates parameter is set.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string of the format "[0-9]+[mhd]" — where 'm' = minutes, 'h' = hours, and 'd' = days. For example, "30m" (30 minutes), "2h" (2 hours), or "1d" (1 day).
    • Default Value: "1h" (1 hour)

stream Backend create Parameters

The following create parameters are specific to the stream backend and are passed as keyword arguments via the kw parameter:

  • shards — The number of stream shards to create.

    • Type: int
    • Requirement: Optional
    • Default Value: 1
    • Valid Values: A positive integer (>= 1). For example, 100.
  • retention_hours — The stream's retention period, in hours.

    • Type: int
    • Requirement: Optional
    • Default Value: 24
    • Valid Values: A positive integer (>= 1). For example, 2 (2 hours).

create Examples

tsdb Backend
client.create("tsdb", table="mytsdb", rate="10/m")
client.create("tsdb", table="/tsdb/my_metrics", rate="1/s", aggregates="count,avg,min,max", aggregation_granularity="1h")

stream Backend
client.create("stream", table="/mystream", shards=3)
client.create("stream", table="/my_streams/stream1", retention_hours=2)

write Method

Writes data from a DataFrame to a data collection, according to the specified backend type.

Syntax
write(backend, table, dfs, expression='', condition='', labels=None,
    max_rows_in_msg=0, index_cols=None, save_mode='createNewItemsOnly',
    partition_keys=None):

Note: The expression parameter isn't supported in the current release.

Common write Parameters

All Frames backends that support the write method support the following common parameters:

  • dfs — One or more DataFrames containing the data to write.

    • Type: A single DataFrame, a list of DataFrames, or a DataFrames iterator
    • Requirement: Required
  • index_cols — A list of column (attribute) names to be used as index columns for the write operation, regardless of any index-column definitions in the DataFrame. By default, the DataFrame's index columns are used.

    Note: The significance and supported number of index columns is backend specific. For example, the nosql backend supports only a single index column for the primary-key item attribute, while the tsdb backend supports additional index columns for metric labels.

    • Type: []str
    • Requirement: Optional
    • Default Value: None
  • labels — This parameter is currently applicable only to the tsdb backend (although it's available for all backends) and is therefore documented as part of the write method's tsdb backend parameters.

    • Type: dict
    • Requirement: Optional
  • save_mode — This parameter is currently applicable only to the nosql backend, and is therefore documented as part of the write method's nosql backend parameters.

    • Type: str
    • Requirement: Optional
  • max_rows_in_msg — Maximum number of rows to write in each message (write chunk size).

    • Type: int
    • Requirement: Optional
    • Default Value: 0

nosql Backend write Parameters

The following write parameters are specific to the nosql backend:

  • condition — A platform condition expression that defines conditions for performing the write operation.

    • Type: str
    • Requirement: Optional
  • save_mode — Save mode, which determines in which circumstances to write new item to the table.

    • Type: str
    • Requirement: Optional
    • Valid Values:
      • createNewItemsOnly — write only new items; don't replace or update any existing table item with the same name (primary-key attribute value) as a written item.
      • "updateItem" — update items; add new items and update the attributes of existing table items.
      • "overwriteItem" — overwrite items; add new items and replace any existing table item with the same name as a written item.
      • "errorIfTableExists" — create a new table only; only write items if the target table doesn't already exist.
      • "overwriteTable" — overwrite the table; replace all existing table items (if any) with the written items.
    • Default Value: createNewItemsOnly

tsdb Backend write Parameters

The following write parameter descriptions are specific to the tsdb backend:

  • labels — A dictionary of metric labels of the format {<label>: <value>[, <label>: <value>, ...]} to apply to all the DataFrame rows. For example, {"os": "linux", "arch": "x86"}.

    • Type: dict
    • Requirement: Optional
    • Default Value: None

write Examples

nosql Backend
data = [["tom", 10, "TLV"], ["nick", 15, "Berlin"], ["juli", 14, "NY"]]
df = pd.DataFrame(data, columns = ["name", "age", "city"])
df.set_index("name", inplace=True)
client.write(backend="nosql", table="mytable", dfs=df, condition="age>14")

tsdb Backend
from datetime import datetime
df = pd.DataFrame(data=[[30.1, 12.7]], index=[[datetime.now()], ["1"]],
                  columns=["cpu", "disk"])
df.index.names = ["time", "node"]
client.write(backend="tsdb", table="mytsdb", dfs=df)

stream Backend
import numpy as np
df = pd.DataFrame(np.random.rand(9, 3) * 100,
                  columns=["cpu", "mem", "disk"])
client.write("stream", table="mystream", dfs=df)

read Method

Reads data from a data collection to a DataFrame, according to the specified backend type.

Syntax
read(backend, table='', query='', columns=None, filter='', group_by='',
    limit=0, data_format='', row_layout=False, max_rows_in_msg=0, marker='',
    iterator=False, get_raw=False, **kw)

Note: The limit, data_format, row_layout, and marker parameters aren't supported in the current release, and get_raw is for internal use only.

Common read Parameters

All Frames backends that support the read method support the following common parameters:

  • iterator — set to True to to return a pandas DataFrames iterator; False (default) returns a single DataFrame.

    • Type: bool
    • Requirement: Optional
    • Default Value: False
  • filter — A query filter. For example, filter="col1=='my_value'".
    This parameter is currently applicable only to the nosql and tsdb backends, and cannot be used concurrently with the query parameter of the tsdb backend.

    • Type: str
    • Requirement: Optional
  • columns — A list of attributes (columns) to return.
    This parameter is currently applicable only to the nosql and tsdb backends, and cannot be used concurrently with the query parameter of the tsdb backend.

    • Type: []str
    • Requirement: Optional
  • kw — This parameter is used for passing a variable-length list of additional keyword (named) arguments. For more information, see the backend-specific method parameters.

    • Type: ** — variable-length keyword arguments list
    • Requirement: Optional

nosql Backend read Parameters

The following read parameters are specific to the nosql backend:

  • max_rows_in_msg — The maximum number of rows per message.

    • Type: int
    • Requirement: Optional

The following parameters are passed as keyword arguments via the kw parameter:

  • reset_index — Set to True to reset the index column of the returned DataFrame and use the auto-generated pandas range-index column; False (default) sets the index column to the table's primary-key attribute.

    • Type: bool
    • Requirement: Optional
    • Default Value: False
  • sharding_keys [Tech Preview] — A list of specific sharding keys to query, for range-scan formatted tables only.

    • Type: []str
    • Requirement: Optional

tsdb Backend read Parameters

The following read parameters are specific to the tsdb backend:

  • group_by [Tech Preview] — A group-by query string.
    This parameter cannot be used concurrently with the query parameter.

    • Type: str
    • Requirement: Optional
  • query [Tech Preview] — A query string in SQL format.

    Note:

    • When setting the query parameter, you must provide the path to the TSDB table as part of the FROM clause in the query string and not in the read method's table parameter.
    • This parameter cannot be set concurrently with the following parameters: aggregators, columns, filter, or group_by parameters.
    • Type: str
    • Requirement: Optional

The following parameters are passed as keyword arguments via the kw parameter:

  • start — Start (minimum) time for the read operation.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2016-01-02T15:34:26Z"; "1451748866"; "now-90m"; "0".
    • Default Value: <end time> - 1h
  • end — End (maximum) time for the read operation.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2018-09-26T14:10:20Z"; "1537971006000"; "now-3h"; "now-7d".
    • Default Value: now
  • step — The query aggregation or downsampling step. The default step is the query's time range, which can be configured via the start and end parameters.

    • Type: str
    • Requirement: Optional
  • aggregators — Aggregation information to return, as a comma-separated list of supported aggregation functions ("aggregators").
    This parameter cannot be used concurrently with the query parameter.

    • Type: str
    • Requirement: Optional
    • Valid Value: The following aggregation functions are supported for over-time aggregation (across each unique label set); for cross-series aggregation (across all metric labels), add "_all" to the end of the function name:
      avg | count | last | max | min | rate | stddev | stdvar | sum
  • aggregation_window [Tech Preview] — Aggregation interval for applying over-time aggregation functions, if set in the aggregators or query parameters.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string of the format "[0-9]+[mhd]" where 'm' = minutes, 'h' = hours, and 'd' = days. For example, "30m" (30 minutes), "2h" (2 hours), or "1d" (1 day).
    • Default Value: The query's aggregation step
  • multi_index — set to True to display labels as index columns in the read results; False (default) displays only the metric's sample time as an index column.

    • Type: bool
    • Requirement: Optional
    • Default Value: False

stream Backend read Parameters

The following read parameters are specific to the stream backend and are passed as keyword arguments via the kw parameter:

  • seek — Seek type.
    When the "seq" or "sequence" seek type is set, you must set the sequence parameter to the desired record sequence number.
    When the time seek type is set, you must set the start parameter to the desired seek start time.

    • Type: str
    • Requirement: Required
    • Valid Values: "time" | "seq" | "sequence" | "latest" | "earliest"
  • shard_id — The ID of the stream shard from which to read.

    • Type: str
    • Requirement: Required
    • Valid values: "0" ... "<stream shard count> - 1"
  • sequence — The sequence number of the record from which to start reading.

    • Type: int64
    • Requirement: Required
  • start — The earliest record ingestion time from which to start reading.

    • Type: str
    • Requirement: Required when seek = "time"
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2016-01-02T15:34:26Z"; "1451748866"; "now-90m"; "0".

Return Value
  • When the value of the iterator parameter is False (default) — returns a single DataFrame.
  • When the value of the iterator parameter is True — returns a DataFrames iterator.

read Examples

nosql Backend
df = client.read(backend="nosql", table="mytable", filter="col1>666")

tsdb Backend
df = client.read("tsdb", table="mytsdb" start="0", multi_index=True)
df = client.read(backend="tsdb", query="select avg(cpu) as cpu, avg(disk) from 'mytsdb' where node='1'", start="now-1d", end="now", step="2h")

stream Backend
df = client.read(backend="stream", table="mystream", seek="latest", shard_id="5")

delete Method

Deletes a data collection or specific collection items, according to the specified backend type.

Syntax
delete(backend, table, filter='', start='', end='', if_missing=FAIL

Common delete Parameters
  • if_missing — Determines whether to raise an error when the specified collection (table) doesn't exist.

    • Type: pb.ErrorOptions enumeration. To use the enumeration, import the frames_pb2 module; for example:

      from v3io_frames import frames_pb2 as fpb
      
    • Requirement: Optional
    • Valid Values: FAIL to raise an error when the specified collection doesn't exist; IGNORE to ignore this
    • Default Value: FAIL

nosql Backend delete Parameters

The following delete parameters are specific to the nosql backend:

  • filter — A filter expression that identifies specific items to delete.

    • Type: str
    • Requirement: Optional
    • Default Value: "" — delete the entire table and its schema file

tsdb Backend delete Parameters

The following delete parameters are specific to the tsdb backend:

  • start — Start (minimum) time for the delete operation — i.e., delete only items whose data sample time is at or after (>=) the specified start time.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2016-01-02T15:34:26Z"; "1451748866"; "now-90m"; "0".
    • Default Value: "" when neither start nor end are set — delete the entire table and its schema file (.schema); 0 when end is set
  • endstr — End (maximum) time for the delete operation — i.e., delete only items whose data sample time is before or at (<=) the specified end time.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2018-09-26T14:10:20Z"; "1537971006000"; "now-3h"; "now-7d".
    • Default Value: "" when neither start nor end are set — delete the entire table and its schema file (.schema); 0 when start is set

Note:

  • When neither the start nor end parameters are set, the entire TSDB table and its schema file are deleted.
  • Only full table partitions within the specified time frame (as determined by the start and end parameters) are deleted. Items within the specified time frames that reside within partitions that begin before the delete start time or end after the delete end time aren't deleted. The partition interval is calculated automatically based on the table's ingestion rate and is stored in the TSDB's partitionerInterval schema field (see the .schema file).

delete Examples

nosql Backend
client.delete(backend="nosql", table="mytable", filter="age > 40")

tsdb Backend
client.delete(backend="tsdb", table="mytsdb", start="now-1d", end="now-5h")

stream Backend
from v3io_frames import frames_pb2 as fpb
client.delete(backend="stream", table="mystream", if_missing=fpb.IGNORE)

execute Method

Extends the basic CRUD functionality of the other client methods via backend-specific commands for performing operations on a data collection.

Note: Currently, no execute commands are available for the tsdb backend.

Syntax
execute(backend, table, command="", args=None)

Common execute Parameters

All Frames backends that support the execute method support the following common parameters:

  • command — The command to execute.

    • Type: str
    • Requirement: Required
    • Valid Values: Backend-specific
  • args — A dictionary of <argument name>: <value> pairs for passing command-specific parameters (arguments).

    • Type: dict
    • Requirement and Valid Values: Backend-specific
    • Default Value: None

nosql Backend execute Commands

The following execute commands are specific to the nosql backend:

  • infer | infer_schema — Infers the data schema of a given NoSQL table and creates a schema file for the table.

    Example:

    client.execute(backend="nosql", table="mytable", command="infer")
    

stream Backend execute Commands

The following execute commands are specific to the stream backend:

  • put — Adds records to a stream shard.

    Example:

    client.execute('stream', table="mystream", command='put',
                   args={'data': '{"cpu": 12.4, "mem": 31.1, "disk": 12.7}',
                         "client_info": "my custom info", "partition_key": "PK1"})
    

history Method

By default every command ran with frames is logged.
History returns information about requests made to the service as a pandas DataFrame.

Syntax
history(backend='', container='', table='', user='', action='', min_start_time='', max_start_time='',
min_duration=0, max_duration=0):

Common history Parameters
  • backend — filter logs by backend.

    • Type: str
    • Requirement: Optional
  • container — filter logs by container.

    • Type: str
    • Requirement: Optional
  • table — filter logs by table.

    • Type: str
    • Requirement: Optional
  • user — filter logs by the user that executed the command.

    • Type: str
    • Requirement: Optional
  • action — filter logs by frames action.

    • Type: str
    • Requirement: Optional
    • Valid Values: "create" | "delete" | "execute" | "read" | "write"
  • min_start_time — specify start time of the desired logs.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2016-01-02T15:34:26Z"; "1451748866"; "now-90m"; "0".
  • max_start_time — specify end time of the desired logs.

    • Type: str
    • Requirement: Optional
    • Valid Values: A string containing an RFC 3339 time, a Unix timestamp in milliseconds, a relative time of the format "now" or "now-[0-9]+[mhd]" (where m = minutes, h = hours, and 'd' = days), or 0 for the earliest time. For example: "2016-01-02T15:34:26Z"; "1451748866"; "now-90m"; "0".
  • min_duration — specify minimum duration in milliseconds for the desired logs.

    • Type: int
    • Requirement: Optional
  • max_duration — specify maximum duration in milliseconds for the desired logs.

    • Type: int
    • Requirement: Optional

Return Value

Returns a single DataFrame.

history Examples

df = client.history()
df = client.history(backend='tsdb', action='read')
df = client.history(container='test-0', min_start_time='now-1d', min_duration='50')

Contributing

To contribute to V3IO Frames, you need to be aware of the following:

Components

The following components are required for building Frames code:

  • Go server with support for both the gRPC and HTTP protocols
  • Go client
  • Python client

Development

The core is written in Go. The development is done on the development branch and then released to the master branch.

Before submitting changes, test the code:

  • To execute the Go tests, run make test.
  • To execute the Python tests, run make test-python.

Adding and Changing Dependencies
  • If you add Go dependencies, run make update-go-deps.
  • If you add Python dependencies, update clients/py/Pipfile and run make update-py-deps.

Travis CI

Integration tests are run on Travis CI. See .travis.yml for details.

The following environment variables are defined in the Travis settings:

  • Docker Container Registry (Quay.io)
    • DOCKER_PASSWORD — a password for pushing images to Quay.io.
    • DOCKER_USERNAME — a username for pushing images to Quay.io.
  • Python Package Index (PyPI)
    • V3IO_PYPI_PASSWORD — a password for pushing a new release to PyPi.
    • V3IO_PYPI_USER — a username for pushing a new release to PyPi.
  • Iguazio Data Science Platform
    • V3IO_SESSION — a JSON encoded map with session information for running tests. For example:

      '{"url":"45.39.128.5:8081","container":"mitzi","user":"daffy","password":"rabbit season"}'
      

      Note: Make sure to embed the JSON object within single quotes ('{...}').

Docker Image

Building the Image

Use the following command to build the Docker image:

make build-docker

Running the Image

Use the following command to run the Docker image:

docker run \
	-v /path/to/config.yaml:/etc/framesd.yaml \
	quay.io/v3io/frames:unstable

LICENSE

Apache 2

Documentation

Overview

Package frames provides an efficient way of moving data from various sources.

The package is composed os a HTTP web server that can serve data from various sources and from clients in Go and in Python.

Index

Constants

View Source
const (
	IgnoreError = pb.ErrorOptions_IGNORE
	FailOnError = pb.ErrorOptions_FAIL
)

Shortcut for fail/ignore

Variables

View Source
var (
	BoolType   = DType(pb.DType_BOOLEAN)
	FloatType  = DType(pb.DType_FLOAT)
	IntType    = DType(pb.DType_INTEGER)
	StringType = DType(pb.DType_STRING)
	TimeType   = DType(pb.DType_TIME)
	NullType   = DType(pb.DType_NULL)
)

Possible data types

View Source
var (
	// DefaultLogLevel is the default log verbosity
	DefaultLogLevel string
)
View Source
var ZeroTime time.Time

ZeroTime is zero value for time

Functions

func MarshalFrame

func MarshalFrame(frame Frame) ([]byte, error)

MarshalFrame serializes a frame to []byte

func NewLogger

func NewLogger(verbose string) (logger.Logger, error)

NewLogger returns a new logger

func SessionFromEnv

func SessionFromEnv() (*pb.Session, error)

SessionFromEnv return a session from V3IO_SESSION environment variable (JSON encoded)

Types

type BackendConfig

type BackendConfig struct {
	Type                    string `json:"type"` // v3io, csv, ...
	Name                    string `json:"name"`
	Workers                 int    `json:"workers"`
	UpdateWorkersPerVN      int    `json:"updateWorkersPerVN"`
	V3ioGoWorkers           int    `json:"v3ioGoWorkers"`
	V3ioGoRequestChanLength int    `json:"v3ioGoRequestChanLength"`
	MaxConnections          int    `json:"maxConnections"`
	DialTimeoutSeconds      int    `json:"dialTimeoutSeconds"`
	MaxRecordsInferSchema   int    `json:"maxRecordsInferSchema"`

	// backend specific options
	Options map[string]interface{} `json:"options"`

	// CSV backend
	RootDir string `json:"rootdir,omitempty"`
}

BackendConfig is default backend configuration

type Client

type Client interface {
	// Read reads data from server
	Read(request *pb.ReadRequest) (FrameIterator, error)
	// Write writes data to server
	Write(request *WriteRequest) (FrameAppender, error)
	// Create creates a table
	Create(request *pb.CreateRequest) error
	// Delete deletes data or table
	Delete(request *pb.DeleteRequest) error
	// Exec executes a command on the backend
	Exec(request *pb.ExecRequest) (Frame, error)
}

Client interface

type Column

type Column interface {
	Len() int                                 // Number of elements
	Name() string                             // Column name
	DType() DType                             // Data type (e.g. IntType, FloatType ...)
	Ints() ([]int64, error)                   // Data as []int64
	IntAt(i int) (int64, error)               // Int value at index i
	Floats() ([]float64, error)               // Data as []float64
	FloatAt(i int) (float64, error)           // Float value at index i
	Strings() []string                        // Data as []string
	StringAt(i int) (string, error)           // String value at index i
	Times() ([]time.Time, error)              // Data as []time.Time
	TimeAt(i int) (time.Time, error)          // time.Time value at index i
	Bools() ([]bool, error)                   // Data as []bool
	BoolAt(i int) (bool, error)               // bool value at index i
	Slice(start int, end int) (Column, error) // Slice of data
	CopyWithName(newName string) Column       // Create a copy of the current column
}

Column is a data column

func NewLabelColumn

func NewLabelColumn(name string, value interface{}, size int) (Column, error)

NewLabelColumn returns a new slabel column

func NewSliceColumn

func NewSliceColumn(name string, data interface{}) (Column, error)

NewSliceColumn returns a new slice column

type ColumnBuilder

type ColumnBuilder interface {
	Name() string
	Append(value interface{}) error
	At(index int) (interface{}, error)
	Set(index int, value interface{}) error
	Delete(index int) error
	Finish() Column
}

ColumnBuilder is interface for building columns

func NewLabelColumnBuilder

func NewLabelColumnBuilder(name string, dtype DType, size int) ColumnBuilder

NewLabelColumnBuilder return a builder for LabelColumn

func NewSliceColumnBuilder

func NewSliceColumnBuilder(name string, dtype DType, size int) ColumnBuilder

NewSliceColumnBuilder return a builder for SliceColumn

type Config

type Config struct {
	Log            LogConfig `json:"log"`
	DefaultLimit   int       `json:"limit,omitempty"`
	DefaultTimeout int       `json:"timeout,omitempty"`

	// default V3IO connection details
	WebAPIEndpoint string `json:"webApiEndpoint"`
	Container      string `json:"container"`
	Username       string `json:"username,omitempty"`
	Password       string `json:"password,omitempty"`
	SessionKey     string `json:"sessionKey,omitempty"`

	// Number of parallel V3IO worker routines
	Workers            int `json:"workers"`
	UpdateWorkersPerVN int `json:"updateWorkersPerVN"`

	QuerierCacheSize    int `json:"querierCacheSize"`
	TsdbMetricCacheSize int `json:"tsdbMetricCacheSize"`

	// History server related configs
	WriteMonitoringLogsTimeoutSeconds int    `json:writeMonitoringLogsTimeoutSeconds`
	PendingLogsBatchSize              int    `json:pendingLogsBatchSize`
	LogsFolderPath                    string `json:logsFolderPath`
	LogsContainer                     string `json:logsContainer`
	MaxBytesInNginxRequest            int    `json:maxBytesInNginxRequest`
	HistoryFileDurationSpans          string `json:historyFileDurationSpans`
	HistoryFileNum                    int    `json:historyFileNum`
	DisableHistory                    bool   `json:disableHistory`

	Backends []*BackendConfig `json:"backends,omitempty"`

	DisableProfiling bool `json:"disableProfiling,omitempty"`
}

Config is server configuration

func (*Config) InitDefaults

func (c *Config) InitDefaults() error

InitDefaults initializes the defaults for configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type CreateRequest

type CreateRequest struct {
	Proto    *pb.CreateRequest
	Password SecretString
	Token    SecretString
}

CreateRequest is a table creation request

func (CreateRequest) ToMap added in v0.7.6

func (createRequest CreateRequest) ToMap() map[string]string

type DType

type DType pb.DType

DType is data type

type DataBackend

type DataBackend interface {
	// TODO: Expose name, type, config ... ?
	Read(request *ReadRequest) (FrameIterator, error)
	Write(request *WriteRequest) (FrameAppender, error) // TODO: use Appender for write streaming
	Create(request *CreateRequest) error
	Delete(request *DeleteRequest) error
	Exec(request *ExecRequest) (Frame, error)
}

DataBackend is an interface for read/write on backend

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder is message decoder

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new Decoder

func (*Decoder) Decode

func (d *Decoder) Decode(msg proto.Message) error

Decode decodes message from d.r

type DeleteRequest

type DeleteRequest struct {
	Proto    *pb.DeleteRequest
	Password SecretString
	Token    SecretString
}

DeleteRequest is a deletion request

func (DeleteRequest) ToMap added in v0.7.6

func (deleteRequest DeleteRequest) ToMap() map[string]string

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder is message encoder

func NewEncoder

func NewEncoder(w io.Writer) *Encoder

NewEncoder returns new Encoder

func (*Encoder) Encode

func (e *Encoder) Encode(msg proto.Message) error

Encode encoders the message to e.w

type ExecRequest

type ExecRequest struct {
	Proto    *pb.ExecRequest
	Password SecretString
	Token    SecretString
}

ExecRequest is execution request

func (ExecRequest) ToMap added in v0.7.6

func (executeRequest ExecRequest) ToMap() map[string]string

type Frame

type Frame interface {
	Labels() map[string]interface{}          // Label set
	Names() []string                         // Column names
	Indices() []Column                       // Index columns
	Len() int                                // Number of rows
	Column(name string) (Column, error)      // Column by name
	Slice(start int, end int) (Frame, error) // Slice of Frame
	IterRows(includeIndex bool) RowIterator  // Iterate over rows
	IsNull(index int, colName string) bool
	NullValuesMap() []*pb.NullValuesMap
}

Frame is a collection of columns

func NewFrame

func NewFrame(columns []Column, indices []Column, labels map[string]interface{}) (Frame, error)

NewFrame returns a new Frame

func NewFrameFromMap

func NewFrameFromMap(columns map[string]interface{}, indices map[string]interface{}) (Frame, error)

NewFrameFromMap returns a new MapFrame from a map

func NewFrameFromProto

func NewFrameFromProto(msg *pb.Frame) Frame

NewFrameFromProto return a new frame from protobuf message

func NewFrameFromRows

func NewFrameFromRows(rows []map[string]interface{}, indices []string, labels map[string]interface{}) (Frame, error)

NewFrameFromRows creates a new frame from rows

func NewFrameWithNullValues

func NewFrameWithNullValues(columns []Column, indices []Column, labels map[string]interface{}, nullValues []*pb.NullValuesMap) (Frame, error)

func UnmarshalFrame

func UnmarshalFrame(data []byte) (Frame, error)

UnmarshalFrame de-serialize a frame from []byte

type FrameAppender

type FrameAppender interface {
	Add(frame Frame) error
	WaitForComplete(timeout time.Duration) error
	Close()
}

FrameAppender appends frames

type FrameIterator

type FrameIterator interface {
	Next() bool
	Err() error
	At() Frame
}

FrameIterator iterates over frames

type HistoryRequest added in v0.7.6

type HistoryRequest struct {
	Proto    *pb.HistoryRequest
	Password SecretString
	Token    SecretString
}

HistoryRequest is a history logs request

type JoinStruct

type JoinStruct = pb.JoinStruct

JoinStruct is a join structure

type LogConfig

type LogConfig struct {
	Level string `json:"level,omitempty"`
}

LogConfig is the logging configuration

type Query

type Query struct {
	Table   string
	Columns []string
	Filter  string
	GroupBy string
}

Query is query structure

func ParseSQL

func ParseSQL(sql string) (*Query, error)

ParseSQL parsers SQL query to a Query struct

type ReadRequest

type ReadRequest struct {
	Proto    *pb.ReadRequest
	Password SecretString
	Token    SecretString
}

ReadRequest is a read/query request

func (ReadRequest) ToMap added in v0.7.6

func (readRequest ReadRequest) ToMap() map[string]string

type RowIterator

type RowIterator interface {
	Next() bool                      // Advance to next row
	Row() map[string]interface{}     // Row as map of name->value
	RowNum() int                     // Current row number
	Indices() map[string]interface{} // MultiIndex as name->value
	Err() error                      // Iteration error
}

RowIterator is an iterator over frame rows

type SaveMode

type SaveMode int
const (
	ErrorIfTableExists SaveMode = iota
	OverwriteTable
	UpdateItem
	OverwriteItem
	CreateNewItemsOnly
)

func SaveModeFromString

func SaveModeFromString(mode string) (SaveMode, error)

func (SaveMode) GetNginxModeName

func (mode SaveMode) GetNginxModeName() string

func (SaveMode) String

func (mode SaveMode) String() string

type SchemaField

type SchemaField = pb.SchemaField

SchemaField represents a schema field for Avro record.

type SchemaKey

type SchemaKey = pb.SchemaKey

SchemaKey is a schema key

type SecretString

type SecretString struct {
	// contains filtered or unexported fields
}

Hides a string such as a password from both plain and json logs.

func InitSecretString

func InitSecretString(s string) SecretString

func (SecretString) Get

func (s SecretString) Get() string

type Server

type Server interface {
	Start() error
	State() ServerState
	Err() error
}

Server is frames server interface

type ServerBase

type ServerBase struct {
	// contains filtered or unexported fields
}

ServerBase have common functionality for server

func NewServerBase

func NewServerBase() *ServerBase

NewServerBase returns a new server base

func (*ServerBase) Err

func (s *ServerBase) Err() error

Err returns the server error

func (*ServerBase) SetError

func (s *ServerBase) SetError(err error)

SetError sets current error and will change state to ErrorState

func (*ServerBase) SetState

func (s *ServerBase) SetState(state ServerState)

SetState sets the server state

func (*ServerBase) State

func (s *ServerBase) State() ServerState

State return the server state

type ServerState

type ServerState string

ServerState is state of server

const (
	ReadyState   ServerState = "ready"
	RunningState ServerState = "running"
	ErrorState   ServerState = "error"
)

Possible server states

type Session

type Session = pb.Session

Session information

func InitSessionDefaults

func InitSessionDefaults(session *Session, framesConfig *Config) *Session

InitSessionDefaults initializes session defaults

func NewSession

func NewSession(url, container, path, user, password, token, id string) (*Session, error)

NewSession will create a new session. It will populate missing values from the V3IO_SESSION environment variable (JSON encoded)

type TableSchema

type TableSchema = pb.TableSchema

TableSchema is a table schema

type VersionRequest added in v0.8.6

type VersionRequest struct {
	Proto    *pb.VersionRequest
	Password SecretString
	Token    SecretString
}

type WriteRequest

type WriteRequest struct {
	Session  *Session
	Password SecretString
	Token    SecretString
	Backend  string // backend name
	Table    string // Table name (path)
	// Data message sent with the write request (in case of a stream multiple messages can follow)
	ImmidiateData Frame
	// Expression template, for update expressions generated from combining columns data with expression
	Expression string
	// Condition template, for update conditions generated from combining columns data with expression
	Condition string
	// Columns to partition the data by
	PartitionKeys []string
	// Will we get more message chunks (in a stream), if not we can complete
	HaveMore bool
	SaveMode SaveMode
}

WriteRequest is request for writing data TODO: Unite with probouf (currenly the protobuf message combines both this and a frame message)

func (WriteRequest) ToMap added in v0.7.6

func (writeRequest WriteRequest) ToMap() map[string]string

Directories

Path Synopsis
csv
kv
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL