一、 执行流程
二、 源码分析
java
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.util.Timeout;
import org.redisson.api.RFuture;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.LockPubSub;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.<br>
* Lock will be removed automatically if client disconnects.
* <p>
* Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order.
*
* @author Nikita Koksharov
*
*/
public class RedissonLock extends RedissonBaseLock {
// ....
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 第一步: 尝试获取锁,锁的失效时间
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// 第二步: 锁的失效时间空,上锁成功。//
if (ttl == null) {
return;
}
// 第三步: 锁的失效时间存在,订阅(通过信号量)
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
// 第四步: 失效时间大于等于0时,不断重试加锁
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 第五步: 关闭订阅
unsubscribe(entry, threadId);
}
// get(lockAsync(leaseTime, unit));
}
//...
}
- 尝试获取锁,获取
锁的失效时间
。
- 1.1 失效时间 > 0, 尝试获取锁,当前使用
Lua 脚本
。 - 1.2 失效时间 < 0, 使用
watch_dog
失效时间(默认30秒) 进行过期续期,过期后每次续期10秒
。
- 尝试获取锁,获取
- 加锁失败之后, 不断尝试进行加锁,