多线程+asyncio端口扫描器

#!/usr/bin/env python3
"""
多线程+asyncio端口扫描器
结合了异步IO和多线程技术,实现高效的端口扫描功能
"""

import asyncio
import socket
import threading
import time
import argparse
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Dict


class AsyncPortScanner:
    """
    异步端口扫描器类
    使用asyncio和ThreadPoolExecutor实现高效的并发端口扫描
    """
    
    def __init__(self, target: str, start_port: int, end_port: int, 
                 max_threads: int = 100, timeout: float = 1.0):
        """
        初始化端口扫描器
        
        Args:
            target: 目标IP地址或域名
            start_port: 起始端口号
            end_port: 结束端口号
            max_threads: 最大线程数
            timeout: 连接超时时间(秒)
        """
        self.target = target
        self.start_port = start_port
        self.end_port = end_port
        self.max_threads = max_threads
        self.timeout = timeout
        self.open_ports = []  # 存储发现的开放端口
        self.lock = threading.Lock()  # 线程锁,保护共享资源
        self.total_ports = end_port - start_port + 1
        self.scanned_ports = 0
        
    async def scan_port(self, port: int) -> bool:
        """
        异步扫描单个端口
        
        Args:
            port: 要扫描的端口号
            
        Returns:
            bool: 端口是否开放
        """
        try:
            # 使用asyncio的run_in_executor在线程池中执行同步的socket操作
            # 这样可以在不阻塞事件循环的情况下执行耗时的网络IO操作
            loop = asyncio.get_event_loop()
            with ThreadPoolExecutor(max_workers=1) as executor:
                result = await loop.run_in_executor(
                    executor, 
                    self._sync_connect, 
                    port
                )
                return result
        except Exception as e:
            return False
    
    def _sync_connect(self, port: int) -> bool:
        """
        同步连接测试方法
        在线程池中执行,测试指定端口是否开放
        
        Args:
            port: 要测试的端口号
            
        Returns:
            bool: 连接是否成功
        """
        try:
            # 创建TCP socket
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)
            # connect_ex()方法返回0表示连接成功
            result = sock.connect_ex((self.target, port))
            sock.close()
            return result == 0
        except Exception:
            return False
    
    async def scan_port_range_chunk(self, start: int, end: int) -> List[int]:
        """
        扫描端口范围块
        将大范围的端口分成小块进行并发扫描
        
        Args:
            start: 起始端口
            end: 结束端口
            
        Returns:
            List[int]: 该端口范围内的开放端口列表
        """
        tasks = []
        # 为每个端口创建异步任务
        for port in range(start, end + 1):
            task = asyncio.create_task(self.scan_port_with_update(port))
            tasks.append(task)
        
        # 并发执行所有端口扫描任务
        results = await asyncio.gather(*tasks)
        # 筛选出开放的端口
        open_ports = [start + i for i, is_open in enumerate(results) if is_open]
        return open_ports
    
    async def scan_port_with_update(self, port: int) -> bool:
        """
        扫描端口并更新进度显示
        在扫描过程中实时更新进度条和发现的开放端口
        
        Args:
            port: 要扫描的端口号
            
        Returns:
            bool: 端口是否开放
        """
        is_open = await self.scan_port(port)
        
        # 使用线程锁保护共享变量的更新
        with self.lock:
            self.scanned_ports += 1
            progress = (self.scanned_ports / self.total_ports) * 100
            print(f"\r扫描进度: {self.scanned_ports}/{self.total_ports} ({progress:.1f}%)", end="")
        
        # 如果发现开放端口,立即显示
        if is_open:
            with self.lock:
                self.open_ports.append(port)
                print(f"\n[+] 发现开放端口: {port}")
        
        return is_open
    
    async def run_scan(self) -> List[int]:
        """
        运行完整的端口扫描流程
        
        Returns:
            List[int]: 所有发现的开放端口列表
        """
        print(f"开始扫描 {self.target} 的端口 {self.start_port}-{self.end_port}")
        print(f"使用 {self.max_threads} 个线程,超时时间: {self.timeout}秒")
        start_time = time.time()
        
        # 将端口范围分成多个块,每个块由一个线程处理
        # 这种分块策略可以更好地利用系统资源
        chunk_size = max(1, self.total_ports // self.max_threads)
        tasks = []
        
        # 为每个端口块创建异步任务
        for i in range(self.start_port, self.end_port + 1, chunk_size):
            chunk_end = min(i + chunk_size - 1, self.end_port)
            task = asyncio.create_task(self.scan_port_range_chunk(i, chunk_end))
            tasks.append(task)
        
        # 等待所有端口块扫描完成
        chunk_results = await asyncio.gather(*tasks)
        
        # 合并所有块的结果
        for chunk_ports in chunk_results:
            self.open_ports.extend(chunk_ports)
        
        end_time = time.time()
        elapsed_time = end_time - start_time
        
        # 显示扫描统计信息
        print(f"\n\n扫描完成!")
        print(f"扫描耗时: {elapsed_time:.2f}秒")
        print(f"发现 {len(self.open_ports)} 个开放端口")
        
        return sorted(self.open_ports)


class PortScannerService:
    """
    多目标扫描服务类
    管理多个目标的并发扫描任务
    """
    
    def __init__(self):
        """初始化扫描服务"""
        self.scan_results: Dict[str, List[int]] = {}  # 存储所有目标的扫描结果
    
    async def scan_multiple_targets(self, targets: List[str], 
                                   start_port: int, end_port: int,
                                   max_threads: int = 100) -> Dict[str, List[int]]:
        """
        并发扫描多个目标
        
        Args:
            targets: 目标IP或域名列表
            start_port: 起始端口
            end_port: 结束端口
            max_threads: 每个目标的最大线程数
            
        Returns:
            Dict[str, List[int]]: 每个目标对应的开放端口字典
        """
        print(f"开始扫描 {len(targets)} 个目标...")
        
        # 为每个目标创建独立的扫描器实例
        tasks = []
        for target in targets:
            scanner = AsyncPortScanner(target, start_port, end_port, max_threads)
            task = asyncio.create_task(scanner.run_scan())
            tasks.append((target, task))
        
        # 并发执行所有扫描任务
        for target, task in tasks:
            try:
                open_ports = await task
                self.scan_results[target] = open_ports
            except Exception as e:
                print(f"扫描 {target} 时出错: {e}")
                self.scan_results[target] = []
        
        return self.scan_results
    
    def display_results(self):
        """
        格式化显示所有目标的扫描结果
        提供清晰的汇总报告
        """
        print("\n" + "="*50)
        print("扫描结果汇总:")
        print("="*50)
        
        for target, ports in self.scan_results.items():
            print(f"\n目标: {target}")
            if ports:
                print(f"开放端口 ({len(ports)}个): {', '.join(map(str, ports))}")
            else:
                print("未发现开放端口")


async def main():
    """
    主函数
    处理命令行参数并启动端口扫描
    """
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description="多线程+asyncio端口扫描器")
    parser.add_argument("target", help="目标IP或域名")
    parser.add_argument("-s", "--start", type=int, default=1, help="起始端口 (默认: 1)")
    parser.add_argument("-e", "--end", type=int, default=1024, help="结束端口 (默认: 1024)")
    parser.add_argument("-t", "--threads", type=int, default=100, help="最大线程数 (默认: 100)")
    parser.add_argument("--timeout", type=float, default=1.0, help="连接超时时间(秒) (默认: 1.0)")
    
    # 解析命令行参数
    args = parser.parse_args()
    
    # 创建并配置扫描器
    scanner = AsyncPortScanner(
        target=args.target,
        start_port=args.start,
        end_port=args.end,
        max_threads=args.threads,
        timeout=args.timeout
    )
    
    # 运行扫描
    open_ports = await scanner.run_scan()
    
    # 显示最终结果
    if open_ports:
        print(f"\n{args.target} 的开放端口: {', '.join(map(str, open_ports))}")
    else:
        print(f"\n{args.target} 没有发现开放端口")


if __name__ == "__main__":
    # 启动异步主程序
    asyncio.run(main())

 

#!/usr/bin/env python3
import asyncio
from port_scanner import PortScannerService


async def demo():
    """演示多目标扫描功能"""
    service = PortScannerService()
    
    # 扫描多个目标
    targets = ["127.0.0.1", "localhost"]
    start_port = 1
    end_port = 1000
    max_threads = 50
    
    print(f"开始扫描 {len(targets)} 个目标的端口 {start_port}-{end_port}")
    
    results = await service.scan_multiple_targets(
        targets=targets,
        start_port=start_port,
        end_port=end_port,
        max_threads=max_threads
    )
    
    # 显示结果
    service.display_results()


if __name__ == "__main__":
    asyncio.run(demo())

 

posted @ 2025-11-27 17:18  RichardShi  阅读(4)  评论(0)    收藏  举报