pubsub

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

README

PubSub

Go Reference Bitbucket Pipelines

Installation

go get bitbucket.org/amotus/pubsub

Usage

package main

import "bitbucket.org/amotus/pubsub"

func main() {
	kafka = kafka.NewPubSub("redpanda:9092", "kafka-consumer-name-example", "topic")
	
	go kafka.Listen()
	
	topicChanOut := kafka.Subscribe("topic")

	go func() {
		for msg := range topicChanOut {
			onKafkaMessage(msg)
		}
	}()
}


func onKafkaMessage(msg kafka.KafkaMessage) {
	log.Println("received message", msg.Topic, msg.Payload)
}

TODO

  • Update readme to include Blocking-pubsub example

License

Licensed under Apache License, Version 2.0 LICENSE.

Documentation

Overview

Package pubsub provide the implementation of a PubSub pattern with channels

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume[Input, Output any](ctx context.Context,
	input <-chan Input,
	output chan<- Output,
	transformer MessageTransformer[Input, Output],
	gate channel.MessageFilter[Output])

func Forward

func Forward[Message1, Message2 any](subscriber <-chan Message1, broadcaster PubSub[Message2], transform func(message Message1) Message2)

Types

type MessageTransformer

type MessageTransformer[IN any, OUT any] interface {
	Transform(message IN) OUT
}

type PubSub

type PubSub[Message any] interface {
	// Publish a message to a topic
	Publish(message Message) error

	// Subscribe to a topic
	Subscribe(filter channel.MessageFilter[Message]) (<-chan Message, error)

	// Unsubscribe from a topic
	Unsubscribe(<-chan Message) error
}

func NewBlockingPubSub

func NewBlockingPubSub[Message any]() PubSub[Message]

func NewPubSub

func NewPubSub[Message any]() PubSub[Message]

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL