discollect

package module
v0.0.0-...-b5f264b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2018 License: MIT Imports: 13 Imported by: 0

README

discollect

dis' collector.

distributed web scraper inspired by scrapy

Architecture

discollect is designed to power known-entrypoint, structured scrapes, unlike a "web crawler". You must have an entrypoint - an initial URL(s) that you're going to submit as a Task. From there, every URL you submit is able to be matched to a Handler, much like you would match routes in a web server and each handler returns more Tasks and datums, or an error.

By changing the entrypoints for a given plugin, we're able to change the nature of a scrape - collecting small amounts of priority information from a site using the same code that drives a full site scrape. i.e. for themoviedb.org, instead of entering at the the published list of all movie IDs on the site, we could enter at an individual movie level.

This architecture allows for large scale, distributed, structured scraping of a variety of websites.

These tasks are stored in the Task Queue, while information and statistics about individual jobs are stored in the Metastore. Also in the Metastore are detailed error reports from when Handlers return errors during execution and fail to retry.

Getting Started (Library Use)

discollect is fundamentally a very flexible library - 99% of the time users will want to implement their own plugins, writers, metastores and task queues to better fit into their environment. Natively, discollect comes with many building blocks to make this as easy as possible. A basic setup may be as easy as

package main

import (
    "log"

    "github.com/fortytw2/discollect"
    "github.com/fortytw2/discollect/metastore/sqlite"
    "github.com/your-org/plugins"
)

func main() {
    d, err := discollect.New(
        discollect.WithWriter(discollect.NewFileWriter("/output")),
        discollect.WithTaskQueue(discollect.NewInMemTaskQueue()),
        discollect.WithMetastore(sqlite.NewMetastore("discollect.db")),
        discollect.WithErrorReporter(discollect.)
    )

    err := d.RegisterPlugins(
        plugins.NewTMDBPlugin("api-key-here"),
        plugins.NewBlogCrawler(),
    )

    // parse all CLI options
    d.ParseFlags()

    // handle SIGTERM/SIGKILL gracefully
    go d.SignalHandler()

    // serve HTTP
    log.Fatal(d.ListenAndServe())
}

However, discollect also ships with a binary discollectd, which includes all task queues, metastores, and writers that are part of the core library. If you build plugins entirely in skylark or lua and do not need to implement any custom queues, metastores, or writers, this may be a good route to go.

Plugins

Plugins are written in either go, skylark, or lua. Go plugins require a recompilation of discollect to be deployed, whereas plugins built in skylark or lua simply require a reload.

discollect includes a large number of test helper functions to aid you in writing effective, fast tests for your network-based plugins. Read the docs to learn more. (DOC LINK HERE)

discollect also supports HTTP-based remote plugins, which can be scaled out easily and implemented in any language that can run an HTTP server.

Metastores

A metastore is the key to a discollect deployment, collecting task and scrape information as well as handling leader election (in a distributed, HA environment)

As always, discollect ships with a variety of metastore implementations for you to choose from

  • SQLite3 (good for single node deployments, can work for worker deployments)
  • PostgreSQL 9.4+ (for single master, many worker deployments)

Task Queues

A reliable task queue is the heart of any distributed work system. For discollect, there are many options to choose from

  • Disk-backed, durable queue (only for single node deployments)
  • Redis 3+ (BRPOPLPUSH based)
  • Beanstalkd
  • Amazon SQS
  • Google Cloud Pub/Sub

Writers

When a datum is returned from a Handler, discollect can perform several actions natively

  • POST an endpoint over HTTP with the datum encoded as JSON
  • write the datum to a file per scrape run

However, it is very easy to extend discollect to perform any action with a datum by implementing the simple Writer interface -

// A Writer is able to process and output datums retrieved by Discollect plugins
type Writer interface {
	Write(ctx context.Context, datum interface{}) error
	io.Closer
}

Writers can also be composed using the discollect.NewMultiWriter(w ...Writer) Writer helper function.

Scheduled Scrapes

discollect packs a powerful, reliable, cron-based scheduler into every deployment that tightly integrates with plugin authorship and the built-in alerting capabilites.

Alerting and Monitoring

discollect is able to monitor internal scrape stability (i.e. number of tasks and datums per run of the same plugin and config over time) and alert to several different providers

  • Slack
  • IRC
  • Pagerduty

Region-Aware Plugins & Proxy Rotation

For many scraping tasks, it's neccessary to only make requests from certain IPs.

AJAX Rendering

discollect can utilize headless Chrome to render the DOM of AJAX sites. 99% of the time this is wholly unneccessary, as you can emulate XHR requests from within Handlers. To enable, use discollect.RenderHeadless(r *http.Request) before making a request with client.Do

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPluginUnregistered = errors.New("discollect: plugin not registered")
	ErrHandlerNotFound    = errors.New("discollect: handler not found for route")
)
View Source
var (
	ErrRateLimitExceeded = errors.New("discollect: rate limit exceeded")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// friendly identifier for this config
	Name string
	// Entrypoints is used to start a scrape
	Entrypoints []string
	// DynamicEntry specifies whether this config was created dynamically
	// or is a preset
	DynamicEntry bool
	// if Entrypoints is null, we can compute the entrypoint (i.e. in a time based Delta)
	ComputedEntry func(ctx context.Context, cfg *Config) error
	// A Plugin can have many types of Scrapes
	// common ones are "full" and "delta"
	Type string
	// Since is used to convey delta information
	Since time.Time
	// Countries is a list of countries this scrape can be executed from
	// in two code, ISO-3166-2 form
	// nil if unused
	Countries []string
}

Config is a specific configuration of a given plugin

type DefaultRotator

type DefaultRotator struct {
	// contains filtered or unexported fields
}

DefaultRotator is a no-op rotator that does no proxy rotation

func NewDefaultRotator

func NewDefaultRotator() *DefaultRotator

NewDefaultRotator provisions a new default rotator

func (*DefaultRotator) Get

func (dr *DefaultRotator) Get(_ *Config) (*http.Client, error)

Get returns a standard http client

type Discollector

type Discollector struct {
	// contains filtered or unexported fields
}

A Discollector ties every element of Discollect together

func New

func New(opts ...OptionFn) (*Discollector, error)

New returns a new Discollector

func (*Discollector) GetScrape

func (d *Discollector) GetScrape(ctx context.Context, id ulid.ULID) (*Scrape, error)

GetScrape returns a currently running scrape by ID

func (*Discollector) LaunchScrape

func (d *Discollector) LaunchScrape(pluginName string, cfg *Config) error

LaunchScrape starts a scrape run

func (*Discollector) ListScrapes

func (d *Discollector) ListScrapes(ctx context.Context) ([]*Scrape, error)

ListScrapes lists all currently running scrapes

func (*Discollector) Shutdown

func (d *Discollector) Shutdown(ctx context.Context)

Shutdown spins down all the workers after allowing them to finish their current tasks

func (*Discollector) Start

func (d *Discollector) Start(workers int) error

Start starts the scraping loops

func (*Discollector) StartScrape

func (d *Discollector) StartScrape(ctx context.Context, pluginName string, config *Config) (string, error)

StartScrape launches a new scrape

type ErrorReporter

type ErrorReporter interface {
	Report(ctx context.Context, ro *ReporterOpts, err error)
}

An ErrorReporter is used to send forward faulty handler runs to a semi-permanent sink for later analysis. Generally, this can be a service such as Sentry or Bugsnag but may also be a simpler DB backend, like Postgres An ErrorReporter should discard any calls with err == nil

type Handler

type Handler func(ctx context.Context, ho *HandlerOpts, t *Task) *HandlerResponse

A Handler can handle an individual Task

type HandlerOpts

type HandlerOpts struct {
	Config *Config
	// RouteParams are Capture Groups from the Route regexp
	RouteParams []string
	Client      *http.Client
}

HandlerOpts are passed to a Handler

type HandlerResponse

type HandlerResponse struct {
	Tasks  []*Task
	Facts  []interface{}
	Errors []error
}

A HandlerResponse is returned from a Handler

func ErrorResponse

func ErrorResponse(err error) *HandlerResponse

ErrorResponse is a helper for returning an error from a Handler

func Response

func Response(facts []interface{}, tasks ...*Task) *HandlerResponse

Response is shorthand for a successful response

type Limiter

type Limiter interface {
	// ReserveN returns a Reservation that indicates how long the caller must
	// wait before n events happen. The Limiter takes this Reservation into
	// account when allowing future events. ReserveN returns false if n exceeds
	// the Limiter's burst size.
	Reserve(rl *RateLimit, url string, scrapeID ulid.ULID) (Reservation, error)
}

A Limiter is used for per-site and per-config rate limits abstracted out into an interface so that distributed rate limiting is practical

type MemMetastore

type MemMetastore struct {
}

MemMetastore is a metastore that only stores information in memory

func (MemMetastore) EndScrape

func (MemMetastore) EndScrape(ctx context.Context, id string, datums, tasks int) error

EndScrape records the end of a scrape

func (MemMetastore) StartScrape

func (MemMetastore) StartScrape(ctx context.Context, pluginName string, cfg *Config) (ulid.ULID, error)

StartScrape creates an id and starts a scrape in memory

type MemQueue

type MemQueue struct {
	// contains filtered or unexported fields
}

A MemQueue is a super simple Queue backed by an array and a mutex

func NewMemQueue

func NewMemQueue() *MemQueue

NewMemQueue makes a new purely in-memory queue

func (*MemQueue) Finish

func (mq *MemQueue) Finish(ctx context.Context, taskID ulid.ULID) error

Finish is a no-op for the MemQueue

func (*MemQueue) Pop

func (mq *MemQueue) Pop(ctx context.Context) (*QueuedTask, error)

Pop pops a single task off the left side of the array

func (*MemQueue) Push

func (mq *MemQueue) Push(ctx context.Context, tasks []*QueuedTask) error

Push appends tasks to the right side of the array

func (*MemQueue) Status

func (mq *MemQueue) Status(ctx context.Context, scrapeID ulid.ULID) (*ScrapeStatus, error)

Status returns the status for a given scrape

type Metastore

type Metastore interface {
	// StartScrape attempts to start the scrape, returning `true, nil` if the scrape is
	// able to be started
	StartScrape(ctx context.Context, pluginName string, cfg *Config) (id ulid.ULID, err error)
	EndScrape(ctx context.Context, id string, datums, tasks int) error
}

A Metastore is used to store the history of all scrape runs

type NilLimiter

type NilLimiter struct{}

A NilLimiter is a Limiter that doesn't restrict anything

func (*NilLimiter) Reserve

func (*NilLimiter) Reserve(rl *RateLimit, url string, scrapeID ulid.ULID) (Reservation, error)

Reserve returns a dummy reservation that always waits one second

type OptionFn

type OptionFn func(d *Discollector) error

An OptionFn is used to pass options to a Discollector

func WithErrorReporter

func WithErrorReporter(er ErrorReporter) OptionFn

WithErrorReporter sets the ErrorReporter for the Discollector

func WithLimiter

func WithLimiter(l Limiter) OptionFn

WithLimiter sets the Limiter for the Discollector

func WithMetastore

func WithMetastore(ms Metastore) OptionFn

WithMetastore sets the Metastore for the Discollector

func WithPlugins

func WithPlugins(p ...*Plugin) OptionFn

WithPlugins registers a list of plugins

func WithQueue

func WithQueue(q Queue) OptionFn

WithQueue sets the Queue for the Discollector

func WithRotator

func WithRotator(ro Rotator) OptionFn

WithRotator sets the Rotator for the Discollector

func WithWriter

func WithWriter(w Writer) OptionFn

WithWriter sets the Writer for the Discollector

type Plugin

type Plugin struct {
	Name     string
	Schedule *Schedule
	Configs  []*Config

	// RateLimit is set per-plugin
	RateLimit *RateLimit

	// A ConfigValidator is used to validate dynamically loaded configs
	ConfigValidator func(*Config) error
	Routes          map[string]Handler
}

A Plugin is capable of running scrapes, ideally of a common type or against a single site

type Queue

type Queue interface {
	Pop(ctx context.Context) (*QueuedTask, error)
	Push(ctx context.Context, tasks []*QueuedTask) error

	Finish(ctx context.Context, taskID ulid.ULID) error

	Status(ctx context.Context, scrapeID ulid.ULID) (*ScrapeStatus, error)
}

A Queue is used to submit and retrieve individual tasks

type QueuedTask

type QueuedTask struct {
	// set by the TaskQueue
	TaskID   ulid.ULID `json:"task_id"`
	ScrapeID ulid.ULID `json:"scrape_id"`

	QueuedAt time.Time `json:"queued_at"`
	Config   *Config   `json:"config"`
	Plugin   string    `json:"plugin"`
	Retries  int       `json:"retries"`

	Task *Task `json:"task"`
}

A QueuedTask is the struct for a task that goes on the Queue

type RateLimit

type RateLimit struct {
	// Rate a single IP can make requests per second
	PerIP float64
	// Rate the entire scrape can operate at per second
	PerScrape float64
	// Rate per domain using the publicsuffix list to differentiate per second
	PerDomain float64
}

RateLimit is a wrapper struct around a variety of per-config rate limits

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

A Registry stores and indexes all available plugins

func NewRegistry

func NewRegistry(plugins []*Plugin) (*Registry, error)

NewRegistry indexes a list of plugins and precomputes the routing table

func (*Registry) Get

func (r *Registry) Get(name string) (*Plugin, error)

Get returns a a plugin by name

func (*Registry) HandlerFor

func (r *Registry) HandlerFor(pluginName string, rawURL string) (Handler, []string, error)

HandlerFor is the core "router" used to point Tasks to an individual Handler

type ReporterOpts

type ReporterOpts struct {
	ScrapeID ulid.ULID
	Plugin   string
	URL      string
}

ReporterOpts is used to attach additional information to an error

type Reservation

type Reservation interface {
	// Cancel indicates that the reservation holder will not perform the
	// reserved action and reverses the effects of this Reservation on the rate
	// limit as much as possible, considering that other reservations may have
	// already been made.
	Cancel()
	// OK returns whether the limiter can provide the requested number of tokens
	// within the maximum wait time. If OK is false, Delay returns InfDuration,
	// and Cancel does nothing.
	OK() bool
	// Delay returns the duration for which the reservation holder must wait
	// before taking the reserved action. Zero duration means act immediately.
	// InfDuration means the limiter cannot grant the tokens requested in this
	// Reservation within the maximum wait time.
	Delay() time.Duration
}

A Reservation holds information about events that are permitted by a Limiter to happen after a delay. A Reservation may be canceled, which may enable the Limiter to permit additional events.

type Rotator

type Rotator interface {
	Get(c *Config) (*http.Client, error)
}

Rotator is a proxy rotator interface capable of rotating and rate limiting between many IPs TODO(fortytw2): this interface is totally wrong, needs rate limits in it

type Schedule

type Schedule struct {
	Config string
	Cron   string
}

A Schedule is part of every plugin and defines when it needs to be run

type ScheduleStore

type ScheduleStore interface {
	// ConfigToStart returns a *Config of a scrape that needs to be started
	// and the plugin to start it on
	ConfigToStart(context.Context) (string, *Config, error)

	// UpsertSchedule creates a schedule out of the given config and cron syntax
	// if it doesn't already exist
	UpsertSchedule(context.Context, *Schedule) error
}

A ScheduleStore is used to store and manage schedules of configs that need to be run periodically

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

A Scheduler initiates new scrapes according to plugin-level schedules

func (*Scheduler) Start

func (s *Scheduler) Start()

Start launches the scheduler

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop gracefully stops the scheduler and blocks until its shutdown

type Scrape

type Scrape struct {
	ID             ulid.ULID `json:"id"`
	PluginName     string    `json:"plugin"`
	EnqueuedTasks  int       `json:"enqueued_tasks"`
	CompletedTasks int       `json:"completed_tasks"`
}

A Scrape is a human readable representation of a scrape

type ScrapeStatus

type ScrapeStatus struct {
	TotalTasks     int `json:"total_tasks,omitempty"`
	CompletedTasks int `json:"completed_tasks,omitempty"`
	RetriedTasks   int `json:"retried_tasks,omitempty"`
}

ScrapeStatus is returned from a Queue with information about a specific scrape

type StdoutReporter

type StdoutReporter struct{}

StdoutReporter writes all errors to Stdout

func (StdoutReporter) Report

func (StdoutReporter) Report(_ context.Context, ro *ReporterOpts, err error)

Report prints out the error

type StdoutWriter

type StdoutWriter struct{}

StdoutWriter fmt.Printfs to stdout

func (*StdoutWriter) Close

func (sw *StdoutWriter) Close() error

Close is a no-op function so the StdoutWriter works

func (*StdoutWriter) Write

func (sw *StdoutWriter) Write(ctx context.Context, f interface{}) error

Write printf %+v the datum to stdout

type Task

type Task struct {
	URL string `json:"url"`
	// Extra can be used to send information from a parent task to its children
	Extra map[string]json.RawMessage `json:"extra,omitempty"`
	// Timeout is the timeout a single task should have attached to it
	// defaults to 15s
	Timeout time.Duration
}

A Task generally maps to a single HTTP request, but sometimes more than one may be made

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker is a single-threaded worker that pulls a single task from the queue at a time and process it to completion

func NewWorker

func NewWorker(r *Registry, ro Rotator, l Limiter, q Queue, w Writer, er ErrorReporter) *Worker

NewWorker provisions a new worker

func (*Worker) Start

func (w *Worker) Start(wg *sync.WaitGroup)

Start launches the worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop initiates stop and then blocks until shutdown is complete

type WorkerErr

type WorkerErr struct {
	QueuedTask *QueuedTask
	Errors     []error
}

WorkerErr carries errors from a task

func (*WorkerErr) Error

func (we *WorkerErr) Error() string

type Writer

type Writer interface {
	Write(ctx context.Context, f interface{}) error
	io.Closer
}

A Writer is able to process and output datums retrieved by Discollect plugins

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL