创建连接
支持的框架
普通Java、SpringBoot、Redkale、Golang
- 普通Java项目
- SpringBoot项目
- Redkale项目
- Golang项目
private ZHubClient zhub;
@Before
public void init() {
// 参数:地址、组名、appid(唯一)、token
zhub = new ZHubClient("127.0.0.1:1216", "test-hub", "DEV-LOCAL-001", "token-12345");
}
说明:
appid必须唯一,RPC 消息回复使用此标识groupid协同消费组,同组内只有一个消费者处理消息
1. 依赖
<dependency>
<groupId>dev.zhub</groupId>
<artifactId>zhub-client-spring</artifactId>
<version>0.1.0424.dev</version>
</dependency>
2. 配置 application.yml
zhub:
addr: 127.0.0.1:1216
groupid: spring-boot-app
appid: spring-boot-001 # 必须唯一
auth: token-12345
3. 配置类
@Configuration
public class ZHubConfig {
@Value("${zhub.addr}") private String addr;
@Value("${zhub.groupid}") private String groupId;
@Value("${zhub.appid}") private String appId;
@Value("${zhub.auth}") private String auth;
@Bean
public ZHubClient zhubClient() {
return new ZHubClient(addr, groupId, appId, auth);
}
}
4. 使用
@Service
public class UserService {
@Autowired
private ZHubClient zhub;
public void publishEvent(String userId, String event) {
zhub.publish("user-event", "{\"userId\":\"" + userId + "\",\"event\":\"" + event + "\"}");
}
@PostConstruct
public void init() {
zhub.subscribe("user-notification", message -> {
System.out.println("通知: " + message);
});
}
}
配置文件:
# source.properties
redkale.cluster.zhub[hub].addr = 127.0.0.1:1216
redkale.cluster.zhub[hub].auth = user@pwd123
redkale.cluster.zhub[hub].groupid = test-hub
使用:
@Resource(name = "hub")
protected ZHubClient zhub;
// Golang 连接示例
// TODO: 待补充
使用示例
- 基础订阅
- 多主题订阅
- 发布消息
// 单个主题
zhub.subscribe("user-login", message -> System.out.println("登录: " + message));
// 类型化消息(使用TypeToken)
zhub.subscribe("user-profile", new TypeToken<UserProfile>(){}, profile ->
System.out.println("用户: " + profile.getUsername()));
// 逗号分隔
zhub.subscribe("user-login,user-logout,user-register", message ->
System.out.println("用户操作: " + message));
// 字符串消息
zhub.publish("user-login", "用户ID: 12345");
// 类型化消息
UserProfile profile = new UserProfile("12345", "张三", "zhangsan@example.com");
zhub.publish("user-profile", profile);
性能优化
异步处理
// 避免阻塞消息处理
zhub.subscribe("topic-abc", message -> {
CompletableFuture.runAsync(() -> {
processMessage(message);
});
});
错误处理
zhub.subscribe("topic-abc", message -> {
try {
processMessage(message);
} catch (Exception e) {
logger.error("消息处理失败", e);
}
});