discollect

package
v0.0.0-...-9b45353 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2018 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FullScrape  = "full_scrape"
	DeltaScrape = "delta_scrape"
)

Variables

View Source
var (
	ErrPluginUnregistered         = errors.New("discollect: plugin not registered")
	ErrHandlerNotFound            = errors.New("discollect: handler not found for route")
	ErrNoValidPluginForEntrypoint = errors.New("discollect: no plugin found for entrypoint")
)
View Source
var ErrCompletedScrape = errors.New("completed scrape")
View Source
var (
	// ErrRateLimitExceeded is thrown when the rate limit is exceeded
	ErrRateLimitExceeded = errors.New("discollect: rate limit exceeded")
)

Functions

func DownloadImages

func DownloadImages(textIn string, c *http.Client, fs FileStore) (string, error)

DownloadImages takes in an HTML string, shreds all the image tags to newly downloaded URLs. TODO(fortytw2): resize to a few standard widths, error handling...

Types

type Config

type Config struct {
	// friendly identifier for this config
	Type string
	// Entrypoints is used to start a scrape
	Entrypoints []string
	// Since is used to convey delta information
	Since time.Time
	// Countries is a list of countries this scrape can be executed from
	// in two code, ISO-3166-2 form
	// nil if unused
	Countries []string
}

Config is a specific configuration of a given plugin

func (*Config) Scan

func (c *Config) Scan(src interface{}) error

Scan implements sql.Scanner for config

func (*Config) Value

func (c *Config) Value() (driver.Value, error)

Value implements sql.Valuer for config

type DefaultRotator

type DefaultRotator struct {
	// contains filtered or unexported fields
}

DefaultRotator is a no-op rotator that does no proxy rotation

func NewDefaultRotator

func NewDefaultRotator() *DefaultRotator

NewDefaultRotator provisions a new default rotator

func (*DefaultRotator) Get

func (dr *DefaultRotator) Get(_ *Config) (*http.Client, error)

Get returns a standard http client

type Discollector

type Discollector struct {
	// contains filtered or unexported fields
}

A Discollector ties every element of Discollect together

func New

func New(opts ...OptionFn) (*Discollector, error)

New returns a new Discollector

func (*Discollector) GetPlugin

func (d *Discollector) GetPlugin(name string) (*Plugin, error)

GetPlugin returns the plugin with the given name

func (*Discollector) ListPlugins

func (d *Discollector) ListPlugins() []string

ListPlugins lists all registered plugins

func (*Discollector) PluginForEntrypoint

func (d *Discollector) PluginForEntrypoint(url string, blacklist []string) (*Plugin, *HandlerOpts, error)

GetPlugin returns the first plugin that matches the given entrypoint

func (*Discollector) Shutdown

func (d *Discollector) Shutdown(ctx context.Context)

Shutdown spins down all the workers after allowing them to finish their current tasks

func (*Discollector) Start

func (d *Discollector) Start(workers int) error

Start starts the scraping loops

type ErrorReporter

type ErrorReporter interface {
	Report(ctx context.Context, ro *ReporterOpts, err error)
}

An ErrorReporter is used to send forward faulty handler runs to a semi-permanent sink for later analysis. Generally, this can be a service such as Sentry or Bugsnag but may also be a simpler DB backend, like Postgres An ErrorReporter should discard any calls with err == nil

type FileStore

type FileStore interface {
	Put(fileName string, contents []byte) (string, error)
}

A FileStore shoves files somewhere and returns a link at which they can be retrieved

type Handler

type Handler func(ctx context.Context, ho *HandlerOpts, t *Task) *HandlerResponse

A Handler can handle an individual Task

type HandlerOpts

type HandlerOpts struct {
	Config *Config
	// RouteParams are Capture Groups from the Route regexp
	RouteParams []string

	FileStore FileStore

	Client *http.Client
}

HandlerOpts are passed to a Handler

type HandlerResponse

type HandlerResponse struct {
	Tasks  []*Task
	Facts  []interface{}
	Errors []error
}

A HandlerResponse is returned from a Handler

func ErrorResponse

func ErrorResponse(err error) *HandlerResponse

ErrorResponse is a helper for returning an error from a Handler

func NilResponse

func NilResponse() *HandlerResponse

func Response

func Response(facts []interface{}, tasks ...*Task) *HandlerResponse

Response is shorthand for a successful response

type Limiter

type Limiter interface {
	// ReserveN returns a Reservation that indicates how long the caller must
	// wait before n events happen. The Limiter takes this Reservation into
	// account when allowing future events. ReserveN returns false if n exceeds
	// the Limiter's burst size.
	Reserve(rl *RateLimit, url string, scrapeID uuid.UUID) (Reservation, error)
}

A Limiter is used for per-site and per-config rate limits abstracted out into an interface so that distributed rate limiting is practical

type LocalFS

type LocalFS struct {
	// contains filtered or unexported fields
}

LocalFS is both a FileStore implementation backed by the filesystem, but also a http.Handler that will serve the images back up

func NewLocalFS

func NewLocalFS(path, staticPath string) (*LocalFS, error)

NewLocalFS creates a LocalFS set up to save files to path and serve from staticPath

func (*LocalFS) Put

func (lf *LocalFS) Put(fileName string, contents []byte) (string, error)

Put writes the file to disk after hashing it

func (*LocalFS) ServeHTTP

func (lf *LocalFS) ServeHTTP(w http.ResponseWriter, r *http.Request) error

ServeHTTP implements hydrocarbon.ErrorHandler

type MemMetastore

type MemMetastore struct{}

MemMetastore is a metastore that only stores information in memory TODO: allow this to function again.

type MemQueue

type MemQueue struct {
	// contains filtered or unexported fields
}

A MemQueue is a super simple Queue backed by an array and a mutex

func NewMemQueue

func NewMemQueue() *MemQueue

NewMemQueue makes a new purely in-memory queue

func (*MemQueue) CompleteScrape

func (mq *MemQueue) CompleteScrape(ctx context.Context, scrapeID uuid.UUID) error

func (*MemQueue) Error

func (mq *MemQueue) Error(ctx context.Context, qt *QueuedTask) error

func (*MemQueue) Finish

func (mq *MemQueue) Finish(ctx context.Context, qt *QueuedTask) error

Finish is a no-op for the MemQueue

func (*MemQueue) Pop

func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)

Pop pops a single task off the left side of the array

func (*MemQueue) Push

func (mq *MemQueue) Push(ctx context.Context, tasks []*QueuedTask) error

Push appends tasks to the right side of the array

func (*MemQueue) Status

func (mq *MemQueue) Status(ctx context.Context, scrapeID uuid.UUID) (*ScrapeStatus, error)

Status returns the status for a given scrape

type Metastore

type Metastore interface {
	// StartScrapes selects a number of currently STOPPED scrapes, moves them to
	// RUNNING and returns their details
	StartScrapes(ctx context.Context, limit int) ([]*Scrape, error)

	// ListScrapes is used to list and filter scrapes, for both session resumption
	// and UI purposes
	ListScrapes(ctx context.Context, statusFilter string, limit, offset int) ([]*Scrape, error)

	// FindMissingSchedules adds scrapes that should be run to the future set
	FindMissingSchedules(ctx context.Context, limit int) ([]*ScheduleRequest, error)
	InsertSchedule(context.Context, *ScheduleRequest, []*ScrapeSchedule) error

	// EndScrape marks a scrape as SUCCESS and records the number of datums and
	// tasks returned
	EndScrape(ctx context.Context, id uuid.UUID, datums, retries, tasks int) error
	// ErrorScrape marks a scrape as ERRORED and adds the error to its list
	ErrorScrape(ctx context.Context, id uuid.UUID, err error) error
}

A Metastore is used to store the history of all scrape runs and enough meta information to allow session resumption on restart of hydrocarbon

type NilLimiter

type NilLimiter struct{}

A NilLimiter is a Limiter that doesn't restrict anything

func (*NilLimiter) Reserve

func (*NilLimiter) Reserve(rl *RateLimit, url string, scrapeID uuid.UUID) (Reservation, error)

Reserve returns a dummy reservation that always waits one second

type OptionFn

type OptionFn func(d *Discollector) error

An OptionFn is used to pass options to a Discollector

func WithErrorReporter

func WithErrorReporter(er ErrorReporter) OptionFn

WithErrorReporter sets the ErrorReporter for the Discollector

func WithFileStore

func WithFileStore(fs FileStore) OptionFn

WithFileStore sets the FileStore for the Discollector

func WithLimiter

func WithLimiter(l Limiter) OptionFn

WithLimiter sets the Limiter for the Discollector

func WithMetastore

func WithMetastore(ms Metastore) OptionFn

WithMetastore sets the Metastore for the Discollector

func WithPlugins

func WithPlugins(p ...*Plugin) OptionFn

WithPlugins registers a list of plugins

func WithQueue

func WithQueue(q Queue) OptionFn

WithQueue sets the Queue for the Discollector

func WithRotator

func WithRotator(ro Rotator) OptionFn

WithRotator sets the Rotator for the Discollector

func WithWriter

func WithWriter(w Writer) OptionFn

WithWriter sets the Writer for the Discollector

type Plugin

type Plugin struct {
	Name    string
	Configs []*Config

	// RateLimit is set per-plugin
	RateLimit *RateLimit

	// a list of valid Entrypoint patterns for this plugin, can easily just be `.*`
	// especially if it merits further testing via the ConfigCreator
	// this gets compiled into regexps at boot
	Entrypoints []string

	// A ConfigCreator is used to validate submitted entrypoints and convert
	// them into a fully valid config as well as returning the normalized title
	ConfigCreator func(url string, ho *HandlerOpts) (string, *Config, error)

	// the Scheduler looks into the past and tells the future
	Scheduler func(*ScheduleRequest) ([]*ScrapeSchedule, error)

	// map of regexp to Handler
	Routes map[string]Handler
}

A Plugin is capable of running scrapes, ideally of a common type or against a single site

type Queue

type Queue interface {
	Pop(ctx context.Context) (*QueuedTask, error)
	Push(ctx context.Context, tasks []*QueuedTask) error

	Finish(ctx context.Context, qt *QueuedTask) error
	Error(ctx context.Context, qt *QueuedTask) error

	Status(ctx context.Context, scrapeID uuid.UUID) (*ScrapeStatus, error)

	CompleteScrape(ctx context.Context, scrapeID uuid.UUID) error
}

A Queue is used to submit and retrieve individual tasks

type QueuedTask

type QueuedTask struct {
	// set by the TaskQueue
	TaskID   uuid.UUID `json:"task_id"`
	ScrapeID uuid.UUID `json:"scrape_id"`

	QueuedAt time.Time `json:"queued_at"`
	Config   *Config   `json:"config"`
	Plugin   string    `json:"plugin"`
	Retries  int       `json:"retries"`

	Task *Task `json:"task"`
}

A QueuedTask is the struct for a task that goes on the Queue

type RateLimit

type RateLimit struct {
	// Rate a single IP can make requests per second
	PerIP float64
	// Rate the entire scrape can operate at per second
	PerScrape float64
	// Rate per domain using the publicsuffix list to differentiate per second
	PerDomain float64
}

RateLimit is a wrapper struct around a variety of per-config rate limits

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

A Registry stores and indexes all available plugins

func NewRegistry

func NewRegistry(plugins []*Plugin) (*Registry, error)

NewRegistry indexes a list of plugins and precomputes the routing table

func (*Registry) Get

func (r *Registry) Get(name string) (*Plugin, error)

Get returns a a plugin by name

func (*Registry) HandlerFor

func (r *Registry) HandlerFor(pluginName string, rawURL string) (Handler, []string, error)

HandlerFor is the core "router" used to point Tasks to an individual Handler

func (*Registry) PluginFor

func (r *Registry) PluginFor(entrypointURL string, blacklistNames []string) (*Plugin, []string, error)

PluginFor finds the

type ReporterOpts

type ReporterOpts struct {
	ScrapeID uuid.UUID
	Plugin   string
	URL      string
}

ReporterOpts is used to attach additional information to an error

type Reservation

type Reservation interface {
	// Cancel indicates that the reservation holder will not perform the
	// reserved action and reverses the effects of this Reservation on the rate
	// limit as much as possible, considering that other reservations may have
	// already been made.
	Cancel()
	// OK returns whether the limiter can provide the requested number of tokens
	// within the maximum wait time. If OK is false, Delay returns InfDuration,
	// and Cancel does nothing.
	OK() bool
	// Delay returns the duration for which the reservation holder must wait
	// before taking the reserved action. Zero duration means act immediately.
	// InfDuration means the limiter cannot grant the tokens requested in this
	// Reservation within the maximum wait time.
	Delay() time.Duration
}

A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.

type Resolver

type Resolver struct {
	// contains filtered or unexported fields
}

Resolver watches for scrapes that should be marked complete.

func (*Resolver) Start

func (r *Resolver) Start()

Start launches the resolver, which marks scrapes as complete

func (*Resolver) Stop

func (r *Resolver) Stop()

Stop gracefully stops the scheduler and blocks until its shutdown

type Rotator

type Rotator interface {
	Get(c *Config) (*http.Client, error)
}

Rotator is a proxy rotator interface capable of rotating and rate limiting between many IPs TODO(fortytw2): this interface is totally wrong, needs rate limits in it

type ScheduleRequest

type ScheduleRequest struct {
	Plugin        string
	FeedID        uuid.UUID
	LatestScrapes []*Scrape
	LatestDatums  interface{}
}

A ScheduleRequest is used to ask for future schedules

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

A Scheduler initiates new scrapes according to plugin-level schedules

func (*Scheduler) Start

func (s *Scheduler) Start()

Start launches the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop gracefully stops the scheduler and blocks until its shutdown

type Scrape

type Scrape struct {
	ID     uuid.UUID `json:"id"`
	FeedID uuid.UUID `json:"feed_id"`

	CreatedAt        time.Time `json:"created_at"`
	ScheduledStartAt time.Time `json:"scheduled_start_at"`
	StartedAt        time.Time `json:"started_at"`
	EndedAt          time.Time `json:"ended_at"`

	State  string   `json:"state"`
	Errors []string `json:"errors"`

	TotalDatums  int `json:"total_datums"`
	TotalRetries int `json:"total_retries"`
	TotalTasks   int `json:"total_tasks"`

	Plugin string  `json:"plugin"`
	Config *Config `json:"config"`
}

type ScrapeSchedule

type ScrapeSchedule struct {
	Config           *Config
	ScheduledStartAt time.Time
}

A ScrapeSchedule adds to the future

func DefaultScheduler

func DefaultScheduler(sr *ScheduleRequest) ([]*ScrapeSchedule, error)

DefaultScheduler uses a simple heuristic to predict when to next scrape.

func NeverSchedule

func NeverSchedule(sr *ScheduleRequest) ([]*ScrapeSchedule, error)

NeverSchedule simple never schedules another scrape

type ScrapeStatus

type ScrapeStatus struct {
	TotalTasks     int `json:"total_tasks,omitempty"`
	InFlightTasks  int `json:"in_flight_tasks,omitempty"`
	CompletedTasks int `json:"completed_tasks,omitempty"`
	RetriedTasks   int `json:"retried_tasks,omitempty"`
}

ScrapeStatus is returned from a Queue with information about a specific scrape

type StdoutReporter

type StdoutReporter struct{}

StdoutReporter writes all errors to Stdout

func (StdoutReporter) Report

func (StdoutReporter) Report(_ context.Context, ro *ReporterOpts, err error)

Report prints out the error

type StdoutWriter

type StdoutWriter struct{}

StdoutWriter fmt.Printfs to stdout

func (*StdoutWriter) Close

func (sw *StdoutWriter) Close() error

Close is a no-op function so the StdoutWriter works

func (*StdoutWriter) Write

func (sw *StdoutWriter) Write(ctx context.Context, _ uuid.UUID, f interface{}) error

Write printf %+v the datum to stdout

type StubFS

type StubFS struct {
	URL string
}

func NewStubFS

func NewStubFS() *StubFS

NewStubFS is used only for testing and doesn't actually do anything

func (*StubFS) Put

func (sf *StubFS) Put(fileName string, contents []byte) (string, error)

type Task

type Task struct {
	URL string `json:"url"`
	// Extra can be used to send information from a parent task to its children
	Extra map[string]json.RawMessage `json:"extra,omitempty"`
	// Timeout is the timeout a single task should have attached to it
	// defaults to 15s
	Timeout time.Duration
}

A Task generally maps to a single HTTP request, but sometimes more than one may be made

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker is a single-threaded worker that pulls a single task from the queue at a time and process it to completion

func NewWorker

func NewWorker(r *Registry, ro Rotator, l Limiter, q Queue, fs FileStore, w Writer, er ErrorReporter) *Worker

NewWorker provisions a new worker

func (*Worker) Start

func (w *Worker) Start(wg *sync.WaitGroup)

Start launches the worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop initiates stop and then blocks until shutdown is complete

type Writer

type Writer interface {
	Write(ctx context.Context, scrapeID uuid.UUID, f interface{}) error
	io.Closer
}

A Writer is able to process and output datums retrieved by Discollect plugins

Directories

Path Synopsis
package redis implements a lightweight queue on top of RPOPLPUSH for hydrocarbon to use
package redis implements a lightweight queue on top of RPOPLPUSH for hydrocarbon to use

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL