pubsubetcd

package module
v0.0.0-...-ccef3ff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

README

pubsub-etcd

Abusing etcd by turning it into a pubsub system. Brokerless, only uses etcd to store consumer group state, offsets etc.

Why?

In all honesty, it was a fun thing to build.

Contributing

The idea is to keep this package brokerless. That means that consumers should be responsible for reporting what kind of state they are in, and ultimately cleaning up the topic and partition they are consuming.

Running

For pubsub-etcd to work, you need an etcd instance up and running. There's a convenience command for this in the _examples folder.

cd _examples && make standalone-etcd

We can now connect to the instance.

etcd, _ := clientv3.New(clientv3.Config{
    Endpoints:   []string{"localhost:2379"},
    DialTimeout: 5 * time.Second,
})

Let's create a topic, which is the place where we will store messages. Partitions are there to spread out events for increased throughput (read and write).

name := "my-awesome-topic"
partitions := 3 // Max is 127
myTopic, err := pubsubetcd.CreateTopic(etcd, name, partitions)
if err != nil {
    log.Printf("%v", err)
}

Note that if the topic already exists, we will get an error trying to create it again. In such cases we can just get it instead.

myTopic, _ = pubsubetcd.GetTopic(etcd, name)

Subscribing to a topic means you'll start receiving messages from said topic. In this case, we'll subscribe to all partitions using Subscribe. We are now a consumer of a topic and a set of partitions.

consumerName := "foobar"
subscriptions, err := myTopic.Subscribe(consumerName)
if err != nil {
    log.Printf("%v", err)
}

We get back one subscription for every partition we now are subscribed to. Each subscription gives us a Messages channel on which all events are received. Let's set up a listener for it to watch incoming messages.

for _, sub := range subscriptions {
    go func(sub pubsubetcd.Subscription) {
        for {
            select {
            case msg := <-sub.Messages:
                log.Printf("key: %v, value: %v, offset: %v", msg.Key, msg.Value, msg.Offset)
            }
        }
    }(sub)
}

We can send any slice of strings to the topic. The default behaviour is to randomly distribute incoming data across all available partitions. We now produced messages to a topic.

success, fail := myTopic.PutBatch([]string{"hello", "darkness", "my", "old", "friend"})
log.Printf("successfully sent %v messages, failed to send %v messages", len(success), len(fail))

Since we've already set up a listener, lets wait for a second to receive everything we sent.

time.Sleep(1 * time.Second)
// Events should show up in stdout.

When you're done consuming messages make sure to unsubscribe. This will eventually let the topic and it's partitions be available for other consumers.

for _, sub := range subscriptions {
    log.Printf("Unsubscribing from %v, partition %v", sub.Topic.Name, sub.Partition)
    sub.Unsubscribe()
}ª

More examples

Check the _examples folder.

cd _examples

# Start an etcd instance locally.
make standalone-etcd

# Run the example you want.

Todo / Done

  • Topics
    • Create
    • Get
    • Update
    • Remove
    • Message ttl
  • Partitions
  • Consumer groups
  • Offset tracking
  • Put / producing messages
    • Put
    • BatchPut
  • Delete
    • Delete
    • BatchDelete
  • Compacting specific topics (probably not possible)

Tests

Tests require an etcd instance to be running. cd _examples && make standalone-etcd can start such an instance for you.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Key    string
	Offset int64
	Value  string
}

func NewMessage

func NewMessage(key string, offset int64, value string) Message

type Subscription

type Subscription struct {
	Messages     chan Message
	Shutdown     chan bool
	ConsumerName string
	Topic        Topic
	Partition    int
}

func (*Subscription) CommitOffset

func (s *Subscription) CommitOffset(offset int64) error

func (*Subscription) KeepSubscriptionAlive

func (s *Subscription) KeepSubscriptionAlive() chan bool

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

type Topic

type Topic struct {
	Name        TopicName
	Partitions  int
	MaxMessages int64
	TTL         time.Duration
	// contains filtered or unexported fields
}

func CreateTopic

func CreateTopic(etcd *clientv3.Client, name string, partitions int) (Topic, error)

func GetTopic

func GetTopic(etcd *clientv3.Client, name string) (Topic, error)

func (*Topic) AnnounceSubscription

func (t *Topic) AnnounceSubscription(consumerName string, partitionNumber int) error

func (*Topic) GetConsumerOffset

func (t *Topic) GetConsumerOffset(consumerName string, partitionNumber int) (int64, error)

func (Topic) GetName

func (t Topic) GetName() TopicName

func (Topic) GetPartitions

func (t Topic) GetPartitions() int

func (*Topic) GetTopicMetadata

func (t *Topic) GetTopicMetadata(topicName string) error

func (*Topic) Put

func (t *Topic) Put(data string) (string, string)

func (*Topic) PutBatch

func (t *Topic) PutBatch(data []string) ([]string, []string)

PutBatch sends values to a given topic. Returns the successful and failed requests.

func (*Topic) ScramblePartitions

func (t *Topic) ScramblePartitions() []int

func (*Topic) Subscribe

func (t *Topic) Subscribe(consumerName string) ([]Subscription, error)

func (*Topic) SubscribeToPartition

func (t *Topic) SubscribeToPartition(consumerName string, partitionNumber int, offset int64) (Subscription, error)

type TopicName

type TopicName string

func (TopicName) IsValid

func (tn TopicName) IsValid() bool

func (TopicName) String

func (tn TopicName) String() string

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL