AI Agent框架探秘:拆解 OpenHands(11)--- Runtime主要组件
AI Agent框架探秘:拆解 OpenHands(11)--- Runtime主要组件
0x00 概要
本篇继续对 runtime 的解读,主要介绍 插件、执行系统和环境这三个组件。
因为本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 三大组件
本篇要介绍的几个组件如下:
- ActionExecutor:在 Runtime 中执行动作的核心组件
- ActionExecutor 初始化时会根据配置加载指定的插件。插件注册到 ActionExecutor 的插件字典。
- 当接收到动作请求时,ActionExecutor 会调用相应的方法执行动作。
- 对于浏览动作,ActionExecutor 会使用 BrowserEnv 来处理。
- 如果涉及插件,ActionExecutor 会通过插件系统处理
- AgentSkillsPlugin:提供智能体技能功能的插件
- AgentSkillsPlugin 是一个插件,继承自 Plugin 基类。
- Runtime 初始化时,插件会被加载到插件字典中。 插件通过 PluginRequirement 机制被注册到系统中。
- 特定动作触发时调用相应插件功能。
- BrowserEnv:浏览器环境封装,使用 BrowserGym 库。
- ActionExecutor 在初始化时根据配置决定是否启用浏览器环境。
- 当需要执行浏览相关的动作时,ActionExecutor 会调用 BrowserEnv 的方法。
- BrowserEnv 运行在一个独立的多进程环境中。
0x02 数据流
Runtime 的数据流如下:
- Runtime 会发起动作请求 → ActionExecutor.run_action()
- ActionExecutor 根据动作类型调用相应的处理方法;
- 如果涉及插件,通过插件系统处理;
- 如果涉及浏览器,调用 BrowserEnv 处理;
- 返回观察结果给智能体。

0x03 插件系统
Runtime会遇到如下问题:新增模块(如自定义工具、新 LLM 模型)时,需修改核心代码,扩展性差;多任务并发执行时,模块间交互频繁,易出现性能瓶颈;框架部署与运维复杂,难以适配不同环境(本地、云端、边缘端)。
因此,业界大多采用微服务架构或插件化设计,模块间通过标准化接口通信,新增功能只需开发插件并注册。
3.1 sandbox_plugins
sandbox_plugins 在 OpenHands 的 CodeActAgent 中起到了关键作用,主要用于定义和配置代理在沙箱环境中可以使用的工具和功能。这些插件是代理能够与环境交互并完成任务的基础工具集。
sandbox_plugins 的定义和作用
在 CodeActAgent 类中,sandbox_plugins 是一个类属性,定义了代理在沙箱环境中需要的插件:
sandbox_plugins: list[PluginRequirement] = [
AgentSkillsRequirement(),
JupyterRequirement(),
]
这些插件为代理提供了在沙箱环境中执行任务所需的工具和功能。
具体插件功能
AgentSkillsRequirement 和 JupyterRequirement 是两个插件需求类。
- AgentSkillsRequirement:提供了一系列 Python 函数和工具,使代理能够执行各种操作,包括文件操作、目录浏览、代码执行等基本技能。需要在 JupyterRequirement 之前初始化,因为 Jupyter 需要使用这些函数。
- JupyterRequirement:提供了交互式 Python 解释器环境,允许代理执行 Python 代码,依赖于 AgentSkillsRequirement 提供的函数。
插件在系统中的使用
从代码中可以看出,这些插件在多个地方被使用:
- 在 Runtime 初始化时:
# 在 agent_session.py 中
self.runtime = runtime_cls(
plugins=agent.sandbox_plugins,
)
- 在 Runtime 中设置插件:
# 在 base.py 中
self.plugins = copy.deepcopy(plugins) if plugins is not None and len(plugins) > 0 else []
这些插件为代理提供了以下能力:
- 执行 Bash 命令:通过 AgentSkills 中的命令执行功能
- 执行 Python 代码:通过 Jupyter 插件提供 IPython 环境
- 文件系统操作:读取、写入、编辑文件
- 目录浏览:查看和导航文件系统
- 其他实用工具:各种辅助函数和工具
我们接下来具体分析基类Plugin,AgentSkillsRequirement 和 JupyterPlugin
3.2 Plugin 基类
class Plugin:
"""Base class for a plugin.
This will be initialized by the runtime client, which will run inside docker.
"""
name: str
@abstractmethod
async def initialize(self, username: str) -> None:
"""Initialize the plugin."""
pass
@abstractmethod
async def run(self, action: Action) -> Observation:
"""Run the plugin for a given action."""
pass
@dataclass
class PluginRequirement:
"""Requirement for a plugin."""
name: str
插件为:
ALL_PLUGINS = {
'jupyter': JupyterPlugin,
'agent_skills': AgentSkillsPlugin,
'vscode': VSCodePlugin,
}
3.3 JupyterPlugin
JupyterPlugin 是 OpenHands 框架中的 Jupyter 内核插件,基于 Plugin 基类实现,核心职责是启动 Jupyter Kernel Gateway(内核网关)服务,提供 IPython 代码单元格的异步执行能力,支持代码运行、输出捕获(文本 / 图片)及 Python 解释器路径获取,是框架中集成交互式数据分析、代码调试等 Jupyter 相关功能的核心组件。
核心特色
- 跨平台适配:兼容 Windows、Linux、macOS 系统,针对不同系统采用差异化的进程启动方式(Windows 用
subprocess.Popen,类 Unix 用asyncio.create_subprocess_shell)。 - 灵活的运行时支持:区分本地运行时(LocalRuntime)与非本地运行时,适配不同部署场景(如沙箱环境、本地开发环境),自动处理工作目录与环境变量配置。
- 端口自动分配:在 40000-49999 端口范围内自动查找可用 TCP 端口,避免端口冲突。
- 异步代码执行:基于
JupyterKernel封装异步代码执行逻辑,支持超时控制,能捕获文本输出与图片 URL 等结构化结果。 - 环境隔离与兼容:通过
micromamba虚拟环境或本地环境变量确保依赖一致性,支持 Poetry 项目的路径配置,适配 OpenHands 框架的工程化部署。
流程图

代码
@dataclass
class JupyterRequirement(PluginRequirement):
"""Jupyter插件的依赖声明类,用于框架识别插件依赖。"""
name: str = 'jupyter' # 依赖名称,固定为'jupyter'
class JupyterPlugin(Plugin):
"""Jupyter插件,提供Jupyter Kernel Gateway启动与IPython代码执行能力。"""
name: str = 'jupyter' # 插件名称,固定为'jupyter'
kernel_gateway_port: int # Jupyter Kernel Gateway服务端口
kernel_id: str # Jupyter内核ID
gateway_process: asyncio.subprocess.Process | subprocess.Popen # 内核网关进程对象
python_interpreter_path: str # Python解释器路径
async def initialize(
self, username: str, kernel_id: str = 'openhands-default'
) -> None:
"""初始化Jupyter插件,启动Kernel Gateway服务,配置运行环境。
参数:
username: 执行用户名称(非本地运行时使用)
kernel_id: Jupyter内核ID(默认:openhands-default)
"""
# 在40000-49999端口范围内查找可用TCP端口,避免冲突
self.kernel_gateway_port = find_available_tcp_port(40000, 49999)
self.kernel_id = kernel_id
# 判断是否为本地运行时(通过环境变量LOCAL_RUNTIME_MODE标记)
is_local_runtime = os.environ.get('LOCAL_RUNTIME_MODE') == '1'
# 判断是否为Windows系统
is_windows = sys.platform == 'win32'
if not is_local_runtime:
# 非本地运行时:配置用户切换前缀与Poetry虚拟环境
# 若启用SU_TO_USER,则添加"su - 用户名 -s "前缀(切换用户执行命令)
prefix = f'su - {username} -s ' if SU_TO_USER else ''
# 命令前缀:切换到代码仓库目录,配置环境变量,使用micromamba虚拟环境
poetry_prefix = (
'cd /openhands/code\n'
'export POETRY_VIRTUALENVS_PATH=/openhands/poetry;\n'
'export PYTHONPATH=/openhands/code:$PYTHONPATH;\n'
'export MAMBA_ROOT_PREFIX=/openhands/micromamba;\n'
'/openhands/micromamba/bin/micromamba run -n openhands '
)
else:
# 本地运行时:无需用户切换,直接使用本地环境
prefix = ''
# 从环境变量获取代码仓库路径(本地运行时必须配置)
code_repo_path = os.environ.get('OPENHANDS_REPO_PATH')
if not code_repo_path:
raise ValueError(
'OPENHANDS_REPO_PATH environment variable is not set. '
'This is required for the jupyter plugin to work with LocalRuntime.'
)
# 命令前缀:切换到代码仓库目录(本地环境依赖PATH确保环境正确)
poetry_prefix = f'cd {code_repo_path}\n'
if is_windows:
# Windows系统:构建CMD格式的启动命令
jupyter_launch_command = (
f'cd /d "{code_repo_path}" && ' # 切换到代码仓库目录(/d参数支持跨盘符切换)
f'"{sys.executable}" -m jupyter kernelgateway ' # 启动Jupyter Kernel Gateway
'--KernelGatewayApp.ip=0.0.0.0 ' # 绑定所有网络接口
f'--KernelGatewayApp.port={self.kernel_gateway_port}' # 指定端口
)
# Windows系统使用同步subprocess.Popen启动进程(asyncio在Windows有兼容性限制)
self.gateway_process = subprocess.Popen( # type: ignore[ASYNC101] # noqa: ASYNC101
jupyter_launch_command,
stdout=subprocess.PIPE, # 捕获标准输出
stderr=subprocess.STDOUT, # 标准错误重定向到标准输出
shell=True, # 使用shell执行命令
text=True, # 输出以文本模式返回
)
# Windows系统同步等待Kernel Gateway启动(读取输出直到包含'at'字符,标识服务就绪)
output = ''
while should_continue():
if self.gateway_process.stdout is None:
time.sleep(1) # 无输出时等待1秒
continue
line = self.gateway_process.stdout.readline() # 读取一行输出
if not line:
time.sleep(1)
continue
output += line
if 'at' in line: # 服务启动成功的标识(输出含"at",如"Listening at...")
break
time.sleep(1)
else:
# 类Unix系统(Linux/macOS):构建Bash格式的启动命令
jupyter_launch_command = (
f"{prefix}/bin/bash << 'EOF'\n" # 切换到bash执行,EOF避免变量解析
f'{poetry_prefix}' # 环境配置前缀(虚拟环境/工作目录)
f'"{sys.executable}" -m jupyter kernelgateway ' # 启动Kernel Gateway
'--KernelGatewayApp.ip=0.0.0.0 ' # 绑定所有网络接口
f'--KernelGatewayApp.port={self.kernel_gateway_port}\n' # 指定端口
'EOF'
)
# 类Unix系统使用asyncio创建异步子进程(避免阻塞事件循环)
self.gateway_process = await asyncio.create_subprocess_shell(
jupyter_launch_command,
stderr=asyncio.subprocess.STDOUT, # 标准错误重定向到标准输出
stdout=asyncio.subprocess.PIPE, # 捕获标准输出
)
# 异步等待Kernel Gateway启动(读取输出直到包含'at'字符)
output = ''
while should_continue() and self.gateway_process.stdout is not None:
line_bytes = await self.gateway_process.stdout.readline() # 异步读取一行输出
line = line_bytes.decode('utf-8') # 字节转字符串
output += line
if 'at' in line:
break
await asyncio.sleep(1) # 等待1秒
# 执行测试代码,获取当前Python解释器路径(验证环境正确性)
_obs = await self.run(
IPythonRunCellAction(code='import sys; print(sys.executable)')
)
self.python_interpreter_path = _obs.content.strip() # 提取并保存解释器路径
async def _run(self, action: Action) -> IPythonRunCellObservation:
"""内部方法:在Jupyter内核中执行代码单元格。
参数:
action: 待执行的动作(必须是IPythonRunCellAction类型)
返回:
IPythonRunCellObservation: 代码执行结果的观察值(含文本内容、图片URL等)
"""
# 校验动作类型:仅支持IPythonRunCellAction
if not isinstance(action, IPythonRunCellAction):
raise ValueError(
f'Jupyter plugin only supports IPythonRunCellAction, but got {action}'
)
# 初始化JupyterKernel(若未初始化)
if not hasattr(self, 'kernel'):
self.kernel = JupyterKernel(
f'localhost:{self.kernel_gateway_port}', # 内核网关地址(本地+端口)
self.kernel_id # 内核ID
)
# 若内核未初始化,执行初始化(建立连接)
if not self.kernel.initialized:
await self.kernel.initialize()
# 异步执行代码,支持超时控制(超时时间从action获取)
output = await self.kernel.execute(action.code, timeout=action.timeout)
# 从结构化输出中提取文本内容与图片URL
text_content = output.get('text', '') # 文本输出(stdout/stderr)
image_urls = output.get('images', []) # 图片URL列表(如matplotlib绘图结果)
# 返回封装后的观察结果
return IPythonRunCellObservation(
content=text_content, # 文本内容
code=action.code, # 执行的代码
image_urls=image_urls if image_urls else None, # 图片URL(无则为None)
)
async def run(self, action: Action) -> IPythonRunCellObservation:
"""公开接口:执行IPython代码动作,返回观察结果。
参数:
action: 待执行的IPythonRunCellAction动作
返回:
IPythonRunCellObservation: 代码执行结果
"""
# 调用内部_run方法执行代码,返回结果
obs = await self._run(action)
return obs
3.4 AgentSkillsPlugin
功能概述
AgentSkillsPlugin 是 OpenHands 框架中管理智能体技能(Agent Skills)的核心插件,负责整合文件操作(file_ops)、文件读取(file_reader)、代码仓库操作(repo_ops)等基础技能模块,通过动态导入机制将分散的技能函数统一暴露给框架,同时提供插件依赖声明与文档自动生成能力,是智能体获取文件处理、仓库管理等核心操作能力的关键组件。
class AgentSkillsPlugin(Plugin):
name: str = 'agent_skills'
async def initialize(self, username: str) -> None:
"""Initialize the plugin."""
pass
async def run(self, action: Action) -> Observation:
"""Run the plugin for a given action."""
raise NotImplementedError('AgentSkillsPlugin does not support run method')
核心特色
- 模块化技能整合:通过动态导入机制,将
file_ops、file_reader、repo_ops等独立模块的技能函数统一聚合,简化框架对技能的调用与管理。 - 自动文档生成:扫描所有导入的技能函数,提取函数签名与文档字符串(
__doc__),自动生成标准化文档,提升开发可维护性。 - 柔性依赖处理:对
repo_ops模块采用可选导入策略,导入失败时仅跳过该模块,不影响其他技能的正常使用,增强插件兼容性。 - 极简初始化设计:插件初始化逻辑为空实现,无需额外配置,聚焦于技能函数的聚合与暴露,降低使用门槛。
- 明确的接口约束:禁用
run方法(抛出未实现异常),明确该插件的核心作用是技能聚合而非直接执行动作,避免误用。
AgentSkillsRequirement
AgentSkillsRequirement 是一个插件需求类,它定义了代理在沙箱环境中运行所需的基本技能集合,这些技能主要通过 Python 函数的形式提供,使代理能够执行各种操作:
- 为代理提供了与文件系统交互的基本能力
- 提供了执行命令和脚本的工具
- 为其他高级插件(如 Jupyter)提供了基础函数支持
- 确保代理能够在沙箱环境中完成大多数常见的开发任务
AgentSkillsRequirement 的主要功能如下:
- 文件系统操作
- 提供读取、写入、编辑文件的能力
- 支持目录浏览和文件管理操作
- 允许代理查看和操作工作区中的文件
- 命令执行
- 提供执行 shell 命令的能力
- 允许代理在沙箱环境中运行 bash 命令
- 支持与操作系统交互的各种操作
- 工具函数集合
- 提供一系列实用的 Python 函数
- 这些函数可以被其他插件(如 Jupyter)使用
- 包括各种辅助功能,如字符串处理、数据操作等
在 CodeActAgent 中,AgentSkillsRequirement 被定义在 sandbox_plugins 列表中:
sandbox_plugins: list[PluginRequirement] = [
AgentSkillsRequirement(),
JupyterRequirement(),
]
AgentSkillsRequirement 与其他组件的关系:
- 与 JupyterRequirement 的关系
- AgentSkillsRequirement 必须在 JupyterRequirement 之前初始化
- AgentSkillsRequirement 提供的 Python 函数会被 Jupyter 环境使用
- 这种顺序确保了 Jupyter 可以访问所有必要的工具函数
- 与 Runtime 的关系
- 在 LocalRuntime 和其他运行时环境中,这些插件会被加载和初始化
总的来说,AgentSkillsRequirement 是代理在 OpenHands 环境中执行任务的基础,它提供了一套核心函数,使代理能够与文件系统、命令行和运行环境进行交互。
框架注册与技能发现
OpenHands 框架通过「插件注册机制」识别 AgentSkillsPlugin,并自动发现其聚合的所有具体 Skill 操作,步骤如下:
- 插件注册与依赖声明
AgentSkillsPlugin 继承自框架的 Plugin 基类,通过 AgentSkillsRequirement 声明依赖,框架启动时会自动扫描并加载该插件:
@dataclass
class AgentSkillsRequirement(PluginRequirement):
name: str = "agent_skills" # 插件依赖名称,与插件名一致
documentation: str = agentskills.DOCUMENTATION
class AgentSkillsPlugin(Plugin):
name: str = "agent_skills" # 插件名称,框架通过该名称识别
- 框架解析技能清单
框架加载 AgentSkillsPlugin 后,会读取其 __all__ 变量和全局命名空间,提取所有 Skill 函数的关键信息:
- 函数名(如
create_file):作为 Skill 的唯一标识; - 函数签名(参数、返回值):通过
inspect.signature解析,用于智能体构造调用参数; - 文档字符串(
__doc__):自动生成技能文档,供智能体参考使用。
- 技能全局注册
框架将解析后的 Skill 信息注册到「全局技能注册表」中,形成 key-value 映射(key:Skill 函数名,value:Skill 函数对象 + 元数据),使智能体可通过函数名快速查找并调用对应 Skill。
- 智能体调用具体 Skill 操作
智能体(Agent)通过框架提供的接口,从「全局技能注册表」中获取 AgentSkillsPlugin 聚合的具体 Skill,并触发执行
logger.debug('Initializing AgentSkills')
if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
obs = await self.run_ipython(
IPythonRunCellAction(
code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
)
)
logger.debug(f'AgentSkills initialized: {obs}')
流程图

代码
@dataclass
class AgentSkillsRequirement(PluginRequirement):
"""AgentSkillsPlugin 的依赖声明类,用于框架识别插件依赖。"""
name: str = 'agent_skills' # 依赖名称,固定为'agent_skills'
documentation: str = agentskills.DOCUMENTATION # 依赖文档(来自agentskills模块)
class AgentSkillsPlugin(Plugin):
"""智能体技能插件,负责聚合各类基础技能函数(文件操作、仓库操作等)。"""
name: str = 'agent_skills' # 插件名称,固定为'agent_skills'
async def initialize(self, username: str) -> None:
"""初始化插件(空实现,无需额外配置)。"""
pass
async def run(self, action: Action) -> Observation:
"""执行插件动作(禁用该方法)。
该插件的核心作用是聚合技能函数,而非直接执行动作,因此抛出未实现异常。
"""
raise NotImplementedError('AgentSkillsPlugin does not support run method')
# 动态导入file_ops模块的所有技能函数,添加到当前模块全局命名空间
import_functions(
module=file_ops, # 源模块:文件操作模块(如创建/删除/修改文件)
function_names=file_ops.__all__, # 导入的函数列表(file_ops定义的所有公开函数)
target_globals=globals() # 目标命名空间:当前模块全局变量
)
# 动态导入file_reader模块的所有技能函数,添加到当前模块全局命名空间
import_functions(
module=file_reader, # 源模块:文件读取模块(如读取文本文件、解析JSON等)
function_names=file_reader.__all__, # 导入的函数列表
target_globals=globals()
)
# 初始化__all__列表,包含已导入的所有技能函数(供外部模块导入)
__all__ = file_ops.__all__ + file_reader.__all__
# 可选导入repo_ops模块(代码仓库操作,如Git克隆、提交等)
try:
from openhands.runtime.plugins.agent_skills import repo_ops
# 动态导入repo_ops模块的所有技能函数
import_functions(
module=repo_ops,
function_names=repo_ops.__all__,
target_globals=globals()
)
# 将repo_ops的技能函数添加到__all__
__all__ += repo_ops.__all__
except ImportError:
# 若repo_ops模块不可用(如未安装依赖),则跳过导入,不影响其他功能
pass
# 自动生成所有技能函数的标准化文档
DOCUMENTATION = ''
for func_name in __all__:
# 从全局命名空间获取技能函数
func = globals()[func_name]
# 获取函数的文档字符串
cur_doc = func.__doc__
# 清理文档字符串:去除空行、统一去除每行缩进
cur_doc = '\n'.join(filter(None, map(lambda x: x.strip(), cur_doc.split('\n'))))
# 格式化文档:每行添加4个空格缩进,保证格式统一
cur_doc = '\n'.join(map(lambda x: ' ' * 4 + x, cur_doc.split('\n')))
# 提取函数签名(函数名+参数列表)
fn_signature = f'{func.__name__}' + str(signature(func))
# 将函数签名与格式化文档添加到总文档
DOCUMENTATION += f'{fn_signature}:\n{cur_doc}\n\n'
# 单独添加file_editor技能函数(特殊处理,未包含在上述模块中)
from openhands.runtime.plugins.agent_skills.file_editor import file_editor # noqa: E402
__all__ += ['file_editor'] # 将file_editor添加到__all__,供外部导入
0x04 执行系统
ActionExecutor 是 OpenHands 框架中运行于 Docker 沙箱内的核心动作执行组件,负责接收并执行来自后端的各类动作(如命令行执行、IPython 代码运行、浏览器操作等),生成对应的观察结果(Observation),同时管理插件生命周期、用户环境、工作目录及资源监控,是连接后端指令与沙箱执行环境的关键桥梁。
4.1 调用
在 OpenHands 项目中,ActionExecutor 类的调用与使用流程总结
- 服务端:ActionExecutor 在 action_execution_server.py 中作为独立服务运行。
- 客户端:各种运行时实现(如 LocalRuntime)通过 HTTP 请求与 ActionExecutor 通信。
执行步骤:
- Runtime(实际就是ActionExecutionClient的派生类)直接或间接调用 execute_action() 方法;
- class RemoteRuntime(ActionExecutionClient)
- class LocalRuntime(ActionExecutionClient)
- class DockerRuntime(ActionExecutionClient)
- 通过 HTTP POST 请求发送到 /execute_action 端点;
- ActionExecutor 接收请求并执行相应操作;
- 将观察结果返回给客户端。
这种架构把操作执行与主应用分离,提供了更好的隔离性和安全性。
4.2action_execution_client.py
action_execution_client.py 文件包含ActionExecutionClient类,它实现了运行时接口。它是一个抽象实现,意味着仍需要通过具体实现来扩展才能使用。
ActionExecutionClient 通过HTTP调用与action_execution_server 交互以实际执行运行时操作。
ActionExecutionClient 在各种运行时实现中被使用。例如,在 LocalRuntime 中,操作会通过客户端发送到 ActionExecutor。
4.3 action_execution_server.py
ActionExecutor 在 openhands/runtime/action_execution_server.py 文件中充当核心组件。该文件将其实例化,并作为 FastAPI 应用程序的核心执行器。即,action_executor.py 文件包含ActionExecutor类,负责通过/execute_action HTTP端点接收的动作。它在HTTP响应中返回观察结果。
核心特色如下
- 沙箱内环境管理:初始化用户权限与工作目录,支持 Windows/Linux 跨平台环境适配,确保执行隔离性与安全性。
- 插件化架构:支持加载 VSCode、Jupyter 等各类插件,通过统一接口管理插件初始化与调用,灵活扩展执行能力。
- 多环境协同:集成 Bash/Windows PowerShell 命令行环境、浏览器环境(BrowserEnv)、Jupyter 交互式环境,满足多样化动作执行需求。
- 异步初始化优化:浏览器环境采用异步初始化,避免阻塞主流程,提升启动效率。
- 资源与状态监控:支持内存上限配置与内存监控,同步 Bash 与 Jupyter 工作目录,确保执行上下文一致性。
- 异常处理与兼容性:针对 Windows 系统、插件缺失等场景做特殊兼容处理,抛出明确异常并记录日志。
具体会:
在main中启动
if __name__ == '__main__':
logger.debug(f'Starting action execution API on port {args.port}')
# When LOG_JSON=1, provide a JSON log config to Uvicorn so error/access logs are structured
log_config = None
if os.getenv('LOG_JSON', '0') in ('1', 'true', 'True'):
log_config = get_uvicorn_json_log_config()
run(app, host='0.0.0.0', port=args.port, log_config=log_config, use_colors=False)
在 lifespan 函数中初始化
@asynccontextmanager
async def lifespan(app: FastAPI):
global client, mcp_proxy_manager
logger.info('Initializing ActionExecutor...')
client = ActionExecutor(
plugins_to_load,
work_dir=args.working_dir,
username=args.username,
user_id=args.user_id,
enable_browser=args.enable_browser,
browsergym_eval_env=args.browsergym_eval_env,
)
await client.ainit()
logger.info('ActionExecutor initialized.')
# Check if we're on Windows
is_windows = sys.platform == 'win32'
# Initialize and mount MCP Proxy Manager (skip on Windows)
if is_windows:
logger.info('Skipping MCP Proxy initialization on Windows')
mcp_proxy_manager = None
else:
logger.info('Initializing MCP Proxy Manager...')
# Create a MCP Proxy Manager
mcp_proxy_manager = MCPProxyManager(
auth_enabled=bool(SESSION_API_KEY),
api_key=SESSION_API_KEY,
logger_level=logger.getEffectiveLevel(),
)
mcp_proxy_manager.initialize()
# Mount the proxy to the app
allowed_origins = ['*']
try:
await mcp_proxy_manager.mount_to_app(app, allowed_origins)
except Exception as e:
logger.error(f'Error mounting MCP Proxy: {e}', exc_info=True)
raise RuntimeError(f'Cannot mount MCP Proxy: {e}')
yield
# Clean up & release the resources
logger.info('Shutting down MCP Proxy Manager...')
if mcp_proxy_manager:
del mcp_proxy_manager
mcp_proxy_manager = None
else:
logger.info('MCP Proxy Manager instance not found for shutdown.')
logger.info('Closing ActionExecutor...')
if client:
try:
client.close()
logger.info('ActionExecutor closed successfully.')
except Exception as e:
logger.error(f'Error closing ActionExecutor: {e}', exc_info=True)
else:
logger.info('ActionExecutor instance not found for closing.')
logger.info('Shutdown complete.')
在 /execute_action 端点中被调用
@app.post('/execute_action')
async def execute_action(action_request: ActionRequest):
assert client is not None
try:
action = event_from_dict(action_request.action)
if not isinstance(action, Action):
raise HTTPException(status_code=400, detail='Invalid action type')
client.last_execution_time = time.time()
observation = await client.run_action(action)
return event_to_dict(observation)
except Exception as e:
logger.exception(f'Error while running /execute_action: {str(e)}')
raise HTTPException(
status_code=500,
detail=f'Internal server error: {str(e)}',
)
finally:
update_last_execution_time()
4.4 流程图

4.5 代码
ActionExecutor类的关键特性:
- 初始化用户环境和bash shell
- 插件管理和初始化
- 执行各种操作类型(bash命令、IPython单元、文件操作、浏览)
- 与BrowserEnv集成以进行网络交互
class ActionExecutor:
"""动作执行器(ActionExecutor)运行于 Docker 沙箱内,
负责执行从 OpenHands 后端接收的动作,并生成对应的观察结果(Observation)。
"""
def __init__(
self,
plugins_to_load: list[Plugin],
work_dir: str,
username: str,
user_id: int,
enable_browser: bool,
browsergym_eval_env: str | None,
) -> None:
"""初始化动作执行器,配置执行环境、插件列表、用户信息等核心参数。
参数:
plugins_to_load: 待加载的插件列表
work_dir: 初始工作目录路径
username: 执行用户名称
user_id: 执行用户ID
enable_browser: 是否启用浏览器环境
browsergym_eval_env: BrowserGym 评估环境名称(可选,启用浏览器时生效)
"""
# 待加载的插件列表
self.plugins_to_load = plugins_to_load
# 初始工作目录(沙箱内路径)
self._initial_cwd = work_dir
# 执行用户名称
self.username = username
# 执行用户ID
self.user_id = user_id
# 初始化用户与工作目录(设置用户权限、创建工作目录,返回更新后的用户ID)
_updated_user_id = init_user_and_working_directory(
username=username, user_id=self.user_id, initial_cwd=work_dir
)
if _updated_user_id is not None:
self.user_id = _updated_user_id # 更新为实际生效的用户ID
# 命令行会话(支持 Bash 或 Windows PowerShell)
self.bash_session: BashSession | 'WindowsPowershellSession' | None = None # type: ignore[name-defined]
# 异步锁,确保动作执行的线程安全
self.lock = asyncio.Lock()
# 已加载的插件字典(key: 插件名称,value: 插件实例)
self.plugins: dict[str, Plugin] = {}
# 文件编辑器实例(基于工作目录根路径初始化)
self.file_editor = OHEditor(workspace_root=self._initial_cwd)
# 是否启用浏览器环境
self.enable_browser = enable_browser
# 浏览器环境实例(BrowserEnv)
self.browser: BrowserEnv | None = None
# 浏览器异步初始化任务(避免阻塞主流程)
self.browser_init_task: asyncio.Task | None = None
# BrowserGym 评估环境名称
self.browsergym_eval_env = browsergym_eval_env
# 合法性校验:未启用浏览器时,不允许设置 BrowserGym 评估环境
if (not self.enable_browser) and self.browsergym_eval_env:
raise BrowserUnavailableException(
'Browser environment is not enabled in config, but browsergym_eval_env is set'
)
# 记录启动时间与最后执行时间
self.start_time = time.time()
self.last_execution_time = self.start_time
# 初始化完成标记
self._initialized = False
# 已下载文件列表
self.downloaded_files: list[str] = []
# 下载目录路径(沙箱内)
self.downloads_directory = '/workspace/.downloads'
# 内存上限配置(从环境变量读取,可选)
self.max_memory_gb: int | None = None
if _override_max_memory_gb := os.environ.get('RUNTIME_MAX_MEMORY_GB', None):
self.max_memory_gb = int(_override_max_memory_gb)
else:
logger.info('No max memory limit set, using all available system memory')
# 初始化内存监控(根据环境变量决定是否启用)
self.memory_monitor = MemoryMonitor(
enable=os.environ.get('RUNTIME_MEMORY_MONITOR', 'False').lower()
in ['true', '1', 'yes']
)
self.memory_monitor.start_monitoring() # 启动内存监控
async def _init_browser_async(self):
"""异步初始化浏览器环境(避免阻塞主流程)。"""
if not self.enable_browser:
logger.info('Browser environment is not enabled in config')
return
# Windows 系统不支持浏览器环境,输出警告
if sys.platform == 'win32':
logger.warning('Browser environment not supported on windows')
return
try:
# 初始化浏览器环境(传入评估环境名称)
self.browser = BrowserEnv(self.browsergym_eval_env)
logger.debug('Browser initialized asynchronously')
except Exception as e:
logger.exception(f'Failed to initialize browser: {e}')
self.browser = None # 初始化失败则置空
async def _init_plugin(self, plugin: Plugin):
"""初始化单个插件,并注册到插件字典。
参数:
plugin: 待初始化的插件实例
"""
assert self.bash_session is not None, "命令行会话未初始化,无法加载插件"
# VSCode 插件特殊处理:需要 Runtime ID 用于 Gateway API 的路径路由
if isinstance(plugin, VSCodePlugin):
runtime_id = os.environ.get('RUNTIME_ID') # 从环境变量获取 Runtime ID
await plugin.initialize(self.username, runtime_id=runtime_id)
else:
# 其他插件直接传入用户名初始化
await plugin.initialize(self.username)
# 将初始化后的插件注册到字典
self.plugins[plugin.name] = plugin
logger.debug(f'Initializing plugin: {plugin.name}')
# Jupyter 插件特殊处理:同步命令行工作目录到 Jupyter 环境
if isinstance(plugin, JupyterPlugin):
# Windows 路径转义(将反斜杠替换为正斜杠)
cwd = self.bash_session.cwd.replace('\\', '/')
# 执行 IPython 代码,切换工作目录
await self.run_ipython(
IPythonRunCellAction(code=f'import os; os.chdir(r"{cwd}")')
)
async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
"""执行 IPython 代码动作,返回执行结果观察值。
参数:
action: IPython 代码执行动作(包含待执行代码、是否补充额外信息等)
返回:
IPythonRunCellObservation: 代码执行结果的观察值(含输出内容、状态等)
"""
assert self.bash_session is not None, "命令行会话未初始化,无法执行 IPython 动作"
# 检查 Jupyter 插件是否已加载
if 'jupyter' in self.plugins:
_jupyter_plugin: JupyterPlugin = self.plugins['jupyter'] # 类型断言
# 同步 Bash 与 Jupyter 的工作目录(确保执行上下文一致)
jupyter_cwd = getattr(self, '_jupyter_cwd', None) # 获取当前 Jupyter 工作目录
if self.bash_session.cwd != jupyter_cwd:
# Windows 路径转义
cwd = self.bash_session.cwd.replace('\\', '/')
# 生成切换工作目录的代码
reset_jupyter_cwd_code = f'import os; os.chdir("{cwd}")'
_aux_action = IPythonRunCellAction(code=reset_jupyter_cwd_code)
# 执行工作目录切换
_reset_obs: IPythonRunCellObservation = await _jupyter_plugin.run(
_aux_action
)
self._jupyter_cwd = self.bash_session.cwd # 更新 Jupyter 工作目录缓存
# 执行目标 IPython 代码
obs: IPythonRunCellObservation = await _jupyter_plugin.run(action)
obs.content = obs.content.rstrip() # 去除输出内容末尾的空白字符
# 若需要补充额外信息,添加工作目录与 Python 解释器路径
if action.include_extra:
obs.content += (
f'\n[Jupyter current working directory: {self.bash_session.cwd}]'
)
obs.content += f'\n[Jupyter Python interpreter: {_jupyter_plugin.python_interpreter_path}]'
return obs
else:
# 未加载 Jupyter 插件时抛出异常
raise RuntimeError(
'JupyterRequirement not found. Unable to run IPython action.'
)
0x05 环境
我们以 BrowserEnv 为例来看看环境如何实现。
BrowserEnv 是 OpenHands 框架中浏览器环境的核心封装类,负责创建独立的浏览器进程(基于 Playwright + BrowserGym),提供浏览器操作的标准化接口(如执行动作、检查存活状态、关闭环境),支持普通网页交互与评估场景(如 WebArena、MiniWoB),并通过进程间通信(Pipe)实现与主程序的高效数据传输,是框架中处理网页交互、数据抓取等浏览器相关任务的核心组件。
5.1 调用
ActionExecutor 有成员变量 self.browser: BrowserEnv
class ActionExecutor:
"""ActionExecutor is running inside docker sandbox.
It is responsible for executing actions received from OpenHands backend and producing observations.
"""
def __init__(
self,
plugins_to_load: list[Plugin],
work_dir: str,
username: str,
user_id: int,
enable_browser: bool,
browsergym_eval_env: str | None,
) -> None:
self.browser: BrowserEnv | None = None
async def _init_browser_async(self):
"""Initialize the browser asynchronously."""
if not self.enable_browser:
logger.info('Browser environment is not enabled in config')
return
if sys.platform == 'win32':
logger.warning('Browser environment not supported on windows')
return
logger.debug('Initializing browser asynchronously')
try:
self.browser = BrowserEnv(self.browsergym_eval_env)
logger.debug('Browser initialized asynchronously')
except Exception as e:
logger.exception(f'Failed to initialize browser: {e}')
self.browser = None
使用如下代码对浏览器环境进行使用。
async def browse(self, action: BrowseURLAction) -> Observation:
if self.browser is None:
return ErrorObservation(
'Browser functionality is not supported or disabled.'
)
await self._ensure_browser_ready()
return await browse(action, self.browser, self.initial_cwd)
async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
if self.browser is None:
return ErrorObservation(
'Browser functionality is not supported or disabled.'
)
await self._ensure_browser_ready()
browser_observation = await browse(action, self.browser, self.initial_cwd)
if not browser_observation.error:
return browser_observation
else:
curr_files = os.listdir(self.downloads_directory)
new_download = False
for file in curr_files:
if file not in self.downloaded_files:
new_download = True
self.downloaded_files.append(file)
break
if not new_download:
return browser_observation
else:
# A new file is downloaded in self.downloads_directory, shift file to /workspace
src_path = os.path.join(
self.downloads_directory, self.downloaded_files[-1]
)
# Guess extension of file using puremagic and add it to tgt_path file name
file_ext = ''
try:
guesses = puremagic.magic_file(src_path)
if len(guesses) > 0:
ext = guesses[0].extension.strip()
if len(ext) > 0:
file_ext = ext
except Exception as _:
pass
tgt_path = os.path.join(
'/workspace', f'file_{len(self.downloaded_files)}{file_ext}'
)
shutil.copy(src_path, tgt_path)
file_download_obs = FileDownloadObservation(
content=f'Execution of the previous action {action.browser_actions} resulted in a file download. The downloaded file is saved at location: {tgt_path}',
file_path=tgt_path,
)
return file_download_obs
5.2 核心特色
- 双模式支持:兼容普通开放式浏览器场景(空白页启动,支持自由网页交互)与评估场景(集成 BrowserGym 生态,支持 WebArena、MiniWoB 等标准化任务评估)。
- 进程隔离设计:通过多进程(
multiprocessing)创建独立浏览器进程,避免与主程序相互影响,提升稳定性与安全性。 - 鲁棒的初始化与销毁:内置重试机制(最多 5 次)处理浏览器启动失败,通过
atexit注册自动关闭函数,确保资源释放。 - 结构化数据处理:将浏览器 DOM 转换为文本、截图转为 Base64 编码,保证观察结果(Observation)的可序列化与可传输性。
- 下载与缓存支持:配置浏览器下载路径,支持文件下载功能,适配网页数据抓取等场景。
- 评估专用功能:评估模式下自动注册 BrowserGym 任务、记录目标(Goal)与奖励(Reward),方便后续评估分析。
5.3 流程图

5.4 代码
初始化BrowserEnv时会spawn一个进程来启动浏览器,以便大模型在需要外部资料时通过工具获取,用到了强化学习库gymnasium来操作浏览器。
import gymnasium as gym
class BrowserEnv:
def __init__(self, browsergym_eval_env: str | None = None):
"""初始化浏览器环境,创建独立进程与通信管道,支持普通交互与评估模式。
参数:
browsergym_eval_env: BrowserGym 评估环境名称(如 "browsergym/webarena"),
传入则启用评估模式,否则为普通开放式浏览器环境。
"""
# 初始化HTML转文本转换器(用于提取网页文本内容)
self.html_text_converter = self.get_html_text_converter()
# 评估模式开关(是否启用BrowserGym评估)
self.eval_mode = False
# 评估目录(用于存储评估相关数据)
self.eval_dir = ''
# 评估模式配置:必须传入评估环境名称
self.browsergym_eval_env = browsergym_eval_env
self.eval_mode = bool(browsergym_eval_env) # 有评估环境则启用评估模式
# 初始化浏览器环境进程:设置多进程启动方式为"spawn"(跨平台兼容)
multiprocessing.set_start_method('spawn', force=True)
# 创建进程间通信管道(双向通信:浏览器端 <-> 代理端)
self.browser_side, self.agent_side = multiprocessing.Pipe()
# 启动浏览器环境(带重试机制)
self.init_browser()
# 注册进程退出时的自动关闭函数(确保资源释放)
atexit.register(self.close)
def get_html_text_converter(self) -> html2text.HTML2Text:
"""创建并配置HTML转文本转换器,定义网页内容的处理规则。"""
html_text_converter = html2text.HTML2Text()
# 配置规则:不忽略链接(保留链接文本),忽略图片
html_text_converter.ignore_links = False
html_text_converter.ignore_images = True
# 图片使用alt文本替代(提升文本可读性)
html_text_converter.images_to_alt = True
# 禁用自动文本换行(保持原始网页文本结构)
html_text_converter.body_width = 0
return html_text_converter
@retry(
wait=wait_fixed(1), # 重试间隔1秒
stop=stop_after_attempt(5) | stop_if_should_exit(), # 最多重试5次或进程退出时停止
retry=retry_if_exception_type(BrowserInitException), # 仅对浏览器初始化异常重试
)
def init_browser(self) -> None:
"""启动浏览器进程,失败则重试(最多5次),最终失败抛出异常。"""
logger.debug('Starting browser env...')
try:
# 创建浏览器进程,目标函数为browser_process(独立进程中运行)
self.process = multiprocessing.Process(target=self.browser_process)
self.process.start()
except Exception as e:
logger.error(f'Failed to start browser process: {e}')
raise # 抛出异常触发重试
# 检查浏览器进程是否存活(超时200秒)
if not self.check_alive(timeout=200):
self.close() # 进程未存活则关闭资源
raise BrowserInitException('Failed to start browser environment.')
def browser_process(self) -> None:
"""浏览器进程核心逻辑:初始化BrowserGym环境,处理动作请求,返回观察结果。"""
if self.eval_mode:
# 评估模式:初始化BrowserGym评估环境
assert self.browsergym_eval_env is not None
logger.info('Initializing browser env for web browsing evaluation.')
# 补全评估环境名称前缀(确保符合BrowserGym规范)
if not self.browsergym_eval_env.startswith('browsergym/'):
self.browsergym_eval_env = 'browsergym/' + self.browsergym_eval_env
# 根据评估环境类型导入对应的BrowserGym任务(注册为gym环境)
if 'visualwebarena' in self.browsergym_eval_env:
import browsergym.visualwebarena # noqa F401 注册visualwebarena任务
import nltk
nltk.download('punkt_tab') # 下载NLTK依赖数据
elif 'webarena' in self.browsergym_eval_env:
import browsergym.webarena # noqa F401 注册webarena任务
elif 'miniwob' in self.browsergym_eval_env:
import browsergym.miniwob # noqa F401 注册miniwob任务
else:
raise ValueError(
f'Unsupported browsergym eval env: {self.browsergym_eval_env}'
)
# 创建评估环境(标记所有元素,超时100000秒)
env = gym.make(self.browsergym_eval_env, tags_to_mark='all', timeout=100000)
else:
# 普通模式:创建开放式浏览器环境
env = gym.make(
'browsergym/openended', # 开放式任务类型
task_kwargs={'start_url': 'about:blank', 'goal': 'PLACEHOLDER_GOAL'}, # 空白页启动
wait_for_user_message=False, # 不等待用户消息
headless=True, # 无头模式(无GUI界面,节省资源)
disable_env_checker=True, # 禁用环境检查(提升启动速度)
tags_to_mark='all', # 标记所有DOM元素(便于交互)
timeout=100000, # 超时时间
pw_context_kwargs={'accept_downloads': True}, # 允许文件下载
pw_chromium_kwargs={'downloads_path': '/workspace/.downloads/'}, # 下载文件保存路径
)
# 重置环境,获取初始观察结果与信息
obs, info = env.reset()
logger.info('Successfully called env.reset')
# 评估模式专用:记录目标与图片URL(用于后续评估)
self.eval_goal = None
self.goal_image_urls = []
self.eval_rewards: list[float] = []
if self.eval_mode:
self.eval_goal = obs['goal']
# 处理目标对象中的文本与图片URL
if 'goal_object' in obs:
obs['goal_object'] = list(obs['goal_object'])
if len(obs['goal_object']) > 0:
self.eval_goal = obs['goal_object'][0]['text'] # 取第一个目标文本
# 收集目标中的图片URL
for message in obs['goal_object']:
if message['type'] == 'image_url':
image_src = message['image_url']
if isinstance(image_src, dict):
image_src = image_src['url'] # 处理嵌套URL格式
self.goal_image_urls.append(image_src)
logger.debug(f'Browsing goal: {self.eval_goal}')
logger.info('Browser env started.')
# 循环处理请求(进程退出时终止)
while should_continue():
try:
# 检查是否有来自代理端的请求(超时0.01秒,非阻塞)
if self.browser_side.poll(timeout=0.01):
unique_request_id, action_data = self.browser_side.recv()
# 处理关闭请求:关闭环境并退出进程
if unique_request_id == 'SHUTDOWN':
logger.debug('SHUTDOWN recv, shutting down browser env...')
env.close()
return
# 处理存活检查请求:返回ALIVE状态
elif unique_request_id == 'IS_ALIVE':
self.browser_side.send(('ALIVE', None))
continue
# 评估模式专用请求:获取目标信息
if action_data['action'] == BROWSER_EVAL_GET_GOAL_ACTION:
self.browser_side.send(
(
unique_request_id,
{
'text_content': self.eval_goal, # 目标文本
'image_content': self.goal_image_urls, # 目标图片URL列表
},
)
)
continue
# 评估模式专用请求:获取奖励列表
elif action_data['action'] == BROWSER_EVAL_GET_REWARDS_ACTION:
self.browser_side.send(
(
unique_request_id,
{'text_content': json.dumps(self.eval_rewards)}, # 奖励列表JSON字符串
)
)
continue
# 处理普通浏览器动作请求
action = action_data['action']
# 执行动作,获取结果(观察值、奖励、终止状态等)
obs, reward, terminated, truncated, info = env.step(action)
# 评估模式:记录奖励
if self.eval_mode:
self.eval_rewards.append(reward)
# 处理网页文本内容:DOM对象转字符串后转为纯文本
html_str = flatten_dom_to_str(obs['dom_object'])
obs['text_content'] = self.html_text_converter.handle(html_str)
# 处理观察结果序列化(确保可通过管道传输)
# 1. 标记元素截图转为Base64 URL
obs['set_of_marks'] = image_to_png_base64_url(
overlay_som(
obs['screenshot'], obs.get('extra_element_properties', {})
),
add_data_prefix=True,
)
# 2. 网页截图转为Base64 URL
obs['screenshot'] = image_to_png_base64_url(
obs['screenshot'], add_data_prefix=True
)
# 3. 转换为Python原生类型(numpy数组转普通整数)
obs['active_page_index'] = obs['active_page_index'].item()
obs['elapsed_time'] = obs['elapsed_time'].item()
# 将结果发送回代理端
self.browser_side.send((unique_request_id, obs))
except KeyboardInterrupt:
logger.debug('Browser env process interrupted by user.')
try:
env.close()
except Exception:
pass
return
def step(self, action_str: str, timeout: float = 120) -> dict:
"""在浏览器环境中执行动作,返回序列化的观察结果。
参数:
action_str: 待执行的浏览器动作(如点击、输入、导航等)
timeout: 超时时间(默认120秒)
返回:
dict: 浏览器观察结果(含网页文本、截图、元素信息等)
"""
# 生成唯一请求ID(用于匹配响应)
unique_request_id = str(uuid.uuid4())
# 发送动作请求到浏览器进程
self.agent_side.send((unique_request_id, {'action': action_str}))
start_time = time.time()
# 循环等待响应
while True:
# 检查进程是否退出或超时
if should_exit() or time.time() - start_time > timeout:
raise TimeoutError('Browser environment took too long to respond.')
# # 检查是否有响应(非阻塞,超时0.01秒)
if self.agent_side.poll(timeout=0.01):
response_id, obs = self.agent_side.recv()
# 匹配请求ID,返回对应的观察结果
if response_id == unique_request_id:
return dict(obs)
def check_alive(self, timeout: float = 60) -> bool:
"""检查浏览器进程是否存活。
参数:
timeout: 超时时间(默认60秒)
返回:
bool: 存活返回True,否则返回False
"""
# 发送存活检查请求
self.agent_side.send(('IS_ALIVE', None))
# 等待响应(超时时间内)
if self.agent_side.poll(timeout=timeout):
response_id, _ = self.agent_side.recv()
# 响应ID为"ALIVE"表示进程存活
if response_id == 'ALIVE':
return True
logger.debug(f'Browser env is not alive. Response ID: {response_id}')
return False
def close(self) -> None:
"""关闭浏览器环境,释放进程与通信资源。"""
# 若进程已终止,直接返回
if not hasattr(self, 'process') or not self.process.is_alive():
return
try:
# 发送关闭请求到浏览器进程
self.agent_side.send(('SHUTDOWN', None))
# 等待进程终止(最多5秒)
self.process.join(5)
# 若进程仍存活,强制终止
if self.process.is_alive():
logger.error(
'Browser process did not terminate, forcefully terminating...'
)
self.process.terminate()
self.process.join(5)
# 若仍存活,强制杀死进程
if self.process.is_alive():
self.process.kill()
self.process.join(5)
# 关闭通信管道
self.agent_side.close()
self.browser_side.close()
except Exception as e:
logger.error(f'Encountered an error when closing browser env: {e}')
0xFF 参考
https://docs.all-hands.dev/openhands/usage/architecture/runtime
浙公网安备 33010602011771号