设为首页 加入收藏

TOP

Flume案例:实时采集python爬取的豆瓣最新电影
2019-05-03 14:09:46 】 浏览:139
Tags:Flume 案例 实时 采集 python 最新电影
版权声明: https://blog.csdn.net/isyslab/article/details/84109832

首先,让我们看一下本案例的背景:通过python爬虫抓取豆瓣最新上映的电影信息,抓取的信息通过flume传输到HDFS中。python的版本是3.6,flume的版本是1.8。

Python 爬虫程序讲解

(1)编写网页爬虫程序,首先要对网页进行访问,python中使用的urllib库,代码如下:

from urllib import request
    resp = request.urlopen('https://movie.douban.com/nowplaying/wuhan/')
    html_data = resp.read().decode('utf-8')

其中https://movie.douban.com/nowplaying/wuhan/ 是豆瓣最新上映的电影页面,可以在浏览器中输入该网址进行查看。html_data是字符串类型的变量,里面存放了网页的html代码。
(2)对html代码进行解析,提取我们需要的数据。在python中使用BeautifulSoup库进行html代码的解析。(注:如果没有安装此库,则使用pip3 install beautifulsoup4进行安装即可!)
BeautifulSoup使用的格式如下:

BeautifulSoup(html,"html.parser")

第一个参数为需要提取数据的html,第二个参数是指定解析器。
(3)打开我们爬取网页的html代码,查看我们需要的数据所在html标签,如下图所示。
在这里插入图片描述
从上图中可以看出从div id=”nowplaying“标签开始是我们想要的数据,里面有电影的名称、评分、主演等信息。所以相应的代码编写如下:

 from bs4 import BeautifulSoup as bs
    soup = bs(html_data, 'html.parser')    
    nowplaying_movie = soup.find_all('div', id='nowplaying')
    nowplaying_movie_list = nowplaying_movie[0].find_all('li', class_='list-item')
    nowplaying_list = []    
    for item in nowplaying_movie_list:        
   	        nowplaying_dict = {}        
            nowplaying_dict['id'] = item['data-subject']
            nowplaying_dict['title'] = item['data-title']
   	        nowplaying_dict['score'] = item['data-score']
   	        nowplaying_dict['region'] = item['data-region']
   	        nowplaying_dict['director'] = item['data-director']
   	        nowplaying_dict['votecount'] = item['data-votecount']       
   	        nowplaying_list.append(nowplaying_dict)
     return nowplaying_list

(4)存储电影信息到CSV文件,相应的代码编写如下:

  csv_file = open("/home/hadoop/data/flume-test/movielist/movielist.csv","w",newline = '')
    writer = csv.writer(csv_file)
    writer.writerow(['id','title','score','region','director','votecount'])
    commentList = [] 
    NowPlayingMovie_list = getNowPlayingMovie_list()
    for i in range(len(NowPlayingMovie_list)):    
          NowPlayingMovie_list = getNowPlayingMovie_list()
          for i in range(len(NowPlayingMovie_list)):    
               writer.writerow([NowPlayingMovie_list[i]['id'],NowPlayingMovie_list[i]['title'],NowPlayingMovie_list[i]['score'],NowPlayingMovie_list[i]['region'],NowPlayingMovie_list[i]['director'],NowPlayingMovie_list[i]['votecount']])
    csv_file.close()

运行爬虫程序, 执行如下命令,将抓取的电影信息放在/home/hadoop/data/flume-test/movielist目录下的movielist.csv文件中,运行结果如图所示。

[hadoop@master movielist]$ python3 scrape-douban.py

[hadoop@master movielist]$ cat movielist.csv

在这里插入图片描述

Flume 配置文件

了解了flume 的运行机制后,接下来我们看一下本实例的flume配置文件。添加如下信息到flume.conf文件:

指定Agent的组件名称

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flume-test
a1.sources.r1.fileHeader = true
# 指定Flume sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume-movies/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

保存配置文件后, 启动flume agent,启动效果如图所示。

[hadoop@master ~]$ cd /home/hadoop/software/apache-flume-1.8.0-bin

[hadoop@master apache-flume-1.8.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

在这里插入图片描述
登陆到HDFS,查询flume采集的电影信息,执行如下命令,查询结果如图所示。

[hadoop@master ~]$hadoop fs -ls /tmp/flume-movies

在这里插入图片描述
查询子文件的具体内容,执行如下命令,查询结果如下所示。

[hadoop@master ~]$hadoop fs -cat /tmp/flume-movies/20180427/events.xxxxxx

在这里插入图片描述

2018.11.15
dhp

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇flume 1.7 TailDir source重复获.. 下一篇Flume (五) Channel Selectors

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目