跳到主要内容

RPC 远程调用

rpc-flow.png

使用场景

  • 微服务间通信:用户服务调用订单服务
  • 数据查询服务:统一的数据查询接口
  • 业务逻辑封装:封装复杂业务逻辑供其他模块调用
  • 第三方服务集成:调用外部 API

基础示例

重要:RPC 通讯要求每个客户端使用唯一的 appid,消息回复使用此标识。

基础类型 RPC

服务端

// 订阅 RPC 服务
zhub.rpcSubscribe("rpc-b", IType.STRING, r -> {
String str = r.getValue();
System.out.println("接收到: " + str);
return r.render("处理结果: " + str);
});

客户端

// 调用 RPC 服务
RpcResult<String> result = zhub.rpc("rpc-b", "hello rpc", IType.STRING);
System.out.println("结果: " + result.getResult());

类型化 RPC

定义类型

// 请求类型
public class UserQueryRequest {
private String userId;
private String queryType;
// getter/setter...
}

// 响应类型
public class UserInfoResponse {
private String userId;
private String username;
private String email;
private long lastLoginTime;
// getter/setter...
}

服务端

zhub.rpcSubscribe("user-get-info", new TypeToken<UserQueryRequest>(){}, request -> {
String userId = request.getUserId();
String queryType = request.getQueryType();
return request.render(new UserInfoResponse(userId, "张三", "zhangsan@example.com", System.currentTimeMillis()));
});

客户端

UserQueryRequest request = new UserQueryRequest("12345", "basic");
RpcResult<UserInfoResponse> result = zhub.rpc("user-get-info", request, new TypeToken<UserInfoResponse>(){});

if (result.isSuccess()) {
UserInfoResponse userInfo = result.getResult();
System.out.println("用户: " + userInfo.getUsername());
} else {
System.out.println("调用失败: " + result.getError());
}

多客户端示例

服务端

ZHubClient serviceProvider = new ZHubClient("127.0.0.1:1216", "service-group", "service-provider-001", "token-12345");

// 单个 RPC 服务
serviceProvider.rpcSubscribe("user-get-info", IType.STRING, request -> {
return request.render("用户信息: " + request.getValue());
});

// 多个 RPC 服务(使用TypeToken)
serviceProvider.rpcSubscribe("user-get-info,user-get-profile", new TypeToken<UserQueryRequest>(){}, request -> {
switch (request.getQueryType()) {
case "getInfo":
return request.render(new UserInfoResponse(request.getUserId(), "张三", "zhangsan@example.com", System.currentTimeMillis()));
case "getProfile":
return request.render(new UserProfileResponse(request.getUserId(), "张三", 25, "北京"));
default:
return request.render(new UserInfoResponse(request.getUserId(), "未知用户", "", 0));
}
});

客户端A

ZHubClient clientA = new ZHubClient("127.0.0.1:1216", "client-group", "client-a-001", "token-12345");
RpcResult<String> result = clientA.rpc("user-get-info", "user123", IType.STRING);
System.out.println("结果: " + result.getResult());

客户端B

ZHubClient clientB = new ZHubClient("127.0.0.1:1216", "client-group", "client-b-002", "token-12345");
UserQueryRequest request = new UserQueryRequest("user456", "basic");
RpcResult<UserInfoResponse> result = clientB.rpc("user-get-info", request, new TypeToken<UserInfoResponse>(){});
if (result.isSuccess()) {
System.out.println("用户: " + result.getResult().getUsername());
}

类型化 RPC

使用 TypeToken 构建具体类型

服务端

// 定义请求和响应类型
public class OrderRequest {
private String orderId;
private String queryType;
// getter/setter...
}

public class OrderResponse {
private String orderId;
private String status;
private BigDecimal amount;
// getter/setter...
}

// 订阅类型化 RPC
zhub.rpcSubscribe("order-get-details", new TypeToken<OrderRequest>(){}, request -> {
OrderResponse response = new OrderResponse();
response.setOrderId(request.getOrderId());
response.setStatus("已支付");
response.setAmount(new BigDecimal("99.99"));
return request.render(response);
});

客户端

// 调用类型化 RPC
OrderRequest request = new OrderRequest("ORDER-123", "details");
RpcResult<OrderResponse> result = zhub.rpc("order-get-details", request, new TypeToken<OrderResponse>(){});

if (result.isSuccess()) {
OrderResponse order = result.getResult();
System.out.println("订单状态: " + order.getStatus());
System.out.println("订单金额: " + order.getAmount());
}