tubemq-client-go

module
v0.0.0-...-065392d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2024 License: Apache-2.0, BSD-3-Clause, MIT

README

Go Report Card Language

TubeMQ Go Client Library

Goal

This project is a pure-Go client library for TubeMQ that does not depend on the TubeMQ C++ library. Production is not supported yet.

Requirements

  • Go 1.11+

Usage

Import the client andconfig library:

import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/client"
import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/config"

Import the log library for log if needed:

import "github.com/apache/inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-go/log"

Create a Consumer by parsing address:

cfg, err := config.ParseAddress("127.0.0.1:8099?topic=test_1&group=test_group")
if err != nil {
	fmt.Errorf("Failed to parse address %s", err.Error())
	panic(err)
}
c, err := client.NewConsumer(cfg)
if err != nil {
	fmt.Errorf("new consumer error %s", err.Error())
	panic(err)
}

defer c.Close()

cr, err := c.GetMessage()
// need to confirm by yourself.
_, err = c.Confirm(cr.ConfirmContext, true)

for _, msg := range cr.Messages {
	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
}

Create a Consumer by constructing a config:

topicFilters := map[string][]string{"topic1": {"filter1", "filter2"}, "topic2": {"filter3", "filter4"}}
partitionOffset := map[string]int64{"181895251:topic1:1": 0, "181895251:topic2:2": 10}
cfg := config.New(config.WithMasters("127.0.0.1:8099"),
config.WithGroup("group"),
//For topic filters
config.WithTopicFilters(topicFilters),
// For bound consume
config.WithBoundConsume("ss", 1, true, partitionOffset))
c, err := client.NewConsumer(cfg)
if err != nil {
	fmt.Errorf("new consumer error %s", err.Error())
	panic(err)
}

defer c.Close()

cr, err := c.GetMessage()
// need to confirm by yourself.
_, err = c.Confirm(cr.ConfirmContext, true)

for _, msg := range cr.Messages {
	fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID, string(msg.Data))
}

Example

Multiple Goroutines consumption is also supported. More specific examples can be referred in Go Client Examples.

Directories

Path Synopsis
Package client defines the api and information which can be exposed to user.
Package client defines the api and information which can be exposed to user.
Package codec defines the encoding and decoding logic between TubeMQ.
Package codec defines the encoding and decoding logic between TubeMQ.
Package config defines the all the TubeMQ configuration options.
Package config defines the all the TubeMQ configuration options.
Package errs defines the TubeMQ error codes and TubeMQ error msg.
Package errs defines the TubeMQ error codes and TubeMQ error msg.
Package flowctrl defines the rule and handle logic of flow control.
Package flowctrl defines the rule and handle logic of flow control.
Package log defines the logger for the sdk.
Package log defines the logger for the sdk.
Package metadata defines all the metadata of the TubeMQ broker and producer.
Package metadata defines all the metadata of the TubeMQ broker and producer.
Package multiplexing defines the multiplex connection pool for sending request and receiving response.
Package multiplexing defines the multiplex connection pool for sending request and receiving response.
Package remote defines the remote data which is returned from TubeMQ.
Package remote defines the remote data which is returned from TubeMQ.
Package rpc encapsulates all the rpc request to TubeMQ.
Package rpc encapsulates all the rpc request to TubeMQ.
Package selector defines the route selector which is responsible for service discovery.
Package selector defines the route selector which is responsible for service discovery.
Package sub defines the subscription information of a client.
Package sub defines the subscription information of a client.
Package transport defines the network communication layer which is responsible for encoding the rpc request and decoding the response from TubeMQ.
Package transport defines the network communication layer which is responsible for encoding the rpc request and decoding the response from TubeMQ.
Package util defines the constants and helper functions.
Package util defines the constants and helper functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL