aeron-go

module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2022 License: Apache-2.0

README

Build Status Go Report Card

aeron-go

Implementation of Aeron messaging client in Go.

Architecture, design, and protocol of Aeron can be found here

Usage

Example subscriber can be found here.

Example publication can be found here.

Common

Instantiate Aeron with Context:

ctx := aeron.NewContext().MediaDriverTimeout(time.Second * 10)

a := aeron.Connect(ctx)

Subscribers

Create subscription:

subscription := <-a.AddSubscription("aeron:ipc", 10)

defer subscription.Close()

aeron.AddSubscription() returns a channel, so that the user has the choice of blocking waiting for subscription to register with the driver or do async select poll.

Define callback for message processing:

handler := func(buffer *buffers.Atomic, offset int32, length int32, header *logbuffer.Header) {
    bytes := buffer.GetBytesArray(offset, length)

    fmt.Printf("Received a fragment with payload: %s\n", string(bytes))
}

Poll for messages:

idleStrategy := idlestrategy.Sleeping{time.Millisecond}

for {
    fragmentsRead := subscription.Poll(handler, 10)
    idleStrategy.Idle(fragmentsRead)
}

Publications

Create publication:

publication := <-a.AddPublication("aeron:ipc", 10)

defer publication.Close()

aeron.AddPublication() returns a channel, so that the user has the choice of blocking waiting for publication to register with the driver or do async select poll.

Create Aeron buffer to send the message:

message := fmt.Sprintf("this is a message %d", counter)

srcBuffer := buffers.MakeAtomic(([]byte)(message))

Optionally make sure that there are connected subscriptions:

for !publication.IsConnected() {
    time.Sleep(time.Millisecond * 10)
}

Send the message, by calling publication.Offer

ret := publication.Offer(srcBuffer, 0, int32(len(message)), nil)
switch ret {
case aeron.NotConnected:
    log.Print("not connected yet")
case aeron.BackPressured:
    log.Print("back pressured")
default:
    if ret < 0 {
        log.Print("Unrecognized code: %d", ret)
    } else {
        log.Print("success!")
    }
}

Directories

Path Synopsis
logging
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.
Provides a transition layer from "github.com/op/go-logging" to "go.uber.org/zap" to simply resolve some reentrancy issues in go-logging.
Package archive provides API access to Aeron's archive-media-driver
Package archive provides API access to Aeron's archive-media-driver
codecs
Package codecs contains the archive protocol packet encoding and decoding
Package codecs contains the archive protocol packet encoding and decoding
examples/basic_recording_publisher
An example recorded publisher
An example recorded publisher
examples/basic_replayed_subscriber
An example replayed subscriber
An example replayed subscriber
codecs
Package codecs contains the archive protocol packet encoding and decoding
Package codecs contains the archive protocol packet encoding and decoding

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL