Skip to content

一、 执行流程

二、 源码分析

java
/**
 * Copyright (c) 2013-2024 Nikita Koksharov
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.redisson;

import io.netty.util.Timeout;
import org.redisson.api.RFuture;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.LockPubSub;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Distributed implementation of {@link java.util.concurrent.locks.Lock}
 * Implements reentrant lock.<br>
 * Lock will be removed automatically if client disconnects.
 * <p>
 * Implements a <b>non-fair</b> locking so doesn't guarantees an acquire order.
 *
 * @author Nikita Koksharov
 *
 */
public class RedissonLock extends RedissonBaseLock {

    // ....
  
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { 
        long threadId = Thread.currentThread().getId();
        // 第一步: 尝试获取锁,锁的失效时间
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);  
        // 第二步: 锁的失效时间空,上锁成功。//
        if (ttl == null) { 
            return;
        }
        // 第三步: 锁的失效时间存在,订阅(通过信号量)
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId); 
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }
        
        try { 
            // 第四步: 失效时间大于等于0时,不断重试加锁
            while (true) {  
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                if (ttl == null) {
                    break;
                }

              
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 第五步: 关闭订阅
            unsubscribe(entry, threadId);
        } 
//        get(lockAsync(leaseTime, unit));
    }
    
   //...
}
    1. 尝试获取锁,获取锁的失效时间
    • 1.1 失效时间 > 0, 尝试获取锁,当前使用Lua 脚本
    • 1.2 失效时间 < 0, 使用 watch_dog 失效时间(默认30秒) 进行过期续期,过期后每次续期10秒
    1. 加锁失败之后, 不断尝试进行加锁,