跳到主要内容

发布订阅

基础概念

  • 发布者:发送消息到指定主题
  • 订阅者:订阅主题并处理消息
  • 主题:消息分类标识,支持正则表达式匹配
  • 消费组(GroupID):协同消费机制,同组内只有一个消费者处理消息

重要说明

发布订阅:使用具体主题名称

zhub.subscribe("user-login", message -> { ... });
zhub.publish("user-login", "用户登录");

消费组机制:同组内消息只被一个消费者处理

// 消费者A和B属于同一组,消息只会被其中一个处理
ZHubClient consumerA = new ZHubClient("127.0.0.1:1216", "order-group", "app-a", "token");
ZHubClient consumerB = new ZHubClient("127.0.0.1:1216", "order-group", "app-b", "token");

// 消费者C属于不同组,会收到所有消息
ZHubClient consumerC = new ZHubClient("127.0.0.1:1216", "notification-group", "app-c", "token");

权限配置:支持正则表达式

users:
- id: 1
username: "user-service"
reads: ["user.*"] # 匹配所有 user.* 主题
writes: ["user.*"]

协同消费

消费组机制

工作原理:同组内只有一个消费者处理消息,不同组独立消费

// 负载均衡:同组内竞争消费
ZHubClient service1 = new ZHubClient("127.0.0.1:1216", "order-group", "app-1", "token");
ZHubClient service2 = new ZHubClient("127.0.0.1:1216", "order-group", "app-2", "token");
// 只有其中一个处理消息

// 消息广播:不同组都收到消息
ZHubClient notify1 = new ZHubClient("127.0.0.1:1216", "notify-group-1", "notify-1", "token");
ZHubClient notify2 = new ZHubClient("127.0.0.1:1216", "notify-group-2", "notify-2", "token");
// 两个都会收到消息

应用场景

  • 负载均衡:同组内多个服务竞争处理
  • 消息广播:不同组都接收消息
  • 数据去重:同组内确保只处理一次

基础使用

1. 基础发布订阅

字符串消息

// 订阅
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});

// 发布
zhub.publish("user-login", "用户ID: 12345");

类型化消息

// 定义消息类型
public class UserLoginEvent {
private String userId;
private String username;
private long loginTime;
// getter/setter...
}

// 订阅类型化消息(使用TypeToken)
zhub.subscribe("user-login", new TypeToken<UserLoginEvent>(){}, event -> {
System.out.println("用户: " + event.getUsername());
});

// 发布类型化消息
UserLoginEvent event = new UserLoginEvent("12345", "张三", System.currentTimeMillis());
zhub.publish("user-login", event);

基础类型

// 整数消息(使用IType)
zhub.subscribe("user-count", IType.INT, count -> {
System.out.println("用户数: " + count);
});
zhub.publish("user-count", 100);

// Map 消息(使用IType)
zhub.subscribe("user-info", IType.MAP, info -> {
System.out.println("用户信息: " + info);
});
Map<String, String> userInfo = Map.of("userId", "12345", "username", "张三");
zhub.publish("user-info", userInfo);

2. 多主题订阅

分别订阅

zhub.subscribe("user-profile", message -> {
System.out.println("用户资料: " + message);
});
zhub.subscribe("user-login", message -> {
System.out.println("用户登录: " + message);
});

逗号分隔订阅

// 同时订阅多个主题
zhub.subscribe("user-profile,user-login,user-logout", message -> {
System.out.println("用户消息: " + message);
});

// 类型化消息(使用TypeToken)
zhub.subscribe("order-create,order-payment", new TypeToken<OrderEvent>(){}, event -> {
System.out.println("订单事件: " + event.getOrderId());
});

3. 广播消息

// 广播给所有客户端
zhub.broadcast("topic-abc", "hello!");

高级功能

1. 延时消息

// 延时5分钟后发送消息
zhub.delay("reminder-email", "发送提醒邮件", 5 * 60 * 1000);

2. 消息过滤

zhub.subscribe("user-notification", message -> {
if (message.contains("VIP")) {
System.out.println("VIP用户消息: " + message);
}
});

3. 批量处理

private final List<String> messageBatch = new ArrayList<>();

zhub.subscribe("data-sync", message -> {
synchronized (messageBatch) {
messageBatch.add(message);
if (messageBatch.size() >= 100) {
processBatch(new ArrayList<>(messageBatch));
messageBatch.clear();
}
}
});

最佳实践

Topic 命名user-profile-updateorder-payment-success

异常处理

zhub.subscribe("critical-task", message -> {
try {
processTask(message);
} catch (Exception e) {
logger.error("处理失败", e);
}
});

消息特性

  • 顺序保证:单个主题内消息严格按发送顺序处理
  • 非持久化:默认不持久化,重启后消息丢失
  • 高性能:内存处理,支持高并发消息传递
  • 容量限制:服务端通道容量500条,满时消息会被丢弃

注意事项

  • 避免创建大量对象
  • 注意消息积压:服务端通道容量500条,满时消息会被丢弃