Documentation ¶
Index ¶
- Variables
- type Config
- type DefaultRotator
- type Discollector
- func (d *Discollector) GetScrape(ctx context.Context, id ulid.ULID) (*Scrape, error)
- func (d *Discollector) LaunchScrape(pluginName string, cfg *Config) error
- func (d *Discollector) ListScrapes(ctx context.Context) ([]*Scrape, error)
- func (d *Discollector) Shutdown(ctx context.Context)
- func (d *Discollector) Start(workers int) error
- func (d *Discollector) StartScrape(ctx context.Context, pluginName string, config *Config) (string, error)
- type ErrorReporter
- type Handler
- type HandlerOpts
- type HandlerResponse
- type Limiter
- type MemMetastore
- type MemQueue
- func (mq *MemQueue) Finish(ctx context.Context, taskID ulid.ULID) error
- func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)
- func (mq *MemQueue) Push(ctx context.Context, tasks []*QueuedTask) error
- func (mq *MemQueue) Status(ctx context.Context, scrapeID ulid.ULID) (*ScrapeStatus, error)
- type Metastore
- type NilLimiter
- type OptionFn
- type Plugin
- type Queue
- type QueuedTask
- type RateLimit
- type Registry
- type ReporterOpts
- type Reservation
- type Rotator
- type Schedule
- type ScheduleStore
- type Scheduler
- type Scrape
- type ScrapeStatus
- type StdoutReporter
- type StdoutWriter
- type Task
- type Worker
- type WorkerErr
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ( ErrPluginUnregistered = errors.New("discollect: plugin not registered") ErrHandlerNotFound = errors.New("discollect: handler not found for route") )
var (
ErrRateLimitExceeded = errors.New("discollect: rate limit exceeded")
)
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // friendly identifier for this config Name string // Entrypoints is used to start a scrape Entrypoints []string // DynamicEntry specifies whether this config was created dynamically // or is a preset DynamicEntry bool // if Entrypoints is null, we can compute the entrypoint (i.e. in a time based Delta) ComputedEntry func(ctx context.Context, cfg *Config) error // A Plugin can have many types of Scrapes // common ones are "full" and "delta" Type string // Since is used to convey delta information Since time.Time // Countries is a list of countries this scrape can be executed from // in two code, ISO-3166-2 form // nil if unused Countries []string }
Config is a specific configuration of a given plugin
type DefaultRotator ¶
type DefaultRotator struct {
// contains filtered or unexported fields
}
DefaultRotator is a no-op rotator that does no proxy rotation
func NewDefaultRotator ¶
func NewDefaultRotator() *DefaultRotator
NewDefaultRotator provisions a new default rotator
type Discollector ¶
type Discollector struct {
// contains filtered or unexported fields
}
A Discollector ties every element of Discollect together
func (*Discollector) LaunchScrape ¶
func (d *Discollector) LaunchScrape(pluginName string, cfg *Config) error
LaunchScrape starts a scrape run
func (*Discollector) ListScrapes ¶
func (d *Discollector) ListScrapes(ctx context.Context) ([]*Scrape, error)
ListScrapes lists all currently running scrapes
func (*Discollector) Shutdown ¶
func (d *Discollector) Shutdown(ctx context.Context)
Shutdown spins down all the workers after allowing them to finish their current tasks
func (*Discollector) Start ¶
func (d *Discollector) Start(workers int) error
Start starts the scraping loops
func (*Discollector) StartScrape ¶
func (d *Discollector) StartScrape(ctx context.Context, pluginName string, config *Config) (string, error)
StartScrape launches a new scrape
type ErrorReporter ¶
type ErrorReporter interface {
Report(ctx context.Context, ro *ReporterOpts, err error)
}
An ErrorReporter is used to send forward faulty handler runs to a semi-permanent sink for later analysis. Generally, this can be a service such as Sentry or Bugsnag but may also be a simpler DB backend, like Postgres An ErrorReporter should discard any calls with err == nil
type Handler ¶
type Handler func(ctx context.Context, ho *HandlerOpts, t *Task) *HandlerResponse
A Handler can handle an individual Task
type HandlerOpts ¶
type HandlerOpts struct { Config *Config // RouteParams are Capture Groups from the Route regexp RouteParams []string Client *http.Client }
HandlerOpts are passed to a Handler
type HandlerResponse ¶
A HandlerResponse is returned from a Handler
func ErrorResponse ¶
func ErrorResponse(err error) *HandlerResponse
ErrorResponse is a helper for returning an error from a Handler
func Response ¶
func Response(facts []interface{}, tasks ...*Task) *HandlerResponse
Response is shorthand for a successful response
type Limiter ¶
type Limiter interface { // ReserveN returns a Reservation that indicates how long the caller must // wait before n events happen. The Limiter takes this Reservation into // account when allowing future events. ReserveN returns false if n exceeds // the Limiter's burst size. Reserve(rl *RateLimit, url string, scrapeID ulid.ULID) (Reservation, error) }
A Limiter is used for per-site and per-config rate limits abstracted out into an interface so that distributed rate limiting is practical
type MemMetastore ¶
type MemMetastore struct { }
MemMetastore is a metastore that only stores information in memory
func (MemMetastore) StartScrape ¶
func (MemMetastore) StartScrape(ctx context.Context, pluginName string, cfg *Config) (ulid.ULID, error)
StartScrape creates an id and starts a scrape in memory
type MemQueue ¶
type MemQueue struct {
// contains filtered or unexported fields
}
A MemQueue is a super simple Queue backed by an array and a mutex
func (*MemQueue) Pop ¶
func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)
Pop pops a single task off the left side of the array
type Metastore ¶
type Metastore interface { // StartScrape attempts to start the scrape, returning `true, nil` if the scrape is // able to be started StartScrape(ctx context.Context, pluginName string, cfg *Config) (id ulid.ULID, err error) EndScrape(ctx context.Context, id string, datums, tasks int) error }
A Metastore is used to store the history of all scrape runs
type NilLimiter ¶
type NilLimiter struct{}
A NilLimiter is a Limiter that doesn't restrict anything
func (*NilLimiter) Reserve ¶
func (*NilLimiter) Reserve(rl *RateLimit, url string, scrapeID ulid.ULID) (Reservation, error)
Reserve returns a dummy reservation that always waits one second
type OptionFn ¶
type OptionFn func(d *Discollector) error
An OptionFn is used to pass options to a Discollector
func WithErrorReporter ¶
func WithErrorReporter(er ErrorReporter) OptionFn
WithErrorReporter sets the ErrorReporter for the Discollector
func WithLimiter ¶
WithLimiter sets the Limiter for the Discollector
func WithMetastore ¶
WithMetastore sets the Metastore for the Discollector
func WithRotator ¶
WithRotator sets the Rotator for the Discollector
func WithWriter ¶
WithWriter sets the Writer for the Discollector
type Plugin ¶
type Plugin struct { Name string Schedule *Schedule Configs []*Config // RateLimit is set per-plugin RateLimit *RateLimit // A ConfigValidator is used to validate dynamically loaded configs ConfigValidator func(*Config) error Routes map[string]Handler }
A Plugin is capable of running scrapes, ideally of a common type or against a single site
type Queue ¶
type Queue interface { Pop(ctx context.Context) (*QueuedTask, error) Push(ctx context.Context, tasks []*QueuedTask) error Finish(ctx context.Context, taskID ulid.ULID) error Status(ctx context.Context, scrapeID ulid.ULID) (*ScrapeStatus, error) }
A Queue is used to submit and retrieve individual tasks
type QueuedTask ¶
type QueuedTask struct { // set by the TaskQueue TaskID ulid.ULID `json:"task_id"` ScrapeID ulid.ULID `json:"scrape_id"` QueuedAt time.Time `json:"queued_at"` Config *Config `json:"config"` Plugin string `json:"plugin"` Retries int `json:"retries"` Task *Task `json:"task"` }
A QueuedTask is the struct for a task that goes on the Queue
type RateLimit ¶
type RateLimit struct { // Rate a single IP can make requests per second PerIP float64 // Rate the entire scrape can operate at per second PerScrape float64 // Rate per domain using the publicsuffix list to differentiate per second PerDomain float64 }
RateLimit is a wrapper struct around a variety of per-config rate limits
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
A Registry stores and indexes all available plugins
func NewRegistry ¶
NewRegistry indexes a list of plugins and precomputes the routing table
type ReporterOpts ¶
ReporterOpts is used to attach additional information to an error
type Reservation ¶
type Reservation interface { // Cancel indicates that the reservation holder will not perform the // reserved action and reverses the effects of this Reservation on the rate // limit as much as possible, considering that other reservations may have // already been made. Cancel() // OK returns whether the limiter can provide the requested number of tokens // within the maximum wait time. If OK is false, Delay returns InfDuration, // and Cancel does nothing. OK() bool // Delay returns the duration for which the reservation holder must wait // before taking the reserved action. Zero duration means act immediately. // InfDuration means the limiter cannot grant the tokens requested in this // Reservation within the maximum wait time. Delay() time.Duration }
A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Rotator ¶
Rotator is a proxy rotator interface capable of rotating and rate limiting between many IPs TODO(fortytw2): this interface is totally wrong, needs rate limits in it
type ScheduleStore ¶
type ScheduleStore interface { // ConfigToStart returns a *Config of a scrape that needs to be started // and the plugin to start it on ConfigToStart(context.Context) (string, *Config, error) // UpsertSchedule creates a schedule out of the given config and cron syntax // if it doesn't already exist UpsertSchedule(context.Context, *Schedule) error }
A ScheduleStore is used to store and manage schedules of configs that need to be run periodically
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
A Scheduler initiates new scrapes according to plugin-level schedules
type Scrape ¶
type Scrape struct { ID ulid.ULID `json:"id"` PluginName string `json:"plugin"` EnqueuedTasks int `json:"enqueued_tasks"` CompletedTasks int `json:"completed_tasks"` }
A Scrape is a human readable representation of a scrape
type ScrapeStatus ¶
type ScrapeStatus struct { TotalTasks int `json:"total_tasks,omitempty"` CompletedTasks int `json:"completed_tasks,omitempty"` RetriedTasks int `json:"retried_tasks,omitempty"` }
ScrapeStatus is returned from a Queue with information about a specific scrape
type StdoutReporter ¶
type StdoutReporter struct{}
StdoutReporter writes all errors to Stdout
func (StdoutReporter) Report ¶
func (StdoutReporter) Report(_ context.Context, ro *ReporterOpts, err error)
Report prints out the error
type StdoutWriter ¶
type StdoutWriter struct{}
StdoutWriter fmt.Printfs to stdout
func (*StdoutWriter) Close ¶
func (sw *StdoutWriter) Close() error
Close is a no-op function so the StdoutWriter works
type Task ¶
type Task struct { URL string `json:"url"` // Extra can be used to send information from a parent task to its children Extra map[string]json.RawMessage `json:"extra,omitempty"` // Timeout is the timeout a single task should have attached to it // defaults to 15s Timeout time.Duration }
A Task generally maps to a single HTTP request, but sometimes more than one may be made
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
A Worker is a single-threaded worker that pulls a single task from the queue at a time and process it to completion
type WorkerErr ¶
type WorkerErr struct { QueuedTask *QueuedTask Errors []error }
WorkerErr carries errors from a task