nats rpc 支持reactor 模式调用

以前简单说过nats rpc 框架的处理,实际上基于reactor 模式已经是当前比较重要的玩法了,尤其是spring boot 周边的

参考定义

@RpcClient(
    serviceEndpoint = "tenantauthservicev4",
    serviceName = "dalong",
    servicePrefix = "global"
)
public  interface MyRpc {
    Mono<DemoMessage> echoDemo(DemoMessage demoMessage, Headers headers);
}

使用

类似openfeign 模式

@Bean

public RpcServiceReactorClient rpcServiceReactorClient(ObjectMapper objectMapper, Connection connection) {
    return RpcServiceReactorClient.builder()
            .objectMapper(objectMapper)
            .connection(connection)
            .build();
}

@Bean
public MyRpc myRpc(RpcServiceReactorClient client) {
    return client.target(MyRpc.class);
}

调用

var res = myRpc.echoDemo(msg, null).block();

myRpc.echoDemo(msg, null)
    .subscribe(demoMessage -> {
        System.out.println("Received response: " + demoMessage);
        long end = System.currentTimeMillis();
        System.out.println("Time taken: " + (end - start) + " ms");
    });

RpcServiceReactorClient 内部处理

就是动态代理, 以下只提供核心部分,实际可以查看github 源码,内部使用了

Type returnType = method.getGenericReturnType();
if (!(returnType instanceof ParameterizedType)) {
    throw new IllegalStateException("Return type must be parameterized (Mono<T>)");
}
ParameterizedType parameterizedType = (ParameterizedType) returnType;
Type innerType = parameterizedType.getActualTypeArguments()[0];
JavaType javaType = objectMapper.getTypeFactory()
        .constructType(innerType);
if (returnType == Void.TYPE) {
    // 为void类型,不需要等待响应,直接发送消息
    return null;
}
CompletableFuture<Message> msg;

if (headers != null) {
    msg = nats.requestWithTimeout(subject, headers, req);
} else {
    msg = nats.requestWithTimeout(subject, req);
}
Mono<Object> monoResult = Mono.fromFuture(msg)
        // 放在弹性线程池上执行,以避免阻塞事件循环
        .subscribeOn(Schedulers.boundedElastic())
        .flatMap(data -> {
            Object value = null;
            try {
                value = objectMapper.readValue(data.getData(), javaType);
                return Mono.just(value);
            } catch (IOException e) {
               return Mono.error(e);
            }
        });
return  monoResult;

说明

以上只是一个简单说明,实际可以查看完整源码

参考资料

https://github.com/rongfengliang/nats-rpc-springboot-starter

posted on 2026-02-02 08:00  荣锋亮  阅读(4)  评论(0)    收藏  举报

导航