predator

package module
v0.1.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2021 License: Apache-2.0 Imports: 30 Imported by: 0

README

predator / 掠食者

基于 fasthttp 开发的高性能爬虫框架

使用

下面是一个示例,基本包含了当前已完成的所有功能,使用方法可以参考注释。

1 创建一个 Crawler
import "github.com/thep0y/predator"


func main() {
	crawler := predator.NewCrawler(
		predator.WithUserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:90.0) Gecko/20100101 Firefox/90.0"),
		predator.WithCookies(map[string]string{"JSESSIONID": cookie}),
		predator.WithProxy(ip), // 或者使用代理池 predator.WithProxyPool([]string)
	)
}

创建Crawler时有一些可选项用来功能增强。所有可选项参考predator/options.go

2 发送 Get 请求
crawler.Get("http://www.baidu.com")

对请求和响应的处理参考的是 colly,我觉得 colly 的处理方式非常舒服。

// BeforeRequest 可以在发送请求前,对请求进行一些修补
crawler.BeforeRequest(func(r *predator.Request) {
	headers := map[string]string{
		"Accept":           "*/*",
		"Accept-Language":  "zh-CN",
		"Accept-Encoding":  "gzip, deflate",
		"X-Requested-With": "XMLHttpRequest",
		"Origin":           "http://example.com",
	}

	r.SetHeaders(headers)
  
	// 请求和响应之间的上下文传递,上下文见下面的上下文示例
	r.Ctx.Put("id", 10)
	r.Ctx.Put("name", "tom")
})

crawler.AfterResponse(func(r *predator.Response) {
	// 从请求发送的上下文中取值
	id := r.Ctx.GetAny("id").(int)
	name := r.Ctx.Get("name")
	
	// 对于 json 响应,建议使用 gjson 进行处理
	body := gjson.ParseBytes(r.Body)
	amount := body.Get("amount").Int()
	types := body.Get("types").Array()
})

// 请求语句要在 BeforeRequest 和 AfterResponse 后面调用
crawler.Get("http://www.baidu.com")
3 发送 Post 请求

与 Get 请求有一点不同,通常每个 Post 的请求的参数是不同的,而这些参数都在请求体中,在BeforeRequest中处理请求体虽然可以,但绝非最佳选择,所以在构造 Post 请求时,可以直接传入上下文,用以解决与响应的信息传递。

// BeforeRequest 可以在发送请求前,对请求进行一些修补
crawler.BeforeRequest(func(r *predator.Request) {
	headers := map[string]string{
		"Accept":           "*/*",
		"Accept-Language":  "zh-CN",
		"Accept-Encoding":  "gzip, deflate",
		"X-Requested-With": "XMLHttpRequest",
		"Origin":           "http://example.com",
	}

	r.SetHeaders(headers)
})

crawler.AfterResponse(func(r *predator.Response) {
	// 从请求发送的上下文中取值
	id := r.Ctx.GetAny("id").(int)
	name := r.Ctx.Get("name")
	
	// 对于 json 响应,建议使用 gjson 进行处理
	body := gjson.ParseBytes(r.Body)
	amount := body.Get("amount").Int()
	types := body.Get("types").Array()
})


body := map[string]string{"foo": "bar"}

// 在 Post 请求中,应该将关键参数用这种方式放进上下文
ctx, _ := context.AcquireCtx()
ctx.Put("id", 10)
ctx.Put("name", "tom")

crawler.Post("http://www.baidu.com", body, ctx)

如果不需要传入上下文,可以直接用nil代替:

crawler.Post("http://www.baidu.com", body, nil)
4 发送 multipart/form-data 请求

multipart/form-data方法需要使用专门的PostMultipart方法,示例可能较长,这里不便书写。

使用方法请参考示例:https://github.com/thep0y/predator/blob/main/example/multipart/main.go

5 上下文

上下文是一个接口,我实现了两种上下文:

  • ReadOp:基于sync.Map实现,适用于读取上下文较多的场景
  • WriteOp:用map实现,适用于读写频率相差不大或写多于读的场景,这是默认采用的上下文

爬虫中如果遇到了读远多于写时就应该换ReadOp了,如下代码所示:

ctx, err := AcquireCtx(context.ReadOp)
6 处理 HTML

爬虫的结果大体可分为两种,一是 HTML 响应,另一种是 JSON 格式的响应。

与 JSON 相比,HTML 需要更多的代码处理。

本框架对 HTML 处理进行了一些函数封装,能方便地通过 css selector 进行元素的查找,可以提取元素中的属性和文本等。

crawl := NewCrawler()

crawl.ParseHTML("body", func(he *html.HTMLElement) {
	// 元素内部 HTML
	h, err := he.InnerHTML()
	// 元素整体 HTML
	h, err := he.OuterHTML()
	// 元素内的文本(包括子元素的文本)
	he.Text()
	// 元素的属性
	he.Attr("class")
	// 第一个匹配的子元素
	he.FirstChild("p")
	// 最后一个匹配的子元素
	he.LastChild("p")
	// 第 2 个匹配的子元素
	he.Child("p", 2)
	// 第一个匹配的子元素的属性
	he.ChildAttr("p", "class")
	// 所有匹配到的子元素的属性切片
	he.ChildrenAttr("p", "class")
}
7 异步 / 多协程请求
c := NewCrawler(
	// 使用此 option 时自动使用指定数量的协程池发出请求,不使用此 option 则默认使用同步方式请求
	// 设置的数量不宜过少,也不宜过多,请自行测试设置不同数量时的效率
	WithConcurrency(30),
)

c.AfterResponse(func(r *predator.Response) {
	// handle response
})

for i := 0; i < 10; i++ {
	c.Post(ts.URL+"/post", map[string]string{
		"id": fmt.Sprint(i + 1),
	}, nil)
}

c.Wait()
8 使用缓存

默认情况下,缓存是不启用的,所有的请求都直接放行。

已经实现的缓存:

  • MySQL
  • PostgreSQL
  • Redis
  • SQLite3

缓存接口中有一个方法Compressed(yes bool)用来压缩响应的,毕竟有时,响应长度非常长,直接保存到数据库中会影响插入和查询时的性能。

这四个接口的使用方法示例:

// MySQL
c := NewCrawler(
	WithCache(&cache.MySQLCache{
		Host:     "127.0.0.1",
		Port:     "3306",
		Database: "predator",
		Username: "root",
		Password: "123456",
	}, false), // false 为关闭压缩,true 为开启压缩,下同
)

// PostgreSQL
c := NewCrawler(
	WithCache(&cache.PostgreSQLCache{
		Host:     "127.0.0.1",
		Port:     "54322",
		Database: "predator",
		Username: "postgres",
		Password: "123456",
	}, false),
)

// Redis
c := NewCrawler(
	WithCache(&cache.RedisCache{
		Addr: "localhost:6379",
	}, true),
)

// SQLite3
c := NewCrawler(
	WithCache(&cache.SQLiteCache{
		URI: uri,  // uri 为数据库存放的位置,尽量加上后缀名 .sqlite
	}, true),
)
// 也可以使用默认值。WithCache 的第一个为 nil 时,
// 默认使用 SQLite 作为缓存,且会将缓存保存在当前
// 目录下的 predator-cache.sqlite 中
c := NewCrawler(WithCache(nil, true))
9 代理

支持 HTTP 代理和 Socks5 代理。

使用代理时需要加上协议,如:

WithProxyPool([]string{"http://ip:port", "socks5://ip:port"})
10 日志

日志使用的是流行日志库zerolog

默认情况下,日志是不开启的,需要手动开启。

WithLogger选项需要填入一个参数*predator.LogOp,当填入nil时,默认会以INFO等级从终端美化输出。

	crawler := predator.NewCrawler(
		predator.WithLogger(nil),
	)

predator.LogOp对外公开四个方法:

  • SetLevel:设置日志等级。等级可选:DEBUGINFOWARNINGERRORFATAL

    logOp := new(predator.LogOp)
    // 设置为 INFO
    logOp.SetLevel(log.INFO)
    
  • ToConsole:美化输出到终端。

  • ToFile:JSON 格式输出到文件。

  • ToConsoleAndFile:既美化输出到终端,同时以 JSON 格式输出到文件。

日志的完整示例:

import "github.com/thep0y/predator/log"

func main() {
	logOp := new(predator.LogOp)
	logOp.SetLevel(log.INFO)
	logOp.ToConsoleAndFile("test.log")

	crawler := predator.NewCrawler(
		predator.WithLogger(logOp),
	)
}
11 关于 JSON

本来想着封装一个 JSON 包用来快速处理 JSON 响应,但是想了一两天也没想出个好办法来,因为我能想到的,gjson都已经解决了。

对于 JSON 响应,能用gjson处理就不要老想着反序列化了。对于爬虫而言,反序列化是不明智的选择。

当然,如果你确实有反序列化的需求,也不要用标准库,使用封装的 JSON 包中的序列化和反序列化方法比标准库性能高。

import "github.com/thep0y/predator/json"

json.Marshal()
json.Unmarshal()
json.UnmarshalFromString()

对付 JSON 响应,当前足够用了。

目标
  • 完成对失败响应的重新请求,直到重试了传入的重试次数时才算最终请求失败
  • 识别因代理失效而造成的请求失败。当使用代理池时,代理池中剔除此代理;代理池为空时,终止整个爬虫程序
    • 考虑到使用代理必然是因为不想将本地 ip 暴露给目标网站或服务器,所以在使用代理后,当所有代理都失效时,不再继续发出请求
  • HTML 页面解析。方便定位查找元素
  • json 扩展,用来处理、筛选 json 响应的数据,原生 json 库不适合用在爬虫上
    • 暂时没想到如何封装便捷好用的 json ,当前 json 包中只能算是使用示例
  • 协程池,实现在多协程时对每个 goroutine 的复用,避免重复创建
  • 定义缓存接口,并完成一种或多种缓存。因为临时缓存在爬虫中并不实用,所以 predator 采用持久化缓存。
    • 默认使用 sqlite3 进行缓存,可以使用已实现的其他缓存数据库,也可以自己实现缓存接口
    • 可用缓存存储有 SQLite3、MySQL、PostgreSQL、Redis
    • 因为采用持久化缓存,所以不实现以内存作为缓存,如果需要请自行根据缓存接口实现
  • 数据库管理接口,用来保存爬虫数据,并完成一种或多种数据库的管理
    • SQL 数据库接口已实现了,NoSQL 接口与 SQL 差别较大,就不实现了,如果有使用 NoSQL 的需求,请自己实现
    • 数据库接口没有封装在 Crawler 方法中,根据需要使用,一般场景下够用,复杂场景中仍然需要自己重写数据库管理
  • 添加日志
    • 可能还不完善
  • RequestResponse的请求体Body添加池管理,减少 GC 次数
    • body 本身就是[]byte,作为引用类型,只要不删除引用关系,其内存就不会被回收
    • 将原求就不是nil的 body 截断为 body[:0] 即可,不需要使用池来管理
  • 增加对 robots.txt 的判断,默认遵守 robots.txt 规则,但可以选择忽略
  • 声明一个代理api处理方法,参数为一个整型,可以请求代理池中代理的数量返回代理切片,形成代理池。后续可以每次请求一个代理,用于实时补全代理池。这个方法需用户自行实现。

Documentation

Index

Constants

View Source
const (
	RUNNING = 1
	STOPED  = 0
)

running status

Variables

View Source
var (
	InvalidProxyError    = errors.New("the proxy ip should contain the protocol")
	UnknownProtocolError = errors.New("only support http and socks5 protocol")
	ProxyExpiredError    = errors.New("the proxy ip has expired")
	OnlyOneProxyIPError  = errors.New("unable to delete the only proxy ip")
	UnkownProxyIPError   = errors.New("proxy is unkown")
	EmptyProxyPoolError  = errors.New("after deleting the invalid proxy, the current proxy ip pool is empty")
	NoCacheSet           = errors.New("No cache set")
)
View Source
var (
	// return if pool size <= 0
	ErrInvalidPoolCap = errors.New("invalid pool cap")
	// put task but pool already closed
	ErrPoolAlreadyClosed = errors.New("pool already closed")
)

errors

View Source
var (
	IncorrectResponse = errors.New("the response status code is not 200 or 201")
)

Functions

func ReleaseRequest

func ReleaseRequest(req *Request)

ReleaseRequest returns req acquired via AcquireRequest to request pool.

It is forbidden accessing req and/or its' members after returning it to request pool.

func ReleaseResponse

func ReleaseResponse(resp *Response)

ReleaseResponse returns resp acquired via AcquireResponse to response pool.

It is forbidden accessing resp and/or its' members after returning it to response pool.

Types

type AcquireProxies

type AcquireProxies func(n int) []string

可以从一些代理网站的 api 中请求指定数量的代理 ip

type Crawler

type Crawler struct {

	// UserAgent is the User-Agent string used by HTTP requests
	UserAgent string

	// 在多协程中这个上下文管理可以用来退出或取消多个协程
	Context context.Context
	// contains filtered or unexported fields
}

Crawler is the provider of crawlers

func NewCrawler

func NewCrawler(opts ...CrawlerOption) *Crawler

NewCrawler creates a new Crawler instance with some CrawlerOptions

func (*Crawler) AfterResponse

func (c *Crawler) AfterResponse(f HandleResponse)

AfterResponse is used to process the response, this method should be used for the response body in non-html format

func (*Crawler) BeforeRequest

func (c *Crawler) BeforeRequest(f HandleRequest)

BeforeRequest used to process requests, such as setting headers, passing context, etc.

func (*Crawler) ClearCache

func (c *Crawler) ClearCache() error

ClearCache will clear all cache

func (*Crawler) DialWithProxy

func (c *Crawler) DialWithProxy() fasthttp.DialFunc

func (*Crawler) DialWithProxyAndTimeout

func (c *Crawler) DialWithProxyAndTimeout(timeout time.Duration) fasthttp.DialFunc

func (*Crawler) Error

func (c *Crawler) Error(err error)

func (*Crawler) Get

func (c *Crawler) Get(URL string) error

Get is used to send GET requests

func (*Crawler) ParseHTML

func (c *Crawler) ParseHTML(selector string, f HandleHTML)

ParseHTML can parse html to find the data you need, and process the data

func (*Crawler) Post

func (c *Crawler) Post(URL string, requestData map[string]string, ctx pctx.Context) error

Post is used to send POST requests

func (*Crawler) PostJSON

func (c *Crawler) PostJSON(URL string, requestData map[string]interface{}, ctx pctx.Context) error

PostJSON is used to send a POST request body in json format

func (*Crawler) PostMultipart

func (c *Crawler) PostMultipart(URL string, form *MultipartForm, ctx pctx.Context) error

PostMultipart

func (Crawler) ProxyPoolAmount

func (c Crawler) ProxyPoolAmount() int

ProxyPoolAmount returns the number of proxies in the proxy pool

func (*Crawler) Wait

func (c *Crawler) Wait()

Wait waits for the end of all concurrent tasks

type CrawlerOption

type CrawlerOption func(*Crawler)

func WithCache

func WithCache(cc cache.Cache, compressed bool, cacheFileds ...string) CrawlerOption

WithCache 使用缓存,可以选择是否压缩缓存的响应。 使用缓存时,如果发出的是 POST 请求,最好传入能 代表请求体的唯一性的缓存字段,可以是零个、一个或多个。

注意:当不传入缓存字段时,将会默认采用整个请求体作为 缓存标识,但由于 map 无序,同一个请求体生成的 key 很 难保证相同,所以可能会有同一个请求缓存多次,或者无法 从缓存中读取已请求过的请求的响应的情况出现。

func WithConcurrency

func WithConcurrency(count uint64) CrawlerOption

WithConcurrency 使用并发,参数为要创建的协程池数量

func WithCookies

func WithCookies(cookies map[string]string) CrawlerOption

func WithLogger

func WithLogger(lop *LogOp) CrawlerOption

func WithProxy

func WithProxy(proxyURL string) CrawlerOption

WithProxy 使用一个代理

func WithProxyPool

func WithProxyPool(proxyURLs []string) CrawlerOption

WithProxyPool 使用一个代理池

func WithRawCookie

func WithRawCookie(cookie string) CrawlerOption

func WithRetry

func WithRetry(count uint32, cond RetryConditions) CrawlerOption

WithRetry 请求失败时重试多少次,什么条件的响应是请求失败

func WithUserAgent

func WithUserAgent(ua string) CrawlerOption

type CustomRandomBoundary

type CustomRandomBoundary func() string

CustomRandomBoundary generates a custom boundary

type HTMLParser

type HTMLParser struct {
	Selector string
	Handle   HandleHTML
}

HTMLParser is used to parse html

type HandleHTML

type HandleHTML func(he *html.HTMLElement)

HandleHTML is used to process html

type HandleRequest

type HandleRequest func(r *Request)

HandleRequest is used to patch the request

type HandleResponse

type HandleResponse func(r *Response)

HandleResponse is used to handle the response

type LogOp

type LogOp struct {
	Out io.Writer
	// contains filtered or unexported fields
}

func (*LogOp) SetLevel

func (l *LogOp) SetLevel(level zerolog.Level)

func (*LogOp) ToConsole

func (l *LogOp) ToConsole()

func (*LogOp) ToConsoleAndFile

func (l *LogOp) ToConsoleAndFile(filepath string) error

func (*LogOp) ToFile

func (l *LogOp) ToFile(filepath string) error

type MultipartForm

type MultipartForm struct {
	// contains filtered or unexported fields
}

MultipartForm 请求体的构造

func NewMultipartForm

func NewMultipartForm(dash string, f CustomRandomBoundary) *MultipartForm

func (*MultipartForm) AppendFile

func (mf *MultipartForm) AppendFile(name, filePath string) error

func (*MultipartForm) AppendString

func (mf *MultipartForm) AppendString(name, value string)

func (*MultipartForm) Boundary

func (mf *MultipartForm) Boundary() string

Boundary returns the Writer's boundary.

func (*MultipartForm) Bytes

func (mf *MultipartForm) Bytes() []byte

func (*MultipartForm) FormDataContentType

func (mf *MultipartForm) FormDataContentType() string

FormDataContentType returns the Content-Type for an HTTP multipart/form-data with this Writer's Boundary.

type Pool

type Pool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Pool task pool

func NewPool

func NewPool(capacity uint64) (*Pool, error)

NewPool init pool

func (*Pool) Close

func (p *Pool) Close()

Close close pool graceful

func (*Pool) GetCap

func (p *Pool) GetCap() uint64

GetCap get capacity

func (*Pool) GetRunningWorkers

func (p *Pool) GetRunningWorkers() uint64

GetRunningWorkers get running workers

func (*Pool) Put

func (p *Pool) Put(task *Task) error

Put put a task to pool

type Request

type Request struct {
	// 访问的链接
	URL string
	// 请求方法
	Method string
	// 请求头
	Headers *fasthttp.RequestHeader
	// 请求和响应之间共享的上下文
	Ctx pctx.Context
	// 请求体
	Body []byte

	// 唯一标识
	ID uint32
	// contains filtered or unexported fields
}

func AcquireRequest

func AcquireRequest() *Request

AcquireRequest returns an empty Request instance from request pool.

The returned Request instance may be passed to ReleaseRequest when it is no longer needed. This allows Request recycling, reduces GC pressure and usually improves performance.

func (*Request) Abort

func (r *Request) Abort()

func (Request) Get

func (r Request) Get(u string) error

func (Request) Hash

func (r Request) Hash() (string, error)

func (*Request) New

func (r *Request) New(method, URL string, body []byte) *Request

New 使用原始请求的上下文创建一个新的请求

func (Request) NumberOfRetries

func (r Request) NumberOfRetries() uint32

func (Request) Post

func (r Request) Post(URL string, requestData map[string]string, ctx pctx.Context) error

func (*Request) Reset

func (r *Request) Reset()

func (*Request) SetContentType

func (r *Request) SetContentType(contentType string)

func (*Request) SetHeaders

func (r *Request) SetHeaders(headers map[string]string)

type Response

type Response struct {
	// 响应状态码
	StatusCode int
	// 二进制请求体
	Body []byte
	// 请求和响应之间共享的上下文
	Ctx ctx.Context `json:"-"`
	// 响应对应的请求
	Request *Request `json:"-"`
	// 响应头
	Headers fasthttp.ResponseHeader
	// 是否从缓存中取得的响应
	FromCache bool
}

func AcquireResponse

func AcquireResponse() *Response

AcquireResponse returns an empty Response instance from response pool.

The returned Response instance may be passed to ReleaseResponse when it is no longer needed. This allows Response recycling, reduces GC pressure and usually improves performance.

func (*Response) ContentType

func (r *Response) ContentType() string

func (*Response) GetSetCookie

func (r *Response) GetSetCookie() string

func (Response) Marshal

func (r Response) Marshal() ([]byte, error)

func (*Response) Reset

func (r *Response) Reset()

func (*Response) Save

func (r *Response) Save(fileName string) error

Save writes response body to disk

func (*Response) String

func (r *Response) String() string

type RetryConditions

type RetryConditions func(r Response) bool

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task task to-do

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL