kafka

package
v0.0.0-...-171658a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: MIT Imports: 4 Imported by: 0

README

kafka

improve kafka's reactivity

  • Producer

    observer watch the Observable and directly publish to broker there's no need to subscribe the stream

      obv := stream.NewObserver(nil)
      p := NewProducerStream([]string{"localhost:9092"}, obv)
      p.AfterSend = func(msg *sarama.ProducerMessage) {
      	log.Print(msg)
      }
      p.ErrFrom = func(err error) {
      	log.Print(err)
      }
      p.Handler.AtComplete = func() {
      	log.Print("end produce")
      }
    
      p.Target = func() {
      	for i := 0; i <= 10; i++ {
      		msg := &sarama.ProducerMessage{
      			Topic: "test",
      			Key:   sarama.StringEncoder(strconv.Itoa(i)),
      			Value: sarama.StringEncoder(strconv.Itoa(i)),
      		}
    
      		select {
      		case <-p.AfterCancel():
      			return
      		default:
      			p.Send(msg)
      		}
      	}
      	p.OnComplete()
      }
    
      p.Publish(nil)
    
  • Consumer

    consumer not have observer thus acting like ovserver to watch broker

      h := DefaultConsumHandler()
      h.AtError = func(err error) {
      	log.Print(err)
      }
    
      c := NewConsumerGroup("test_group", []string{"127.0.0.1:9092"}, []string{"test"}, h)
    
      var count int
      c.Subscribe(func(msg *sarama.ConsumerMessage) {
      	log.Printf("offset: %d\n", msg.Offset)
      	if count == 10 {
      		_ = c.Consumer.CommitOffsets()
      		c.Cancel()
      	}
      	count++
      })
    

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumHandler

type ConsumHandler struct {
	AtSubscribe ConsumedMessageHandler
	AtError     stream.ErrHandler
	AtNotified  NotificationHandler
}

ConsumHandler consum 이벤트에 맞춰 실행될 핸들러

func DefaultConsumHandler

func DefaultConsumHandler() *ConsumHandler

DefaultConsumHandler 기본 핸들러 생성

type ConsumedMessageHandler

type ConsumedMessageHandler func(*sarama.ConsumerMessage)

ConsumedMessageHandler consumer가 메세지를 받을때, 실행된다.

type ConsumerGroup

type ConsumerGroup struct {
	Consumer *cluster.Consumer
	Handler  ConsumHandler
	// contains filtered or unexported fields
}

ConsumerGroup sarama cluster를 이용한 consumer

func NewConsumerGroup

func NewConsumerGroup(groupName string, addrs, topics []string, handler *ConsumHandler) *ConsumerGroup

NewConsumerGroup consumer group 생성

func (*ConsumerGroup) Cancel

func (cg *ConsumerGroup) Cancel()

Cancel 구독을 취소시킨다.

func (*ConsumerGroup) Subscribe

func (cg *ConsumerGroup) Subscribe(call ConsumedMessageHandler)

Subscribe broker로 부터 메세지를 구독한다.

type NotificationHandler

type NotificationHandler func(*cluster.Notification)

NotificationHandler consumer가 broker로 부터 알림을 받을때, 실행

type ProceedMessageHandler

type ProceedMessageHandler func(*sarama.ProducerMessage)

ProceedMessageHandler 메세지가 broker로 전달된 후 실행되는 핸들러

type ProducerStream

type ProducerStream struct {
	Producer  sarama.AsyncProducer
	AfterSend ProceedMessageHandler
	ErrFrom   stream.ErrHandler
	*stream.Observer
	// contains filtered or unexported fields
}

ProducerStream sarama AsyncProducer를 이용한 stream 구조체

func NewProducerStream

func NewProducerStream(addrs []string, obv *stream.Observer) *ProducerStream

NewProducerStream stream 생성

func (*ProducerStream) Publish

func (ps *ProducerStream) Publish(target func())

Publish Observable의 데이터를 구독한 후 broker로 메세지를 전송한다.

func (*ProducerStream) Send

func (ps *ProducerStream) Send(msg *sarama.ProducerMessage)

Send ProducerMessage를 Subscribable에 전달

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL