flow

package
v0.13.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2021 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrConvertKafkaMessage   = errkit.Error("Convert KafkaMessage Failed")
	ErrStore                 = errkit.Error("Store Failed")
	ErrElasticsearchClient   = errkit.Error("Elasticsearch Client Failed")
	ErrConsumerWorker        = errkit.Error("Consumer Worker Failed")
	ErrMakeKafkaAdmin        = errkit.Error("Make kafka admin failed")
	ErrMakeNewTopicWorker    = errkit.Error("Make new topic worker failed")
	ErrSpawnWorkerOnNewTopic = errkit.Error("Spawn worker on new topic failed")
	ErrSpawnWorker           = errkit.Error("Span worker failed")
	ErrHaltWorker            = errkit.Error("Consumer Worker Halted")

	PrefixEventGroupID = "nte"
)
View Source
const (
	JsonParseError  = errkit.Error("JSON Parse Error")
	ProtoParseError = errkit.Error("Protobuf Parse Error")
)
View Source
const (
	ErrMakeSyncProducer       = errkit.Error("Make sync producer failed")
	ErrKafkaRetryLimitReached = errkit.Error("Error connecting to kafka, retry limit reached")
	ErrInitGrpc               = errkit.Error("Failed to listen to gRPC address")
	ErrRegisterGrpc           = errkit.Error("Error registering gRPC server endpoint into reverse proxy")
	ErrReverseProxy           = errkit.Error("Error serving REST reverse proxy")
)
View Source
const (
	DEFAULT_ELASTIC_DOCUMENT_TYPE = "_doc"
)
View Source
const (
	RetrieveMessageFailedError = errkit.Error("Retrieve message failed")
)

Variables

This section is empty.

Functions

func Contains

func Contains(s []string, e string) bool

func ConvertKafkaMessageToTimber

func ConvertKafkaMessageToTimber(message *sarama.ConsumerMessage) (timber pb.Timber, err error)

func ConvertTimberToEsDocumentString added in v0.13.0

func ConvertTimberToEsDocumentString(timber pb.Timber, m *jsonpb.Marshaler) string

func ConvertTimberToKafkaMessage

func ConvertTimberToKafkaMessage(timber *pb.Timber, topic string) *sarama.ProducerMessage

func GetApplicationSecretCollection

func GetApplicationSecretCollection() []string

func InstruApplicationSecret

func InstruApplicationSecret(appSecret string)

func NewDummyKafkaFactory

func NewDummyKafkaFactory() *dummyKafkaFactory

func NewDummyRateLimiter

func NewDummyRateLimiter() *dummyRateLimiter

func NewElastic

func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, elasticUsername string, elasticPassword string) (client elasticClient, err error)

func NewEsConfig

func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printTPS bool) esConfig

Types

type ApplicationSecretCollection

type ApplicationSecretCollection struct {
	// contains filtered or unexported fields
}

type BaritoConsumerService

type BaritoConsumerService interface {
	Start() error
	Close()
	WorkerMap() map[string]ConsumerWorker
	NewTopicEventWorker() ConsumerWorker
}

func NewBaritoConsumerService

func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerService

type ClusterConsumer

type ClusterConsumer interface {
	Messages() <-chan *sarama.ConsumerMessage
	Notifications() <-chan *cluster.Notification
	Errors() <-chan error
	MarkOffset(msg *sarama.ConsumerMessage, metadata string)
	Close() error
}

Interfacing cluser.Consumer for testing purpose

type ConsumerWorker

type ConsumerWorker interface {
	Start()
	Stop()
	Halt()
	IsStart() bool
	OnError(f func(error))
	OnSuccess(f func(*sarama.ConsumerMessage))
	OnNotification(f func(*cluster.Notification))
}

func NewConsumerWorker

func NewConsumerWorker(name string, consumer ClusterConsumer) ConsumerWorker

type ELasticTestHandler

type ELasticTestHandler struct {
	ExistAPIStatus  int
	CreateAPIStatus int
	PostAPIStatus   int
	ResponseBody    []byte
	CustomHandler   func(w http.ResponseWriter, r *http.Request)
}

func (*ELasticTestHandler) ServeHTTP

func (handler *ELasticTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Elastic

type Elastic interface {
	OnFailure(f func(*pb.Timber))
	Store(ctx context.Context, timber pb.Timber) error
	NewClient()
}

type ElasticRetrier

type ElasticRetrier struct {
	// contains filtered or unexported fields
}

func NewElasticRetrier

func NewElasticRetrier(t time.Duration, n int, f func(err error)) *ElasticRetrier

func (*ElasticRetrier) Retry

func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request, resp *http.Response, err error) (time.Duration, bool, error)

type KafkaAdmin

type KafkaAdmin interface {
	RefreshTopics() error
	SetTopics([]string)
	Topics() []string
	AddTopic(topic string)
	Exist(topic string) bool
	CreateTopic(topic string, numPartitions int32, replicationFactor int16) error
	Close()
}

func NewKafkaAdmin

func NewKafkaAdmin(client sarama.Client) (admin KafkaAdmin, err error)

type KafkaFactory

type KafkaFactory interface {
	MakeKafkaAdmin() (admin KafkaAdmin, err error)
	MakeClusterConsumer(groupID, topic string, initialOffset int64) (worker ClusterConsumer, err error)
	MakeSyncProducer() (producer sarama.SyncProducer, err error)
}

func NewKafkaFactory

func NewKafkaFactory(brokers []string, config *sarama.Config) KafkaFactory

type LeakyBucket

type LeakyBucket struct {
	// contains filtered or unexported fields
}

func NewLeakyBucket

func NewLeakyBucket(max int32) *LeakyBucket

func (*LeakyBucket) IsFull

func (b *LeakyBucket) IsFull() bool

func (*LeakyBucket) Max

func (b *LeakyBucket) Max() int32

func (*LeakyBucket) Refill

func (l *LeakyBucket) Refill()

func (*LeakyBucket) Take

func (l *LeakyBucket) Take(count int) bool

func (*LeakyBucket) Token

func (b *LeakyBucket) Token() int32

func (*LeakyBucket) UpdateMax

func (b *LeakyBucket) UpdateMax(newMax int32)

type ProducerService added in v0.13.0

type ProducerService interface {
	pb.ProducerServer
	Start() error
	LaunchREST() error
	Close()
}

func NewProducerService added in v0.13.0

func NewProducerService(params map[string]interface{}) ProducerService

type RateLimiter

type RateLimiter interface {
	IsHitLimit(topic string, count int, maxTokenIfNotExist int32) bool
	Start()
	Stop()
	IsStart() bool
	PutBucket(topic string, bucket *LeakyBucket)
	Bucket(topic string) *LeakyBucket
}

func NewRateLimiter

func NewRateLimiter(duration int) RateLimiter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL