model

package
v0.0.0-...-f73c795 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2016 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ASYNC_EXPIRE int = 1000
View Source
var CONSUMERGROUP string = "default"

Get the kafka consumer for asynchronous batch requests

View Source
var GetAsyncBatchProducer = func() (sarama.SyncProducer, error) {
	return NewAsyncBatchProducer(ZOOKEEPER)
}

Get the kafka producer for asynchronous batch requests

View Source
var GetAsyncJobRedis = func() *redis.Client {
	return NewRedisClient(REDIS_HOST,
		REDIS_PORT,
		REDIS_PW,
		strconv.Itoa(REDIS_DB))
}

Get a redis instance for the async jobs

View Source
var GetRequestClient = func() BatchClient {
	return &http.Client{}
}

Get the client to use for the http requests

View Source
var HEAD_OFFSETS int64 = 0
View Source
var HostMap map[string]string

Contains the mapping for internal services

View Source
var NewAsyncBatchProducer = func(zookeeperConn string) (sarama.SyncProducer, error) {
	zookeeper, err := kazoo.NewKazooFromConnectionString(zookeeperConn, kazoo.NewConfig())
	if err != nil {
		log.Fatalln("An error occurred connecting to zookeeper", err)
		return nil, err
	}

	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 3
	config.Producer.Retry.Backoff = 100 * time.Millisecond

	brokerList, err := zookeeper.BrokerList()
	if err != nil {
		log.Fatalln("An error occurred getting broker list from zookeeper", err)
		return nil, err
	}

	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		log.Fatalln("Failed to start Sarama producer:", err)
		return nil, err
	}

	return producer, nil
}

NewAsyncBatchProducer gets a new producer instance for producing batch item messages to kafka Can be overridden for mocking kafka

View Source
var NewRedisClient = func(host, port, password, db string) *redis.Client {
	return redis.NewTCPClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%s", REDIS_HOST, REDIS_PORT),
		Password: REDIS_PW,
		DB:       int64(REDIS_DB),
	})
}

Used to retrieve a new Redis client

View Source
var REDIS_DB int = 0
View Source
var REDIS_HOST string = "default"
View Source
var REDIS_PORT string = "default"
View Source
var REDIS_PW string = "default"
View Source
var RESET_OFFSETS bool = false
View Source
var TOPIC string = "default"
View Source
var WORKER_SLEEP int = 100
View Source
var ZOOKEEPER string = "default"

Functions

func NewAsyncBatchConsumer

func NewAsyncBatchConsumer(zookeeperConn, consumerGroup, topic string, headOffset int64, resetOffsets bool) (*consumergroup.ConsumerGroup, error)

creates the task consumer for the kafka queues

func StartAsyncWorker

func StartAsyncWorker(workerNum int, quit chan bool, finished chan bool)

Starts a new background worker task to process asynchronous batch items from kafka/redis

func StartAsyncWorkers

func StartAsyncWorkers(numWorkers int, quit chan bool, finished chan bool)

Starts a bunch of background worker tasks to process asynchronous batch items from kafka/redis

Types

type AsyncBatchItem

type AsyncBatchItem struct {
	RequestID  string    `json:"requestId"`
	Index      int64     `json:"idx"`
	Item       BatchItem `json:"item"`
	IdentityID string    `json:"identityId"`
}

Struct used to write to kafka an asynchronous batch item request

type BatchClient

type BatchClient interface {
	Do(req *http.Request) (resp *http.Response, err error)
}

Interface for the http method "Do", useful for mocking requests/responses

type BatchItem

type BatchItem struct {
	Method  string            `json:"method"`
	URL     string            `json:"url"`
	Body    interface{}       `json:"body"`
	Headers map[string]string `json:"headers"`
}

A single batch item request

func (BatchItem) Do

func (batchItem BatchItem) Do(request *http.Request) (BatchResponseItem, error)

Make a request for this batch item

func (BatchItem) InternalURL

func (batchItem BatchItem) InternalURL() (string, error)

Get the URL to hit for an internal request batch item

func (BatchItem) NewExternalRequest

func (batchItem BatchItem) NewExternalRequest(identityID string) (*http.Request, error)

Create a request for this external request batch item

func (BatchItem) NewInternalRequest

func (batchItem BatchItem) NewInternalRequest(identityID string) (*http.Request, error)

Create a request for this internal request batch item

func (BatchItem) NewRequest

func (batchItem BatchItem) NewRequest(identityID string) (*http.Request, error)

Create a request for this request batch item

func (BatchItem) RequestItem

func (batchItem BatchItem) RequestItem(identityID string) (BatchResponseItem, error)

Request a single item from the BatchItems. Meant to be used asynchronously using a channel.

func (BatchItem) RequestItemAsync

func (batchItem BatchItem) RequestItemAsync(response chan interface{}, identityID string)

Request a single item from the BatchItems. Meant to be used asynchronously using a channel.

type BatchItems

type BatchItems []BatchItem

func (BatchItems) MakeError

func (batchItems BatchItems) MakeError(code int, err error) BatchResponseItem

Create an error message for the batch item

func (BatchItems) RunBatch

func (batchItems BatchItems) RunBatch(identityID string) BatchResponse

Runs all of the jobs in this list of batch items

func (BatchItems) RunBatchAsync

func (batchItems BatchItems) RunBatchAsync(identityID string) (string, error)

Runs all of the jobs in this list of batch items

type BatchResponse

type BatchResponse []BatchResponseItem

The list of responses for all of the batch item requests

func RetrieveAsyncResponse

func RetrieveAsyncResponse(requestID string) (BatchResponse, error)

Get a response for an async request

type BatchResponseItem

type BatchResponseItem struct {
	Code    int               `json:"code"`
	Body    interface{}       `json:"body"`
	Headers map[string]string `json:"headers"`
}

The response for a single item in a batch

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL