通过分页、线程池、代理池等技术,快速爬取链家网近4万条在售二手房信息,速度可达 10000 条 / 5 分钟。

通过对二手房作数据分析,得到北上广深等(新)一线城市四地房价的纵向比较,同时对各个城市各个区的房价做横向对比,并将对比结果可视化出来。

主要用到的库或模块包括

  • Requests
  • PyQuery
  • ThreadPoolExecutor
  • JSON
  • Matplotlib
  • PyEcharts

环境:

  • Widnows10
  • Python3.5
  • Pycharm2018

数据抓取

爬虫架构设计

通过分析链家网的 URL ,不难发现,每一个城市的链家网的基本格式是:

城市名简拼 + ”.lianjia.com“

所以整个爬虫最外层应该是遍历一个保存城市简拼的列表,拼接得到一个个起始 URL,根据这些 URL 爬取对应城市的链家网。

针对每一个城市的链家网而言,首先得到该城市在售二手房的总套数,由于每一页显示的套数是 30,由总套数整除以30再加上1可以得到总页数,但是由于最大可浏览页数为 100,所以我们这里得加个判断,如果总页数大于 100 的话,令总页数等于 100。

分析具体城市的链家网每一页的 URL, 以北京为例,我们可以发现第 N 页的 URL 是:

bj.lianjia.com/ershoufang/pg{N},由此我们可以通过以下代码来得到每一页的 URL:

for i in range(total_page):
    page_url = "bj.lianjia.com/ershoufang/pg{}".format(i+1)

本来得到每一页的 URL 后,我们可以得到该页上 30 套房的房价信息和详情页 URL,但是页面上没有房子所在区的信息。

我们只能再向下请求访问详情页 URL,从而提取出我们想要的所有数据。

综上所述,我们可以将整个框架从上往下分为四层,如下图所示:

在这里插入图片描述
基于上述思路,在写代码的时候,可以分层从上往下实现,方便调试。

第一层 & 第二层:获取总套数

根据城市简拼得到起始 URL,并得到总套数,为分页做准备。

def get_list_page_url(city):

    start_url = "https://{}.lianjia.com/ershoufang".format(city)
    headers =  {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
    }
    try:
        response = requests.get(start_url, headers=headers)
        # print(response.status_code, response.text)
        doc = pq(response.text)
        total_num =  int(doc(".resultDes .total span").text())
        total_page = total_num // 30 + 1
        # 只能访问到前一百页
        if total_page > 100:
            total_page = 100

        page_url_list = list()

        for i in range(total_page):
            url = start_url + "/pg" + str(i + 1) + "/"
            page_url_list.append(url)
            #print(url)
        return page_url_list

    except:
        print("获取总套数出错,请确认起始URL是否正确")
        return None

第三层:根据起始 URL 得到分页 URL

def get_detail_page_url(page_url):
    global detail_list
    headers =  {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
        'Referer': 'https://bj.lianjia.com/ershoufang'
    }

    try:
        response = requests.get(page_url,headers=headers,timeout=3)
        doc = pq(response.text)
        i = 0
        detail_urls = list()
        for item in doc(".sellListContent li").items():
            i += 1
            print(i)
            if i == 31:
                break
            child_item = item(".noresultRecommend")
            if child_item == None:
                i -= 1
            detail_url = child_item.attr("href")
            detail_urls.append(detail_url)
        return detail_urls
    except:
        print("获取列表页" + page_url + "出错")

第四层

本层做的是具体解析,解析使用的是 PyQuery 库,支持 CSS 选择器且比 Beautiful Soup 方便。仅仅需要下面几行代码就帮助我们获得了目标数据:

response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#detail_url 是得到的详情页 URL
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url

多线程爬取

p = ThreadPoolExecutor(30)
for page_url in page_url_list:
    p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
p.shutdown()

IP 代理池
下载后新开一个 Pycharm 视窗运行该项目,然后我们可以用下面的方式来获取可用的代理 IP:

def get_valid_ip():
    url = "http://localhost:5000/get"
    try:
        ip = requests.get(url).text
        return ip
    except:
        print("请先运行代理池")

然后通过参数设置使用代理 IP:

proxies = {
    "http": "http://" + get_valid_ip(),
}
response = requests.get(url=detail_url, headers=headers, proxies=proxies)

数据保存
采用 JSON文件形式保存数据,每个城市保存一个 JSON 文件,文件名为该城市简拼。

def save_data(data,filename):
    with open(filename+".json", 'w', encoding="utf-8") as f:
        f.write(json.dumps(data, indent=2, ensure_ascii=False))

稍等一会儿,所有数据就保存在本地了:
在这里插入图片描述
在这里插入图片描述

数据分析

数据整合

在这里做一些求同地区房价最大值、最小值、平均值,以及数据格式统一化的工作:

def split_data():
    global region_data
    region_data = dict()
    for region in dic_data.keys():
        # 最大值、最小值、平均值
        region_data[region] = {"max":dic_data[region][0],"min":dic_data[region][0],"average":0}
        for per_price in dic_data[region]:
            if per_price > region_data[region]["max"]:
                region_data[region]["max"] = per_price
            if per_price < region_data[region]["min"]:
                region_data[region]["min"] = per_price
            region_data[region]["average"] += per_price
        region_data[region]["average"] /= len(dic_data[region])
        # 保留两位小数
        region_data[region]["average"] = round(region_data[region]["average"],2)

数据可视化

将分析结果通过 Matplotlib 直观的体现出来,该部分的代码如下:

def data_viewer():
    label_list = region_data.keys()  # 横坐标刻度显示值
    max = []
    min = []
    average = []
    for label in label_list:
        max.append(region_data[label].get("max"))
        min.append(region_data[label].get("min"))
        average.append(region_data[label].get("average"))
    x = range(len(max))
    """
    绘制条形图
    left: 长条形中点横坐标
    height: 长条形高度
    width: 长条形宽度,默认值0
    .8
    label: 为后面设置legend准备
    """
    rects1 = plt.bar(x=x, height=max, width=0.25, alpha=0.8, color='red', label="最大值")
    rects2 = plt.bar(x=[i + 0.25 for i in x], height=average, width=0.25, color='green', label="平均值")
    rects3 = plt.bar(x=[i + 0.5 for i in x], height=min, width=0.25, color='blue', label="最小值")
    #plt.ylim(0, 50) # y轴取值范围
    plt.ylabel("房价/元")
    """
    设置x轴刻度显示值
    参数一:中点坐标
    参数二:显示值
    """
    plt.xticks([index + 0.2 for index in x], label_list)
    plt.xlabel("地区")
    plt.legend()
    for rect in rects1:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height+1, str(height), ha="center", va="bottom")
    for rect in rects2:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
    for rect in rects3:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
    plt.show()

结果如下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
限于篇幅,其他城市的图就不放了。

再来看全国主要一线城市二手房房价有序条形图:
在这里插入图片描述
可以看出,北京、上海、深圳的房价大致在同一水平线,而厦门位于第四,广州在第六,最后看一下房价地域图:
在这里插入图片描述

最终代码

import requests

from concurrent.futures import ThreadPoolExecutor

from pyquery import PyQuery as pq

import json

import threading


import time
'''
更多Python学习资料以及源码教程资料,可以在群1136201545免费获取
'''

def get_list_page_url(city):

    start_url = "https://{}.lianjia.com/ershoufang".format(city)
    headers =  {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
    }
    try:
        response = requests.get(start_url, headers=headers)
        # print(response.status_code, response.text)
        doc = pq(response.text)
        total_num =  int(doc(".resultDes .total span").text())
        total_page = total_num // 30 + 1
        # 只能访问到前一百页
        if total_page > 100:
            total_page = 100

        page_url_list = list()

        for i in range(total_page):
            url = start_url + "/pg" + str(i + 1) + "/"
            page_url_list.append(url)
            #print(url)
        return page_url_list

    except:
        print("获取总套数出错,请确认起始URL是否正确")
        return None


detail_list = list()

# 需要先在本地开启代理池
# 代理池仓库: https://github.com/Python3WebSpider/ProxyPool
def get_valid_ip():
    url = "http://localhost:5000/get"
    try:
        ip = requests.get(url).text
        return ip
    except:
        print("请先运行代理池")


def get_detail_page_url(page_url):
    global detail_list
    headers =  {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
        'Referer': 'https://bj.lianjia.com/ershoufang'
    }

    try:
        response = requests.get(page_url,headers=headers,timeout=3)
        doc = pq(response.text)
        # broswer.get(page_url)
        # print(page_url)
        # doc = pq(broswer.page_source)
        i = 0
        detail_urls = list()
        for item in doc(".sellListContent li").items():
            i += 1
            if i == 31:
                break
            child_item = item(".noresultRecommend")
            if child_item == None:
                i -= 1
            detail_url = child_item.attr("href")
            detail_urls.append(detail_url)
        return detail_urls
    except:
        print("获取列表页" + page_url + "出错")

lock = threading.Lock()

def detail_page_parser(res):
    global detail_list
    detail_urls = res.result()
    if not detail_urls:
        print("detail url 为空")
        return None
    headers =  {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
        'Referer': 'https://bj.lianjia.com/ershoufang'
    }
    for detail_url in detail_urls:
        try:
            response = requests.get(url=detail_url, headers=headers,timeout=3)
            #print(response.status_code)
            detail_dict = dict()
            doc = pq(response.text)
            unit_price = doc(".unitPriceValue").text()
            unit_price = unit_price[0:unit_price.index("元")]
            title = doc("h1").text()
            area = doc(".areaName .info a").eq(0).text().strip()
            url = detail_url
            detail_dict["title"] = title
            detail_dict["area"] = area
            detail_dict["price"] = unit_price
            detail_dict["url"] = url

            detail_list.append(detail_dict)

            print(unit_price, title, area)

        except:
            print("获取详情页出错,换ip重试")
            proxies = {
                "http": "http://" + get_valid_ip(),
            }
            try:
                response = requests.get(url=detail_url, headers=headers, proxies=proxies)
                #print(response.status_code)
                detail_dict = dict()
                doc = pq(response.text)
                unit_price = doc(".unitPriceValue").text()
                unit_price = unit_price[0:unit_price.index("元")]
                title = doc("h1").text()
                area = doc(".areaName .info a").eq(0).text().strip()
                url = detail_url
                # 已下架的还会爬取,但是没有价格
                if len(unit_price)>0:
                    detail_dict["title"] = title
                    detail_dict["area"] = area
                    detail_dict["price"] = unit_price
                    detail_dict["url"] = url

                    detail_list.append(detail_dict)

                    print(unit_price, title, area)
            except:
                print("重试失败...")


def save_data(data,filename):
    with open(filename+".json", 'w', encoding="utf-8") as f:
        f.write(json.dumps(data, indent=2, ensure_ascii=False))

def main():
    # cq,cs,nj,dl,wh,cc
    city_list = ['nj']
    for city in city_list:
        page_url_list = get_list_page_url(city)

        # pool = threadpool.ThreadPool(20)
        # requests = threadpool.makeRequests(page_and_detail_parser, page_url_list)
        # [pool.putRequest(req) for req in requests]
        # pool.wait()

        p = ThreadPoolExecutor(30)

        for page_url in page_url_list:
            p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
        # 这里的回调函数拿到的是一个对象。
        # 先把返回的res得到一个结果。即在前面加上一个res.result(),这个结果就是get_detail_page_url的返回
        p.shutdown()

        print(detail_list)

        save_data(detail_list, city)

        detail_list.clear()

if __name__ == '__main__':
    old = time.time()
    main()
    new  = time.time()
    delta_time = new - old
    print("程序共运行{}s".format(delta_time))

本文转载:CSDN博客