@TOC
简单概述
本系列可能是一个比较长的系列,主要是对《Python3网络爬虫开发实战》前七章的一个内容总结并且熟悉使用一下相关的框架与技术。
任务目标
爬取电影数据网站ssr1.scrape.center/, 此网站无反爬,数据通过服务端渲染,需要爬取的部分为列表页里面的电影数据详情。
任务目标解析
- 爬取ssr1.scrape.center/, 网站的列表页面,通过列表页面的内容获取到需要的URL
- 爬取ssr1.scrape.center/detail/{id}… 网站内的数据详情,需要获取的部分有:
- 电影标题
- 电影图片的url
- 电影上映时间
- 电影分类
- 电影评分
- 剧情简介
- 将内容存放到需要的数据库中
技术选型与爬取
如何爬取
- 电影标题
- 电影图片的url
- 电影上映时间
- 电影分类
- 电影评分
- 剧情简介
如何爬取
这一部分并不是本次探讨的一个重点,对于爬取请求在之前已经使用了requests库与urllib库进行了相关的实现,如果需要参考可以参看前面的两篇文章,里面有相关的内容介绍。
如何解析
BeautifulSoup库
BeautifulSoup库是Python当中进行网页解析常用的一个库,这个库的功能十分完善,并且可以通过链式原则直接获取一些所需要的元素与属性,它的内部可以选择4种不同的解析方式,并且其可以通过css选择器进行直接的选取,可以说相当的好用了。
解析列表页
对于列表页的解析只需要查找到对应类名的元素后将其进行返回,而BeautifulSoup库正好有一个方便的方法,直接使用find_all()
方法查找类名就可以获取到我们所需要的所有url。
# 对网页数据进行解析
def parse_index(html):
soup = BeautifulSoup(html, 'lxml')
# 获取url
items = soup.find_all(class_='name')
# 如果没有值
if not items:
return []
# 存在值
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item['href'])
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
解析详情页
详情页部分所需要获取的内容就比较多一些,除去直接利用正则表达式容易提取的上映时间外,其他所要获取的内容均可以利用BeautifulSoup库获取到,但在获取元素后获取元素的子孙节点的操作中就有些麻烦了,可能需要进行嵌套的相关操作进行获取。
# 对详情网页进行解析
def parse_detail(html):
soup = BeautifulSoup(html, 'lxml')
# 标题
name = soup.find(name="h2").string if soup.find(name="h2") else None
# 图片
cover = soup.find(class_='cover')['src'] if soup.find(class_='cover') \
else None
# 分类
categories = []
# 获取分类
categories_ele = soup.find(class_='categories')
# 如果存在
if categories_ele != -1:
# 查找内部节点
for category in categories_ele:
if category.find('span') != -1:
category = category.find('span').string
categories.append(category)
# 简介
drama = soup.select('div.drama p')[0].string.strip() if soup.select('div.drama p') \
else None
# 上映时间
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 评分
score = soup.find(class_='score').string.strip() if soup.find(class_='score') \
else None
return {
'name': name,
'cover': cover,
'published': published,
'categories': categories,
'score': score,
'drama': drama
}
parsel库
parsel库的优势之处在于它可以用css选择器的方式选择所需要的元素,也可以通过使用xpath来选择所需要的元素,甚至其还可以直接使用正则表达式进行选择,那么有了这个利器,我们可以在进行选择时,哪个较为方便就利用哪一个进行选择,减轻某些元素在特定情况下不好获取的问题。
解析列表页
由于parsel库可以使用xpath规则,css选择器,正则表达式的方式,这里就可以按自己的熟悉程度进行选择,完成自己需要的部分。
# 获取列表页的数据
def parse_index(html):
selector = Selector(html)
items = selector.css('a.name::attr(href)').getall()
if not items:
return []
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item)
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
解析详情页
parsel库的三种模式的优势在此时就显现的淋漓尽致,当css选择器不太好进行操作时,我们就可以试一试xpath,如果xpath不好操作,那么此时我们也可以试一试正则表达式(当然这里的正则表达式我还是沿用的之前写过的正则表达式。
# 获取详情页的数据
def parse_detail(html):
selector = Selector(text=html)
# 标题
name = selector.xpath('//h2[@class="m-b-sm"]/text()').get() if selector.xpath('//h2[@class="m-b-sm"]/text()').get() else None
# 图片
cover = selector.css('img.cover::attr(src)').get() if selector.css('img.cover::attr(src)').get() else None
# 分类
categories = selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
if selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
else None
# 简介
drama = selector.xpath('//div[@class="drama"]/p/text()').get().strip() \
if selector.xpath('//div[@class="drama"]/p/text()').get() \
else None
# 出版日期
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 分数
score = selector.xpath('//p[contains(@class, "score")]/text()').get().strip() \
if selector.xpath('//p[contains(@class, "score")]/text()').get() \
else None
return {
'name': name,
'cover': cover,
'categories': categories,
'drama': drama,
'published': published,
'score': score
}
如何存储
这里两次抓取的内容我也分别使用了不同的方式进行存储:csv格式存储,以及使用非关系型数据库mongoDB进行存储。
csv格式存储
由于我们返回时采用的是字典格式,那么存储写入csv格式时,我们需要选择以字典的方式进行写入的模式即DictWriter对象,这里需要注意的是,传入的同时我们需要将字典的key也一并传入,否则会出现错误,同时由于需要一个key的标题,因此需要在主函数当中进行一次写入。
# 对获取的内容进行存储
# 本次存储使用csv
def save_data(data_path ,data):
# 初始化字典写入对象
with open(data_path, 'a+', encoding='utf-8') as csvfile:
fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']
# 初始化一个字典写入对象
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writerow(data)
MongoDB存储
mongoDB数据库是一个非关系型的数据库,通过此数据库可以有效的实现对一些非关系型数据内容的存储,同时,由于python有一个pymongo库可以方便的对数据内容进行存储,所以对于此部分的整体实现是比较容易的。
# 进行数据内容的存储
# 使用MongoDB进行数据存储
def save_data(data):
collection.update_one(
{'name': data.get('name')},
{'$set': data},
upsert=True
)
源代码
BeautifulSoup库抓取
import re
import requests
import logging
import csv
from os import makedirs
from os.path import exists
from bs4 import BeautifulSoup
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
# 对网页进行爬取
def scrape_page(url):
logging.info('scraping url: %s ...', url)
try:
# 进行请求
response = requests.get(url=url)
# 请求结果判断
if response.status_code == 200:
# 输出文本内容
return response.text
# 错误情况输出
logging.error('get invaild status code %s while scraping %s',
response.status_code, url)
except requests.RequestException:
# 输出错误情况
logging.error('error occured while scraping %s', url, exc_info=True)
def scrape_index(page):
# 索引页面
index_url = f'{BASE_URL}/page/{page}'
# 进行抓取
return scrape_page(index_url)
def scrape_detail(url):
# 抓取页面内容
return scrape_page(url)
# 对网页数据进行解析
def parse_index(html):
soup = BeautifulSoup(html, 'lxml')
# 获取url
items = soup.find_all(class_='name')
# 如果没有值
if not items:
return []
# 存在值
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item['href'])
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
# 对详情网页进行解析
def parse_detail(html):
soup = BeautifulSoup(html, 'lxml')
# 标题
name = soup.find(name="h2").string if soup.find(name="h2") else None
# 图片
cover = soup.find(class_='cover')['src'] if soup.find(class_='cover') \
else None
# 分类
categories = []
# 获取分类
categories_ele = soup.find(class_='categories')
# 如果存在
if categories_ele != -1:
# 查找内部节点
for category in categories_ele:
if category.find('span') != -1:
category = category.find('span').string
categories.append(category)
# 简介
drama = soup.select('div.drama p')[0].string.strip() if soup.select('div.drama p') \
else None
# 上映时间
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 评分
score = soup.find(class_='score').string.strip() if soup.find(class_='score') \
else None
return {
'name': name,
'cover': cover,
'published': published,
'categories': categories,
'score': score,
'drama': drama
}
# 对获取的内容进行存储
# 本次存储使用csv
def save_data(data_path ,data):
# 初始化字典写入对象
with open(data_path, 'a+', encoding='utf-8') as csvfile:
fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']
# 初始化一个字典写入对象
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writerow(data)
# 主函数
def main():
# 数据存放地址
data_path = '{0}/movies.csv'.format(RESULT_DIR)
# 初始化一个写入对象
with open(data_path, 'w', encoding='utf-8') as csvfile:
fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']
# 初始化一个字典写入对象
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for page_index in range(1, TOTAL_PAGE + 1):
# 获取列表页
page_html = scrape_index(page=page_index)
# 获取详情页的urls
detail_urls = parse_index(page_html)
for detail_url in detail_urls:
# 抓取详情页内容
detail_html = scrape_detail(detail_url)
# 获取数据
data = parse_detail(detail_html)
# 获取信息
logging.info('get detail data %s', data)
save_data(data=data, data_path=data_path)
if __name__ == '__main__':
main()
parsel库抓取
import re
import logging
import pymongo
from parsel import Selector
from urllib.request import urlopen
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
# MONGO
# 链接地址
MONGO_CLIENT_STRING = 'mongodb://localhost:27017'
# 链接数据库
MONGO_DB = 'spiders'
# 链接集合
MONGO_COLLECTION = 'movies'
# 数据库连接与配置
client = pymongo.MongoClient(MONGO_CLIENT_STRING)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]
# 进行页面的抓取
# 构建一个抓取页面的函数
def scrape_page(url):
logging.info('scraping %s ...', url)
# 异常捕获操作
try:
# 进行请求
response = urlopen(url=url)
# 判断请求码
if response.status == 200:
# 输出网页内容
return response.read().decode('utf-8')
# 错误请求
logging.error('get invaild status code %s while scraping %s',
response.status, url)
except:
# 错误请求
logging.error('error occured while scraping %s', url, exc_info=True)
# 获取列表页网页
def scarpe_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 获取详情页网页
def scrape_detail(url):
return scrape_page(url=url)
# 进行页面的解析
# 获取列表页的数据
def parse_index(html):
selector = Selector(html)
items = selector.css('a.name::attr(href)').getall()
if not items:
return []
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item)
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
# 获取详情页的数据
def parse_detail(html):
selector = Selector(text=html)
# 标题
name = selector.xpath('//h2[@class="m-b-sm"]/text()').get() if selector.xpath('//h2[@class="m-b-sm"]/text()').get() else None
# 图片
cover = selector.css('img.cover::attr(src)').get() if selector.css('img.cover::attr(src)').get() else None
# 分类
categories = selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
if selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
else None
# 简介
drama = selector.xpath('//div[@class="drama"]/p/text()').get().strip() \
if selector.xpath('//div[@class="drama"]/p/text()').get() \
else None
# 出版日期
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 分数
score = selector.xpath('//p[contains(@class, "score")]/text()').get().strip() \
if selector.xpath('//p[contains(@class, "score")]/text()').get() \
else None
return {
'name': name,
'cover': cover,
'categories': categories,
'drama': drama,
'published': published,
'score': score
}
# 进行数据内容的存储
# 使用MongoDB进行数据存储
def save_data(data):
collection.update_one(
{'name': data.get('name')},
{'$set': data},
upsert=True
)
# 主函数
def main():
for page in range(1, TOTAL_PAGE + 1):
# 抓取详情页
index_html = scarpe_index(page)
# 获取urls
detail_urls = parse_index(index_html)
# 抓取每个详情页里面的内容
for detail_url in detail_urls:
# 获取详情页内容
detail_html = scrape_detail(detail_url)
# 解析详情页内容获取数据
data = parse_detail(detail_html)
# 将最终的数据输出
logging.info('get detail data %s', data)
# 保存数据
save_data(data=data)
if __name__ == '__main__':
main()
版权信息
import re
import requests
import logging
import csv
from os import makedirs
from os.path import exists
from bs4 import BeautifulSoup
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
# 对网页进行爬取
def scrape_page(url):
logging.info('scraping url: %s ...', url)
try:
# 进行请求
response = requests.get(url=url)
# 请求结果判断
if response.status_code == 200:
# 输出文本内容
return response.text
# 错误情况输出
logging.error('get invaild status code %s while scraping %s',
response.status_code, url)
except requests.RequestException:
# 输出错误情况
logging.error('error occured while scraping %s', url, exc_info=True)
def scrape_index(page):
# 索引页面
index_url = f'{BASE_URL}/page/{page}'
# 进行抓取
return scrape_page(index_url)
def scrape_detail(url):
# 抓取页面内容
return scrape_page(url)
# 对网页数据进行解析
def parse_index(html):
soup = BeautifulSoup(html, 'lxml')
# 获取url
items = soup.find_all(class_='name')
# 如果没有值
if not items:
return []
# 存在值
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item['href'])
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
# 对详情网页进行解析
def parse_detail(html):
soup = BeautifulSoup(html, 'lxml')
# 标题
name = soup.find(name="h2").string if soup.find(name="h2") else None
# 图片
cover = soup.find(class_='cover')['src'] if soup.find(class_='cover') \
else None
# 分类
categories = []
# 获取分类
categories_ele = soup.find(class_='categories')
# 如果存在
if categories_ele != -1:
# 查找内部节点
for category in categories_ele:
if category.find('span') != -1:
category = category.find('span').string
categories.append(category)
# 简介
drama = soup.select('div.drama p')[0].string.strip() if soup.select('div.drama p') \
else None
# 上映时间
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 评分
score = soup.find(class_='score').string.strip() if soup.find(class_='score') \
else None
return {
'name': name,
'cover': cover,
'published': published,
'categories': categories,
'score': score,
'drama': drama
}
# 对获取的内容进行存储
# 本次存储使用csv
def save_data(data_path ,data):
# 初始化字典写入对象
with open(data_path, 'a+', encoding='utf-8') as csvfile:
fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']
# 初始化一个字典写入对象
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writerow(data)
# 主函数
def main():
# 数据存放地址
data_path = '{0}/movies.csv'.format(RESULT_DIR)
# 初始化一个写入对象
with open(data_path, 'w', encoding='utf-8') as csvfile:
fieldnames = ['name', 'cover', 'published', 'categories', 'score', 'drama']
# 初始化一个字典写入对象
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for page_index in range(1, TOTAL_PAGE + 1):
# 获取列表页
page_html = scrape_index(page=page_index)
# 获取详情页的urls
detail_urls = parse_index(page_html)
for detail_url in detail_urls:
# 抓取详情页内容
detail_html = scrape_detail(detail_url)
# 获取数据
data = parse_detail(detail_html)
# 获取信息
logging.info('get detail data %s', data)
save_data(data=data, data_path=data_path)
if __name__ == '__main__':
main()
import re
import logging
import pymongo
from parsel import Selector
from urllib.request import urlopen
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
# 常量
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
# MONGO
# 链接地址
MONGO_CLIENT_STRING = 'mongodb://localhost:27017'
# 链接数据库
MONGO_DB = 'spiders'
# 链接集合
MONGO_COLLECTION = 'movies'
# 数据库连接与配置
client = pymongo.MongoClient(MONGO_CLIENT_STRING)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]
# 进行页面的抓取
# 构建一个抓取页面的函数
def scrape_page(url):
logging.info('scraping %s ...', url)
# 异常捕获操作
try:
# 进行请求
response = urlopen(url=url)
# 判断请求码
if response.status == 200:
# 输出网页内容
return response.read().decode('utf-8')
# 错误请求
logging.error('get invaild status code %s while scraping %s',
response.status, url)
except:
# 错误请求
logging.error('error occured while scraping %s', url, exc_info=True)
# 获取列表页网页
def scarpe_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
# 获取详情页网页
def scrape_detail(url):
return scrape_page(url=url)
# 进行页面的解析
# 获取列表页的数据
def parse_index(html):
selector = Selector(html)
items = selector.css('a.name::attr(href)').getall()
if not items:
return []
for item in items:
# 对链接进行拼接
detail_url = urljoin(BASE_URL, item)
# 输出生成的链接
logging.info('get detail url %s', detail_url)
# 生成器函数
yield detail_url
# 获取详情页的数据
def parse_detail(html):
selector = Selector(text=html)
# 标题
name = selector.xpath('//h2[@class="m-b-sm"]/text()').get() if selector.xpath('//h2[@class="m-b-sm"]/text()').get() else None
# 图片
cover = selector.css('img.cover::attr(src)').get() if selector.css('img.cover::attr(src)').get() else None
# 分类
categories = selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
if selector.xpath('//button[contains(@class,"category")]/span/text()').getall() \
else None
# 简介
drama = selector.xpath('//div[@class="drama"]/p/text()').get().strip() \
if selector.xpath('//div[@class="drama"]/p/text()').get() \
else None
# 出版日期
published_at = re.compile('(\d{4}-\d{2}-\d{2}) 上映')
published = re.search(published_at, html).group(1) if re.search(published_at, html) \
else None
# 分数
score = selector.xpath('//p[contains(@class, "score")]/text()').get().strip() \
if selector.xpath('//p[contains(@class, "score")]/text()').get() \
else None
return {
'name': name,
'cover': cover,
'categories': categories,
'drama': drama,
'published': published,
'score': score
}
# 进行数据内容的存储
# 使用MongoDB进行数据存储
def save_data(data):
collection.update_one(
{'name': data.get('name')},
{'$set': data},
upsert=True
)
# 主函数
def main():
for page in range(1, TOTAL_PAGE + 1):
# 抓取详情页
index_html = scarpe_index(page)
# 获取urls
detail_urls = parse_index(index_html)
# 抓取每个详情页里面的内容
for detail_url in detail_urls:
# 获取详情页内容
detail_html = scrape_detail(detail_url)
# 解析详情页内容获取数据
data = parse_detail(detail_html)
# 将最终的数据输出
logging.info('get detail data %s', data)
# 保存数据
save_data(data=data)
if __name__ == '__main__':
main()
本文由PorterZhang整理或写作完成
本人的Github: PorterZhang2021
本人的博客地址:PorterZhang
代办报建
本公司承接江浙沪报建代办施工许可证。
联系人:张经理,18321657689(微信同号)。
15条评论
态度决定一切,不错!http://c9pmsm.wztl.net/
听楼主一席话,省我十本书!http://wap.zjchuzhou.com/
楼主内心很强大!http://test.cqyiyou.net/test/
信楼主,得永生!http://tru089.dushidaogou.com
楼上的能详细介绍一下么?http://53z33.shuruiyun.com
这篇文章真是让人受益匪浅!http://xaf9.farhataas.com
收藏了,楼主加油!http://bg36.gzhuanhaow.com
看帖不回帖的人就是耍流氓,我回复了!http://mc0nc7.k3i6.com
不错哦,楼主这是要火的节奏啊!http://8id8x.njhaohai.com
感谢楼主的推荐!http://wii.yyjxsb.com
鉴定完毕!http://ln8.https://www.skypeis.com/
雷锋做好事不留名,都写在帖子里!http://xiaoxiongxc.cn/html/66c98998944.html
很有品味!https://www.telegramnu.com
刚分手,心情不好!https://i4-pc.com
楼主是在找骂么?http://mrs.87520.cn
发表评论