#!/usr/bin/env python3
"""
多线程+asyncio端口扫描器
结合了异步IO和多线程技术,实现高效的端口扫描功能
"""
import asyncio
import socket
import threading
import time
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Dict
class AsyncPortScanner:
"""
异步端口扫描器类
使用asyncio和ThreadPoolExecutor实现高效的并发端口扫描
"""
def __init__(self, target: str, start_port: int, end_port: int,
max_threads: int = 100, timeout: float = 1.0):
"""
初始化端口扫描器
Args:
target: 目标IP地址或域名
start_port: 起始端口号
end_port: 结束端口号
max_threads: 最大线程数
timeout: 连接超时时间(秒)
"""
self.target = target
self.start_port = start_port
self.end_port = end_port
self.max_threads = max_threads
self.timeout = timeout
self.open_ports = [] # 存储发现的开放端口
self.lock = threading.Lock() # 线程锁,保护共享资源
self.total_ports = end_port - start_port + 1
self.scanned_ports = 0
async def scan_port(self, port: int) -> bool:
"""
异步扫描单个端口
Args:
port: 要扫描的端口号
Returns:
bool: 端口是否开放
"""
try:
# 使用asyncio的run_in_executor在线程池中执行同步的socket操作
# 这样可以在不阻塞事件循环的情况下执行耗时的网络IO操作
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(
executor,
self._sync_connect,
port
)
return result
except Exception as e:
return False
def _sync_connect(self, port: int) -> bool:
"""
同步连接测试方法
在线程池中执行,测试指定端口是否开放
Args:
port: 要测试的端口号
Returns:
bool: 连接是否成功
"""
try:
# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
# connect_ex()方法返回0表示连接成功
result = sock.connect_ex((self.target, port))
sock.close()
return result == 0
except Exception:
return False
async def scan_port_range_chunk(self, start: int, end: int) -> List[int]:
"""
扫描端口范围块
将大范围的端口分成小块进行并发扫描
Args:
start: 起始端口
end: 结束端口
Returns:
List[int]: 该端口范围内的开放端口列表
"""
tasks = []
# 为每个端口创建异步任务
for port in range(start, end + 1):
task = asyncio.create_task(self.scan_port_with_update(port))
tasks.append(task)
# 并发执行所有端口扫描任务
results = await asyncio.gather(*tasks)
# 筛选出开放的端口
open_ports = [start + i for i, is_open in enumerate(results) if is_open]
return open_ports
async def scan_port_with_update(self, port: int) -> bool:
"""
扫描端口并更新进度显示
在扫描过程中实时更新进度条和发现的开放端口
Args:
port: 要扫描的端口号
Returns:
bool: 端口是否开放
"""
is_open = await self.scan_port(port)
# 使用线程锁保护共享变量的更新
with self.lock:
self.scanned_ports += 1
progress = (self.scanned_ports / self.total_ports) * 100
print(f"\r扫描进度: {self.scanned_ports}/{self.total_ports} ({progress:.1f}%)", end="")
# 如果发现开放端口,立即显示
if is_open:
with self.lock:
self.open_ports.append(port)
print(f"\n[+] 发现开放端口: {port}")
return is_open
async def run_scan(self) -> List[int]:
"""
运行完整的端口扫描流程
Returns:
List[int]: 所有发现的开放端口列表
"""
print(f"开始扫描 {self.target} 的端口 {self.start_port}-{self.end_port}")
print(f"使用 {self.max_threads} 个线程,超时时间: {self.timeout}秒")
start_time = time.time()
# 将端口范围分成多个块,每个块由一个线程处理
# 这种分块策略可以更好地利用系统资源
chunk_size = max(1, self.total_ports // self.max_threads)
tasks = []
# 为每个端口块创建异步任务
for i in range(self.start_port, self.end_port + 1, chunk_size):
chunk_end = min(i + chunk_size - 1, self.end_port)
task = asyncio.create_task(self.scan_port_range_chunk(i, chunk_end))
tasks.append(task)
# 等待所有端口块扫描完成
chunk_results = await asyncio.gather(*tasks)
# 合并所有块的结果
for chunk_ports in chunk_results:
self.open_ports.extend(chunk_ports)
end_time = time.time()
elapsed_time = end_time - start_time
# 显示扫描统计信息
print(f"\n\n扫描完成!")
print(f"扫描耗时: {elapsed_time:.2f}秒")
print(f"发现 {len(self.open_ports)} 个开放端口")
return sorted(self.open_ports)
class PortScannerService:
"""
多目标扫描服务类
管理多个目标的并发扫描任务
"""
def __init__(self):
"""初始化扫描服务"""
self.scan_results: Dict[str, List[int]] = {} # 存储所有目标的扫描结果
async def scan_multiple_targets(self, targets: List[str],
start_port: int, end_port: int,
max_threads: int = 100) -> Dict[str, List[int]]:
"""
并发扫描多个目标
Args:
targets: 目标IP或域名列表
start_port: 起始端口
end_port: 结束端口
max_threads: 每个目标的最大线程数
Returns:
Dict[str, List[int]]: 每个目标对应的开放端口字典
"""
print(f"开始扫描 {len(targets)} 个目标...")
# 为每个目标创建独立的扫描器实例
tasks = []
for target in targets:
scanner = AsyncPortScanner(target, start_port, end_port, max_threads)
task = asyncio.create_task(scanner.run_scan())
tasks.append((target, task))
# 并发执行所有扫描任务
for target, task in tasks:
try:
open_ports = await task
self.scan_results[target] = open_ports
except Exception as e:
print(f"扫描 {target} 时出错: {e}")
self.scan_results[target] = []
return self.scan_results
def display_results(self):
"""
格式化显示所有目标的扫描结果
提供清晰的汇总报告
"""
print("\n" + "="*50)
print("扫描结果汇总:")
print("="*50)
for target, ports in self.scan_results.items():
print(f"\n目标: {target}")
if ports:
print(f"开放端口 ({len(ports)}个): {', '.join(map(str, ports))}")
else:
print("未发现开放端口")
async def main():
"""
主函数
处理命令行参数并启动端口扫描
"""
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="多线程+asyncio端口扫描器")
parser.add_argument("target", help="目标IP或域名")
parser.add_argument("-s", "--start", type=int, default=1, help="起始端口 (默认: 1)")
parser.add_argument("-e", "--end", type=int, default=1024, help="结束端口 (默认: 1024)")
parser.add_argument("-t", "--threads", type=int, default=100, help="最大线程数 (默认: 100)")
parser.add_argument("--timeout", type=float, default=1.0, help="连接超时时间(秒) (默认: 1.0)")
# 解析命令行参数
args = parser.parse_args()
# 创建并配置扫描器
scanner = AsyncPortScanner(
target=args.target,
start_port=args.start,
end_port=args.end,
max_threads=args.threads,
timeout=args.timeout
)
# 运行扫描
open_ports = await scanner.run_scan()
# 显示最终结果
if open_ports:
print(f"\n{args.target} 的开放端口: {', '.join(map(str, open_ports))}")
else:
print(f"\n{args.target} 没有发现开放端口")
if __name__ == "__main__":
# 启动异步主程序
asyncio.run(main())
#!/usr/bin/env python3
import asyncio
from port_scanner import PortScannerService
async def demo():
"""演示多目标扫描功能"""
service = PortScannerService()
# 扫描多个目标
targets = ["127.0.0.1", "localhost"]
start_port = 1
end_port = 1000
max_threads = 50
print(f"开始扫描 {len(targets)} 个目标的端口 {start_port}-{end_port}")
results = await service.scan_multiple_targets(
targets=targets,
start_port=start_port,
end_port=end_port,
max_threads=max_threads
)
# 显示结果
service.display_results()
if __name__ == "__main__":
asyncio.run(demo())