rosbag

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MIT Imports: 13 Imported by: 3

README

go-rosbag

Utilities for working with ROS bag data in go.

rosbag library

The rosbag library contains utilities for reading and writing bag files. The primary focus is on the bag container format, rather than the ROS message serialization format - although there is a ros1msg parser.

Writing a bag file

Write a bag file like this:


f, err := os.Open("my-bag.bag")
if err != nil {
	panic(err)
}

writer, err := rosbag.NewWriter(f)
if err != nil {
	panic(err)
}

err = writer.WriteConnection(&rosbag.Connection{
	Conn: 1,
	Topic: "/foo",
	Data: rosbag.ConnectionHeader{
		Topic: "/foo",
		Type: "Foo",
		MD5Sum: "abc",
		MessageDefinition: []byte{0x01, 0x02, 0x03},

		// optional fields
		CallerID: &callerID,
		Latching: &latching,
	},
})
if err != nil {
	panic(err)
}

for i := 0; i < 100; i++ {
	err = writer.WriteMessage(&rosbag.Message{
		Conn: 0,
		Time: uint64(i),
		Data: []byte{"hello"},
	})
	if err != nil {
		panic(err)
	}
}

Reading a bag file

Read a bag file like this:


f, err := os.Open("my-bag.bag")
if err != nil {
	panic(err)
}

reader, err := rosbag.NewReader(f)
if err != nil {
	panic(err)
}

it, err := reader.Messages()
if err != nil {
	panic(err)
}

for it.More() {
	connection, message, err := it.Next()
	if err != nil {
		panic(err)
	}

	// handle connection and message
}

Documentation

Index

Constants

View Source
const (
	CompressionNone = "none"
	CompressionLZ4  = "lz4"
	CompressionBZ2  = "bz2"
)

Variables

View Source
var (
	// ErrUnindexedBag indicates a bag has a zero-valued IndexPos in the
	// BagHeader.
	ErrUnindexedBag = errors.New("unindexed bag")
	// ErrInvalidOpHeader indicates an invalid op header was encountered while
	// reading the bag.
	ErrInvalidOpHeader = errors.New("invalid op header")
	// ErrNotImplemented indicates a feature is not yet implemented.
	ErrNotImplemented = errors.New("not implemented")
	// ErrInvalidHeapEntry indicates an invalid heap entry was encountered.
	ErrInvalidHeapEntry = errors.New("invalid heap entry")
	// ErrShortBuffer indicates a buffer was too short to read from.
	ErrShortBuffer = errors.New("short buffer")
	// ErrNotABag indicates a file is not a bag.
	ErrNotABag = errors.New("not a bag")
	// ErrMalformedHeader indicates a malformed header was encountered.
	ErrMalformedHeader = errors.New("malformed header")
	// ErrUnseekableReader indicates a reader is not seekable.
	ErrUnseekableReader = errors.New("unseekable reader")
	// ErrInvalidBool indicates a bool value was not 0 or 1.
	ErrInvalidBool = errors.New("invalid bool")
)
View Source
var Magic = []byte("#ROSBAG V2.0\n")

Magic is the magic number for ROS bag files.

Functions

func GetHeaderValue

func GetHeaderValue(header []byte, key string) ([]byte, error)

GetHeaderValue gets the value of a header in a ROS record header. If there is no header with the given key, it returns ErrKeyNotFound.

Types

type BagHeader

type BagHeader struct {
	IndexPos   uint64 // offset of first record after the chunk section
	ConnCount  uint32 // number of unique connections in the file
	ChunkCount uint32 // number of chunk records in the file
}

The bag header record occurs once in the file as the first record.

func ParseBagHeader

func ParseBagHeader(record []byte) (*BagHeader, error)

ParseBagHeader parses a bag header record.

type ChunkInfo

type ChunkInfo struct {
	ChunkPos  uint64            // offset of the chunk record
	StartTime uint64            // timestamp of earliest message in the chunk
	EndTime   uint64            // timestamp of latest message in the chunk
	Count     uint32            // number of connections in the chunk
	Data      map[uint32]uint32 // map of connID to message count
}

ChunkInfo represents the chunk info record. The "ver" field is omitted, and instead assumed to be 1. A ChunkInfo record is placed in the index section of a ROS bag, to allow a reader to easily locate chunks within the file by offset.

func ParseChunkInfo

func ParseChunkInfo(record []byte) (*ChunkInfo, error)

ParseChunkInfo parses a chunk info record.

type Connection

type Connection struct {
	Conn  uint32           // unique connection ID
	Topic string           // topic on which the messages are stored
	Data  ConnectionHeader // connection header
}

Connection represents the connection record. The data portion contains the "connection header". Two topic fields exist (in the record and connection headers). This is because messages can be written to the bag on a topic different from where they were originally published.

func ParseConnection

func ParseConnection(record []byte) (*Connection, error)

ParseConnection parses a connection record.

type ConnectionHeader

type ConnectionHeader struct {
	Topic             string  // name of the topic the subscriber is connecting to
	Type              string  // message type
	MD5Sum            string  // md5sum of message type
	MessageDefinition []byte  // full text of message definition (output of gendeps --cat)
	CallerID          *string // name of node sending data
	Latching          *bool   // publisher is in latching mode (i.e sends the last value published to new subscribers)
}

ConnectionHeader is the connection header structure described here: http://wiki.ros.org/ROS/Connection%20Header, which makes up the data portion of the connection record.

type ErrHeaderKeyNotFound

type ErrHeaderKeyNotFound struct {
	// contains filtered or unexported fields
}

ErrHeaderKeyNotFound indicates a header key was not found.

func (ErrHeaderKeyNotFound) Error

func (e ErrHeaderKeyNotFound) Error() string

Error implements the error interface.

type ErrUnexpectedOpHeader

type ErrUnexpectedOpHeader struct {
	// contains filtered or unexported fields
}

ErrUnexpectedOpHeader indicates an unexpected op header was encountered when reading the bag.

func (ErrUnexpectedOpHeader) Error

func (e ErrUnexpectedOpHeader) Error() string

Error implements the error interface.

type ErrUnsupportedCompression

type ErrUnsupportedCompression struct {
	// contains filtered or unexported fields
}

ErrUnsupportedCompression indicates a chunk was encountered with an unsupported compression algorithm.

func (ErrUnsupportedCompression) Error

Error implements the error interface.

type IndexData

type IndexData struct {
	Conn  uint32              // connection ID
	Count uint32              // number of messages on *conn* in the preceding chunk
	Data  []MessageIndexEntry // *count* repeating occurrences of timestamps (uint64) and message offsets (uint32)
}

IndexData represents the index data record. The "ver" field is omitted and instead assumed to be 1.

func ParseIndexData

func ParseIndexData(record []byte) (*IndexData, error)

ParseIndexData parses an index data record.

type Info

type Info struct {
	MessageStartTime uint64
	MessageEndTime   uint64
	MessageCount     uint64

	ChunkInfos  []*ChunkInfo
	Connections map[uint32]*Connection
}

Info represents information from the bag index.

func (*Info) ConnectionMessageCounts

func (info *Info) ConnectionMessageCounts() map[uint32]int64

ConnectionMessageCounts returns a map of connection ID to message count across the bag.

type Iterator

type Iterator interface {
	Next() (*Connection, *Message, error)
	More() bool
}

type Message

type Message struct {
	Conn uint32 // ID for connection on which emssage arrived
	Time uint64 // time at which the message was received
	Data []byte // serialized message data in ROS serialization format
}

Message represents the message record. Message records are timestamped byte arrays associated with a connection via "connID". The byte array is expected to be decodable using the message_definition field of the connection record associated with "connID".

func ParseMessage

func ParseMessage(record []byte) (*Message, error)

ParseMessage parses a message data. The returned message data is sliced from the record provided. The caller should take care not to modify that record subsequently.

type MessageIndexEntry

type MessageIndexEntry struct {
	Time   uint64 // time at which the message was recorded
	Offset uint32 // offset of the message, relative to the start of the chunk data that contains it
}

MessageIndexEntry is an entry in the data section of an IndexData record.

type OpCode

type OpCode byte

OpCode is a single-byte opcode identifying a record type. See the ROS bag spec for details.

const (
	// OpError is not in the bag spec. We return it only in cases where an error
	// value is also returned, rendering the opcode useless.
	OpError OpCode = 0x00

	// Bag header record: http://wiki.ros.org/Bags/Format/2.0#Bag_header
	OpBagHeader OpCode = 0x03

	// Chunk record: http://wiki.ros.org/Bags/Format/2.0#Chunk
	OpChunk OpCode = 0x05

	// Connection record: http://wiki.ros.org/Bags/Format/2.0#Connection
	OpConnection OpCode = 0x07

	// Message data record: http://wiki.ros.org/Bags/Format/2.0#Message_data
	OpMessageData OpCode = 0x02

	// Index data record: http://wiki.ros.org/Bags/Format/2.0#Index_data
	OpIndexData OpCode = 0x04

	// Chunk info record: http://wiki.ros.org/Bags/Format/2.0#Chunk_info
	OpChunkInfo OpCode = 0x06
)

func ReadRecord

func ReadRecord(reader io.Reader) (OpCode, []byte, error)

ReadRecord reads a record from a reader. The record slice returned includes the header and data lengths.

func (OpCode) String

func (o OpCode) String() string

String returns a string representation of the opcode for display.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a ROS bag reader.

func NewReader

func NewReader(r io.Reader) (*Reader, error)

NewReader returns a new reader. It requires at least a reader, for doing linear reads, but if passed a read seeker can make use of the index.

func (*Reader) Info

func (r *Reader) Info() (*Info, error)

Info returns a structure containing information from the index of the bag.

func (*Reader) Messages

func (r *Reader) Messages(opts ...ScanOption) (Iterator, error)

Messages returns an iterator that performs an indexed read over the bag in timestamp order.

type ScanOption

type ScanOption func(*scanOptions)

func ScanLinear

func ScanLinear(value bool) ScanOption

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is a writer for ROS bag files.

func NewWriter

func NewWriter(outputWriter io.Writer, opts ...WriterOption) (*Writer, error)

NewWriter constructs a new bag writer. The bag writer implements the ROS bag specification, with chunk compression and indexing.

func (*Writer) Close

func (b *Writer) Close() error

Close the bag file, and if the output writer implements WriteSeeker, also overwrite the bag header record with correct values. If the output writer does not implement write seeker, the resulting index will be structurally correct, but not linked from the file header. This can be repaired by running "rosbag reindex", at the cost of rewriting the bag. A smarter tool could scan the file to locate the index records and update the pointer in place.

func (*Writer) WriteBagHeader

func (b *Writer) WriteBagHeader(bagHeader BagHeader) error

WriteBagHeader writes a bag header record to the output. See http://wiki.ros.org/Bags/Format/2.0#Bag_header for details.

func (*Writer) WriteConnection

func (b *Writer) WriteConnection(conn *Connection) error

WriteConnection writes a connection record to the output. A connection record should be written prior to any messages on that connection. This is _not_ enforced by the library, in order to support writing messages to an existing partial file. See http://wiki.ros.org/Bags/Format/2.0#Connection for additional detail.

func (*Writer) WriteMessage

func (b *Writer) WriteMessage(msg *Message) error

WriteMessage writes a message data record to the bag file. See http://wiki.ros.org/Bags/Format/2.0#Message_data for additional detail.

type WriterOption

type WriterOption func(c *bagWriterConfig)

func SkipHeader

func SkipHeader(skipHeader bool) WriterOption

SkipHeader skips writing the header to the bag. This is useful for appending to an existing partial bag or output stream.

func WithChunksize

func WithChunksize(chunksize int) WriterOption

WithChunksize sets the chunksize for the bag writer. The default is 4MB.

func WithCompression

func WithCompression(compression string) WriterOption

WithCompression sets the chunk compression on the output bag. The default value is "lz4".

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL