es_writer

package module
v0.18.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2022 License: MIT Imports: 19 Imported by: 0

README

Elastic Search writer Build Status

Problems

It's very easy to have conflict when we have multiple services writing data into same Elastic Search server.

To avoid this problem, the service should publish message to a certain instead of writing to ES direclty. So that we can have ES-Writer, a single actor that connect with Elastic Search.

By this convention, the services doesn't need to know credentials of Elastic Search server.

It's also easy to create cluster Elastic Search servers without magic.

Usage

Queueing

To write data to ES server, your service publishes messages to GO1 RabbitMQ with:

  • Exchange: $exchange
  • Key: $routingKey
  • Payload: OBJECT
    • Examples:
      • fixtures/indices/indices-create.json
      • fixtures/indices/indices-drop.json
      • fixtures/portal/portal-index.json
      • fixtures/portal/portal-update.json
      • fixtures/portal/portal-update-by-query.json
      • fixtures/portal/portal-delete.json
      • fixtures/portal/portal-delete-by-query.json

Ref https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html

Consuming
/path/to/es-writer
    -serviceName     SERVICE_NAME<STRING=es-writer>
    -url             RABBITMQ_URL<STRING=amqp://go1:go1@127.0.0.1:5672/>
    -kind            RABBITMQ_KIND<STRING=topic>
    -exchange        RABBITMQ_EXCHANGE<STRING=events>
    -routing-key     RABBITMQ_ROUTING_KEY<STRING=es.writer.go1>
    -consumer-name   RABBITMQ_CONSUMER_NAME<STRING=es-writer>
    -queue-name      RABBITMQ_QUEUE_NAME<STRING=es-writer>
    -url-contains     URL_CONTAINS<STRING>     # example: `award:`
    -url-not-contains URL_NOT_CONTAINS<STRING> # example: `award:`
    -prefetch-count  RABBITMQ_PREFETCH_COUNT<INT=50>
    -es-url          ELASTIC_SEARCH_URL<STRING=http://127.0.0.1:9200/?sniff=false>
    -refresh         true # Optional. allowed values: true, wait_for. default is false
    -admin-port      ADMIN_PORT<STRING=:8001>
    -debug           DEBUG<BOOL=true>. Default is false
    -single-active-consumer SINGLE_ACTIVE_CONSUMER<BOOL=true>. default is false. @see https://www.rabbitmq.com/consumers.html#single-active-consumer
    -bulk-timeout BULK_TIMEOUT=5m
Notes

Limit these kinds of request because they are not bulkable:

  • /_update_by_query
  • /_delete_by_query
  • PUT /index_name
  • DELETE /index_name
Test local
docker run -d -p 9200:9200                --rm --name es       elasticsearch:5-alpine
docker run -d -p 5672:5672 -p 15672:15672 --rm --name rabbitmq rabbitmq:3-management
go test -race -v ./...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewClientFromConfig added in v0.17.10

func NewClientFromConfig(cfg *config.Config, disableKeepAlive bool) (*elastic.Client, error)

Types

type App

type App struct {
	// contains filtered or unexported fields
}

func (*App) Run

func (this *App) Run(ctx context.Context, container Configuration) error

type Configuration added in v0.18.4

type Configuration struct {
	Url           *string
	Kind          *string
	Exchange      *string
	RoutingKey    *string
	PrefetchCount *int
	PrefetchSize  *int
	TickInterval  *time.Duration
	QueueName     *string
	UrlContain    *string
	UrlNotContain *string
	ConsumerName  *string
	EsUrl         *string
	AdminPort     *string
	Debug         *bool
	Refresh       *string
	DataDog       DataDogConfig

	Stop                 chan bool
	SingleActiveConsumer *bool
	BulkTimeoutString    *string
	// contains filtered or unexported fields
}

func NewConfiguration added in v0.18.4

func NewConfiguration(logger *zap.Logger) Configuration

func (*Configuration) App added in v0.18.4

func (this *Configuration) App() (*App, error, chan bool)

type DataDogConfig

type DataDogConfig struct {
	Host        string
	Port        string
	ServiceName string
	Env         string
}

type PushCallback

type PushCallback func(context.Context, amqp.Delivery) (err error, ack bool, buff bool, flush bool)

type RabbitMqInput

type RabbitMqInput struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL