首頁(yè)技術(shù)文章正文

Web前端培訓(xùn):深入理解Redis分布式鎖

更新時(shí)間:2022-11-11 來(lái)源:黑馬程序員 瀏覽量:

IT培訓(xùn)班

  相信很多同學(xué)都聽(tīng)說(shuō)過(guò)分布式鎖,但也僅僅停留在概念的理解上,這篇文章會(huì)從分布式鎖的應(yīng)用場(chǎng)景講起,從實(shí)現(xiàn)的角度上深度剖析redis如何實(shí)現(xiàn)分布式鎖。

  一、超賣(mài)問(wèn)題

  我們先來(lái)看超賣(mài)的概念:

  當(dāng)寶貝庫(kù)存接近0時(shí),如果多個(gè)買(mǎi)家同時(shí)付款購(gòu)買(mǎi)此寶貝,或者店鋪后臺(tái)在架數(shù)量大于倉(cāng)庫(kù)實(shí)際數(shù)量,將會(huì)出現(xiàn)超賣(mài)現(xiàn)象。超賣(mài)現(xiàn)象本質(zhì)上就是買(mǎi)到了比倉(cāng)庫(kù)中數(shù)量更多的寶貝。

  > 本文主要解決超賣(mài)問(wèn)題的第一種,同時(shí)多人購(gòu)買(mǎi)寶貝時(shí),造成超賣(mài)。

  測(cè)試代碼

  那么超賣(mài)問(wèn)題是如何產(chǎn)生的呢?我們準(zhǔn)備一段代碼進(jìn)行測(cè)試:

@Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 第一種實(shí)現(xiàn),進(jìn)程內(nèi)就存在線程安全問(wèn)題
     * 可以只啟動(dòng)一個(gè)進(jìn)程測(cè)試
     */
    @RequestMapping("/deduct_stock1")
    public void deductStock1(){

        String stock = stringRedisTemplate.opsForValue().get("stock");
        int stockNum = Integer.parseInt(stock);
        if(stockNum > 0){
            //設(shè)置庫(kù)存減1
            int realStock = stockNum - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("設(shè)置庫(kù)存" + realStock);
        }else{
            System.out.println("庫(kù)存不足");
        }

    }

  這段代碼中,使用redis先獲取庫(kù)存數(shù)量(當(dāng)然實(shí)際場(chǎng)景中不會(huì)只保存一個(gè)全局庫(kù)存數(shù),應(yīng)該根據(jù)每一個(gè)商品單元(sku)保存一份庫(kù)存數(shù))。

 String stock = stringRedisTemplate.opsForValue().get("stock");
 int stockNum = Integer.parseInt(stock);

  接下來(lái),判斷庫(kù)存數(shù)是否大于0:

  - 如果大于0,將庫(kù)存數(shù)減一,通過(guò)set命令,寫(xiě)回redis

  >這里沒(méi)有使用redis的decrement命令,因?yàn)榇嗣钤趓edis單線程模型下是線程安全的,而為了可以模擬線程不安全的情況將其拆成三步操作。

  //設(shè)置庫(kù)存減1
  int realStock = stockNum - 1;
  stringRedisTemplate.opsForValue().set("stock",realStock + "");
  System.out.println("設(shè)置庫(kù)存" + realStock);

  - 如果小于等于0,提示庫(kù)存不足

  JMeter測(cè)試

  通過(guò)JMeter進(jìn)行并發(fā)測(cè)試,看下會(huì)不會(huì)出現(xiàn)超賣(mài)的問(wèn)題:

      1.啟動(dòng)tomcat

  這種情況下,只需要啟動(dòng)一個(gè)tomcat就會(huì)出現(xiàn)超賣(mài)。我們先啟動(dòng)一個(gè)tomcat在8080端口上。

1668135340736_1.jpg

  2.下載JMeter

  Apache JMeter是Apache組織開(kāi)發(fā)的基于Java的壓力測(cè)試工具。

  從官網(wǎng)上下載即可:

  [https://jmeter.apache.org/download_jmeter.cgi](https://links.jianshu.com/go?to=https%3A%2F%2Fjmeter.apache.org%2Fdownload_jmeter.cgi)

  下載完之后解壓,運(yùn)行bin目錄下的jmeter.bat,顯示如下界面:

1668135364378_2.jpg

  如果嫌字體太小,可以選擇放大:

1668135376942_3.jpg

  3.配置JMeter

  在Test Plan上點(diǎn)擊右鍵,創(chuàng)建`線程組(Thread Group)`

1668135398609_4.jpg

  配置一下具體參數(shù):

1668135413660_5.jpg

  - `Number of Threads` 同時(shí)并發(fā)線程數(shù)

  - `Ramp-Up Period(in-seconds)` 代表隔多長(zhǎng)時(shí)間執(zhí)行,0代表同時(shí)并發(fā)。假設(shè)線程數(shù)為100, 估計(jì)的點(diǎn)擊率為每秒10次, 那么估計(jì)的理想ramp-up period 就是 100/10 = 10 秒

  - `Loop Count` 循環(huán)次數(shù)

  > 這里給出500是為了直接測(cè)試并發(fā)500搶,看看能不能正好把500個(gè)貨物搶完。

  添加Http請(qǐng)求:

1668135439553_6.jpg

  添加請(qǐng)求URL:

1668135453144_7.jpg

  添加聚合結(jié)果,用來(lái)顯示整體的運(yùn)行情況:

1668135465198_8.jpg

  到此為止JMeter的配置結(jié)束。

  4.設(shè)置庫(kù)存量

  啟動(dòng)redis-server,使用redis-client連接:

1668135483864_9.jpg

  把庫(kù)存數(shù)設(shè)置為500。

  5.開(kāi)始測(cè)試

  點(diǎn)擊運(yùn)行按鈕,啟動(dòng)測(cè)試:

1668135500642_10.jpg

  首先我們看到聚合報(bào)告里輸出的結(jié)果:

1668135538911_11.jpg

  錯(cuò)誤率0%,樣本數(shù)500,證明500個(gè)請(qǐng)求都已經(jīng)執(zhí)行,但是發(fā)現(xiàn)控制臺(tái)輸出如下:

<img src="assets/12.png" alt="img" style="zoom:67%;" />

    很顯然,一份商品都被賣(mài)了多次,這顯然是不合理的。

  原因分析

  現(xiàn)在我們只啟動(dòng)了一個(gè)tomcat,在單jvm進(jìn)程的情況下,tomcat會(huì)使用線程池接收請(qǐng)求:

1668135598645_13.jpg

  而由于每個(gè)線程可能同時(shí)獲取到庫(kù)存量,所以庫(kù)存量在兩個(gè)線程中顯示的都是500,然后兩個(gè)線程就繼續(xù)進(jìn)行扣減庫(kù)存操作,得出499寫(xiě)回redis中,在這個(gè)過(guò)程中,顯然存在線程安全的問(wèn)題。同一個(gè)商品被賣(mài)出了2份,超賣(mài)問(wèn)題就出現(xiàn)了。

  二、加鎖優(yōu)化

  synchronized鎖

  要保證單jvm中線程安全,最簡(jiǎn)單直接的方式就是添加synchronized關(guān)鍵字,那么這樣行不行呢,我們來(lái)做一個(gè)測(cè)試:

  /**
     * 第二種實(shí)現(xiàn),使用synchronized加鎖
     * 可以只啟動(dòng)一個(gè)進(jìn)程測(cè)試
     */
    @RequestMapping("/deduct_stock2")
    public void deductStock2(){

        synchronized (this){
            String stock = stringRedisTemplate.opsForValue().get("stock");
            int stockNum = Integer.parseInt(stock);
            if(stockNum > 0){
                //設(shè)置庫(kù)存減1
                int realStock = stockNum - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("設(shè)置庫(kù)存" + realStock);
            }else{
                System.out.println("庫(kù)存不足");
            }
        }

    }

  在進(jìn)行扣減庫(kù)存前,先通過(guò)synchronized關(guān)鍵字,對(duì)資源加鎖,這樣就只有一個(gè)線程能進(jìn)入到扣減庫(kù)存的代碼塊中。來(lái)測(cè)試一下:

  重置庫(kù)存

set stock 500

  修改接口地址

1668135664179_14.jpg

  測(cè)試

<img src="assets/15.png" alt="img" style="zoom:67%;" />

  可以看到,庫(kù)存被扣減為0,并且沒(méi)有出現(xiàn)超賣(mài)的情況(設(shè)置了500庫(kù)存,并且500個(gè)人搶,正好搶完)。

  但是這種方案顯然是不行的,在生產(chǎn)環(huán)境上如果部署多個(gè)tomcat實(shí)例,那么就會(huì)出現(xiàn)如下情況:

1668135702539_16.jpg

  多個(gè)進(jìn)程無(wú)法共享jvm內(nèi)存中的鎖,所以會(huì)出現(xiàn)多把鎖,這種情況下也會(huì)出現(xiàn)超賣(mài)問(wèn)題。

  三、分布式鎖的實(shí)現(xiàn)

  多Tomcat實(shí)例下的超賣(mài)演示

  接下來(lái)我們演示一下如何在多個(gè)Tomcat情況下,演示超賣(mài)的問(wèn)題:

       1.啟動(dòng)兩個(gè)tomcat服務(wù)

  在IDEA中配置兩個(gè)spring boot的啟動(dòng)項(xiàng),使用vm參數(shù)指定不同的端口號(hào)

  ```undefined

  -Dserver.port=8080

  ```

1668135752623_17.jpg

1668135765972_18.jpg

  2.配置nginx

  編寫(xiě)~/nginx_redis/conf/nginx.conf如下:

user  nginx;
worker_processes  1;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';


upstream redislock{
     server 192.168.226.1:8080 weight=1;
     server 192.168.226.1:8081 weight=1;
}

 server {
        listen       80;
        server_name  localhost;
    location /{
           root html;
           proxy_pass http://redislock;
    }
}

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    include /etc/nginx/conf.d/*.conf;
}

  > 192.168.226.1這是我宿主機(jī)的IP

  準(zhǔn)備一個(gè)虛擬機(jī)(也可以使用windows下的nginx),使用docker啟動(dòng)nginx:

docker pull nginx
docker run -di -p 10085:80 --name nginx-redis-hc   -v ~/nginx_redis/html:/usr/share/nginx/html   -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf   -v ~/nginx_redis/logs:/var/log/nginx   nginx

  在宿主機(jī)下使用`虛擬機(jī)的IP地址:10085`訪問(wèn)nginx,如果出現(xiàn)如下頁(yè)面就代表成功:

1668135849014_19.jpg

  3.測(cè)試

  修改接口地址為nginx:

1668135863401_20.jpg

  運(yùn)行查看兩個(gè)tomcat的控制臺(tái):

  - tomcat1

<img src="assets/21.png" alt="img" style="zoom:67%;" />

  - tomcat2

<img src="assets/22.png" alt="img" style="zoom:67%;" />

  
       沒(méi)有將庫(kù)存清空,證明存在超賣(mài)問(wèn)題。

  手動(dòng)實(shí)現(xiàn)分布式鎖

  使用redis手動(dòng)實(shí)現(xiàn)分布式鎖,需要用到命令`setnx`。先來(lái)介紹一下setnx:

  SETNX key value[]

  > 可用版本: >= 1.0.0

  >

  > 時(shí)間復(fù)雜度: O(1)

  只在鍵 `key` 不存在的情況下, 將鍵 `key` 的值設(shè)置為 `value` 。

  若鍵 `key` 已經(jīng)存在, 則 `SETNX` 命令不做任何動(dòng)作。

  `SETNX` 是『SET if Not eXists』(如果不存在,則 SET)的簡(jiǎn)寫(xiě)。

  返回值

  命令在設(shè)置成功時(shí)返回 `1` , 設(shè)置失敗時(shí)返回 `0` 。

  代碼示例

redis> EXISTS job                # job 不存在
# job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 設(shè)置成功
(integer) 1

redis> SETNX job "code-farmer"   # 嘗試覆蓋 job ,失敗
(integer) 0

redis> GET job                   # 沒(méi)有被覆蓋

  使用redis構(gòu)建分布式鎖流程如下:

1668135991872_23.jpg

  image.png

  - 線程1申請(qǐng)鎖(`setnx`),拿到了鎖。

  - 線程2申請(qǐng)鎖,由于線程1已經(jīng)擁有了鎖,`setnx`返回0失敗,這一步用戶操作會(huì)失敗。

  - 線程1執(zhí)行扣減庫(kù)存操作并釋放鎖。

  - 線程2再次申請(qǐng)鎖,獲取到鎖并執(zhí)行扣減庫(kù)存,然后釋放鎖。

  > 注意這里線程沒(méi)有拿到鎖,如果不嘗試while(true)重新獲取鎖,這個(gè)操作就直接失敗了。

  代碼實(shí)現(xiàn)

/**
     * 第三種實(shí)現(xiàn),使用redis中的setIfAbsent(setnx命令)實(shí)現(xiàn)分布式鎖
     */
    @RequestMapping("/deduct_stock3")
    public void deductStock3(){

        //在獲取到鎖的時(shí)候,給鎖分配一個(gè)id
        String opId = UUID.randomUUID().toString();
        Boolean stockLock = stringRedisTemplate
                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);

        if(stockLock){

            try{
                String stock = stringRedisTemplate.opsForValue().get("stock");
                int stockNum = Integer.parseInt(stock);
                if(stockNum > 0){
                    //設(shè)置庫(kù)存減1
                    int realStock = stockNum - 1;
                    stringRedisTemplate.opsForValue().set("stock",realStock + "");
                    System.out.println("設(shè)置庫(kù)存" + realStock);
                }else{
                    System.out.println("庫(kù)存不足");
                }

            }catch(Exception e){
                e.printStackTrace();
            }finally {
                if(opId.equals(stringRedisTemplate
                        .opsForValue().get("stockLock"))){
                    stringRedisTemplate.delete("stockLock");
                }
            }

        }

    }

  測(cè)試略過(guò),這里有幾個(gè)知識(shí)點(diǎn)需要說(shuō)明

  setIfAbsent設(shè)置超時(shí)

  如果setIfAbsent不設(shè)置超時(shí)時(shí)間,假設(shè)線程執(zhí)行業(yè)務(wù)代碼時(shí)間時(shí)死鎖或者其他原因?qū)е麻L(zhǎng)時(shí)間不釋放,那么會(huì)影響其他線程獲取到鎖,這個(gè)時(shí)候整體業(yè)務(wù)就會(huì)出現(xiàn)不可用。

Boolean stockLock = stringRedisTemplate
                .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);

  設(shè)置超時(shí)時(shí)間為30秒,該時(shí)間一般大于業(yè)務(wù)執(zhí)行的最大時(shí)間。

  每次獲取到鎖,設(shè)置唯一ID

  考慮這樣的場(chǎng)景

1668136077380_24.jpg

  - 線程1獲取鎖扣減庫(kù)存,但是由于操作不當(dāng),長(zhǎng)時(shí)間卡住,這樣會(huì)觸發(fā)超時(shí)時(shí)間鎖被釋放。

  - 線程2獲取到鎖,扣減庫(kù)存。

  - 線程1的代碼拋出異常,執(zhí)行finally釋放鎖,但是釋放的是進(jìn)程B的鎖。

  解決方案就是在**加鎖前生成UUID**,釋放的時(shí)候校驗(yàn)UUID是否正確,如果不正確,說(shuō)明加鎖線程不是當(dāng)前線程。

  使用Redisson實(shí)現(xiàn)分布式鎖

  setnx雖好,但是實(shí)現(xiàn)起來(lái)畢竟太過(guò)麻煩,一不小心就可能陷入并發(fā)編程的陷阱中,那么有沒(méi)有更加簡(jiǎn)單的實(shí)現(xiàn)方式呢?答案就是`redisson`。

  > Redisson是架設(shè)在[Redis](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.oschina.net%2Fp%2Fredis)基礎(chǔ)上的一個(gè)Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)?!綶Redis官方推薦](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.redis.io%2Fclients)】

  > Redisson在基于NIO的[Netty](https://links.jianshu.com/go?to=http%3A%2F%2Fnetty.io%2F)框架上,充分的利用了Redis鍵值數(shù)據(jù)庫(kù)提供的一系列優(yōu)勢(shì),在Java實(shí)用工具包中常用接口的基礎(chǔ)上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡(jiǎn)化了分布式環(huán)境中程序相互之間的協(xié)作。

  總而言之,`redisson`提供了一系列較為完善的工具類,其中就包含了分布式鎖。用`redisson`實(shí)現(xiàn)分布式鎖的流程極為簡(jiǎn)單。

  引入依賴

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.14.0</version>
        </dependency>

  創(chuàng)建Redisson實(shí)例

  @Bean
    public RedissonClient redisson(){
        // 1. Create config object
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
//        config.useClusterServers()
//                // use "rediss://" for SSL connection
//                .addNodeAddress("redis://127.0.0.1:7181");
        return Redisson.create(config);
    }

  編寫(xiě)分布式鎖代碼

  @Autowired
    private RedissonClient redissonClient;
    /**
     * 第四種實(shí)現(xiàn),使用redisson實(shí)現(xiàn)
     */
    @RequestMapping("/deduct_stock4")
    public void deductStock4(){

        RLock lock = redissonClient.getLock("redisson:stockLock");
        try{
            //加鎖
            lock.lock();
            String stock = stringRedisTemplate.opsForValue().get("stock");
            int stockNum = Integer.parseInt(stock);
            if(stockNum > 0){
                //設(shè)置庫(kù)存減1
                int realStock = stockNum - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("設(shè)置庫(kù)存" + realStock);
            }else{
                System.out.println("庫(kù)存不足");
            }

        }catch(Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

  其中加鎖代碼基本與進(jìn)程內(nèi)加鎖一致,就不再詳細(xì)解讀,讀者自行實(shí)踐即可。

  Redisson分布式鎖原理

  `Redisson分布式鎖`的主要原理非常簡(jiǎn)單,利用了lua腳本的原子性。

  在分布式環(huán)境下產(chǎn)生并發(fā)問(wèn)題的主要原因是三個(gè)操作并不是原子操作:

  - 獲取庫(kù)存

  - 扣減庫(kù)存

  - 寫(xiě)入庫(kù)存

  那么如果我們把三個(gè)操作合并為一個(gè)操作,在默認(rèn)單線程的Redis中運(yùn)行,是不會(huì)產(chǎn)生并發(fā)問(wèn)題的。源碼如下:

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

  這一段源碼中,`redisson`利用了lua腳本的原子性,校驗(yàn)key是否存在,如果不存在就創(chuàng)建key并利用incrby加一操作(這步操作主要是為了實(shí)現(xiàn)可重入性)。`redisson`實(shí)現(xiàn)的分布式鎖具備如下特性:

  - 鎖失效

  - 鎖續(xù)租

  > 執(zhí)行時(shí)間長(zhǎng)的鎖快要到期時(shí)會(huì)自動(dòng)續(xù)租

  - 可重入

  - 操作原子性

  鎖續(xù)租原理

  使用如下代碼進(jìn)行測(cè)試鎖續(xù)租的情況

@Test
void test() throws InterruptedException {
    RLock testlock1111 = redissonClient.getLock("testlock");
    testlock1111.lock();
    try{
        Thread thread = new Thread(() -> {
            while(true){
                Long testlock = redisTemplate.getExpire("testlock");
                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) + " ttl:" + testlock);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
        thread.join();
    }finally {
        if(testlock1111.isHeldByCurrentThread()){
            testlock1111.unlock();
        }
    }

}

  我們會(huì)發(fā)現(xiàn),每隔10秒會(huì)自動(dòng)續(xù)租一次,保證鎖不被釋放。

<img src="assets/25.png" alt="image-20220721153251169" style="zoom:67%;" />

  那么這種續(xù)租的行為是如何實(shí)現(xiàn)的呢?考慮這種情況:如果線程加鎖之后,進(jìn)程宕機(jī),線程無(wú)法執(zhí)行解鎖代碼,那么這個(gè)鎖就無(wú)法得到釋放(注意,不是加鎖線程不允許亂解鎖),為了避免這種情況的發(fā)生,鎖都會(huì)設(shè)置一個(gè)過(guò)期時(shí)間。比如使用**lock**無(wú)參命令會(huì)默認(rèn)設(shè)置30秒的過(guò)期時(shí)間。那么30秒之后呢?如果線程還在工作,自動(dòng)釋放依然會(huì)產(chǎn)生線程安全的問(wèn)題。所以Redisson使用了watch dog看門(mén)狗機(jī)制來(lái)實(shí)現(xiàn)自動(dòng)續(xù)租。

  核心代碼及注釋:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //lock()無(wú)參方法leaseTime為-1,所以進(jìn)else分值
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //通過(guò)lua腳本加鎖
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // 異步方法,等到加鎖成功會(huì)回調(diào),第一次加鎖ttlRemaining為空,leaseTime為-1
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                //設(shè)置延時(shí)任務(wù)
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

  接下來(lái)分析scheduleExpirationRenewal的過(guò)程:

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //創(chuàng)建一個(gè)延遲任務(wù)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //執(zhí)行l(wèi)ua腳本進(jìn)行續(xù)租
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                //執(zhí)行l(wèi)ua續(xù)租,鎖還在就續(xù)租,鎖不在返回false就取消續(xù)租的行為
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);  //internalLockLeaseTime默認(rèn)值30,所以每10秒會(huì)續(xù)租一次,續(xù)租到30秒
   
    ee.setTimeout(task);
}

  其中,renewExpirationAsync執(zhí)行的lua腳本如下:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

  判斷hash中是否存在鎖,如果存在就設(shè)置過(guò)期時(shí)間為30秒,返回1。如果不存在就返回0。

  總結(jié)

  本文介紹了超賣(mài)問(wèn)題產(chǎn)生的原因:操作不具備原子性,同時(shí)提出了集中解決思路。

  - `synchronized鎖`,無(wú)法保證多實(shí)例下的線程安全

  - `setnx`手動(dòng)實(shí)現(xiàn),坑很多、代碼較為復(fù)雜

  - `redisson`實(shí)現(xiàn),能夠保證多實(shí)例下線程安全,代碼簡(jiǎn)單可靠

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!