通过分页、线程池、代理池等技术,快速爬取链家网近4万条在售二手房信息,速度可达 10000 条 / 5 分钟。
通过对二手房作数据分析,得到北上广深等(新)一线城市四地房价的纵向比较,同时对各个城市各个区的房价做横向对比,并将对比结果可视化出来。
主要用到的库或模块包括
- Requests
- PyQuery
- ThreadPoolExecutor
- JSON
- Matplotlib
- PyEcharts
环境:
- Widnows10
- Python3.5
- Pycharm2018
数据抓取
爬虫架构设计
通过分析链家网的 URL ,不难发现,每一个城市的链家网的基本格式是:
城市名简拼 + ”.lianjia.com“
所以整个爬虫最外层应该是遍历一个保存城市简拼的列表,拼接得到一个个起始 URL,根据这些 URL 爬取对应城市的链家网。
针对每一个城市的链家网而言,首先得到该城市在售二手房的总套数,由于每一页显示的套数是 30,由总套数整除以30再加上1可以得到总页数,但是由于最大可浏览页数为 100,所以我们这里得加个判断,如果总页数大于 100 的话,令总页数等于 100。
分析具体城市的链家网每一页的 URL, 以北京为例,我们可以发现第 N 页的 URL 是:
bj.lianjia.com/ershoufang/pg{N},由此我们可以通过以下代码来得到每一页的 URL:
for i in range(total_page):
page_url = "bj.lianjia.com/ershoufang/pg{}".format(i+1)
本来得到每一页的 URL 后,我们可以得到该页上 30 套房的房价信息和详情页 URL,但是页面上没有房子所在区的信息。
我们只能再向下请求访问详情页 URL,从而提取出我们想要的所有数据。
综上所述,我们可以将整个框架从上往下分为四层,如下图所示:
基于上述思路,在写代码的时候,可以分层从上往下实现,方便调试。
第一层 & 第二层:获取总套数
根据城市简拼得到起始 URL,并得到总套数,为分页做准备。
def get_list_page_url(city):
start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能访问到前一百页
if total_page > 100:
total_page = 100
page_url_list = list()
for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list
except:
print("获取总套数出错,请确认起始URL是否正确")
return None
第三层:根据起始 URL 得到分页 URL
def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
print(i)
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("获取列表页" + page_url + "出错")
第四层
本层做的是具体解析,解析使用的是 PyQuery 库,支持 CSS 选择器且比 Beautiful Soup 方便。仅仅需要下面几行代码就帮助我们获得了目标数据:
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#detail_url 是得到的详情页 URL
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
多线程爬取
p = ThreadPoolExecutor(30)
for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
p.shutdown()
IP 代理池
下载后新开一个 Pycharm 视窗运行该项目,然后我们可以用下面的方式来获取可用的代理 IP:
def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("请先运行代理池")
然后通过参数设置使用代理 IP:
proxies = {
"http": "http://" + get_valid_ip(),
}
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
数据保存
采用 JSON文件形式保存数据,每个城市保存一个 JSON 文件,文件名为该城市简拼。
def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
稍等一会儿,所有数据就保存在本地了:
数据分析
数据整合
在这里做一些求同地区房价最大值、最小值、平均值,以及数据格式统一化的工作:
def split_data():
global region_data
region_data = dict()
for region in dic_data.keys():
# 最大值、最小值、平均值
region_data[region] = {"max":dic_data[region][0],"min":dic_data[region][0],"average":0}
for per_price in dic_data[region]:
if per_price > region_data[region]["max"]:
region_data[region]["max"] = per_price
if per_price < region_data[region]["min"]:
region_data[region]["min"] = per_price
region_data[region]["average"] += per_price
region_data[region]["average"] /= len(dic_data[region])
# 保留两位小数
region_data[region]["average"] = round(region_data[region]["average"],2)
数据可视化
将分析结果通过 Matplotlib 直观的体现出来,该部分的代码如下:
def data_viewer():
label_list = region_data.keys() # 横坐标刻度显示值
max = []
min = []
average = []
for label in label_list:
max.append(region_data[label].get("max"))
min.append(region_data[label].get("min"))
average.append(region_data[label].get("average"))
x = range(len(max))
"""
绘制条形图
left: 长条形中点横坐标
height: 长条形高度
width: 长条形宽度,默认值0
.8
label: 为后面设置legend准备
"""
rects1 = plt.bar(x=x, height=max, width=0.25, alpha=0.8, color='red', label="最大值")
rects2 = plt.bar(x=[i + 0.25 for i in x], height=average, width=0.25, color='green', label="平均值")
rects3 = plt.bar(x=[i + 0.5 for i in x], height=min, width=0.25, color='blue', label="最小值")
#plt.ylim(0, 50) # y轴取值范围
plt.ylabel("房价/元")
"""
设置x轴刻度显示值
参数一:中点坐标
参数二:显示值
"""
plt.xticks([index + 0.2 for index in x], label_list)
plt.xlabel("地区")
plt.legend()
for rect in rects1:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height+1, str(height), ha="center", va="bottom")
for rect in rects2:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
for rect in rects3:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
plt.show()
结果如下:
限于篇幅,其他城市的图就不放了。
再来看全国主要一线城市二手房房价有序条形图:
可以看出,北京、上海、深圳的房价大致在同一水平线,而厦门位于第四,广州在第六,最后看一下房价地域图:
最终代码
import requests
from concurrent.futures import ThreadPoolExecutor
from pyquery import PyQuery as pq
import json
import threading
import time
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''
def get_list_page_url(city):
start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能访问到前一百页
if total_page > 100:
total_page = 100
page_url_list = list()
for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list
except:
print("获取总套数出错,请确认起始URL是否正确")
return None
detail_list = list()
# 需要先在本地开启代理池
# 代理池仓库: https://github.com/Python3WebSpider/ProxyPool
def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("请先运行代理池")
def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
# broswer.get(page_url)
# print(page_url)
# doc = pq(broswer.page_source)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("获取列表页" + page_url + "出错")
lock = threading.Lock()
def detail_page_parser(res):
global detail_list
detail_urls = res.result()
if not detail_urls:
print("detail url 为空")
return None
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
for detail_url in detail_urls:
try:
response = requests.get(url=detail_url, headers=headers,timeout=3)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url
detail_list.append(detail_dict)
print(unit_price, title, area)
except:
print("获取详情页出错,换ip重试")
proxies = {
"http": "http://" + get_valid_ip(),
}
try:
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
# 已下架的还会爬取,但是没有价格
if len(unit_price)>0:
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url
detail_list.append(detail_dict)
print(unit_price, title, area)
except:
print("重试失败...")
def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))
def main():
# cq,cs,nj,dl,wh,cc
city_list = ['nj']
for city in city_list:
page_url_list = get_list_page_url(city)
# pool = threadpool.ThreadPool(20)
# requests = threadpool.makeRequests(page_and_detail_parser, page_url_list)
# [pool.putRequest(req) for req in requests]
# pool.wait()
p = ThreadPoolExecutor(30)
for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
# 这里的回调函数拿到的是一个对象。
# 先把返回的res得到一个结果。即在前面加上一个res.result(),这个结果就是get_detail_page_url的返回
p.shutdown()
print(detail_list)
save_data(detail_list, city)
detail_list.clear()
if __name__ == '__main__':
old = time.time()
main()
new = time.time()
delta_time = new - old
print("程序共运行{}s".format(delta_time))