博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Golang实现简单爬虫框架(5)——项目重构与数据存储
阅读量:7207 次
发布时间:2019-06-29

本文共 8690 字,大约阅读时间需要 28 分钟。

前言

在上一篇文章中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。

注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载

1、项目重构

(1)并发引擎

通过分析我们发现,两种不同调度的区别是每个worker一个channel还是 所有worker共用一个channel,所以我们在接口中定义一个函数WorkerChan(),用来决定这件事,即worker一个channel还是 所有worker共用一个channel。此时ConfigMasterWorkerChan就不再需要了。

在项目文件concurrent.go中我们定义一个任务调度器Scheduler,如下:

// 任务调度器type Scheduler interface {	Submit(request Request) // 提交任务	ConfigMasterWorkerChan(chan Request)	WorkerReady(w chan Request)	Run()}复制代码

但是在简单并发中我们只实现了SubmitConfigMasterWorkerChan接口,而使用队列调度中却实现了接口的所有方法,所有我们同构一下使concurrent.go文件可以适用于两种不同的调度器。

因为在createworker函数中要使用WorkerReady函数,所以要传入一个Scheduler,但是这样显得比较重,我们可以利用接口组合,新建一个接口ReadyNotifier,这样在createworker函数中传入ReadyNotifier即可。

修改后的任务调度如下:

type Scheduler interface {	ReadyNotifier	Submit(request Request) // 提交任务	WorkerChan() chan Request	Run()}type ReadyNotifier interface {	WorkerReady(chan Request)}复制代码

此时创建goroutine修改如下:

// 创建 goroutinefor i := 0; i < e.WorkerCount; i++ {    //任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定	createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)}复制代码

修改后的concurrent.go文件如下:

package engineimport (	"log")// 并发引擎type ConcurrendEngine struct {	Scheduler   Scheduler	WorkerCount int}// 任务调度器type Scheduler interface {	ReadyNotifier	Submit(request Request) // 提交任务	WorkerChan() chan Request	Run()}type ReadyNotifier interface {	WorkerReady(chan Request)}func (e *ConcurrendEngine) Run(seeds ...Request) {	out := make(chan ParseResult)	e.Scheduler.Run()	// 创建 goruntine	for i := 0; i < e.WorkerCount; i++ {		// 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定		createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)	}	// engine把请求任务提交给 Scheduler	for _, request := range seeds {		e.Scheduler.Submit(request)	}	itemCount := 0	for {		// 接受 Worker 的解析结果		result := <-out		for _, item := range result.Items {			log.Printf("Got item: #%d: %v\n", itemCount, item)			itemCount++		}		// 然后把 Worker 解析出的 Request 送给 Scheduler		for _, request := range result.Requests {			e.Scheduler.Submit(request)		}	}}func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {	go func() {		for {			ready.WorkerReady(in) // 告诉调度器任务空闲			request := <-in			result, err := worker(request)			if err != nil {				continue			}			out <- result		}	}()}复制代码

(2)简单并发调度器

scheduler/simple.go

package schedulerimport "crawler/engine"type SimpleScheduler struct {	workerChan chan engine.Request}func (s *SimpleScheduler) WorkerChan() chan engine.Request {	// 此时所有 worker 共用同一个 channel,直接返回即可	return s.workerChan}func (s *SimpleScheduler) WorkerReady(w chan engine.Request) {}func (s *SimpleScheduler) Run() {    // 创建出 workchannel	s.workerChan = make(chan engine.Request)}func (s *SimpleScheduler) Submit(request engine.Request) {	// send request down to worker chan	go func() {		s.workerChan <- request	}()}复制代码

(3)队列实现调度器

scheduler/queued.go

添加WorkerChan()的实现即可

package schedulerimport "crawler/engine"// 使用队列来调度任务type QueuedScheduler struct {	requestChan chan engine.Request	workerChan  chan chan engine.Request}func (s *QueuedScheduler) WorkerChan() chan engine.Request {	// 对于队列实现来讲,每个 worker 共用一个 channel	return make(chan engine.Request)}// 提交请求任务到 requestChanfunc (s *QueuedScheduler) Submit(request engine.Request) {	s.requestChan <- request}// 告诉外界有一个 worker 可以接收 requestfunc (s *QueuedScheduler) WorkerReady(w chan engine.Request) {	s.workerChan <- w}func (s *QueuedScheduler) Run() {	s.workerChan = make(chan chan engine.Request)	s.requestChan = make(chan engine.Request)	go func() {		// 创建请求队列和工作队列		var requestQ []engine.Request		var workerQ []chan engine.Request		for {			var activeWorker chan engine.Request			var activeRequest engine.Request			if len(requestQ) > 0 && len(workerQ) > 0 {				activeWorker = workerQ[0]				activeRequest = requestQ[0]			}			select {			case r := <-s.requestChan: // 当 requestChan 收到数据				requestQ = append(requestQ, r)			case w := <-s.workerChan: // 当 workerChan 收到数据				workerQ = append(workerQ, w)			case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务				requestQ = requestQ[1:]				workerQ = workerQ[1:]			}		}	}()}复制代码

(4)main函数

经过上述同构,在main函数中如需切换不同调度器,只需要相应的配置即可。

package mainimport (	"crawler/engine"	"crawler/scheduler"	"crawler/zhenai/parser")func main() {	e := engine.ConcurrendEngine{		//Scheduler: &scheduler.QueuedScheduler{},	// 队列实现调度器		Scheduler:   &scheduler.SimpleScheduler{},	// 简单并发调度		WorkerCount: 50,	}	e.Run(engine.Request{		Url:       "http://www.zhenai.com/zhenghun",		ParseFunc: parser.ParseCityList,	})}复制代码

2、数据存储

(1)Mgo的介绍安装

爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用mongodb来存储我们的数据。

mgo(音mango)是的驱动,它用基于Go语法的简单API实现了丰富的特性,并经过良好测试。

官方网址:

文档:

首先我们要安装mgo,打开终端,输入下面代码完成安装

go get gopkg.in/mgo.v2复制代码

mgo基本操作都很简单,有数据库操作经验都可以很快上手。

(2)爬虫引擎与数据格式

首先,爬虫引擎获取到数据要把数据发送给数据存储模块,而数据的传递用要用到channel,所以打开concurrent.go文件,在引擎添加ItemChan属性,如下所示:

爬取到数据需要把数据发送到数据存储模块,

package engine// 并发引擎type ConcurrendEngine struct {	Scheduler   Scheduler // 任务调度器	WorkerCount int       // 并发任务数量	ItemChan    chan Item // 数据保存 channel}// ...for {    // 接受 Worker 的解析结果    result := <-out    for _, item := range result.Items {        // 当抓取一组数据后,进行保存        go func(item2 Item) {			e.ItemChan <- item2		}(item)    }	// ...}// ...复制代码

engine/types.go中定义Item类型:

package engine// 请求结构type Request struct {	Url       string // 请求地址	ParseFunc func([]byte) ParseResult}// 解析结果结构type ParseResult struct {	Requests []Request // 解析出的请求	Items    []Item    // 解析出的内容}// 解析出的用户数据格式type Item struct {	Url     string      // 个人信息Url地址	Type    string      // table	Id      string      // Id	Payload interface{} // 详细信息}func NilParseFun([]byte) ParseResult {	return ParseResult{}}复制代码

(3)存储模块的实现

在根目录下创建persist文件夹,然后创建itemsaver.go文件

// persist/itemsaver.gopackage persistimport (	"context"	"crawler/engine"	"errors"	"gopkg.in/mgo.v2"	"gopkg.in/olivere/elastic.v5"	"log")func ItemSaver(index string) (chan engine.Item, error) {	// mongodb connect	session, err := mgo.Dial("localhost:27017")	if err != nil {		panic(err)	}	out := make(chan engine.Item)	go func() {		itemCount := 0		for {			// 接收到发送的 item			item := <-out			log.Printf("Item Saver: got item #%d: %v\n",				itemCount, item)			itemCount++			// Save data in mongodb			err := mongo_save(session, index, item)			if err != nil {				// if have err, ignore it				log.Printf("Item Saver: error, saving item %v: %v",					item, err)			}		}	}()	return out, nil}// 使用 MongoDB 保存数据func mongo_save(session *mgo.Session, dbName string, item engine.Item) error {	if item.Type == "" {		return errors.New("must supply Type")	}	c := session.DB(dbName).C(item.Type)	// 选择要操作的数据库与集合	err := c.Insert(item)		// 插入数据	if err != nil {		log.Fatal(err)	}	return nil}复制代码

(4)存储测试文件

我们把一条数据存入mongodb,然后再取出来,比对读出的数据和写入的数据是否相同

// persist/itemsaver_test.gppackage persistimport (	"crawler/engine"	"crawler/model"	"encoding/json"	"fmt"	"gopkg.in/mgo.v2"	"gopkg.in/mgo.v2/bson"	"log"	"testing")func TestMongoSave(t *testing.T) {	// mongodb connect	session, err := mgo.Dial("localhost:27017")	if err != nil {		panic(err)	}	expected := engine.Item{		Url:  "http://album.zhenai.com/u/1946858930",		Type: "zhenai",		Id:   "1946858930",		Payload: model.Profile{			Name:     "為你垨候",			Gender:   "女士",			Age:      40,			Height:   163,			Weight:   54,			Income:   "5-8千",			Marriage: "未婚",			Address:  "佛山顺德区",		},	}	// 保存数据	err = mongo_save(session, "crawler", expected)	if err != nil {		panic(err)	}	c := session.DB("crawler").C("zhenai")	var result engine.Item    // 查询数据	err = c.Find(bson.M{
"id": "1946858930"}).One(&result) // result 为 Json 类型 if err != nil { log.Fatal(err) } fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload)}复制代码

(5)parser模块

我们要在parse/profile.go文件中组装好需要保存到数据库的数据格式

// ...result := engine.ParseResult{    Items: []engine.Item{        {            Url:     url,            Type:    "zhenai",            Id:      extractString([]byte(url), idUrlRe),            Payload: profile,        },    },}// ...复制代码

(6)main函数

package mainimport (	"crawler/engine"	"crawler/persist"	"crawler/scheduler"	"crawler/zhenai/parser")func main() {	itemChan, err := persist.ItemSaver()	if err != nil {		panic(err)	}	e := engine.ConcurrendEngine{		//Scheduler: &scheduler.QueuedScheduler{},		Scheduler:   &scheduler.SimpleScheduler{},		WorkerCount: 100,		ItemChan:    itemChan,	}	e.Run(engine.Request{		Url:       "http://www.zhenai.com/zhenghun",		ParseFunc: parser.ParseCityList,	})}复制代码

运行项目,打开mongodb可视化工具,可以看到爬取了54410条数据

3、总结

我们首先把两种并发方式做一个同构,使代码统一,直接在main函数中使用不同的配置就可以切换调度器,简单方便。然后使用Mgo驱动操作数据,添加到mongodb中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载,回滚到对应提交记录查看,效果会更好。 别无所求,只求随手给个star

下篇博客中我们会再当前博客的基础上添加数据展示功能

如果想获取视频资源的,可以在评论区留下邮箱。

如果觉得文章还可以,劳烦大人随手点个赞。。。

转载地址:http://duwym.baihongyu.com/

你可能感兴趣的文章
自学计划
查看>>
dp-01背包问题 (升级)
查看>>
MySQL数据库唯一性设置(unique index)
查看>>
Windows性能计数器(命令行方式)
查看>>
Perl information,doc,module document and FAQ.
查看>>
sql 查询目标数据库中所有的表以其关键信息
查看>>
linux 下安装tomcat
查看>>
集成xadmin源码到项目的正式姿势
查看>>
自定义ViewPager,避免左右滑动时与水平滑动控件冲突
查看>>
javaScript-进阶篇(一)
查看>>
截取地址栏参数
查看>>
Redis介绍及Jedis基础操作
查看>>
20061218: 多个任务管理器
查看>>
WCF 可靠会话
查看>>
vim+makefile入门编辑,编译,差错实例
查看>>
Python之基础练习题
查看>>
AC日记——回文子串 openjudge 1.7 34
查看>>
易买网总结
查看>>
C#导入Excel报错问题。
查看>>
网站前端性能优化
查看>>