mq

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2020 License: Apache-2.0 Imports: 14 Imported by: 0

README

package main

import (
    "github.com/joshqu1985/fireman/configor"
    "github.com/joshqu1985/fireman/mq"
)

type Config struct {
    Mq       mq.Config
}

var (
    Conf Config
)

func init() {
    if err := configor.Load("./configs/conf.toml", &Conf); err != nil {
        panic(err)
    }
}

func main() {
    // 生产
    producer := mq.NewSyncProducer(Conf.Mq)
    producer.Send(ctx, "A", &mq.Message{
        Body: []byte("what'up test"),
    })

    // 消费
    consumer := mq.NewConsumer(Conf.Mq)
    consumer.Recv("A", h1)
    consumer.Recv("B", h2)
}

func h1(ctx context.Context, e mq.Event) error {
    fmt.Println(tracing.GetRequestId(ctx), e.Topic(), string(e.Message().Body))
    return nil
}

func h2(ctx context.Context, e mq.Event) error {
    fmt.Println(tracing.GetRequestId(ctx), e.Topic(), string(e.Message().Body))
    return nil
}
#conf.toml
[mq]
    mq_name   = "kafka"
    endpoints = ["localhost:9092"]
    group     = "feed"
    [[mq.topics]]
        name     = "A"
        topic    = "test"
    [[mq.topics]]
        name     = "B"
        topic    = "hello"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartKafkaConsumerSpan

func StartKafkaConsumerSpan(tracer opentracing.Tracer, headers []*sarama.RecordHeader, topic string) (opentracing.Span, context.Context)

func StartKafkaProducerSpan

func StartKafkaProducerSpan(ctx context.Context, topic string) (opentracing.Span, []sarama.RecordHeader)

Types

type AsyncProducer

type AsyncProducer interface {
	Send(ctx context.Context, topicName string, msg *Message) error
	Close() error
}

func NewAsyncProducer

func NewAsyncProducer(conf Config) AsyncProducer

type Config

type Config struct {
	MqName    string        `toml:"mq_name"`
	Endpoints []string      `toml:"endpoints"`
	Instance  string        `toml:"instance"`
	AccessKey string        `toml:"access_key"`
	SecretKey string        `toml:"secret_key"`
	Group     string        `toml:"group"`
	Topics    []TopicConfig `toml:"topics"`
}

type Consumer

type Consumer interface {
	Recv(topicName string, h Handler) error
	Start() error
	Close() error
}

func NewConsumer

func NewConsumer(conf Config) Consumer

type Event

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
}

type GroupHandler

type GroupHandler struct {
	Handler       Handler
	ConsumerGroup sarama.ConsumerGroup
	TopicName     string
}

func (*GroupHandler) Cleanup

func (*GroupHandler) ConsumeClaim

func (*GroupHandler) Setup

type Handler

type Handler func(ctx context.Context, event Event) error

type KafkaAsyncProducer

type KafkaAsyncProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaAsyncProducer

func NewKafkaAsyncProducer(conf Config) (*KafkaAsyncProducer, error)

func (*KafkaAsyncProducer) Close

func (p *KafkaAsyncProducer) Close() error

func (*KafkaAsyncProducer) Send

func (p *KafkaAsyncProducer) Send(ctx context.Context, name string, msg *Message) error

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(conf Config) (*KafkaConsumer, error)

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

func (*KafkaConsumer) Recv

func (c *KafkaConsumer) Recv(name string, h Handler) error

func (*KafkaConsumer) Start

func (c *KafkaConsumer) Start() error

type KafkaConsumerGroup

type KafkaConsumerGroup struct {
	Topic         string
	ConsumerGroup sarama.ConsumerGroup
}

type KafkaEvent

type KafkaEvent struct {
	// contains filtered or unexported fields
}

func (*KafkaEvent) Ack

func (k *KafkaEvent) Ack() error

func (*KafkaEvent) Message

func (k *KafkaEvent) Message() *Message

func (*KafkaEvent) Topic

func (k *KafkaEvent) Topic() string

type KafkaSyncProducer

type KafkaSyncProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaSyncProducer

func NewKafkaSyncProducer(conf Config) (*KafkaSyncProducer, error)

func (*KafkaSyncProducer) Close

func (p *KafkaSyncProducer) Close() error

func (*KafkaSyncProducer) Send

func (p *KafkaSyncProducer) Send(ctx context.Context, name string, msg *Message) error

type Message

type Message struct {
	Header map[string]string
	Body   []byte
}

type RocketAsyncProducer

type RocketAsyncProducer struct {
	// contains filtered or unexported fields
}

func NewRocketAsyncProducer

func NewRocketAsyncProducer(conf Config) (*RocketAsyncProducer, error)

func (*RocketAsyncProducer) Close

func (p *RocketAsyncProducer) Close() error

func (*RocketAsyncProducer) Send

func (p *RocketAsyncProducer) Send(ctx context.Context, name string, msg *Message) error

type RocketConsumer

type RocketConsumer struct {
	// contains filtered or unexported fields
}

func NewRocketConsumer

func NewRocketConsumer(conf Config) (*RocketConsumer, error)

func (*RocketConsumer) Close

func (c *RocketConsumer) Close() error

func (*RocketConsumer) Recv

func (c *RocketConsumer) Recv(name string, h Handler) error

func (*RocketConsumer) Start

func (c *RocketConsumer) Start() error

type RocketEvent

type RocketEvent struct {
	// contains filtered or unexported fields
}

func (*RocketEvent) Ack

func (r *RocketEvent) Ack() error

func (*RocketEvent) Message

func (r *RocketEvent) Message() *Message

func (*RocketEvent) Topic

func (r *RocketEvent) Topic() string

type RocketSyncProducer

type RocketSyncProducer struct {
	// contains filtered or unexported fields
}

func NewRocketSyncProducer

func NewRocketSyncProducer(conf Config) (*RocketSyncProducer, error)

func (*RocketSyncProducer) Close

func (p *RocketSyncProducer) Close() error

func (*RocketSyncProducer) Send

func (p *RocketSyncProducer) Send(ctx context.Context, name string, msg *Message) error

type SyncProducer

type SyncProducer interface {
	Send(ctx context.Context, topicName string, msg *Message) error
	Close() error
}

func NewSyncProducer

func NewSyncProducer(conf Config) SyncProducer

type TopicConfig

type TopicConfig struct {
	Name  string `toml:"name"`
	Topic string `toml:"topic"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL