发布订阅
基础概念
- 发布者:发送消息到指定主题
- 订阅者:订阅主题并处理消息
- 主题:消息分类标识,支持正则表达式匹配
- 消费组(GroupID):协同消费机制,同组内只有一个消费者处理消息
重要说明
发布订阅:使用具体主题名称
zhub.subscribe("user-login", message -> { ... });
zhub.publish("user-login", "用户登录");
消费组机制:同组内消息只被一个消费者处理
// 消费者A和B属于同一组,消息只会被其中一个处理
ZHubClient consumerA = new ZHubClient("127.0.0.1:1216", "order-group", "app-a", "token");
ZHubClient consumerB = new ZHubClient("127.0.0.1:1216", "order-group", "app-b", "token");
// 消费者C属于不同组,会收到所有消息
ZHubClient consumerC = new ZHubClient("127.0.0.1:1216", "notification-group", "app-c", "token");
权限配置:支持正则表达式
users:
- id: 1
username: "user-service"
reads: ["user.*"] # 匹配所有 user.* 主题
writes: ["user.*"]
协同消费
消费组机制
工作原理:同组内只有一个消费者处理消息,不同组独立消费
// 负载均衡:同组内竞争消费
ZHubClient service1 = new ZHubClient("127.0.0.1:1216", "order-group", "app-1", "token");
ZHubClient service2 = new ZHubClient("127.0.0.1:1216", "order-group", "app-2", "token");
// 只有其中一个处理消息
// 消息广播:不同组都收到消息
ZHubClient notify1 = new ZHubClient("127.0.0.1:1216", "notify-group-1", "notify-1", "token");
ZHubClient notify2 = new ZHubClient("127.0.0.1:1216", "notify-group-2", "notify-2", "token");
// 两个都会收到消息
应用场景:
- 负载均衡:同组内多个服务竞争处理
- 消息广播:不同组都接收消息
- 数据去重:同组内确保只处理一次
基础使用
1. 基础发布订阅
字符串消息:
// 订阅
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});
// 发布
zhub.publish("user-login", "用户ID: 12345");
类型化消息:
// 定义消息类型
public class UserLoginEvent {
private String userId;
private String username;
private long loginTime;
// getter/setter...
}
// 订阅类型化消息(使用TypeToken)
zhub.subscribe("user-login", new TypeToken<UserLoginEvent>(){}, event -> {
System.out.println("用户: " + event.getUsername());
});
// 发布类型化消息
UserLoginEvent event = new UserLoginEvent("12345", "张三", System.currentTimeMillis());
zhub.publish("user-login", event);
基础类型:
// 整数消息(使用IType)
zhub.subscribe("user-count", IType.INT, count -> {
System.out.println("用户数: " + count);
});
zhub.publish("user-count", 100);
// Map 消息(使用IType)
zhub.subscribe("user-info", IType.MAP, info -> {
System.out.println("用户信息: " + info);
});
Map<String, String> userInfo = Map.of("userId", "12345", "username", "张三");
zhub.publish("user-info", userInfo);
2. 多主题订阅
分别订阅:
zhub.subscribe("user-profile", message -> {
System.out.println("用户资料: " + message);
});
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});
逗号分隔订阅:
// 同时订阅多个主题
zhub.subscribe("user-profile,user-login,user-logout", message -> {
System.out.println("用户消息: " + message);
});
// 类型化消息(使用TypeToken)
zhub.subscribe("order-create,order-payment", new TypeToken<OrderEvent>(){}, event -> {
System.out.println("订单事件: " + event.getOrderId());
});
3. 广播消息
// 广播给所有客户端
zhub.broadcast("topic-abc", "hello!");
高级功能
1. 延时消息
// 延时5分钟后发送消息
zhub.delay("reminder-email", "发送提醒邮件", 5 * 60 * 1000);
2. 消息过滤
zhub.subscribe("user-notification", message -> {
if (message.contains("VIP")) {
System.out.println("VIP用户消息: " + message);
}
});
3. 批量处理
private final List<String> messageBatch = new ArrayList<>();
zhub.subscribe("data-sync", message -> {
synchronized (messageBatch) {
messageBatch.add(message);
if (messageBatch.size() >= 100) {
processBatch(new ArrayList<>(messageBatch));
messageBatch.clear();
}
}
});
最佳实践
Topic 命名:user-profile-update
、order-payment-success
异常处理:
zhub.subscribe("critical-task", message -> {
try {
processTask(message);
} catch (Exception e) {
logger.error("处理失败", e);
}
});
消息特性
- 顺序保证:单个主题内消息严格按发送顺序处理
- 非持久化:默认不持久化,重启后消息丢失
- 高性能:内存处理,支持高并发消息传递
- 容量限制:服务端通道容量500条,满时消息会被丢弃
注意事项
- 避免创建大量对象
- 注意消息积压:服务端通道容量500条,满时消息会被丢弃