worker

package
v3.1.11+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2021 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var InvalidMessageArray = "array must be of the form [jobId, appName, users]"

InvalidMessageArray is the string returned when the message array of the process batch worker is not valid

Functions

func BuildMessageFromTemplate added in v0.5.0

func BuildMessageFromTemplate(template model.Template, context map[string]interface{}) (string, error)

BuildMessageFromTemplate build a message using a template and the context

func BuildTopicName added in v0.5.0

func BuildTopicName(appName, service, topicTemplate string) string

BuildTopicName builds a topic name based in appName, service and a template

func CompressUsers

func CompressUsers(users *[]User) (string, error)

CompressUsers compresses users payload for enqueuing the message

func GetPushDBTableName added in v0.5.0

func GetPushDBTableName(appName, service string) string

GetPushDBTableName get the table name using appName and service

func GetTimeOffsetFromUTCInSeconds added in v0.5.0

func GetTimeOffsetFromUTCInSeconds(tz string, l zap.Logger) (int, error)

GetTimeOffsetFromUTCInSeconds returns the offset in seconds from UTC for tz

func GetWhereClauseFromFilters added in v0.5.0

func GetWhereClauseFromFilters(filters map[string]interface{}) string

GetWhereClauseFromFilters returns a string cointaining the where clause to use in the query

func IsUserIDValid added in v1.0.0

func IsUserIDValid(userID string) bool

IsUserIDValid tests whether a userID is valid or not

func RandomElementFromSlice

func RandomElementFromSlice(elements []string) string

RandomElementFromSlice gets a random element from a slice

Types

type Batch added in v0.5.0

type Batch struct {
	UserIds *[]string
	PageID  int
}

Batch is a struct that helps tracking processes pages

type BatchPart

type BatchPart struct {
	Start      int
	Size       int
	TotalParts int
	TotalSize  int
	Part       int
	Job        model.Job
}

BatchPart hold the information of a batch

type BatchWorkerMessage added in v0.5.0

type BatchWorkerMessage struct {
	JobID   uuid.UUID
	AppName string
	Users   []User
}

BatchWorkerMessage is the batch worker message struct

func ParseProcessBatchWorkerMessageArray added in v0.5.0

func ParseProcessBatchWorkerMessageArray(arr []interface{}) (*BatchWorkerMessage, error)

ParseProcessBatchWorkerMessageArray parses the message array of the process batch worker

type CSVSplitWorker

type CSVSplitWorker struct {
	Workers *Worker
	Logger  zap.Logger
}

CSVSplitWorker is the CSVSplitWorker struct

func NewCSVSplitWorker

func NewCSVSplitWorker(workers *Worker) *CSVSplitWorker

NewCSVSplitWorker gets a new CSVSplitWorker

func (*CSVSplitWorker) Process

func (b *CSVSplitWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue

type CreateBatchesWorker added in v0.5.0

type CreateBatchesWorker struct {
	Workers *Worker
	Logger  zap.Logger
}

CreateBatchesWorker is the CreateBatchesWorker struct

func NewCreateBatchesWorker added in v0.5.0

func NewCreateBatchesWorker(workers *Worker) *CreateBatchesWorker

NewCreateBatchesWorker gets a new CreateBatchesWorker

func (*CreateBatchesWorker) Process added in v0.5.0

func (b *CreateBatchesWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue

func (*CreateBatchesWorker) ReadFromCSV

func (b *CreateBatchesWorker) ReadFromCSV(buffer *[]byte, job *model.Job) []string

ReadFromCSV reads CSV from S3 and return correspondent array of strings

type DBPage added in v0.5.0

type DBPage struct {
	Page       int
	SmallestID string
	BiggestID  string
	Last       bool
}

DBPage is a struct that helps create batches from filters jobs

type DirectPartMsg

type DirectPartMsg struct {
	SmallestSeqID uint64 // not in the interval
	BiggestSeqID  uint64 // in the interval
	JobUUID       uuid.UUID
}

DirectPartMsg saves information about a block to process

type DirectWorker

type DirectWorker struct {
	Logger  zap.Logger
	Workers *Worker
}

DirectWorker is the DirectWorker struct

func NewDirectWorker

func NewDirectWorker(workers *Worker) *DirectWorker

NewDirectWorker gets a new DirectWorker

func (*DirectWorker) Process

func (b *DirectWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue and send them to kafka

type JobCompletedWorker

type JobCompletedWorker struct {
	Workers *Worker
	Logger  zap.Logger
}

JobCompletedWorker is the JobCompletedWorker struct

func NewJobCompletedWorker

func NewJobCompletedWorker(workers *Worker) *JobCompletedWorker

NewJobCompletedWorker gets a new JobCompletedWorker

func (*JobCompletedWorker) Process

func (b *JobCompletedWorker) Process(message *workers.Msg)

Process processes the messages sent to worker queue

type ProcessBatchWorker

type ProcessBatchWorker struct {
	Logger  zap.Logger
	Workers *Worker
}

ProcessBatchWorker is the ProcessBatchWorker struct

func NewProcessBatchWorker added in v0.5.0

func NewProcessBatchWorker(workers *Worker) *ProcessBatchWorker

NewProcessBatchWorker gets a new ProcessBatchWorker

func (*ProcessBatchWorker) Process added in v0.5.0

func (b *ProcessBatchWorker) Process(message *workers.Msg)

Process processes the messages sent to batch worker queue and send them to kafka

type ResumeJobWorker added in v0.5.0

type ResumeJobWorker struct {
	Logger  zap.Logger
	Workers *Worker
}

ResumeJobWorker is the CreateBatchesUsingFiltersWorker struct

func NewResumeJobWorker added in v0.5.0

func NewResumeJobWorker(workers *Worker) *ResumeJobWorker

NewResumeJobWorker gets a new ResumeJobWorker

func (*ResumeJobWorker) Process added in v0.5.0

func (b *ResumeJobWorker) Process(message *workers.Msg)

Process processes the messages sent to worker queue

type SentBatches added in v0.5.0

type SentBatches struct {
	NumBatches  int
	TotalTokens int
}

SentBatches is a struct that helps tracking sent batches

type StageStatus

type StageStatus struct {
	JobID       string
	Stage       string
	StageKey    string
	Description string
	Client      *redis.Client

	MaxProgress     int
	CurrentProgress int
	Completed       bool

	SubStageStatus []*StageStatus
}

StageStatus holds information about a stage from a worker pipeline in Redis

func NewStageStatus

func NewStageStatus(client *redis.Client,
	jobID, stage, description string,
	maxProgress int) (*StageStatus, error)

NewStageStatus returns a new StageStatus instance

func (*StageStatus) IncrProgress

func (s *StageStatus) IncrProgress() error

IncrProgress increments the StageStatus progress by 1 unit

func (*StageStatus) NewSubStage

func (s *StageStatus) NewSubStage(
	description string,
	maxProgress int,
) (*StageStatus, error)

NewSubStage creates a new StageStatus from a previous one and add it to its SubStages list

type User added in v0.5.0

type User struct {
	UserID string `json:"user_id,omitempty" sql:"user_id"`
	Token  string `json:"token,omitempty" sql:"token"`
	Locale string `json:"locale,omitempty" sql:"locale"`
	Region string `json:"region,omitempty" sql:"region"`
	Tz     string `json:"tz,omitempty" sql:"tz"`
}

User is the struct that will keep users before sending them to send batches worker

type Worker

type Worker struct {
	Logger                    zap.Logger
	PushDB                    interfaces.DB
	MarathonDB                interfaces.DB
	Config                    *viper.Viper
	DBPageSize                int
	S3Client                  interfaces.S3
	PageProcessingConcurrency int
	Statsd                    *statsd.Client
	RedisClient               *redis.Client
	ConfigPath                string
	SendgridClient            *extensions.SendgridClient
	Kafka                     interfaces.PushProducer
}

Worker is the struct that will configure workers

func NewWorker added in v0.5.0

func NewWorker(l zap.Logger, configPath string) *Worker

NewWorker returns a configured worker

func (*Worker) CreateBatchesJob added in v0.5.0

func (w *Worker) CreateBatchesJob(part *BatchPart) (string, error)

CreateBatchesJob creates a new CreateBatchesWorker job

func (*Worker) CreateCSVSplitJob

func (w *Worker) CreateCSVSplitJob(job *model.Job) (string, error)

CreateCSVSplitJob creates a new CSVSplitWorker job

func (*Worker) CreateDirectBatchesJob

func (w *Worker) CreateDirectBatchesJob(job *model.Job) error

CreateDirectBatchesJob schedules a new DirectWorker job

func (*Worker) CreateProcessBatchJob added in v0.5.0

func (w *Worker) CreateProcessBatchJob(jobID string, appName string, users *[]User) (string, error)

CreateProcessBatchJob creates a new ProcessBatchWorker job

func (*Worker) CreateResumeJob added in v0.5.0

func (w *Worker) CreateResumeJob(jobID *[]string) (string, error)

CreateResumeJob creates a new ResumeJobWorker job

func (*Worker) GetJob

func (w *Worker) GetJob(jobID uuid.UUID) (*model.Job, error)

GetJob get a job from the db

func (*Worker) ScheduleCSVSplitJob

func (w *Worker) ScheduleCSVSplitJob(job *model.Job, at int64) (string, error)

ScheduleCSVSplitJob schedules a new CSVSplitWorker job

func (*Worker) ScheduleCreateBatchesFromFiltersJob added in v0.5.0

func (w *Worker) ScheduleCreateBatchesFromFiltersJob(jobID *[]string, at int64) (string, error)

ScheduleCreateBatchesFromFiltersJob schedules a new CreateBatchesWorker job

func (*Worker) ScheduleCreateBatchesJob added in v0.5.0

func (w *Worker) ScheduleCreateBatchesJob(jobID *[]string, at int64) (string, error)

ScheduleCreateBatchesJob schedules a new CreateBatchesWorker job

func (*Worker) ScheduleDirectBatchesJob

func (w *Worker) ScheduleDirectBatchesJob(job *model.Job, at int64) error

ScheduleDirectBatchesJob schedules a new DirectWorker job

func (*Worker) ScheduleJobCompletedJob

func (w *Worker) ScheduleJobCompletedJob(jobID string, at int64) (string, error)

ScheduleJobCompletedJob schedules a new JobCompletedWorker job

func (*Worker) ScheduleProcessBatchJob added in v0.5.0

func (w *Worker) ScheduleProcessBatchJob(jobID string, appName string, users *[]User, at int64) (string, error)

ScheduleProcessBatchJob schedules a new ProcessBatchWorker job

func (*Worker) SendControlGroupToRedis

func (w *Worker) SendControlGroupToRedis(job *model.Job, ids []string)

SendControlGroupToRedis send a sequency of users ids to redis

func (*Worker) Start

func (w *Worker) Start()

Start starts the worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL