白筱汐

想都是问题,做都是答案

0%

分布式锁的实现

介绍

分布式锁是一种用于协调分布式系统中多个进程或线程之间访问共享资源的机制。在分布式环境中,由于多个进程或服务可能并发地访问共享资源,如果没有一种机制来同步它们的访问,可能会引发数据不一致或竞态条件等问题。

分布式锁可以保证在同一时间只有一个进程或服务能够获得对共享资源的访问权,从而避免了并发访问带来的问题。

本文将介绍三种分布式锁的实现方式:

  1. mysql悲观锁
  2. mysql乐观锁
  3. redis分布式锁

Mysql悲观锁

当我们要对数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制在修改数据之前锁定,再修改的方式被称为悲观并发控制(PCC)。

这也就是这种锁被称为 “悲观” 的原因,它会以悲观的态度去对待并发的数据控制,认为共享数据被并发修改的可能性较高,在修改之前先去加锁。在实现效率上,处理加锁的过程会让数据库产生额外的开销,降低并发度,同时,还可能会有死锁的可能。

悲观锁的实现,依赖于数据库提供的锁机制(行级锁、表级锁)。它的工作流程可以总结如下:

  • 对数据操作之前,尝试获取锁
  • 获取锁成功,对数据进行修改、提交事务,最后释放锁
  • 获取锁失败,则锁正在被占用,等待或抛出异常

在 MySQL 中使用悲观锁,必须关闭 MySQL 的自动提交(MySQL 默认使用自动提交模式,即执行 INSERT、UPDATE、DELETE 时,结果自动生效)

1
2
3
4
-- 关闭自动提交
SET autocommit = 0;
-- 校验自动提交是否关闭
SHOW VARIABLES LIKE 'autocommit';

MySQL 提供的悲观锁实现方式是:SELECT … FOR UPDATE

1
2
-- 通过悲观锁语法锁住 id 为 1 的记录
SELECT * FROM goods WHERE id = 1 FOR UPDATE;

SELECT … FOR UPDATE 只允许一个事务获取到锁,其他的事务只能等待或者超时
但是,在使用悲观锁的时候,一定需要注意使用索引,否则行锁将会上升为表锁,引起系统问题

go伪代码事例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 开启事务
tx := db.Begin()

// 通过悲观锁语法锁住 id 为 1 的记录
tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&Goods{ID:1}).First(&user)

// 业务操作
...

// 遇到错误时回滚事务
tx.Rollback()

// 提交事务
tx.Commit()

Mysql乐观锁

乐观锁其实是对 CAS(compare-and-swap)的实现:在做修改时,检查当前的环境是否与预定义的一致,如果一致则可以提交;否则,重试或抛异常。

可以想办法加入一个不会重复修改的值数据来作为版本号,即 version 参数,version 只能增加,不能减少。乐观锁在每次执行数据修改时,都需要去比对 version,如果一致,则更新数据的同时,也要更新 version。

1
2
3
4
5
6
7
8
-- 给 worker 表添加 version 列
ALTER TABLE `worker` ADD COLUMN `version` BIGINT(20) NOT NULL DEFAULT '0' COMMENT '乐观锁版本号';
-- 读取数据,记录 version 的值
SELECT * FROM worker WHERE id = 1;
-- 比对 version 是否符合预期,更新数据和 version
UPDATE worker SET salary = 2000, version = version + 1 WHERE id = 1 AND version = 0;
-- 再次读取数据,校验是否符合预期
SELECT * FROM worker WHERE id = 1;

乐观锁同样可以使用时间戳来实现,一样的道理.

go伪代码事例

1
2
3
4
5
6
7
8
9
10
11
for {
// 强制更新 Stock、Version字段,并时Version + 1
result := tx.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?", goodInfo.GoodsId, inv.Version).Updates(model.Inventory{Stocks: inv.Stocks, Version: inv.Version + 1})
if result.RowAffected == 0 {
// 更新失败重试
log.Printf()("更新失败")
} else {
// 退出循环
break
}
}

Redis分布式锁

Redis分布式锁是一种使用Redis实现的分布式协调机制,用于在分布式系统中控制多个节点对共享资源的并发访问。它通过利用Redis的原子性操作和特定的指令来实现同步和互斥,确保在任意时刻只有一个进程或线程能够持有锁.

  1. 锁的获取:在尝试获取锁时,可以使用Redis的SETNX命令(SET if Not eXists)设置一个特定的键值对,如果键不存在则成功获取锁。这一步是一个原子操作,确保只有一个客户端能够成功创建该键值对,从而获得锁.
  2. 锁的过期时间:为了避免死锁和进程崩溃导致的锁无法释放,需要为锁设置合理的过期时间。可以使用Redis的EXPIRE命令为键设置一个过期时间,在超过该时间后,Redis会自动删除该键,实现锁的自动释放。
  3. 锁的释放:当持有锁的进程完成对共享资源的操作后,需要显式地释放锁。可以通过使用Redis的DEL命令删除键来释放锁。释放锁的过程也需要是一个原子操作,以确保在同一时间只有一个进程持有锁。
  4. 避免误删其他进程的锁:为了避免出现误删其他进程的锁的情况,可以为每个客户端生成一个唯一的标识符作为锁的值,并且在释放锁时检查该标识符是否匹配,以确保只有获得锁的客户端才能释放锁。
  5. 防止锁过期时间过短导致的问题:为了避免锁的过期时间过短导致其他进程过早地获取了锁,可以使用轮询机制在锁即将过期时进行续约。具体做法是在获取锁后,为该锁设置一个适当的续约时间,确保在共享资源操作完成前不会过期。

需要强调的是,Redis分布式锁虽然在很多场景下是有效的,但并不是完全解决所有分布式环境中的同步问题的银弹。

使用插件redsync可以帮助我们快速实现基于go语言的Redis分布式锁。

以下是官方案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
goredislib "github.com/redis/go-redis/v9"
)

func main() {
// 连接redis
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // 新建连接池

// 创建一个redsync实例,用来获取互斥锁
rs := redsync.New(pool)

// 创建命名互斥锁
mutexname := "my-global-mutex"
mutex := rs.NewMutex(mutexname)

//获取给定互斥对象的锁。在此操作成功后,没有其他人可以获得相同的锁(相同的互斥锁名称),直到我们解锁它
if err := mutex.Lock(); err != nil {
panic(err)
}

// 你的代码逻辑

// 释放锁,以便其他进程或线程可以获得锁
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}

我们已经完成了基于Redis分布式锁的基本操作,但是仍有一些问题需要处理。比如:redis锁没有设置过期时间,也没有错误处理,另外在分布式系统中,可能有多个应用基于类似商品id做为分布式锁的名称,会出现锁重名现象。因为,我们可以使用uuid + goodsId 来生产唯一标识。

优化之后的代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // 新建连接池

// 创建一个redsync实例,用来获取互斥锁
rs := redsync.New(pool)

// 创建命名互斥锁
mutexname := "<uuid + goodsId>"
mutex := rs.NewMutex(mutexname)
// 尝试获取锁,最多等待 2 秒
if err := mutex.LockEx(time.Second * 2); err != nil {
fmt.Println("未能获得锁:", err)
return
}
defer func() {
// 通过 recover 捕获异常,确保释放锁
if r := recover(); r != nil {
fmt.Println("发生异常:", r)
}
mutex.Unlock()
}()