transport

package
v1.3.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: MIT Imports: 30 Imported by: 0

README

Package transport provides streaming object-based transport over HTTP for massive intra-AIS data transfers. AIStore utilizes this package for cluster-wide (aka "global") rebalancing, distributed merge-sort, and more.

Build

The package includes build-time support for two alternative http clients:

The following is a quick summary:

Client Reference Build Tag Default
net/http golang.org/pkg/net/http nethttp no
fasthttp github.com/valyala/fasthttp n/a yes

To test with net/http, run:

$ go test -v -tags=nethttp

or, the same, with logs redirected to STDERR:

$ go test -v -logtostderr=true -tags=nethttp

For more examples, see testing below.

Description

A stream (or, more exactly, a transport.Stream) asynchronously transfers objects between two HTTP endpoints. The objects, in turn, are defined by their headers (transport.Header) and their readers (io.ReadCloser).

A stream preserves ordering: the objects posted for sending will get completed in the same exact order (for more on completions, see below), and certainly transferred to the receiver in the same exact order as well.

Term Description Example
Stream A point-to-point flow over HTTP where a single HTTP request (and, therefore, a single TCP session) is used to transfer multiple objects transport.NewStream(client, "http://example.com", nil) - creates a stream between the local client and the example.com host
Object Any io.ReadCloser that is accompanied by a transport header that specifies, in part, the object's size and the object's (bucket, name) at the destination transport.Header{"abc", "X", nil, 1024*1024} - specifies a 1MB object that will be named abc/X at the destination
Object Attributes Objects are often associated with their attributes like size, access time, checksum and version. When sending the object it is often necessary to also send these attributes with the object so the receiver can update the object metadata. transport.ObjectAttrs{Atime: time.Now(), Size: 13, CksumType: "xxhash", Chksum: "s0m3ck5um", Version: "2"}
Object Header A transport.Header structure that, in addition to bucket name, object name, and object size, carries an arbitrary (opaque) sequence of bytes that, for instance, may be a JSON message or anything else. transport.Header{"abracadabra", "p/q/s", false, []byte{'1', '2', '3'}, transport.ObjectAttrs{Size: 13}} - describes a 13-byte object that, in the example, has some application-specific and non-nil opaque field in the header
Receive callback A function that has the following signature: Receive func(http.ResponseWriter, transport.Header, io.Reader). Receive callback must be registered prior to the very first object being transferred over the stream - see next. Notice the last parameter in the receive callback: io.Reader. Behind this (reading) interface, there's a special type reader supporting, in part, object boundaries. In other words, each callback invocation corresponds to one transferred and received object. Note as well the object header that is also delivered to the receiving endpoint via the same callback.
Registering receive callback An API to establish the one-to-one correspondence between the stream sender and the stream receiver For instance, to register the same receive callback foo with two different HTTP endpoints named "ep1" and "ep2", we could call transport.Register("n1", "ep1", foo) and transport.Register("n1", "ep2", foo), where n1 is an http request multiplexer ("muxer") that corresponds to one of the documented networking options - see README, section Networking. The transport will then be calling foo() to separately deliver the "ep1" stream to the "ep1" endpoint and "ep2" - to, respectively, "ep2". Needless to say that a per-endpoint callback is also supported and permitted. To allow registering endpoints to different http request multiplexers, one can change network parameter transport.Register("different-network", "ep1", foo)
Object-has-been-sent callback (not to be confused with the Receive callback above) A function or a method of the following signature: SendCallback func(Header, io.ReadCloser, error), where transport.Header and io.ReadCloser represent the object that has been transmitted and error is the send error or nil This callback can optionally be defined on a) per-stream basis (via NewStream constructor) and/or b) for a given object that is being sent (for instance, to support some sort of batch semantics). Note that object callback overrides the per-stream one: when (object callback) is defined i.e., non-nil, the stream callback is ignored and skipped.

BEWARE:
Latency of this callback adds to the latency of the entire stream operation on the send side. It is critically important, therefore, that user implementations do not take extra locks, do not execute system calls and, generally, return as soon as possible.
Header-only objects Header-only (data-less) objects are supported - when there's no data to send (that is, when the transport.Header.Dsize field is set to zero), the reader (io.ReadCloser) is not required and the corresponding argument in the the Send() API can be set to nil Header-only objects can be used to implement L6 control plane over streams, where the header's Opaque field gets utilized to transfer the entire (control message's) payload
Stream bundle A higher-level (cluster level) API to aggregate multiple streams and broadcast objects replicas to all or some of the established nodes of the cluster while aggregating completions and preserving FIFO ordering transport.NewStreamBundle(smap, si, client, transport.SBArgs{Network: transport.cmn.NetworkPublic, Trname: "path-name", Extra: &extra, Ntype: cluster.Targets, ManualResync: false, Multiplier: 4})

Closing and completions

In streams, the sending pipeline is implemented as a pair (SQ, SCQ) where the former is a send queue realized as a channel, and the latter is a send completion queue (and a different Go channel). Together, SQ and SCQ form a FIFO as far as ordering of transmitted objects.

Once an object is put on the wire, the corresponding completion gets queued and eventually gets processed by the completion handler. The handling always entails closing of the object reader.

To reiterate: object reader is always closed by the code that handles send completions. In the case when the callback (SendCallback) is provided (i.e., non-nil), the closing is done right after invoking the callback.

Note as well that for every transmission of every object there's always a completion. This holds true in all cases including network errors that may cause sudden and instant termination of the underlying stream(s).

Commented example

path := transport.Register("n1", "ep1", testReceive) // register receive callback with HTTP endpoint "ep1" to "n1" network
client := &http.Client{Transport: &http.Transport{}} // create default HTTP client
url := "http://example.com/" +  path // combine the hostname with the result of the Register() above

// open a stream (to the http endpoint identified by the url) with burst equal 10 and the capability to cancel at any time
// ("burst" is the number of objects the caller is permitted to post for sending without experiencing any sort of back-pressure)
ctx, cancel := context.WithCancel(context.Background())
stream := transport.NewStream(client, url, &transport.Extra{Burst: 10, Ctx: ctx})

// NOTE: constructing a transport stream does not necessarily entail establishing TCP connection.
// Actual connection establishment is delayed until the very first object gets posted for sending.
// The underlying HTTP/TCP session will also terminate after a (configurable) period of inactivity
// (`Extra.IdleTimeout`), only to be re-established when (and if) the traffic picks up again.

for  {
	hdr := transport.Header{...} 	// next object header
	object := ... 			// next object reader, e.g. os.Open("some file")
	// send the object asynchronously (the 3rd arg specifies an optional "object-has-been-sent" callback)
	stream.Send(hdr, object, nil)
	...
}
stream.Fin() // gracefully close the stream (call it in all cases except after canceling (aborting) the stream)

Registering HTTP endpoint

On the receiving side, each network contains multiple HTTP endpoints, whereby each HTTP endpoint, in turn, may have zero or more stream sessions. In effect, there are two nested many-to-many relationships whereby you may have multiple logical networks, each containing multiple named transports, etc.

The following:

path, err := transport.Register("public", "myapp", mycallback)

adds a transport endpoint named "myapp" to the "public" network (that must already exist), and then registers a user callback with the latter.

The last argument, user-defined callback, must have the following typedef:

Receive func(w http.ResponseWriter, hdr Header, object io.Reader, err error)

The callback is being invoked on a per received object basis (note that a single stream may transfer multiple, potentially unlimited, number of objects). Callback is always invoked in case of an error.

Back to the registration. On the HTTP receiving side, the call to Register translates as:

mux.HandleFunc(path, mycallback)

where mux is mux.ServeMux (fork of net/http package) that corresponds to the named network ("public", in this example), and path is a URL path ending with "/myapp".

On the wire

On the wire, each transmitted object will have the layout:

[header length] [header fields including object name and size] [object bytes]

The size must be known upfront, which is the current limitation.

A stream (the Stream type) carries a sequence of objects of arbitrary sizes and contents, and overall looks as follows:

object1 = (**[header1]**, **[data1]**) object2 = (**[header2]**, **[data2]**), etc.

Stream termination is denoted by a special marker in the data-size field of the header:

header = [object size=7fffffffffffffff]

Transport statistics

The API that queries runtime statistics includes:

func (s *Stream) GetStats() (stats Stats)
  • on the send side, and
func GetNetworkStats(network string) (netstats map[string]EndpointStats, err error)
  • on receive.

Statistics themselves include the following metrics:

Stats struct {
	Num     int64   // number of transferred objects
	Size    int64   // transferred size, in bytes
	Offset  int64   // stream offset, in bytes
	IdleDur int64   // the time stream was idle since the previous GetStats call
	TotlDur int64   // total time since the previous GetStats
	IdlePct float64 // idle time %
}

On the receive side, the EndpointStats map contains all the transport.Stats structures indexed by (unique) stream IDs for the currently active streams.

For usage examples and details, please see tests in the package directory.

Stream Bundle

Stream bundle (transport.StreamBundle) in this package is motivated by the need to broadcast and multicast continuously over a set of long-lived TCP sessions. The scenarios in storage clustering include intra-cluster replication and erasure coding, rebalancing (upon target-added and target-removed events) and MapReduce-generated flows, and more.

In each specific case, a given clustered node needs to maintain control and/or data flows between itself and multiple other clustered nodes, where each of the flows would be transferring large numbers of control and data objects, or parts of thereof.

The provided implementation aggregates transport streams. A stream (or, a transport.Stream) asynchronously transfers objects between two HTTP endpoints, whereby an object is defined as a combination of transport.Header and an (io.ReadCloser) interface. The latter may have a variety of well-known implementations: file, byte array, scatter-gather list of buffers, etc.

The important distinction, though, is that while transport streams are devoid of any clustering "awareness", a stream bundle is fully integrated with a cluster. Internally, the implementation utilizes cluster-level abstractions, such as a node (cluster.Snode), a cluster map (cluster.Smap), and more.

The provided API includes StreamBundle constructor that allows to establish streams between the local node and (a) all storage targets, (b) all gateways, or (c) all nodes in the cluster - in one shot:

sbArgs := &SBArgs{
  Network	string,		// network, one of `cmn.KnownNetworks`
  Trname	string,		// transport endpoint name
  Extra		*Extra, // additional stream control parameters
  Ntype 	int,		// destination type: all targets, ..., all nodes
  ManualResync bool,		// if false, establishes/removes connections with new/old nodes when new smap is received
  Multiplier int,		// number of streams per destination, with subsequent round-robin selection
}

NewStreamBundle(
  sowner	cluster.Sowner,		// Smap (cluster map) owner interface
  lsnode	*cluster.Snode,		// local node
  cl		*http.Client,		// http client
  sbArgs	*SbArgs			// additional stream bundle arguments
)

A note on connection establishment and termination

  • For each of the individual transport streams in a bundle, constructing a stream (transport.Stream) does not necessarily entail establishing TCP connection. Actual connection establishment is delayed until arrival (via Send or SendV) of the very first object.
  • The underlying HTTP/TCP session will also terminate after a (configurable) period of inactivity, only to be re-established when (and if) the traffic picks up again.

API

The two main API methods are Send and SendV:

  • to broadcast via all established streams, use SendV() and omit the last argument;
  • otherwise, use SendV() with the destinations specified as a comma-separated list, or
  • use Send() with a list of nodes on the receive side.

Other provided APIs include terminating all contained streams - gracefully or instantaneously via Close, and more.

Finally, there are two important facts to remember:

  • When streaming an object to multiple destinations, StreamBundle may call reader.Open() multiple times as well. For N object replicas (or N identical notifications) over N streams, the original reader (provided via Send or SendV - see above) will get reopened (N-1) times.

  • Completion callback (transport.SendCallback), if provided, is getting called only once per object, independently of the number of the object replicas sent to multiple destinations. The callback is invoked by the completion handler of the very last object replica (for more on completion handling.

Testing

  • Run all tests while redirecting glog to STDERR:
$ go test -v -logtostderr=true
  • Run tests matching "Multi" with debug-enabled assertions and glog level=1 (non-verbose):
$ AIS_DEBUG=transport=1 go test -v -run=Multi -tags=debug
  • Same as above, with super-verbose glog:
$ AIS_DEBUG=transport=4 go test -v -run=Multi -tags=debug -logtostderr=true
  • Use nethttp build tag to run with net/http, e.g.:
$ go test -v -tags=nethttp
  • The same with fasthttp (the current default):
$ go test -v

For more examples, please see tests in the package directory.

Environment

Environment Variable Description
AIS_DEBUG Enable inline assertions and verbose tracing (eg. AIS_DEBUG=transport=1)
AIS_STREAM_BURST_NUM Max number of objects the caller is permitted to post for sending without experiencing any sort of back-pressure
AIS_STREAM_DRY_RUN If enabled, read and immediately discard all read data (can be used to evaluate client-side throughput)

Documentation

Overview

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.

Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).

  • Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Example (Headers)
package main

import (
	"encoding/binary"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"sync"

	"github.com/artashesbalabekyan/aistore/api/apc"
	"github.com/artashesbalabekyan/aistore/cmn"
	"github.com/artashesbalabekyan/aistore/cmn/cos"
	"github.com/artashesbalabekyan/aistore/memsys"
	"github.com/artashesbalabekyan/aistore/transport"
)

const (
	lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut
labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.`
	duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.`
	et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est
eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.`
	temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet,
ut et voluptates repudiandae sint et molestiae non-recusandae.`
)

func main() {
	f := func(w http.ResponseWriter, r *http.Request) {
		body, err := io.ReadAll(r.Body)
		if err != nil {
			panic(err)
		}
		if len(body) == 0 {
			return
		}
		var (
			hdr       transport.ObjHdr
			hlen, off int
		)
		for {
			hlen = int(binary.BigEndian.Uint64(body[off:]))
			off += 16 // hlen and hlen-checksum
			hdr = transport.ExtObjHeader(body[off:], hlen)

			if transport.ReservedOpcode(hdr.Opcode) {
				break
			}

			fmt.Printf("%+v (%d)\n", hdr, hlen)
			off += hlen + int(hdr.ObjAttrs.Size)
		}
	}

	ts := httptest.NewServer(http.HandlerFunc(f))
	defer ts.Close()

	httpclient := transport.NewIntraDataClient()
	stream := transport.NewObjStream(httpclient, ts.URL, cos.GenTie(), nil)

	sendText(stream, lorem, duis)
	stream.Fin()

}

func sendText(stream *transport.Stream, txt1, txt2 string) {
	var wg sync.WaitGroup
	cb := func(transport.ObjHdr, io.ReadCloser, any, error) {
		wg.Done()
	}
	sgl1 := memsys.PageMM().NewSGL(0)
	sgl1.Write([]byte(txt1))
	hdr := transport.ObjHdr{
		Bck: cmn.Bck{
			Name:     "abc",
			Provider: apc.AWS,
			Ns:       cmn.Ns{UUID: "uuid", Name: "namespace"},
		},
		ObjName: "X",
		ObjAttrs: cmn.ObjAttrs{
			Size:  sgl1.Size(),
			Atime: 663346294,
			Cksum: cos.NewCksum(cos.ChecksumXXHash, "h1"),
			Ver:   "1",
		},
		Opaque: nil,
	}
	wg.Add(1)
	stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, Callback: cb})
	wg.Wait()

	sgl2 := memsys.PageMM().NewSGL(0)
	sgl2.Write([]byte(txt2))
	hdr = transport.ObjHdr{
		Bck: cmn.Bck{
			Name:     "abracadabra",
			Provider: apc.AIS,
			Ns:       cmn.NsGlobal,
		},
		ObjName: "p/q/s",
		ObjAttrs: cmn.ObjAttrs{
			Size:  sgl2.Size(),
			Atime: 663346294,
			Cksum: cos.NewCksum(cos.ChecksumXXHash, "h2"),
			Ver:   "222222222222222222222222",
		},
		Opaque: []byte{'1', '2', '3'},
	}
	hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"})
	wg.Add(1)
	stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, Callback: cb})
	wg.Wait()
}
Output:

{Bck:s3://@uuid#namespace/abc ObjName:X SID: Opaque:[] ObjAttrs:{Cksum:xxhash[h1] CustomMD:map[] Ver:1 Atime:663346294 Size:231} Opcode:0} (69)
{Bck:ais://abracadabra ObjName:p/q/s SID: Opaque:[49 50 51] ObjAttrs:{Cksum:xxhash[h2] CustomMD:map[xx:11 yy:22] Ver:222222222222222222222222 Atime:663346294 Size:213} Opcode:0} (110)
Example (Msg)
receive := func(msg transport.Msg, err error) error {
	if !transport.ReservedOpcode(msg.Opcode) {
		fmt.Printf("%s...\n", string(msg.Body[:16]))
	}
	return nil
}

ts := httptest.NewServer(msgmux)
defer ts.Close()

trname := "dummy-msg"
err := transport.HandleMsgStream(trname, receive)
if err != nil {
	fmt.Println(err)
	return
}
httpclient := transport.NewIntraDataClient()
url := ts.URL + transport.MsgURLPath(trname)
stream := transport.NewMsgStream(httpclient, url, cos.GenTie())

stream.Send(&transport.Msg{Body: []byte(lorem)})
stream.Send(&transport.Msg{Body: []byte(duis)})
stream.Send(&transport.Msg{Body: []byte(et)})
stream.Send(&transport.Msg{Body: []byte(temporibus)})

stream.Fin()
Output:

Lorem ipsum dolo...
Duis aute irure ...
Et harum quidem ...
Temporibus autem...
Example (Obj)
package main

import (
	"fmt"
	"io"
	"net/http/httptest"
	"sync"
	"time"

	"github.com/artashesbalabekyan/aistore/3rdparty/golang/mux"
	"github.com/artashesbalabekyan/aistore/api/apc"
	"github.com/artashesbalabekyan/aistore/cmn"
	"github.com/artashesbalabekyan/aistore/cmn/cos"
	"github.com/artashesbalabekyan/aistore/memsys"
	"github.com/artashesbalabekyan/aistore/transport"
)

const (
	lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut
labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.`
	duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.`
	et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est
eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.`
	temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet,
ut et voluptates repudiandae sint et molestiae non-recusandae.`
)

var objmux *mux.ServeMux

func sendText(stream *transport.Stream, txt1, txt2 string) {
	var wg sync.WaitGroup
	cb := func(transport.ObjHdr, io.ReadCloser, any, error) {
		wg.Done()
	}
	sgl1 := memsys.PageMM().NewSGL(0)
	sgl1.Write([]byte(txt1))
	hdr := transport.ObjHdr{
		Bck: cmn.Bck{
			Name:     "abc",
			Provider: apc.AWS,
			Ns:       cmn.Ns{UUID: "uuid", Name: "namespace"},
		},
		ObjName: "X",
		ObjAttrs: cmn.ObjAttrs{
			Size:  sgl1.Size(),
			Atime: 663346294,
			Cksum: cos.NewCksum(cos.ChecksumXXHash, "h1"),
			Ver:   "1",
		},
		Opaque: nil,
	}
	wg.Add(1)
	stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, Callback: cb})
	wg.Wait()

	sgl2 := memsys.PageMM().NewSGL(0)
	sgl2.Write([]byte(txt2))
	hdr = transport.ObjHdr{
		Bck: cmn.Bck{
			Name:     "abracadabra",
			Provider: apc.AIS,
			Ns:       cmn.NsGlobal,
		},
		ObjName: "p/q/s",
		ObjAttrs: cmn.ObjAttrs{
			Size:  sgl2.Size(),
			Atime: 663346294,
			Cksum: cos.NewCksum(cos.ChecksumXXHash, "h2"),
			Ver:   "222222222222222222222222",
		},
		Opaque: []byte{'1', '2', '3'},
	}
	hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"})
	wg.Add(1)
	stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, Callback: cb})
	wg.Wait()
}

func main() {
	receive := func(hdr transport.ObjHdr, objReader io.Reader, err error) error {
		cos.Assert(err == nil)
		object, err := io.ReadAll(objReader)
		if err != nil {
			panic(err)
		}
		if int64(len(object)) != hdr.ObjAttrs.Size {
			panic(fmt.Sprintf("size %d != %d", len(object), hdr.ObjAttrs.Size))
		}
		fmt.Printf("%s...\n", string(object[:16]))
		return nil
	}
	ts := httptest.NewServer(objmux)
	defer ts.Close()
	trname := "dummy-obj"
	err := transport.HandleObjStream(trname, receive)
	if err != nil {
		fmt.Println(err)
		return
	}
	httpclient := transport.NewIntraDataClient()
	stream := transport.NewObjStream(httpclient, ts.URL+transport.ObjURLPath(trname), cos.GenTie(), nil)
	sendText(stream, lorem, duis)
	sendText(stream, et, temporibus)
	stream.Fin()

}
Output:

Lorem ipsum dolo...
Duis aute irure ...
Et harum quidem ...
Temporibus autem...

Index

Examples

Constants

View Source
const (
	OutObjCount = "stream.out.n"
	OutObjSize  = "stream.out.size"
	InObjCount  = "stream.in.n"
	InObjSize   = "stream.in.size"
)
View Source
const (
	SizeUnknown = -1
)

Variables

This section is empty.

Functions

func DrainAndFreeReader

func DrainAndFreeReader(r io.Reader)

DrainAndFreeReader: 1) reads and discards all the data from `r` - the `objReader`; 2) frees this objReader back to the `recvPool`. As such, this function is intended for usage only and exclusively by `transport.RecvObj` implementations.

func FreeRecv

func FreeRecv(object io.Reader)

func GetStats

func GetStats() (netstats map[string]EndpointStats, err error)

func HandleMsgStream

func HandleMsgStream(trname string, rxMsg RecvMsg) error

func HandleObjStream

func HandleObjStream(trname string, rxObj RecvObj) error

func IsErrDuplicateTrname

func IsErrDuplicateTrname(e error) bool

func MsgURLPath

func MsgURLPath(trname string) string

func ObjURLPath

func ObjURLPath(trname string) string

func ReservedOpcode

func ReservedOpcode(opc int) bool

func RxAnyStream

func RxAnyStream(w http.ResponseWriter, r *http.Request)

main Rx objects

func UID2SessID

func UID2SessID(uid uint64) (xxh, sessID uint64)

func Unhandle

func Unhandle(trname string) (err error)

Types

type Client

type Client interface {
	Do(req *fasthttp.Request, resp *fasthttp.Response) error
}

func NewIntraDataClient

func NewIntraDataClient() Client

intra-cluster networking: fasthttp client

type EndpointStats

type EndpointStats map[uint64]*Stats // all stats for a given (network, trname) endpoint indexed by session ID

type ErrDuplicateTrname

type ErrDuplicateTrname struct {
	// contains filtered or unexported fields
}

private types

func (*ErrDuplicateTrname) Error

func (e *ErrDuplicateTrname) Error() string

type Extra

type Extra struct {
	Callback     ObjSentCB     // typical usage: to free SGLs, close files, etc.
	MMSA         *memsys.MMSA  // compression-related buffering
	Config       *cmn.Config   // (to optimize-out GCO.Get())
	Compression  string        // see CompressAlways, etc. enum
	SenderID     string        // e.g., xaction ID (optional)
	IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send)
	SizePDU      int32         // NOTE: 0(zero): no PDUs; must be below maxSizePDU; unknown size _requires_ PDUs
	MaxHdrSize   int32         // overrides `dfltMaxHdr` if specified
}

advanced usage: additional stream control

func (*Extra) Compressed

func (extra *Extra) Compressed() bool

func (*Extra) UsePDU

func (extra *Extra) UsePDU() bool

type Msg

type Msg struct {
	SID    string
	Body   []byte
	Opcode int
}

func ExtMsg

func ExtMsg(body []byte, hlen int) (msg Msg)

func (*Msg) IsHeaderOnly

func (*Msg) IsHeaderOnly() bool

func (*Msg) String

func (msg *Msg) String() string

type MsgStream

type MsgStream struct {
	// contains filtered or unexported fields
}

message stream & private types

func NewMsgStream

func NewMsgStream(client Client, dstURL, dstID string) (s *MsgStream)

func (*MsgStream) Abort

func (s *MsgStream) Abort()

func (*MsgStream) Fin

func (s *MsgStream) Fin()

func (*MsgStream) GetStats

func (s *MsgStream) GetStats() (stats Stats)

func (*MsgStream) ID

func (s *MsgStream) ID() (string, int64)

func (*MsgStream) IsTerminated

func (s *MsgStream) IsTerminated() bool

func (*MsgStream) Read

func (s *MsgStream) Read(b []byte) (n int, err error)

func (*MsgStream) Send

func (s *MsgStream) Send(msg *Msg) (err error)

func (*MsgStream) Stop

func (s *MsgStream) Stop()

func (*MsgStream) String

func (s *MsgStream) String() string

func (*MsgStream) TermInfo

func (s *MsgStream) TermInfo() (reason string, err error)

func (*MsgStream) URL

func (s *MsgStream) URL() string

type Obj

type Obj struct {
	Reader   io.ReadCloser // reader (to read the object, and close when done)
	CmplArg  any           // optional context passed to the ObjSentCB callback
	Callback ObjSentCB     // called when the last byte is sent _or_ when the stream terminates (see term.reason)

	Hdr ObjHdr
	// contains filtered or unexported fields
}

object to transmit

func AllocSend

func AllocSend() (obj *Obj)

func (*Obj) IsHeaderOnly

func (obj *Obj) IsHeaderOnly() bool

func (*Obj) IsUnsized

func (obj *Obj) IsUnsized() bool

func (*Obj) SetPrc

func (obj *Obj) SetPrc(n int)

func (*Obj) Size

func (obj *Obj) Size() int64

func (*Obj) String

func (obj *Obj) String() string

type ObjHdr

type ObjHdr struct {
	Bck      cmn.Bck
	ObjName  string
	SID      string       // sender node ID
	Opaque   []byte       // custom control (optional)
	ObjAttrs cmn.ObjAttrs // attributes/metadata of the object that's being transmitted
	Opcode   int          // (see reserved range above)
}

object header

func ExtObjHeader

func ExtObjHeader(body []byte, hlen int) (hdr ObjHdr)

func (*ObjHdr) Cname

func (hdr *ObjHdr) Cname() string

func (*ObjHdr) IsHeaderOnly

func (hdr *ObjHdr) IsHeaderOnly() bool

func (*ObjHdr) IsUnsized

func (hdr *ObjHdr) IsUnsized() bool

func (*ObjHdr) ObjSize

func (hdr *ObjHdr) ObjSize() int64

type ObjSentCB

type ObjSentCB func(ObjHdr, io.ReadCloser, any, error)

object-sent callback that has the following signature can optionally be defined on a: a) per-stream basis (via NewStream constructor - see Extra struct above) b) for a given object that is being sent (for instance, to support a call-per-batch semantics) Naturally, object callback "overrides" the per-stream one: when object callback is defined (i.e., non-nil), the stream callback is ignored/skipped. NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned

type RecvMsg

type RecvMsg func(msg Msg, err error) error

type RecvObj

type RecvObj func(hdr ObjHdr, object io.Reader, err error) error

Rx callbacks

type Stats

type Stats struct {
	Num            atomic.Int64 // number of transferred objects including zero size (header-only) objects
	Size           atomic.Int64 // transferred object size (does not include transport headers)
	Offset         atomic.Int64 // stream offset, in bytes
	CompressedSize atomic.Int64 // compressed size (NOTE: converges to the actual compressed size over time)
}

stream stats

func (*Stats) CompressionRatio

func (stats *Stats) CompressionRatio() float64

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

object stream & private types

func NewObjStream

func NewObjStream(client Client, dstURL, dstID string, extra *Extra) (s *Stream)

func (*Stream) Abort

func (s *Stream) Abort()

func (*Stream) Fin

func (s *Stream) Fin()

func (*Stream) GetStats

func (s *Stream) GetStats() (stats Stats)

func (*Stream) ID

func (s *Stream) ID() (string, int64)

func (*Stream) IsTerminated

func (s *Stream) IsTerminated() bool

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

as io.Reader

func (*Stream) Send

func (s *Stream) Send(obj *Obj) (err error)

Asynchronously send an object (transport.Obj) defined by its header and its reader.

The sending pipeline is implemented as a pair (SQ, SCQ) where the former is a send queue realized as workCh, and the latter is a send completion queue (cmplCh). Together SQ and SCQ form a FIFO.

  • header-only objects are supported; when there's no data to send (that is, when the header's Dsize field is set to zero), the reader is not required and the corresponding argument in Send() can be set to nil.
  • object reader is *always* closed irrespectively of whether the Send() succeeds or fails. On success, if send-completion (ObjSentCB) callback is provided (i.e., non-nil), the closing is done by doCmpl().
  • Optional reference counting is also done by (and in) the doCmpl, so that the ObjSentCB gets called if and only when the refcount (if provided i.e., non-nil) reaches zero.
  • For every transmission of every object there's always an doCmpl() completion (with its refcounting and reader-closing). This holds true in all cases including network errors that may cause sudden and instant termination of the underlying stream(s).

func (*Stream) Stop

func (s *Stream) Stop()

func (*Stream) String

func (s *Stream) String() string

func (*Stream) TermInfo

func (s *Stream) TermInfo() (reason string, err error)

func (*Stream) URL

func (s *Stream) URL() string

type StreamCollector

type StreamCollector struct{}

stream collector

func Init

func Init(st cos.StatsUpdater, config *cmn.Config) *StreamCollector

func (*StreamCollector) Name

func (*StreamCollector) Name() string

func (*StreamCollector) Run

func (sc *StreamCollector) Run() (err error)

func (*StreamCollector) Stop

func (sc *StreamCollector) Stop(err error)

Directories

Path Synopsis
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more.
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL