LangGraph4j 学习系列(6)-并行工作流

上节继续,本篇将学习如何实现并行工作流。

image

 上面这张图,用代码很容易绘制,参考以下代码。

核心代码

public static StateGraph<AgentState> getParallelGraph() throws GraphStateException {
    return new StateGraph<>(AgentState::new)
            .addNode("node-1", node_async(new Node1Action()))
            .addNode("node-2", node_async(new Node2Action()))
            .addNode("node-3", node_async(new Node3Action()))
            .addEdge(START, "node-1")
            .addEdge("node-1", "node-2")
            .addEdge("node-1", "node-3")
            .addEdge("node-2", GraphDefinition.END)
            .addEdge("node-3", GraphDefinition.END);
}

 

性能问题

虽然图上看着貌似node-2,node-3并行在跑,但真的如此吗?我们把node-2和node-3的apply()里加点sleep
public class Node2Action implements NodeAction<AgentState> {
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-2");
        Thread.sleep(1000);
        return Map.of("myData", "node2-my-value",
                "node2Key", "node2-value");
    }
}

  

public class Node3Action implements NodeAction<AgentState> {
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-3");
        Thread.sleep(1000);
        return Map.of("myData", "node3-my-value",
                "node3Key", "node3-value");
    }
}

然后在node-1里,记录下start时间戳

public class Node1Action implements NodeAction<AgentState> {
    
    @Override
    public Map<String, Object> apply(AgentState state) throws Exception {
        System.out.println("current Node: node-1");
        Thread.sleep(1000);
        return Map.of(
                "myData", "node1-my-value",
                "node1Key", "node1-value",
                //记录开始时间
                "start", System.currentTimeMillis());
    }
    
}

  

getParallelGraph().compile()
        .invoke(Map.of("test", "test-init-value"))
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
        });

 

运行结果

current Node: node-1
current Node: node-2
current Node: node-3
{node1Key=node1-value, start=1770719927373, test=test-init-value, node2Key=node2-value, node3Key=node3-value, myData=node3-my-value}
2017ms

从第5行来看,耗时2秒+,显然是顺序执行的。
 

多线程提速

LangGraph4可以手动指定线程池实现真正的并发处理。
StateGraph<AgentState> graphNoThreadPool = getParallelGraph();

ExecutorService executorService = Executors.newFixedThreadPool(2);
RunnableConfig rc = RunnableConfig.builder()
        //从node-1开始并行执行node-2和node-3(使用线程池)
        .addParallelNodeExecutor("node-1", executorService)
        .build();
graphNoThreadPool.compile()
        .invoke(Map.of("test", "test-init-value"), rc) //调用时,使用特定的RunnableConfig
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
            //记得关闭线程池
            executorService.shutdown();
        });

  

运行结果
current Node: node-1
current Node: node-2
current Node: node-3
{node1Key=node1-value, start=1770722528938, test=test-init-value, node2Key=node2-value, node3Key=node3-value, myData=node3-my-value}
1015ms


明显快多了。如果是jdk 25版本,也可以使用虚拟线程:

ExecutorService virtualThreadPerTaskExecutor = Executors.newVirtualThreadPerTaskExecutor();
RunnableConfig rc2 = RunnableConfig.builder()
        //从node-1开始并行执行node-2和node-3(使用线程池)
        .addParallelNodeExecutor("node-1", virtualThreadPerTaskExecutor)
        .build();
graphNoThreadPool2.compile()
        .invoke(Map.of("test", "test-init-value"), rc2)
        .ifPresent(c -> {
            long start = (long) c.data().getOrDefault("start", 0L);
            System.out.println(c.data());
            long end = System.currentTimeMillis();
            System.out.println((end - start) + "ms");
        });

  

文中源码:langgraph4j-study/src/main/java/org/bsc/langgraph4j/agent/_07_parallel at main · yjmyzz/langgraph4j-study · GitHub

posted @ 2026-03-01 18:53  菩提树下的杨过  阅读(36)  评论(0)    收藏  举报