Documentation ¶
Index ¶
- Variables
- func NewAsyncBatchConsumer(zookeeperConn, consumerGroup, topic string, headOffset int64, ...) (*consumergroup.ConsumerGroup, error)
- func StartAsyncWorker(workerNum int, quit chan bool, finished chan bool)
- func StartAsyncWorkers(numWorkers int, quit chan bool, finished chan bool)
- type AsyncBatchItem
- type BatchClient
- type BatchItem
- func (batchItem BatchItem) Do(request *http.Request) (BatchResponseItem, error)
- func (batchItem BatchItem) InternalURL() (string, error)
- func (batchItem BatchItem) NewExternalRequest(identityID string) (*http.Request, error)
- func (batchItem BatchItem) NewInternalRequest(identityID string) (*http.Request, error)
- func (batchItem BatchItem) NewRequest(identityID string) (*http.Request, error)
- func (batchItem BatchItem) RequestItem(identityID string) (BatchResponseItem, error)
- func (batchItem BatchItem) RequestItemAsync(response chan interface{}, identityID string)
- type BatchItems
- type BatchResponse
- type BatchResponseItem
Constants ¶
This section is empty.
Variables ¶
var ASYNC_EXPIRE int = 1000
var CONSUMERGROUP string = "default"
var GetAsyncBatchConsumer = func() (*consumergroup.ConsumerGroup, error) { return NewAsyncBatchConsumer(ZOOKEEPER, CONSUMERGROUP, TOPIC, HEAD_OFFSETS, RESET_OFFSETS) }
Get the kafka consumer for asynchronous batch requests
var GetAsyncBatchProducer = func() (sarama.SyncProducer, error) { return NewAsyncBatchProducer(ZOOKEEPER) }
Get the kafka producer for asynchronous batch requests
var GetAsyncJobRedis = func() *redis.Client { return NewRedisClient(REDIS_HOST, REDIS_PORT, REDIS_PW, strconv.Itoa(REDIS_DB)) }
Get a redis instance for the async jobs
var GetRequestClient = func() BatchClient { return &http.Client{} }
Get the client to use for the http requests
var HEAD_OFFSETS int64 = 0
var HostMap map[string]string
Contains the mapping for internal services
var NewAsyncBatchProducer = func(zookeeperConn string) (sarama.SyncProducer, error) { zookeeper, err := kazoo.NewKazooFromConnectionString(zookeeperConn, kazoo.NewConfig()) if err != nil { log.Fatalln("An error occurred connecting to zookeeper", err) return nil, err } config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 3 config.Producer.Retry.Backoff = 100 * time.Millisecond brokerList, err := zookeeper.BrokerList() if err != nil { log.Fatalln("An error occurred getting broker list from zookeeper", err) return nil, err } producer, err := sarama.NewSyncProducer(brokerList, config) if err != nil { log.Fatalln("Failed to start Sarama producer:", err) return nil, err } return producer, nil }
NewAsyncBatchProducer gets a new producer instance for producing batch item messages to kafka Can be overridden for mocking kafka
var NewRedisClient = func(host, port, password, db string) *redis.Client { return redis.NewTCPClient(&redis.Options{ Addr: fmt.Sprintf("%s:%s", REDIS_HOST, REDIS_PORT), Password: REDIS_PW, DB: int64(REDIS_DB), }) }
Used to retrieve a new Redis client
var REDIS_DB int = 0
var REDIS_HOST string = "default"
var REDIS_PORT string = "default"
var REDIS_PW string = "default"
var RESET_OFFSETS bool = false
var TOPIC string = "default"
var WORKER_SLEEP int = 100
var ZOOKEEPER string = "default"
Functions ¶
func NewAsyncBatchConsumer ¶
func NewAsyncBatchConsumer(zookeeperConn, consumerGroup, topic string, headOffset int64, resetOffsets bool) (*consumergroup.ConsumerGroup, error)
creates the task consumer for the kafka queues
func StartAsyncWorker ¶
Starts a new background worker task to process asynchronous batch items from kafka/redis
func StartAsyncWorkers ¶
Starts a bunch of background worker tasks to process asynchronous batch items from kafka/redis
Types ¶
type AsyncBatchItem ¶
type AsyncBatchItem struct { RequestID string `json:"requestId"` Index int64 `json:"idx"` Item BatchItem `json:"item"` IdentityID string `json:"identityId"` }
Struct used to write to kafka an asynchronous batch item request
type BatchClient ¶
Interface for the http method "Do", useful for mocking requests/responses
type BatchItem ¶
type BatchItem struct { Method string `json:"method"` URL string `json:"url"` Body interface{} `json:"body"` Headers map[string]string `json:"headers"` }
A single batch item request
func (BatchItem) Do ¶
func (batchItem BatchItem) Do(request *http.Request) (BatchResponseItem, error)
Make a request for this batch item
func (BatchItem) InternalURL ¶
Get the URL to hit for an internal request batch item
func (BatchItem) NewExternalRequest ¶
Create a request for this external request batch item
func (BatchItem) NewInternalRequest ¶
Create a request for this internal request batch item
func (BatchItem) NewRequest ¶
Create a request for this request batch item
func (BatchItem) RequestItem ¶
func (batchItem BatchItem) RequestItem(identityID string) (BatchResponseItem, error)
Request a single item from the BatchItems. Meant to be used asynchronously using a channel.
func (BatchItem) RequestItemAsync ¶
Request a single item from the BatchItems. Meant to be used asynchronously using a channel.
type BatchItems ¶
type BatchItems []BatchItem
func (BatchItems) MakeError ¶
func (batchItems BatchItems) MakeError(code int, err error) BatchResponseItem
Create an error message for the batch item
func (BatchItems) RunBatch ¶
func (batchItems BatchItems) RunBatch(identityID string) BatchResponse
Runs all of the jobs in this list of batch items
func (BatchItems) RunBatchAsync ¶
func (batchItems BatchItems) RunBatchAsync(identityID string) (string, error)
Runs all of the jobs in this list of batch items
type BatchResponse ¶
type BatchResponse []BatchResponseItem
The list of responses for all of the batch item requests
func RetrieveAsyncResponse ¶
func RetrieveAsyncResponse(requestID string) (BatchResponse, error)
Get a response for an async request
type BatchResponseItem ¶
type BatchResponseItem struct { Code int `json:"code"` Body interface{} `json:"body"` Headers map[string]string `json:"headers"` }
The response for a single item in a batch