go-queue

module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: Apache-2.0

README

go-queue

Kafka, Beanstalkd, Pulsar Pub/Sub framework. Reference: https://github.com/zeromicro/go-queue

installation

go get -u github.com/chenquan/go-queue

beanstalkd

High available beanstalkd.

consumer example

config.yaml

Name: beanstalkd
Telemetry:
  Name: beanstalkd
  Endpoint: http://localhost:14268/api/traces
  Sampler: 1.0
  Natcher: jaeger
package main

import (
	"context"
	"github.com/chenquan/go-queue/beanstalkd"
	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/core/service"
	"github.com/zeromicro/go-zero/core/stores/redis"
)

func main() {
	var c service.ServiceConf
	conf.MustLoad("config.yaml", &c)

	c.MustSetUp()

	consumer := beanstalkd.NewConsumer(beanstalkd.Conf{
		Beanstalkd: beanstalkd.Beanstalkd{
			Endpoints: []string{
				"localhost:11300",
				"localhost:11300",
			},
			Tube: "tube",
		},
		Redis: redis.RedisConf{
			Host: "localhost:6379",
			Type: redis.NodeType,
		},
	}, beanstalkd.WithHandle(func(ctx context.Context, body []byte) {
		logx.WithContext(ctx).Info(string(body))

	}))
	defer consumer.Stop()
	consumer.Start()
}
producer example
package main

import (
	"context"
	"fmt"
	"github.com/chenquan/go-queue/beanstalkd"
	"strconv"
	"time"
)

func main() {
	producer := beanstalkd.NewProducer(
		beanstalkd.Beanstalkd{
			Tube: "tube",
			Endpoints: []string{
				"localhost:11300",
				"127.0.0.1:11300",
			},
		},
	)

	for i := 1; i < 1005; i++ {
		//_, err := producer.Delay(context.Background(), []byte(strconv.Itoa(i)), time.Second*5)
		//if err != nil {
		//	fmt.Println(err)
		//}
		_, err := producer.Push(context.Background(), nil, []byte(strconv.Itoa(i)), beanstalkd.WithDuration(time.Second*5))
		if err != nil {
			fmt.Println(err)
		}
	}
}

kafka

Kafka Pub/Sub framework

consumer example

config.yaml

Name: kafka
Brokers:
  - 127.0.0.1:19092
  - 127.0.0.1:19092
  - 127.0.0.1:19092
Group: kafka
Topic: kafka
Offset: first
Consumers: 1

Telemetry:
  Name: kq
  Endpoint: http://localhost:14268/api/traces
  Sampler: 1.0
  Natcher: jaeger

example code

package main

import (
	"context"
	"fmt"
	"github.com/chenquan/go-queue/kafka"
	"github.com/chenquan/go-queue/queue"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/core/service"

	"github.com/zeromicro/go-zero/core/conf"
)

func main() {
	var c struct {
		kafka.Conf
		service.ServiceConf
	}

	conf.MustLoad("config.yaml", &c)
	c.MustSetUp()

	q := kafka.MustNewQueue(c.Conf, queue.WithHandle(func(ctx context.Context, k, v []byte) error {
		logx.WithContext(ctx).Info(fmt.Sprintf("=> %s\n", v))
		return nil
	}))
	defer q.Stop()
	q.Start()
}

producer example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/chenquan/go-queue/kafka"
	"log"
	"math/rand"
	"strconv"
	"time"

	"github.com/zeromicro/go-zero/core/cmdline"
)

type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

func main() {
	pusher := kafka.NewPusher([]string{
		"127.0.0.1:19092",
		"127.0.0.1:19092",
		"127.0.0.1:19092",
	}, "kafka")

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		<-ticker.C

		count := rand.Intn(100)
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		if err != nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))
		if _, err := pusher.Push(context.Background(), []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), body); err != nil {
			log.Fatal(err)
		}
	}

	cmdline.EnterToContinue()
}

pulsar

Pulsar Pub/Sub framework

consumer example

config.yaml

Name: pulsar
Brokers:
  - 127.0.0.1:6650
Topic: pulsar
Conns: 2
Processors: 2
SubscriptionName: pulsar

Telemetry:
  Name: pulsar
  Endpoint: http://localhost:14268/api/traces
  Sampler: 1.0
  Natcher: jaeger

consumer code

package main

import (
	"context"
	"fmt"
	"github.com/chenquan/go-queue/pulsar"
	"github.com/chenquan/go-queue/queue"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/core/service"

	"github.com/zeromicro/go-zero/core/conf"
)

func main() {
	var c struct {
		pulsar.Conf
		service.ServiceConf
	}
	conf.MustLoad("config.yaml", &c)
	c.MustSetUp()

	q := pulsar.MustNewQueue(c.Conf, queue.WithHandle(func(ctx context.Context, k, v []byte) error {
		logx.WithContext(ctx).Info(fmt.Sprintf("=> %s\n", v))
		return nil
	}))
	defer q.Stop()
	q.Start()
}

producer code

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/chenquan/go-queue/pulsar"
	"log"
	"math/rand"
	"strconv"
	"time"

	"github.com/zeromicro/go-zero/core/cmdline"
)

type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}

func main() {
	pusher := pulsar.NewPusher([]string{
		"127.0.0.1:19092",
		"127.0.0.1:19092",
		"127.0.0.1:19092",
	}, "pulsar")

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		<-ticker.C

		count := rand.Intn(100)
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		if err != nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))

		if _, err := pusher.Push(context.Background(), []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), body); err != nil {
			log.Fatal(err)
		}
	}

	cmdline.EnterToContinue()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL