nats rpc 支持reactor 模式调用
以前简单说过nats rpc 框架的处理,实际上基于reactor 模式已经是当前比较重要的玩法了,尤其是spring boot 周边的
参考定义
@RpcClient(
serviceEndpoint = "tenantauthservicev4",
serviceName = "dalong",
servicePrefix = "global"
)
public interface MyRpc {
Mono<DemoMessage> echoDemo(DemoMessage demoMessage, Headers headers);
}
使用
类似openfeign 模式
@Bean
public RpcServiceReactorClient rpcServiceReactorClient(ObjectMapper objectMapper, Connection connection) {
return RpcServiceReactorClient.builder()
.objectMapper(objectMapper)
.connection(connection)
.build();
}
@Bean
public MyRpc myRpc(RpcServiceReactorClient client) {
return client.target(MyRpc.class);
}
调用
var res = myRpc.echoDemo(msg, null).block();
myRpc.echoDemo(msg, null)
.subscribe(demoMessage -> {
System.out.println("Received response: " + demoMessage);
long end = System.currentTimeMillis();
System.out.println("Time taken: " + (end - start) + " ms");
});
RpcServiceReactorClient 内部处理
就是动态代理, 以下只提供核心部分,实际可以查看github 源码,内部使用了
Type returnType = method.getGenericReturnType();
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalStateException("Return type must be parameterized (Mono<T>)");
}
ParameterizedType parameterizedType = (ParameterizedType) returnType;
Type innerType = parameterizedType.getActualTypeArguments()[0];
JavaType javaType = objectMapper.getTypeFactory()
.constructType(innerType);
if (returnType == Void.TYPE) {
// 为void类型,不需要等待响应,直接发送消息
return null;
}
CompletableFuture<Message> msg;
if (headers != null) {
msg = nats.requestWithTimeout(subject, headers, req);
} else {
msg = nats.requestWithTimeout(subject, req);
}
Mono<Object> monoResult = Mono.fromFuture(msg)
// 放在弹性线程池上执行,以避免阻塞事件循环
.subscribeOn(Schedulers.boundedElastic())
.flatMap(data -> {
Object value = null;
try {
value = objectMapper.readValue(data.getData(), javaType);
return Mono.just(value);
} catch (IOException e) {
return Mono.error(e);
}
});
return monoResult;
说明
以上只是一个简单说明,实际可以查看完整源码
参考资料
https://github.com/rongfengliang/nats-rpc-springboot-starter
浙公网安备 33010602011771号