大家好,我是小小明。
今天我们使用python调用几个网络操作相关的命令,并基于此做出些小玩具。
学习计划:
- 通过
ipconfig /all
命令获取局域网所在的网段 - 通过
arp -d *
命令清空当前所有的arp映射表 - 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表
- 通过arp -a命令读取缓存的映射表获取所有与本机连接的设备的Mac地址。
计划实现案例:
- 读取当前局域网网段下所有连接设备的IP地址和Mac地址。
- 分析设备的上下线。
CMD命令
获取本机ip地址和网段
通过ipconfig /all
命令获取本机ip和局域网所在的网段:
C:\Windows\system32>ipconfig /all
Windows IP 配置
主机名 . . . . . . . . . . . . . : DESKTOP-IS8QJHF
主 DNS 后缀 . . . . . . . . . . . :
节点类型 . . . . . . . . . . . . : 混合
IP 路由已启用 . . . . . . . . . . : 否
WINS 代理已启用 . . . . . . . . . : 否
以太网适配器 以太网:
连接特定的 DNS 后缀 . . . . . . . :
描述. . . . . . . . . . . . . . . : Realtek PCIe GbE Family Controller
物理地址. . . . . . . . . . . . . : F0-2F-74-82-0F-B0
DHCP 已启用 . . . . . . . . . . . : 是
自动配置已启用. . . . . . . . . . : 是
本地链接 IPv6 地址. . . . . . . . : fe80::4453:3c08:3332:32de%5(首选)
IPv4 地址 . . . . . . . . . . . . : 192.168.3.31(首选)
子网掩码 . . . . . . . . . . . . : 255.255.255.0
获得租约的时间 . . . . . . . . . : 2021年7月3日 8:55:32
租约过期的时间 . . . . . . . . . : 2021年7月3日 20:55:31
默认网关. . . . . . . . . . . . . : 192.168.3.1
DHCP 服务器 . . . . . . . . . . . : 192.168.3.1
DHCPv6 IAID . . . . . . . . . . . : 82849652
DHCPv6 客户端 DUID . . . . . . . : 00-01-00-01-27-EC-F1-A5-F0-2F-74-82-0F-B0
DNS 服务器 . . . . . . . . . . . : 101.226.4.6
114.114.114.114
TCPIP 上的 NetBIOS . . . . . . . : 已启用
清空arp映射表
通过arp -d *
命令清空当前所有的arp映射表:
C:\Windows\system32>arp -d *
C:\Windows\system32>arp -a
接口: 192.168.3.31 --- 0x5
Internet 地址 物理地址 类型
192.168.3.1 c0-b8-e6-4b-82-cd 动态
224.0.0.22 01-00-5e-00-00-16 静态
239.11.20.1 01-00-5e-0b-14-01 静态
C:\Windows\system32>arp -a
接口: 192.168.3.31 --- 0x5
Internet 地址 物理地址 类型
192.168.3.1 c0-b8-e6-4b-82-cd 动态
192.168.3.22 3c-7c-3f-80-07-d0 动态
224.0.0.22 01-00-5e-00-00-16 静态
239.11.20.1 01-00-5e-0b-14-01 静态
可以看到清空后马上查到的地址不一样。
Ping局域网所有可能的设备
在命令行中我们可以通过如下命令实现:
for /L %i IN (1,1,254) DO ping -w 1 -n 1 192.168.3.%i
192.168.3改成前面查询到的网段。
不过在Python中,我们只需使用python自己的for循环即可。
上述命令执行过程中:
查看当前在线的设备
在将上面命令执行过一遍过,ARP命令便能够查看当前网段内在线的设备:
经实际测试大概只有几秒的延迟便能够感知设备的上线(刚联网的设备会向该网段发送广播),但无法自动感知到设备的下线,除非主动Ping目标ip发现ping不通,目标ip才会从arp表中删除。
Python实现
下面,我们用Python执行这些命令:
- 通过
ipconfig /all
命令获取局域网所在的网段 - 通过
arp -d *
命令清空当前所有的arp映射表 - 循环遍历当前网段所有可能的ip与其ping一遍建立arp映射表
- 通过arp -a命令读取缓存的映射表获取所有与本机连接的设备的Mac地址。
获取局域网所在的网段
以下代码根据系统ipconfig /all
命令返回的结果进行文本处理:
with os.popen("ipconfig /all") as res:
for line in res:
line = line.strip()
if line.startswith("IPv4"):
ipv4 = map(int, re.findall("(\d+)\.(\d+)\.(\d+)\.(\d+)", line)[0])
elif line.startswith("子网掩码"):
mask = map(int, re.findall("(\d+)\.(\d+)\.(\d+)\.(\d+)", line)[0])
break
net_segment = ".".join([str(i & j) for i, j in zip(ipv4, mask)]).strip(".0")
net_segment
'192.168.3'
注意:若你的系统执行返回的结果与前面cmd命令的结果不一样,则需要根据实际情况修改代码。
当前我们实际只会使用最后一个位置作为网段,并不需要考虑子网掩码,所有可以简化代码:
with os.popen("ipconfig /all") as res:
for line in res:
line = line.strip()
if line.startswith("IPv4"):
net_segment = re.findall("(\d+\.\d+\.\d+)\.\d+", line)[0]
break
net_segment
'192.168.3'
清空映射表
这步必须具有管理员权限,否则会执行失败:
import os
os.system("arp -d *")
Ping局域网所有可能的设备
for i in range(1, 255):
os.system(f"ping -w 1 -n 1 {net_segment}.{i}")
不过上述命令耗时2分钟:
感觉太慢了,我们考虑使用多线程并发执行或多进程并行执行实现加速。
对于多进程,经测试在开启5个线程时耗时是30秒,8个线程是28秒:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
for i in range(1, 255):
executor.submit(os.system, f"ping -w 1 -n 1 {net_segment}.{i}")
继续增大线程,并不能明显加快执行速度。
试试多进程:
from concurrent.futures import ProcessPoolExecutor
import psutil
# 逻辑cpu个数
count = psutil.cpu_count()
with ProcessPoolExecutor(count) as executor:
for i in range(1, 255):
executor.submit(os.system, f"ping -w 1 -n 1 {net_segment}.{i}")
结果耗时31秒,设置4倍的进程数也仅仅只能提升到25秒,与多线程的速度相差不大,不如直接多线程。
结论:使用多线程设置5-8个线程最佳。能由2分钟提升到30秒,直到慢慢达到性能瓶颈。
获取当前在线设备ip和mac地址
通过解析arp命令的结果即可:
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" {2,}", line.strip())
break
df = pd.read_csv(res, sep=" {2,}", names=header, header=0, engine='python')
print(df.shape)
df.head()
获得36条连接数据:
封装代码
下面我们封装一下上述操作:
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
def get_net_segment():
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if line.startswith("接口"):
net_segment = re.findall(
"(\d+\.\d+\.\d+)\.\d+", line)[0]
break
return net_segment
def clean_arp():
os.system("arp -d *")
def ping_net_segment_all(net_segment):
# for i in range(1, 255):
# os.system(f"ping -w 1 -n 1 {net_segment}.{i}")
with ThreadPoolExecutor(max_workers=4) as executor:
for i in range(1, 255):
executor.submit(os.popen, f"ping -w 1 -n 1 {net_segment}.{i}")
def get_arp_ip_mac():
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" {2,}", line.strip())
break
df = pd.read_csv(res, sep=" {2,}",
names=header, header=0, engine='python')
return df
有了这些方法,我们完成最后的目标:
分析局域网设备上下线
思路
- 程序启动时,扫描本网段所有可能的ip尝试建立连接。
- 获取在线ip地址列表
- 每隔10秒尝试ping一次之前在线的ip,获取下线状态
- 重新获取IP地址列表与前一次对比,获取设备的上下线状态
为了第3条方便实现,我们实现一个方法:
def ping_ip_list(ips, max_workers=4):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_tasks = []
for ip in ips:
future_tasks.append(executor.submit(os.popen, f"ping -w 1 -n 1 {ip}"))
wait(future_tasks, return_when=ALL_COMPLETED)
逻辑代码
if __name__ == '__main__':
# 是否进行初始扫描
init_search = False
if init_search:
print("正在扫描当前网段所有ip,预计耗时1分钟....")
ping_net_segment_all(get_net_segment())
last = None
while 1:
df = get_arp_ip_mac()
df = df.loc[df.类型 == "动态", ["Internet 地址", "物理地址"]]
if last is None:
print("当前在线的设备:")
print(df)
else:
online = df.loc[~df.物理地址.isin(last.物理地址)]
if online.shape[0] > 0:
print("新上线设备:")
print(online)
offline = last[~last.物理地址.isin(df.物理地址)]
if offline.shape[0] > 0:
print("刚下线设备:")
print(offline)
time.sleep(5)
ping_ip_list(df["Internet 地址"].values)
last = df
完整代码
import os
import re
import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
import pandas as pd
def get_net_segment():
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if line.startswith("接口"):
net_segment = re.findall(
"(\d+\.\d+\.\d+)\.\d+", line)[0]
break
return net_segment
def ping_net_segment_all(net_segment):
# for i in range(1, 255):
# os.system(f"ping -w 1 -n 1 {net_segment}.{i}")
with ThreadPoolExecutor(max_workers=4) as executor:
for i in range(1, 255):
executor.submit(os.popen, f"ping -w 1 -n 1 {net_segment}.{i}")
def get_arp_ip_mac():
header = None
with os.popen("arp -a") as res:
for line in res:
line = line.strip()
if not line or line.startswith("接口"):
continue
if header is None:
header = re.split(" {2,}", line.strip())
break
df = pd.read_csv(res, sep=" {2,}",
names=header, header=0, engine='python')
return df
def ping_ip_list(ips, max_workers=4):
print("正在扫描在线列表")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_tasks = []
for ip in ips:
future_tasks.append(executor.submit(os.popen, f"ping -w 1 -n 1 {ip}"))
wait(future_tasks, return_when=ALL_COMPLETED)
if __name__ == '__main__':
# 是否进行初始扫描
init_search = False
if init_search:
print("正在扫描当前网段所有ip,预计耗时1分钟....")
ping_net_segment_all(get_net_segment())
last = None
while 1:
df = get_arp_ip_mac()
df = df.loc[df.类型 == "动态", ["Internet 地址", "物理地址"]]
if last is None:
print("当前在线的设备:")
print(df)
else:
online = df.loc[~df.物理地址.isin(last.物理地址)]
if online.shape[0] > 0:
print("新上线设备:")
print(online)
offline = last[~last.物理地址.isin(df.物理地址)]
if offline.shape[0] > 0:
print("刚下线设备:")
print(offline)
time.sleep(5)
ping_ip_list(df["Internet 地址"].values)
last = df
结果示例:
当前在线的设备:
Internet 地址 物理地址
0 192.168.3.3 3c-7c-3f-83-e2-7c
1 192.168.3.10 3c-7c-3f-80-08-1b
2 192.168.3.25 f0-2f-74-82-15-7e
3 192.168.3.26 f0-2f-74-82-15-a2
4 192.168.3.28 f0-2f-74-82-15-38
5 192.168.3.29 f0-2f-74-82-15-d0
6 192.168.3.32 f0-2f-74-82-15-3b
7 192.168.3.33 f0-2f-74-82-15-56
8 192.168.3.39 a8-5e-45-16-79-99
9 192.168.3.225 30-24-a9-5a-eb-82
新上线设备:
Internet 地址 物理地址
9 192.168.3.52 3c-7c-3f-c2-cd-cb
刚下线设备:
Internet 地址 物理地址
9 192.168.3.52 3c-7c-3f-c2-cd-cb