并发版爬虫

代码实现

/crawler/main.go
package main

import (
"learn/crawler/engine"
"learn/crawler/scheduler"
"learn/crawler/zhenai/parser"
) func main() {
e := engine.ConcurrentEngine{
Scheduler: &scheduler.QueuedScheduler{},
WorkerCount: 20,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
//测试上海单个城市
//e.Run(engine.Request{
// Url: "http://www.zhenai.com/zhenghun/shanghai",
// ParseFunc: parser.ParseCity,
//})
}
/crawler/engine/simple.go
package engine

import (
"learn/crawler/fetcher"
"log"
) type SimpleEngine struct { } func (e SimpleEngine) Run(seeds ...Request) {
var requests []Request
for _, r := range seeds {
requests = append(requests, r)
}
for len(requests) > 0 {
r := requests[0]
requests = requests[1:] parseResult, err := worker(r)
if err != nil {
continue
}
requests = append(requests, parseResult.Requests...)
for _, item := range parseResult.Items{
log.Printf("Got item %v", item)
}
}
}
func worker(r Request) (ParseResult, error) {
log.Printf("Fetching %s", r.Url)
body, err := fetcher.Fetch(r.Url)
if err != nil {
log.Printf("Fetcher: error" + "fetching url %s: %v", r.Url, err)
return ParseResult{}, err
}
return r.ParseFunc(body), nil
}
/crawler/engine/concurrent.go
package engine

import (
"log"
) type ConcurrentEngine struct {
Scheduler Scheduler
WorkerCount int
}
type Scheduler interface {
ReadyNotifier
Submit(Request)
WorkerChan() chan Request
Run()
}
type ReadyNotifier interface {
WorkerReady(chan Request)
}
func (e *ConcurrentEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run() for i := 0; i < e.WorkerCount; i++ {
createWork(e.Scheduler.WorkerChan(), out, e.Scheduler)
}
for _, r := range seeds {
e.Scheduler.Submit(r)
}
itemCount := 0
for {
result := <- out
for _, item := range result.Items {
log.Printf("Got item #%d: %v", itemCount, item)
itemCount++
}
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWork(in chan Request, out chan ParseResult, ready ReadyNotifier) {
go func() {
for {
ready.WorkerReady(in)
request := <- in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
/crawler/engine/typers.go
package engine

type Request struct {
Url string
ParseFunc func([]byte) ParseResult
}
type ParseResult struct {
Requests []Request
Items []interface{}
}
func NilParser([]byte) ParseResult{
return ParseResult{}
}
/crawler/fetcher/fetcher.go
package fetcher

import (
"bufio"
"fmt"
"golang.org/x/net/html/charset"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
"io/ioutil"
"log"
"net/http"
"time"
) var rateLimiter = time.Tick(100 * time.Millisecond)
func Fetch(url string) ([]byte, error) {
<- rateLimiter
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Wrong status code: %d", resp.StatusCode)
}
bodyReader := bufio.NewReader(resp.Body)
e := determineEncoding(bodyReader)
utf8Reader := transform.NewReader(bodyReader, e.NewDecoder())
return ioutil.ReadAll(utf8Reader)
}
func determineEncoding(r *bufio.Reader) encoding.Encoding {
bytes, err := r.Peek(1024)
if err != nil {
log.Printf("Fetcher error: %v", err)
return unicode.UTF8
}
e, _, _ := charset.DetermineEncoding(bytes, "")
return e
}
/crawler/zhenai/parser/citylist.go
package parser

import (
"learn/crawler/engine"
"regexp"
) const cityListRe = `<a href="(http://www.zhenai.com/zhenghun/[0-9a-z]+)" [^>]*>([^<]+)</a>`
func ParseCityList(contents []byte) engine.ParseResult {
re := regexp.MustCompile(cityListRe)
matches := re.FindAllSubmatch(contents, -1)
result := engine.ParseResult{}
for _, m := range matches {
result.Items = append(result.Items, "City: "+string(m[2]))
result.Requests = append(result.Requests, engine.Request{
Url: string(m[1]),
ParseFunc: ParseCity,
})
}
return result
}
/crawler/zhenai/parser/city.go
package parser

import (
"learn/crawler/engine"
"regexp"
)
var (
profileRe = regexp.MustCompile(`<a href="(http://album.zhenai.com/u/[0-9]+)" [^>]*>([^<]+)</a>`)
cityUrlRe = regexp.MustCompile(`href="(http://www.zhenai.com/zhenghun/[^"]+)"`)
)
func ParseCity(contents []byte) engine.ParseResult {
matches := profileRe.FindAllSubmatch(contents, -1)
result := engine.ParseResult{}
for _, m := range matches {
name := string(m[2])
result.Items = append(result.Items, "User "+name)
result.Requests = append(result.Requests, engine.Request{
Url: string(m[1]),
ParseFunc: func(c []byte) engine.ParseResult {
return ParseProfile(c, "name:"+name)
},
})
}
matches = cityUrlRe.FindAllSubmatch(contents, -1)
for _, m := range matches {
result.Requests = append(result.Requests, engine.Request{
Url: string(m[1]),
ParseFunc: ParseCity,
})
}
return result
}
/crawler/zhenai/parser/profile.go
package parser

import (
"learn/crawler/engine"
"learn/crawler/model"
"regexp"
) const all = `<div class="m-btn purple" data-v-8b1eac0c>([^<]+)</div>`
func ParseProfile(contents []byte, name string) engine.ParseResult {
profile := model.Profile{}
profile.User = append(profile.User, name)
re := regexp.MustCompile(all)
match := re.FindAllSubmatch(contents,-1)
if match != nil {
for _, m := range match {
profile.User = append(profile.User, string(m[1]))
}
} result := engine.ParseResult{
Items: []interface{}{profile},
}
return result
}
/crawler/model/profile.go
package model

type Profile struct {
User []string
}
/crawler/scheduler/queued.go
package scheduler

import "learn/crawler/engine"

type QueuedScheduler struct {
requestChan chan engine.Request
workChan chan chan engine.Request
} func (s *QueuedScheduler) WorkerChan() chan engine.Request {
return make(chan engine.Request)
} func (s *QueuedScheduler) Submit(r engine.Request) {
s.requestChan <- r
}
func (s *QueuedScheduler) WorkerReady(w chan engine.Request){
s.workChan <- w
}
func (s *QueuedScheduler) Run(){
s.workChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeRequest engine.Request
var activeWorker chan engine.Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeRequest = requestQ[0]
activeWorker = workerQ[0]
}
select {
case r := <-s.requestChan:
requestQ = append(requestQ, r)
case w := <-s.workChan:
workerQ = append(workerQ, w)
case activeWorker <- activeRequest:
workerQ = workerQ[1:]
requestQ = requestQ[1:]
}
}
}()
}
/crawler/scheduler/simple.go
package scheduler

import "learn/crawler/engine"

type SimpleScheduler struct {
workerChan chan engine.Request
} func (s *SimpleScheduler) WorkerChan() chan engine.Request {
return s.workerChan
} func (s *SimpleScheduler) WorkerReady(chan engine.Request) {
} func (s *SimpleScheduler) Run() {
s.workerChan = make(chan engine.Request)
} func (s *SimpleScheduler) Submit(r engine.Request) {
go func() { s.workerChan <- r }()
}

完整项目

https://gitee.com/FenYiYuan/golang-cpdcrawler.git

最新文章

  1. Quartz
  2. java十进制转十六进制
  3. ASP.NET页面的字符编码设置
  4. HTML5 表单新增属性
  5. Javascript操作Cookie的脚本 &mdash; CookieHelper
  6. 用C语言操纵Mysql
  7. oracle 建立视图,创建用户并授予查询权限
  8. Android学习笔记(二)之异步加载图片
  9. AngularJS实现表单手动验证和表单自动验证
  10. Linux CentOS 安装 httpd
  11. bugku web web3
  12. 通过命令窗口控制mysql服务的启动与停止
  13. zabbix监控windows服务器
  14. Tcp三次挥手和四次挥手
  15. Docker学习笔记之常用的 Docker Compose 配置项
  16. HDU 1205 吃糖果(想想题)
  17. mybatis mapper接口开发dao层
  18. 【API】NetUserEnum-获取系统所有账户名称
  19. lastlog
  20. 如何正确的把 Java 数组 Array 转为列表 List

热门文章

  1. hadoop_2.6.5集群安装
  2. zookeeper3.4.6安装
  3. 利用http协议使用普通的网站虚拟主机+安信可A6C GPRS模块实现对stm32的远程升级
  4. docker启动redis端口映射错误问题解决
  5. python实现一个客户端与服务端的通信
  6. 基于Arduino开发的智能蓝牙小车
  7. 目标检测——深度学习下的小目标检测(检测难的原因和Tricks)
  8. GDAL利用地理坐标读取图像像元值
  9. docker 镜像save和转换
  10. 解决mysql登录报错ERROR 1045 (28000): Access denied for user &#39;root&#39;@&#39;localhost&#39; (using password: YES)问题