大家好,我是小小明。

今天我们使用python调用几个网络操作相关的命令,并基于此做出些小玩具。

学习计划:

  1. 通过ipconfig /all 命令获取局域网所在的网段
  2. 通过arp -d *命令清空当前所有的arp映射表
  3. 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表
  4. 通过arp -a命令读取缓存的映射表获取所有与本机连接的设备的Mac地址。

计划实现案例:

  1. 读取当前局域网网段下所有连接设备的IP地址和Mac地址。
  2. 分析设备的上下线。

CMD命令

获取本机ip地址和网段

通过ipconfig /all 命令获取本机ip和局域网所在的网段:

C:\Windows\system32>ipconfig /all

Windows IP 配置

   主机名  . . . . . . . . . . . . . : DESKTOP-IS8QJHF
   主 DNS 后缀 . . . . . . . . . . . :
   节点类型  . . . . . . . . . . . . : 混合
   IP 路由已启用 . . . . . . . . . . : 否
   WINS 代理已启用 . . . . . . . . . : 否

以太网适配器 以太网:

   连接特定的 DNS 后缀 . . . . . . . :
   描述. . . . . . . . . . . . . . . : Realtek PCIe GbE Family Controller
   物理地址. . . . . . . . . . . . . : F0-2F-74-82-0F-B0
   DHCP 已启用 . . . . . . . . . . . : 是
   自动配置已启用. . . . . . . . . . : 是
   本地链接 IPv6 地址. . . . . . . . : fe80::4453:3c08:3332:32de%5(首选)
   IPv4 地址 . . . . . . . . . . . . : 192.168.3.31(首选)
   子网掩码  . . . . . . . . . . . . : 255.255.255.0
   获得租约的时间  . . . . . . . . . : 2021年7月3日 8:55:32
   租约过期的时间  . . . . . . . . . : 2021年7月3日 20:55:31
   默认网关. . . . . . . . . . . . . : 192.168.3.1
   DHCP 服务器 . . . . . . . . . . . : 192.168.3.1
   DHCPv6 IAID . . . . . . . . . . . : 82849652
   DHCPv6 客户端 DUID  . . . . . . . : 00-01-00-01-27-EC-F1-A5-F0-2F-74-82-0F-B0
   DNS 服务器  . . . . . . . . . . . : 101.226.4.6
                                       114.114.114.114
   TCPIP 上的 NetBIOS  . . . . . . . : 已启用

清空arp映射表

通过arp -d *命令清空当前所有的arp映射表:

C:\Windows\system32>arp -d *

C:\Windows\system32>arp -a

接口: 192.168.3.31 --- 0x5
  Internet 地址         物理地址              类型
  192.168.3.1           c0-b8-e6-4b-82-cd     动态
  224.0.0.22            01-00-5e-00-00-16     静态
  239.11.20.1           01-00-5e-0b-14-01     静态

C:\Windows\system32>arp -a

接口: 192.168.3.31 --- 0x5
  Internet 地址         物理地址              类型
  192.168.3.1           c0-b8-e6-4b-82-cd     动态
  192.168.3.22          3c-7c-3f-80-07-d0     动态
  224.0.0.22            01-00-5e-00-00-16     静态
  239.11.20.1           01-00-5e-0b-14-01     静态

可以看到清空后马上查到的地址不一样。

Ping局域网所有可能的设备

在命令行中我们可以通过如下命令实现:

for /L %i IN (1,1,254) DO ping -w 1 -n 1 192.168.3.%i

192.168.3改成前面查询到的网段。

不过在Python中,我们只需使用python自己的for循环即可。

上述命令执行过程中:

image-20210703150700223

查看当前在线的设备

在将上面命令执行过一遍过,ARP命令便能够查看当前网段内在线的设备:

image-20210703150908516

经实际测试大概只有几秒的延迟便能够感知设备的上线(刚联网的设备会向该网段发送广播),但无法自动感知到设备的下线,除非主动Ping目标ip发现ping不通,目标ip才会从arp表中删除。

Python实现

下面,我们用Python执行这些命令:

  1. 通过ipconfig /all 命令获取局域网所在的网段
  2. 通过arp -d *命令清空当前所有的arp映射表
  3. 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表
  4. 通过arp -a命令读取缓存的映射表获取所有与本机连接的设备的Mac地址。

获取局域网所在的网段

以下代码根据系统ipconfig /all命令返回的结果进行文本处理:

with os.popen("ipconfig /all") as res:
    for line in res:
        line = line.strip()
        if line.startswith("IPv4"):
            ipv4 = map(int, re.findall("(\d+)\.(\d+)\.(\d+)\.(\d+)", line)[0])
        elif line.startswith("子网掩码"):
            mask = map(int, re.findall("(\d+)\.(\d+)\.(\d+)\.(\d+)", line)[0])
            break
net_segment = ".".join([str(i & j) for i, j in zip(ipv4, mask)]).strip(".0")
net_segment
'192.168.3'

注意:若你的系统执行返回的结果与前面cmd命令的结果不一样,则需要根据实际情况修改代码。

当前我们实际只会使用最后一个位置作为网段,并不需要考虑子网掩码,所有可以简化代码:

with os.popen("ipconfig /all") as res:
    for line in res:
        line = line.strip()
        if line.startswith("IPv4"):
            net_segment = re.findall("(\d+\.\d+\.\d+)\.\d+", line)[0]
            break
net_segment
'192.168.3'

清空映射表

这步必须具有管理员权限,否则会执行失败:

import os

os.system("arp -d *")

Ping局域网所有可能的设备

for i in range(1, 255):
    os.system(f"ping -w 1 -n 1 {net_segment}.{i}")

不过上述命令耗时2分钟:

image-20210703160851106

感觉太慢了,我们考虑使用多线程并发执行或多进程并行执行实现加速。

对于多进程,经测试在开启5个线程时耗时是30秒,8个线程是28秒:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=5) as executor:
    for i in range(1, 255):
        executor.submit(os.system, f"ping -w 1 -n 1 {net_segment}.{i}")

继续增大线程,并不能明显加快执行速度。

试试多进程:

from concurrent.futures import ProcessPoolExecutor
import psutil

# 逻辑cpu个数
count = psutil.cpu_count()
with ProcessPoolExecutor(count) as executor:
    for i in range(1, 255):
        executor.submit(os.system, f"ping -w 1 -n 1 {net_segment}.{i}")

结果耗时31秒,设置4倍的进程数也仅仅只能提升到25秒,与多线程的速度相差不大,不如直接多线程。

结论:使用多线程设置5-8个线程最佳。能由2分钟提升到30秒,直到慢慢达到性能瓶颈。

获取当前在线设备ip和mac地址

通过解析arp命令的结果即可:

header = None
with os.popen("arp -a") as res:
    for line in res:
        line = line.strip()
        if not line or line.startswith("接口"):
            continue
        if header is None:
            header = re.split(" {2,}", line.strip())
            break
    df = pd.read_csv(res, sep=" {2,}", names=header, header=0, engine='python')

print(df.shape)
df.head()

获得36条连接数据:

image-20210703163955852

封装代码

下面我们封装一下上述操作:

import os
import re
import time
from concurrent.futures import ThreadPoolExecutor

import pandas as pd


def get_net_segment():
    with os.popen("arp -a") as res:
        for line in res:
            line = line.strip()
            if line.startswith("接口"):
                net_segment = re.findall(
                    "(\d+\.\d+\.\d+)\.\d+", line)[0]
                break
    return net_segment


def clean_arp():
    os.system("arp -d *")


def ping_net_segment_all(net_segment):
    # for i in range(1, 255):
    #     os.system(f"ping -w 1 -n 1 {net_segment}.{i}")
    with ThreadPoolExecutor(max_workers=4) as executor:
        for i in range(1, 255):
            executor.submit(os.popen, f"ping -w 1 -n 1 {net_segment}.{i}")


def get_arp_ip_mac():
    header = None
    with os.popen("arp -a") as res:
        for line in res:
            line = line.strip()
            if not line or line.startswith("接口"):
                continue
            if header is None:
                header = re.split(" {2,}", line.strip())
                break
        df = pd.read_csv(res, sep=" {2,}",
                         names=header, header=0, engine='python')
    return df

有了这些方法,我们完成最后的目标:

分析局域网设备上下线

思路

  1. 程序启动时,扫描本网段所有可能的ip尝试建立连接。
  2. 获取在线ip地址列表
  3. 每隔10秒尝试ping一次之前在线的ip,获取下线状态
  4. 重新获取IP地址列表与前一次对比,获取设备的上下线状态

为了第3条方便实现,我们实现一个方法:

def ping_ip_list(ips, max_workers=4):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_tasks = []
        for ip in ips:
            future_tasks.append(executor.submit(os.popen, f"ping -w 1 -n 1 {ip}"))
        wait(future_tasks, return_when=ALL_COMPLETED)

逻辑代码

if __name__ == '__main__':
    # 是否进行初始扫描
    init_search = False
    if init_search:
        print("正在扫描当前网段所有ip,预计耗时1分钟....")
        ping_net_segment_all(get_net_segment())

    last = None
    while 1:
        df = get_arp_ip_mac()
        df = df.loc[df.类型 == "动态", ["Internet 地址", "物理地址"]]
        if last is None:
            print("当前在线的设备:")
            print(df)
        else:
            online = df.loc[~df.物理地址.isin(last.物理地址)]
            if online.shape[0] > 0:
                print("新上线设备:")
                print(online)
            offline = last[~last.物理地址.isin(df.物理地址)]
            if offline.shape[0] > 0:
                print("刚下线设备:")
                print(offline)
        time.sleep(5)
        ping_ip_list(df["Internet 地址"].values)
        last = df

完整代码

import os
import re
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

import pandas as pd


def get_net_segment():
    with os.popen("arp -a") as res:
        for line in res:
            line = line.strip()
            if line.startswith("接口"):
                net_segment = re.findall(
                    "(\d+\.\d+\.\d+)\.\d+", line)[0]
                break
    return net_segment


def ping_net_segment_all(net_segment):
    # for i in range(1, 255):
    #     os.system(f"ping -w 1 -n 1 {net_segment}.{i}")
    with ThreadPoolExecutor(max_workers=4) as executor:
        for i in range(1, 255):
            executor.submit(os.popen, f"ping -w 1 -n 1 {net_segment}.{i}")


def get_arp_ip_mac():
    header = None
    with os.popen("arp -a") as res:
        for line in res:
            line = line.strip()
            if not line or line.startswith("接口"):
                continue
            if header is None:
                header = re.split(" {2,}", line.strip())
                break
        df = pd.read_csv(res, sep=" {2,}",
                         names=header, header=0, engine='python')
    return df


def ping_ip_list(ips, max_workers=4):
    print("正在扫描在线列表")
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_tasks = []
        for ip in ips:
            future_tasks.append(executor.submit(os.popen, f"ping -w 1 -n 1 {ip}"))
        wait(future_tasks, return_when=ALL_COMPLETED)


if __name__ == '__main__':
    # 是否进行初始扫描
    init_search = False
    if init_search:
        print("正在扫描当前网段所有ip,预计耗时1分钟....")
        ping_net_segment_all(get_net_segment())

    last = None
    while 1:
        df = get_arp_ip_mac()
        df = df.loc[df.类型 == "动态", ["Internet 地址", "物理地址"]]
        if last is None:
            print("当前在线的设备:")
            print(df)
        else:
            online = df.loc[~df.物理地址.isin(last.物理地址)]
            if online.shape[0] > 0:
                print("新上线设备:")
                print(online)
            offline = last[~last.物理地址.isin(df.物理地址)]
            if offline.shape[0] > 0:
                print("刚下线设备:")
                print(offline)
        time.sleep(5)
        ping_ip_list(df["Internet 地址"].values)
        last = df

结果示例:

当前在线的设备:
     Internet 地址               物理地址
0    192.168.3.3  3c-7c-3f-83-e2-7c
1   192.168.3.10  3c-7c-3f-80-08-1b
2   192.168.3.25  f0-2f-74-82-15-7e
3   192.168.3.26  f0-2f-74-82-15-a2
4   192.168.3.28  f0-2f-74-82-15-38
5   192.168.3.29  f0-2f-74-82-15-d0
6   192.168.3.32  f0-2f-74-82-15-3b
7   192.168.3.33  f0-2f-74-82-15-56
8   192.168.3.39  a8-5e-45-16-79-99
9  192.168.3.225  30-24-a9-5a-eb-82
新上线设备:
    Internet 地址               物理地址
9  192.168.3.52  3c-7c-3f-c2-cd-cb
刚下线设备:
    Internet 地址               物理地址
9  192.168.3.52  3c-7c-3f-c2-cd-cb

本文转载:CSDN博客