跳到主要内容

分布式锁

ZHub 提供了分布式锁功能,支持多客户端环境下的资源互斥访问控制。


功能特性

  • 分布式互斥: 多个客户端实例之间互斥访问共享资源
  • 自动超时: 支持锁超时自动释放,避免死锁
  • 非阻塞获取: 支持尝试获取锁,立即返回结果
  • 自动释放: 支持手动释放锁,确保资源及时释放

基础使用

1. 阻塞式获取锁

// 获取锁,会阻塞直到获取成功或超时
Lock lock = zhub.lock("resource-key", 30); // 30秒超时
try {
if (lock.success()) {
System.out.println("获取到锁,开始处理业务");
// 执行业务逻辑
Thread.sleep(5000); // 模拟业务处理
} else {
System.out.println("获取锁失败");
}
} finally {
lock.unLock(); // 释放锁
}

2. 非阻塞式获取锁

// 尝试获取锁,立即返回结果
Lock lock = zhub.tryLock("resource-key", 30);
if (lock.success()) {
try {
System.out.println("获取到锁,开始处理业务");
// 执行业务逻辑
} finally {
lock.unLock(); // 释放锁
}
} else {
System.out.println("未获取到锁,资源被其他客户端占用");
}

高级用法

1. 锁超时处理

public void processWithTimeout(String resourceKey, int timeoutSeconds) {
Lock lock = zhub.tryLock(resourceKey, timeoutSeconds);
if (!lock.success()) {
System.out.println("资源被占用,请稍后重试");
return;
}

try {
// 业务处理
processResource(resourceKey);
} finally {
lock.unLock();
}
}

2. 锁重试机制

public boolean processWithRetry(String resourceKey, int maxRetries) {
for (int i = 0; i < maxRetries; i++) {
Lock lock = zhub.tryLock(resourceKey, 10);
if (lock.success()) {
try {
// 业务处理
processResource(resourceKey);
return true;
} finally {
lock.unLock();
}
}

// 等待一段时间后重试
try {
Thread.sleep(1000 * (i + 1)); // 递增等待时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return false;
}

3. 锁监控和统计

public class LockMonitor {
private final ZHubClient zhub;
private final Map<String, LockInfo> lockStats = new ConcurrentHashMap<>();

public void acquireLock(String key, int timeout) {
long startTime = System.currentTimeMillis();
Lock lock = zhub.lock(key, timeout);

if (lock.success()) {
long acquireTime = System.currentTimeMillis() - startTime;
lockStats.put(key, new LockInfo(key, acquireTime, System.currentTimeMillis()));
System.out.println("锁获取成功,耗时: " + acquireTime + "ms");
}
}

public void releaseLock(String key) {
LockInfo info = lockStats.remove(key);
if (info != null) {
long holdTime = System.currentTimeMillis() - info.acquireTime;
System.out.println("锁持有时间: " + holdTime + "ms");
}
}

static class LockInfo {
String key;
long acquireTime;
long acquireDuration;

LockInfo(String key, long acquireDuration, long acquireTime) {
this.key = key;
this.acquireDuration = acquireDuration;
this.acquireTime = acquireTime;
}
}
}

实现原理

服务端实现

ZHub 服务端锁管理结构:

// 锁结构
type Lock struct {
Key string
UUID string
Duration int
Conn *ZConn
CreateTime time.Time
}

// 锁管理
type ZBus struct {
locks map[string][]*Lock // 锁映射表
// ... 其他字段
}

客户端实现

ZHub 客户端锁实现结构:

public class Lock {
protected String name; // 锁名称
protected String uuid; // 锁唯一标识
protected int duration; // 锁持续时间
protected boolean success; // 获取是否成功
protected ZHubClient hubClient; // 客户端引用
}

最佳实践

1. 锁命名规范

// 推荐:使用有意义的锁名称
String lockKey = "user:update:" + userId;
String lockKey = "order:process:" + orderId;
String lockKey = "cache:refresh:" + cacheType;

// 避免:使用无意义的锁名称
String lockKey = "lock1";
String lockKey = "temp";

2. 超时时间设置

// 根据业务处理时间设置合理的超时时间
int shortTaskTimeout = 5; // 短任务:5秒
int mediumTaskTimeout = 30; // 中等任务:30秒
int longTaskTimeout = 300; // 长任务:5分钟

// 避免设置过短或过长的超时时间
int tooShort = 1; // 太短,可能导致获取失败
int tooLong = 3600; // 太长,可能导致资源长时间占用

3. 异常处理

public void safeProcess(String resourceKey) {
Lock lock = null;
try {
lock = zhub.lock(resourceKey, 30);
if (lock.success()) {
// 业务处理
processResource(resourceKey);
}
} catch (Exception e) {
logger.error("处理资源时发生异常", e);
} finally {
if (lock != null && lock.success()) {
try {
lock.unLock();
} catch (Exception e) {
logger.error("释放锁时发生异常", e);
}
}
}
}

4. 性能优化

// 使用 tryLock 避免长时间阻塞
public boolean tryProcess(String resourceKey) {
Lock lock = zhub.tryLock(resourceKey, 5);
if (!lock.success()) {
return false; // 快速失败
}

try {
processResource(resourceKey);
return true;
} finally {
lock.unLock();
}
}

注意事项

1. 死锁预防

  • 避免嵌套锁的使用
  • 设置合理的锁超时时间
  • 确保锁的释放逻辑正确

2. 性能考虑

  • 锁的粒度要适中,不要过细或过粗
  • 避免长时间持有锁
  • 考虑使用非阻塞的 tryLock

3. 错误处理

  • 始终在 finally 块中释放锁
  • 处理锁获取失败的情况
  • 记录锁的使用情况用于监控

监控和调试

1. 锁状态查询

# 通过管理接口查看锁状态
curl http://127.0.0.1:711/_/info

2. 日志监控

// 在客户端代码中添加锁使用日志
logger.info("尝试获取锁: " + resourceKey);
Lock lock = zhub.lock(resourceKey, 30);
if (lock.success()) {
logger.info("锁获取成功: " + resourceKey);
} else {
logger.warn("锁获取失败: " + resourceKey);
}

3. 性能指标

  • 锁获取成功率
  • 锁平均持有时间
  • 锁等待时间
  • 锁竞争频率