transfer

package
v2.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package transfer implements wire transfer with the datanodes.

Index

Constants

This section is empty.

Variables

View Source
var ErrEndOfBlock = errors.New("end of block")
View Source
var ErrInvalidSeqno = errors.New("invalid ack sequence number")

Functions

This section is empty.

Types

type BlockReader

type BlockReader struct {
	// ClientName is the unique ID used by the NamenodeConnection to locate the
	// block.
	ClientName string
	// Block is the block location provided by the namenode.
	Block *hdfs.LocatedBlockProto
	// Offset is the current read offset in the block.
	Offset int64
	// UseDatanodeHostname specifies whether the datanodes should be connected to
	// via their hostnames (if true) or IP addresses (if false).
	UseDatanodeHostname bool
	// DialFunc is used to connect to the datanodes. If nil, then
	// (&net.Dialer{}).DialContext is used.
	DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

BlockReader implements io.ReadCloser, for reading a block. It abstracts over reading from multiple datanodes, in order to be robust to connection failures, timeouts, and other shenanigans.

func (*BlockReader) Close

func (br *BlockReader) Close() error

Close implements io.Closer.

func (*BlockReader) Read

func (br *BlockReader) Read(b []byte) (int, error)

Read implements io.Reader.

In the case that a failure (such as a disconnect) occurs while reading, the BlockReader will failover to another datanode and continue reading transparently. In the case that all the datanodes fail, the error from the most recent attempt will be returned.

Any datanode failures are recorded in a global cache, so subsequent reads, even reads for different blocks, will prioritize them lower.

func (*BlockReader) SetDeadline

func (br *BlockReader) SetDeadline(t time.Time) error

SetDeadline sets the deadline for future Read calls. A zero value for t means Read will not time out.

func (*BlockReader) Skip

func (br *BlockReader) Skip(off int64) error

Skip attempts to discard bytes in the stream in order to skip forward. This is an optimization for the case that the amount to skip is very small. It returns an error if skip was not attempted at all (because the BlockReader isn't connected, or the offset is out of bounds or too far ahead) or the seek failed for some other reason.

type BlockWriter

type BlockWriter struct {
	// ClientName is the unique ID used by the NamenodeConnection to initialize
	// the block.
	ClientName string
	// Block is the block location provided by the namenode.
	Block *hdfs.LocatedBlockProto
	// BlockSize is the target size of the new block (or the existing one, if
	// appending). The represents the configured value, not the actual number
	// of bytes currently in the block.
	BlockSize int64
	// Offset is the current write offset in the block.
	Offset int64
	// Append indicates whether this is an append operation on an existing block.
	Append bool
	// UseDatanodeHostname indicates whether the datanodes will be connected to
	// via hostname (if true) or IP address (if false).
	UseDatanodeHostname bool
	// DialFunc is used to connect to the datanodes. If nil, then
	// (&net.Dialer{}).DialContext is used.
	DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

BlockWriter implements io.WriteCloser for writing a block to a datanode. Given a block location, it handles pipeline construction and failures, including communicating with the namenode if need be.

func (*BlockWriter) Close

func (bw *BlockWriter) Close() error

Close implements io.Closer. It flushes any unwritten packets out to the datanode, and sends a final packet indicating the end of the block. The block must still be finalized with the namenode.

func (*BlockWriter) Flush

func (bw *BlockWriter) Flush() error

Flush flushes any unwritten packets out to the datanode.

func (*BlockWriter) SetDeadline

func (bw *BlockWriter) SetDeadline(t time.Time) error

SetDeadline sets the deadline for future Write, Flush, and Close calls. A zero value for t means those calls will not time out.

func (*BlockWriter) Write

func (bw *BlockWriter) Write(b []byte) (int, error)

Write implements io.Writer.

Unlike BlockReader, BlockWriter currently has no ability to recover from write failures (timeouts, datanode failure, etc). Once it returns an error from Write or Close, it may be in an invalid state.

This will hopefully be fixed in a future release.

type ChecksumReader

type ChecksumReader struct {
	// Block is the block location provided by the namenode.
	Block *hdfs.LocatedBlockProto
	// UseDatanodeHostname specifies whether the datanodes should be connected to
	// via their hostnames (if true) or IP addresses (if false).
	UseDatanodeHostname bool
	// DialFunc is used to connect to the datanodes. If nil, then (&net.Dialer{}).DialContext is used
	DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
	// contains filtered or unexported fields
}

ChecksumReader provides an interface for reading the "MD5CRC32" checksums of individual blocks. It abstracts over reading from multiple datanodes, in order to be robust to failures.

func (*ChecksumReader) ReadChecksum

func (cr *ChecksumReader) ReadChecksum() ([]byte, error)

ReadChecksum returns the checksum of the block.

func (*ChecksumReader) SetDeadline

func (cr *ChecksumReader) SetDeadline(t time.Time) error

SetDeadline sets the deadline for future ReadChecksum calls. A zero value for t means Read will not time out.

type SaslDialer

type SaslDialer struct {
	DialFunc                  func(ctx context.Context, network, addr string) (net.Conn, error)
	Key                       *hdfs.DataEncryptionKeyProto
	Token                     *hadoop.TokenProto
	EnforceQop                string
	SkipSaslOnPrivilegedPorts bool
}

SaslDialer dials using the underlying DialFunc, then negotiates authentication with the datanode. The resulting Conn implements whatever data protection level is specified by the server, whether it be wire encryption or integrity checks.

func (*SaslDialer) DialContext

func (d *SaslDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL