一、概述
1.1 背景介绍
在我担任某互联网金融平台SRE期间,曾遇到过一次严重的线上事故:凌晨3点,监控系统疯狂告警,数据库活跃连接数从平时的200飙升到2000,大量请求超时。紧急排查后发现,一个批量更新任务与在线交易产生了死锁,导致数据库连接被占满。
这次事故持续了40分钟,影响了上万名用户的交易。事后复盘发现,问题根源是开发团队对MySQL事务和锁机制理解不足,写出了容易产生死锁的代码。
从那以后,我花了大量时间研究MySQL的事务和锁机制,并总结出一套完整的排查和预防方法。本文将系统性地讲解MySQL事务的ACID特性、锁的工作原理,以及死锁的排查和解决方案。
1.2 技术特点
MySQL InnoDB存储引擎的事务和锁机制具有以下特点:
ACID事务特性
Atomicity(原子性):事务是不可分割的工作单位
Consistency(一致性):事务执行前后数据保持一致
Isolation(隔离性):并发事务之间相互隔离
Durability(持久性):事务提交后数据永久保存
多粒度锁机制
行锁:锁定单行记录,并发度高
间隙锁:锁定索引间隙,防止幻读
表锁:锁定整张表,开销小但并发度低
意向锁:表级锁,用于协调行锁和表锁
MVCC多版本并发控制
读不阻塞写,写不阻塞读
通过undo log实现一致性读
支持多种隔离级别
1.3 适用场景
| 场景类型 | 隔离级别 | 锁策略 | 典型应用 |
|---|---|---|---|
| 高并发读写 | READ COMMITTED | 最小化锁范围 | 电商订单 |
| 金融交易 | REPEATABLE READ | 行锁+间隙锁 | 转账、支付 |
| 报表统计 | READ COMMITTED | 快照读 | 数据分析 |
| 库存扣减 | REPEATABLE READ | SELECT FOR UPDATE | 秒杀系统 |
| 批量更新 | READ COMMITTED | 分批提交 | 数据迁移 |
1.4 环境要求
| 组件 | 版本要求 | 说明 |
|---|---|---|
| MySQL | 8.0.35+ / 8.4 LTS | 本文基于8.0.35版本 |
| 操作系统 | Rocky Linux 9 / Ubuntu 24.04 | 推荐Rocky Linux 9 |
| 存储引擎 | InnoDB | 必须使用InnoDB |
| 内存 | 16GB+ | 足够的缓冲池空间 |
关键配置要求:
-- 查看InnoDB相关配置 SHOWVARIABLESLIKE'innodb%'; -- 关键配置 innodb_buffer_pool_size = 8G -- 缓冲池大小 innodb_lock_wait_timeout = 50 -- 锁等待超时(秒) innodb_deadlock_detect = ON -- 开启死锁检测 innodb_print_all_deadlocks = ON -- 打印所有死锁信息 transaction_isolation = REPEATABLE-READ -- 默认隔离级别
二、详细步骤
2.1 准备工作
2.1.1 ACID特性深入理解
原子性(Atomicity)
事务中的所有操作要么全部成功,要么全部失败回滚。MySQL通过undo log实现原子性。
-- 原子性示例:转账操作 STARTTRANSACTION; -- 操作1:扣减转出账户余额 UPDATEaccountsSETbalance = balance -1000WHEREuser_id =1; -- 操作2:增加转入账户余额 UPDATEaccountsSETbalance = balance +1000WHEREuser_id =2; -- 如果两个操作都成功,提交事务 COMMIT; -- 如果任一操作失败,回滚事务 -- ROLLBACK; -- 原子性保证: -- 1. 要么两个账户都更新成功 -- 2. 要么两个账户都保持原状 -- 不会出现钱扣了但没有到账的情况
一致性(Consistency)
事务执行前后,数据库从一个一致状态转换到另一个一致状态。
-- 一致性示例:确保总金额不变 -- 假设系统中只有两个账户,总金额应该始终为10000 -- 事务前检查 SELECTSUM(balance)FROMaccounts; -- 结果:10000 STARTTRANSACTION; UPDATEaccountsSETbalance = balance -1000WHEREuser_id =1; UPDATEaccountsSETbalance = balance +1000WHEREuser_id =2; COMMIT; -- 事务后检查 SELECTSUM(balance)FROMaccounts; -- 结果仍然:10000 -- 一致性由应用程序和数据库约束共同保证 -- 比如:CHECK约束、外键约束、触发器等
隔离性(Isolation)
并发执行的事务之间相互隔离,一个事务的中间状态对其他事务不可见。
-- 隔离性示例:并发读写 -- 会话1 STARTTRANSACTION; UPDATEproductsSETstock = stock -1WHEREid=1; -- 此时还未提交 -- 会话2 SELECTstockFROMproductsWHEREid=1; -- 根据隔离级别,可能看到更新前或更新后的值 -- MySQL默认使用REPEATABLE READ隔离级别 -- 会话2看到的是事务开始时的快照,即更新前的值
持久性(Durability)
事务一旦提交,其结果就是永久性的,即使系统崩溃也不会丢失。
-- 持久性由redo log保证 -- 事务提交时,redo log会刷入磁盘 -- 相关配置 SHOWVARIABLESLIKE'innodb_flush_log_at_trx_commit'; -- innodb_flush_log_at_trx_commit = 1(默认) -- 每次事务提交都将redo log刷入磁盘 -- 最安全但性能略低 -- innodb_flush_log_at_trx_commit = 2 -- 每次提交写入OS缓存,每秒刷盘 -- 性能好,但断电可能丢失1秒数据 -- innodb_flush_log_at_trx_commit = 0 -- 每秒写入OS缓存并刷盘 -- 性能最好,但可能丢失1秒数据
2.1.2 事务隔离级别
MySQL支持四种隔离级别,解决不同的并发问题:
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 |
|---|---|---|---|---|
| READ UNCOMMITTED | 可能 | 可能 | 可能 | 最高 |
| READ COMMITTED | 不可能 | 可能 | 可能 | 高 |
| REPEATABLE READ | 不可能 | 不可能 | InnoDB防止 | 中 |
| SERIALIZABLE | 不可能 | 不可能 | 不可能 | 最低 |
-- 查看当前隔离级别 SELECT@@transaction_isolation; -- 或 SHOWVARIABLESLIKE'transaction_isolation'; -- 设置会话隔离级别 SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED; -- 设置全局隔离级别(需要重连生效) SETGLOBALTRANSACTIONISOLATIONLEVELREADCOMMITTED; -- 在配置文件中设置 -- [mysqld] -- transaction-isolation = READ-COMMITTED
脏读演示
-- 会话1(设置为READ UNCOMMITTED) SETSESSIONTRANSACTIONISOLATIONLEVELREADUNCOMMITTED; STARTTRANSACTION; -- 会话2 STARTTRANSACTION; UPDATEaccountsSETbalance =500WHEREuser_id =1; -- 未提交 -- 会话1 SELECTbalanceFROMaccountsWHEREuser_id =1; -- 结果:500(读到了未提交的数据,即脏读) -- 会话2 ROLLBACK; -- 回滚 -- 会话1再次查询 SELECTbalanceFROMaccountsWHEREuser_id =1; -- 结果可能是原来的值,之前读到的500是"脏数据"
不可重复读演示
-- 会话1(READ COMMITTED级别) SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED; STARTTRANSACTION; SELECTbalanceFROMaccountsWHEREuser_id =1; -- 结果:1000 -- 会话2 UPDATEaccountsSETbalance =500WHEREuser_id =1; COMMIT; -- 会话1再次查询 SELECTbalanceFROMaccountsWHEREuser_id =1; -- 结果:500(同一事务内两次读取结果不同,即不可重复读) COMMIT;
幻读演示
-- 会话1(即使REPEATABLE READ也可能有幻读场景) STARTTRANSACTION; SELECTCOUNT(*)FROMordersWHEREuser_id =1; -- 结果:10 -- 会话2 INSERTINTOorders (user_id, amount)VALUES(1,100); COMMIT; -- 会话1使用当前读 SELECTCOUNT(*)FROMordersWHEREuser_id =1FORUPDATE; -- 结果:11(看到了新插入的行,即幻读) -- 注意:InnoDB的REPEATABLE READ通过间隙锁很大程度上防止了幻读 -- 但在某些边界情况下仍可能发生
2.1.3 创建测试环境
-- 创建测试数据库 CREATEDATABASEIFNOTEXISTSlock_demo; USElock_demo; -- 创建账户表 CREATETABLEaccounts ( idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY, user_idBIGINTUNSIGNEDNOTNULL, balanceDECIMAL(15,2)NOTNULLDEFAULT0.00, versionINTUNSIGNEDNOTNULLDEFAULT0, -- 乐观锁版本号 created_at DATETIMEDEFAULTCURRENT_TIMESTAMP, updated_at DATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP, UNIQUEKEYuk_user_id (user_id) )ENGINE=InnoDB; -- 创建订单表 CREATETABLEorders ( idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY, order_noVARCHAR(32)NOTNULL, user_idBIGINTUNSIGNEDNOTNULL, amountDECIMAL(10,2)NOTNULL, statusTINYINTDEFAULT0, created_at DATETIMEDEFAULTCURRENT_TIMESTAMP, UNIQUEKEYuk_order_no (order_no), INDEXidx_user_id (user_id), INDEXidx_status (status), INDEXidx_user_status (user_id,status) )ENGINE=InnoDB; -- 创建库存表 CREATETABLEinventory ( idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY, product_idBIGINTUNSIGNEDNOTNULL, stockINTUNSIGNEDNOTNULLDEFAULT0, versionINTUNSIGNEDNOTNULLDEFAULT0, UNIQUEKEYuk_product_id (product_id) )ENGINE=InnoDB; -- 插入测试数据 INSERTINTOaccounts (user_id, balance)VALUES (1,10000.00), (2,5000.00), (3,3000.00); INSERTINTOinventory (product_id, stock)VALUES (1001,100), (1002,200), (1003,50); -- 生成订单测试数据 INSERTINTOorders (order_no, user_id, amount,status) SELECT CONCAT('ORD',LPAD(seq,10,'0')), FLOOR(RAND() *3) +1, ROUND(RAND() *1000,2), FLOOR(RAND() *5) FROM( SELECT@row:= @row+1asseqFROM (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4 UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t1, (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4 UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t2, (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4 UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t3, (SELECT@row:=0) r ) seq_table;
2.2 核心配置
2.2.1 InnoDB锁类型详解
1. 共享锁(S锁)和排他锁(X锁)
-- 共享锁(S锁):允许其他事务读,但不允许写 SELECT*FROMaccountsWHEREuser_id =1LOCKINSHAREMODE; -- MySQL 8.0 新语法 SELECT*FROMaccountsWHEREuser_id =1FORSHARE; -- 排他锁(X锁):不允许其他事务读写(当前读除外) SELECT*FROMaccountsWHEREuser_id =1FORUPDATE; -- 锁兼容性矩阵 -- | | S锁 | X锁 | -- | S锁 | 兼容 | 冲突 | -- | X锁 | 冲突 | 冲突 |
2. 意向锁(IS/IX锁)
-- 意向锁是表级锁,用于表明事务稍后会在表中的行上加什么类型的锁 -- 意向共享锁(IS):事务准备给数据行加共享锁 -- 意向排他锁(IX):事务准备给数据行加排他锁 -- 查看意向锁 SELECT*FROMperformance_schema.data_locksWHERELOCK_TYPE ='TABLE'; -- 意向锁的作用: -- 加表锁时,不需要遍历每一行来检查是否有行锁 -- 只需检查意向锁即可 -- 兼容性矩阵: -- | | IS | IX | S | X | -- | IS | 兼容 | 兼容 | 兼容 | 冲突 | -- | IX | 兼容 | 兼容 | 冲突 | 冲突 | -- | S | 兼容 | 冲突 | 兼容 | 冲突 | -- | X | 冲突 | 冲突 | 冲突 | 冲突 |
3. 记录锁(Record Lock)
-- 记录锁锁定索引记录 -- 如果表没有索引,InnoDB会创建隐藏的聚簇索引,并使用该索引进行记录锁定 STARTTRANSACTION; -- 锁定id=1的记录 SELECT*FROMaccountsWHEREid=1FORUPDATE; -- 此时其他事务无法修改id=1的行 -- 查看记录锁 SELECT*FROMperformance_schema.data_locks WHERELOCK_TYPE ='RECORD'ANDLOCK_MODE ='X,REC_NOT_GAP';
4. 间隙锁(Gap Lock)
-- 间隙锁锁定索引记录之间的间隙,防止其他事务插入 -- 只在REPEATABLE READ及以上隔离级别生效 -- 假设accounts表中有id: 1, 5, 10 STARTTRANSACTION; SELECT*FROMaccountsWHEREidBETWEEN3AND7FORUPDATE; -- 这会锁定(1,5)和(5,10)的间隙 -- 其他事务无法在这些间隙中插入新记录 -- INSERT INTO accounts (id, user_id, balance) VALUES (3, 3, 1000); -- 会等待 -- 查看间隙锁 SELECT*FROMperformance_schema.data_locks WHERELOCK_TYPE ='RECORD'ANDLOCK_MODE ='X,GAP';
5. 临键锁(Next-Key Lock)
-- 临键锁 = 记录锁 + 间隙锁 -- 锁定一个索引记录及其前面的间隙 -- 假设有id: 1, 5, 10 STARTTRANSACTION; SELECT*FROMaccountsWHEREid=5FORUPDATE; -- 在REPEATABLE READ级别,这会锁定: -- 1. 记录id=5 -- 2. 间隙(1,5) -- 临键锁是InnoDB默认的锁类型,用于防止幻读
6. 插入意向锁(Insert Intention Lock)
-- 插入意向锁是一种特殊的间隙锁 -- 多个事务可以同时获取同一间隙的插入意向锁(只要插入位置不同) -- 会话1 STARTTRANSACTION; INSERTINTOaccounts (id, user_id, balance)VALUES(3,3,1000); -- 获取(1,5)间隙的插入意向锁,插入id=3 -- 会话2 STARTTRANSACTION; INSERTINTOaccounts (id, user_id, balance)VALUES(4,4,2000); -- 也可以获取(1,5)间隙的插入意向锁,插入id=4 -- 两个插入可以并发执行,因为插入位置不冲突
2.2.2 锁监控配置
-- 开启锁监控 SETGLOBALinnodb_status_output =ON; SETGLOBALinnodb_status_output_locks =ON; -- 查看InnoDB状态(包含锁信息) SHOWENGINEINNODBSTATUSG -- 使用performance_schema监控锁 -- data_locks:当前持有的锁 SELECT*FROMperformance_schema.data_locks; -- data_lock_waits:锁等待关系 SELECT*FROMperformance_schema.data_lock_waits; -- 查看等待锁的事务 SELECT r.trx_idASwaiting_trx_id, r.trx_mysql_thread_idASwaiting_thread, r.trx_queryASwaiting_query, b.trx_idASblocking_trx_id, b.trx_mysql_thread_idASblocking_thread, b.trx_queryASblocking_query FROMperformance_schema.data_lock_waits w INNERJOINinformation_schema.innodb_trx bONb.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID INNERJOINinformation_schema.innodb_trx rONr.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID;
2.2.3 死锁检测配置
-- 开启死锁检测(默认开启) SETGLOBALinnodb_deadlock_detect =ON; -- 设置锁等待超时时间 SETGLOBALinnodb_lock_wait_timeout =50; -- 默认50秒 -- 打印所有死锁信息到错误日志 SETGLOBALinnodb_print_all_deadlocks =ON; -- 配置文件设置 -- [mysqld] -- innodb_deadlock_detect = ON -- innodb_lock_wait_timeout = 10 -- innodb_print_all_deadlocks = ON
2.3 启动和验证
2.3.1 验证锁机制
-- 测试记录锁 -- 会话1 STARTTRANSACTION; SELECT*FROMaccountsWHEREid=1FORUPDATE; -- 不提交,保持锁定 -- 会话2 STARTTRANSACTION; -- 尝试更新同一行 UPDATEaccountsSETbalance = balance +100WHEREid=1; -- 此语句会等待,因为id=1被会话1锁定 -- 会话1 COMMIT; -- 提交后会话2的更新才会执行 -- 查看锁等待情况 SELECT*FROMperformance_schema.data_lock_waits;
2.3.2 验证死锁检测
-- 构造死锁场景 -- 会话1 STARTTRANSACTION; UPDATEaccountsSETbalance = balance -100WHEREid=1; -- 会话2 STARTTRANSACTION; UPDATEaccountsSETbalance = balance -100WHEREid=2; -- 会话1 UPDATEaccountsSETbalance = balance +100WHEREid=2; -- 等待会话2释放id=2的锁 -- 会话2 UPDATEaccountsSETbalance = balance +100WHEREid=1; -- 等待会话1释放id=1的锁 -- 此时发生死锁! -- MySQL会检测到死锁,回滚其中一个事务 -- ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction -- 查看最近的死锁信息 SHOWENGINEINNODBSTATUSG -- 找到"LATEST DETECTED DEADLOCK"部分
三、示例代码和配置
3.1 完整配置示例
3.1.1 死锁案例分析
案例1:相反顺序更新
-- 最常见的死锁场景:两个事务以相反顺序更新行 -- 事务1:先更新A,再更新B STARTTRANSACTION; UPDATEaccountsSETbalance = balance -100WHEREuser_id =1; -- 锁定user_id=1 -- 等待... UPDATEaccountsSETbalance = balance +100WHEREuser_id =2; -- 需要锁定user_id=2 -- 事务2:先更新B,再更新A STARTTRANSACTION; UPDATEaccountsSETbalance = balance -50WHEREuser_id =2; -- 锁定user_id=2 -- 等待... UPDATEaccountsSETbalance = balance +50WHEREuser_id =1; -- 需要锁定user_id=1 -- 死锁!事务1持有A等待B,事务2持有B等待A -- 解决方案:固定更新顺序 -- 始终按user_id升序或降序更新 STARTTRANSACTION; -- 方法1:应用层排序 UPDATEaccountsSETbalance = balance -100WHEREuser_id =1; UPDATEaccountsSETbalance = balance +100WHEREuser_id =2; COMMIT;
案例2:间隙锁死锁
-- 间隙锁导致的死锁 -- 表中有id: 1, 10, 20 -- 事务1 STARTTRANSACTION; SELECT*FROMaccountsWHEREid=5FORUPDATE; -- 锁定间隙(1,10) -- 等待... -- 事务2 STARTTRANSACTION; SELECT*FROMaccountsWHEREid=15FORUPDATE; -- 锁定间隙(10,20) INSERTINTOaccounts (id, user_id, balance)VALUES(7,7,1000); -- 等待事务1 -- 事务1 INSERTINTOaccounts (id, user_id, balance)VALUES(12,12,2000); -- 等待事务2 -- 死锁! -- 解决方案: -- 1. 降低隔离级别到READ COMMITTED(不使用间隙锁) -- 2. 使用唯一索引精确匹配,避免间隙锁 -- 3. 减少锁定范围
案例3:唯一键冲突死锁
-- 唯一键冲突可能导致死锁
-- 表中已有 order_no = 'ORD001'
-- 事务1
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',1,100);
-- 事务2
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',2,200);
-- 唯一键冲突,等待事务1
-- 事务3
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',3,300);
-- 也等待
-- 事务1回滚
ROLLBACK;
-- 事务2和事务3可能死锁,因为它们都在等待锁
-- 解决方案:
-- 1. 使用INSERT ... ON DUPLICATE KEY UPDATE
-- 2. 使用INSERT IGNORE
-- 3. 先查询再插入(在应用层处理)
3.1.2 悲观锁实现
/**
* 悲观锁实现转账功能
* 使用SELECT FOR UPDATE锁定记录
*/
@Service
@Transactional
publicclassTransferService{
@Autowired
privateJdbcTemplate jdbcTemplate;
/**
* 转账 - 悲观锁实现
* 关键:按固定顺序获取锁,避免死锁
*/
publicvoidtransfer(Long fromUserId, Long toUserId, BigDecimal amount){
// 按user_id排序,确保获取锁的顺序一致
Long firstUserId = Math.min(fromUserId, toUserId);
Long secondUserId = Math.max(fromUserId, toUserId);
try{
// 按顺序锁定账户
BigDecimal firstBalance = lockAndGetBalance(firstUserId);
BigDecimal secondBalance = lockAndGetBalance(secondUserId);
// 确定转出和转入账户的余额
BigDecimal fromBalance = fromUserId.equals(firstUserId) ? firstBalance : secondBalance;
BigDecimal toBalance = fromUserId.equals(firstUserId) ? secondBalance : firstBalance;
// 检查余额
if(fromBalance.compareTo(amount) < 0) {
thrownew RuntimeException("余额不足");
}
// 执行转账
updateBalance(fromUserId, fromBalance.subtract(amount));
updateBalance(toUserId, toBalance.add(amount));
} catch (Exception e) {
// 异常时事务自动回滚
thrownew RuntimeException("转账失败: " + e.getMessage(), e);
}
}
private BigDecimal lockAndGetBalance(Long userId) {
// SELECT FOR UPDATE 锁定记录
String sql = "SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE";
return jdbcTemplate.queryForObject(sql, BigDecimal.class, userId);
}
private void updateBalance(Long userId, BigDecimal newBalance) {
String sql = "UPDATE accounts SET balance = ? WHERE user_id = ?";
jdbcTemplate.update(sql, newBalance, userId);
}
}
3.1.3 乐观锁实现
/**
* 乐观锁实现库存扣减
* 使用版本号或CAS机制
*/
@Service
publicclassInventoryService{
@Autowired
privateJdbcTemplate jdbcTemplate;
/**
* 扣减库存 - 乐观锁实现
*@returntrue 成功,false 失败(库存不足或版本冲突)
*/
publicbooleandecreaseStock(Long productId,intquantity){
intmaxRetries =3;
for(inti =0; i < maxRetries; i++) {
// 查询当前库存和版本号
String selectSql = "SELECT stock, version FROM inventory WHERE product_id = ?";
Map result = jdbcTemplate.queryForMap(selectSql, productId);
intcurrentStock = (Integer) result.get("stock");
intcurrentVersion = (Integer) result.get("version");
// 检查库存
if(currentStock < quantity) {
returnfalse; // 库存不足
}
// 使用版本号进行CAS更新
String updateSql = """
UPDATE inventory
SET stock = stock - ?, version = version + 1
WHERE product_id = ? AND version = ?
""";
int affected = jdbcTemplate.update(updateSql, quantity, productId, currentVersion);
if (affected >0) {
returntrue; // 更新成功
}
// 版本冲突,重试
try{
Thread.sleep(10+ (long)(Math.random() *50)); // 随机延迟
}catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
returnfalse; // 重试次数用尽
}
/**
* 扣减库存 - 使用行级条件更新(更简洁的乐观锁)
*/
publicbooleandecreaseStockSimple(Long productId,intquantity){
String sql ="""
UPDATE inventory
SET stock = stock - ?
WHERE product_id = ? AND stock >= ?
""";
intaffected = jdbcTemplate.update(sql, quantity, productId, quantity);
returnaffected >0;
}
}
3.1.4 分布式锁实现
/** * 基于Redis的分布式锁实现 * 解决跨实例的并发问题 */ @Component publicclassDistributedLock{ @Autowired privateStringRedisTemplate redisTemplate; privatestaticfinallongDEFAULT_EXPIRE_TIME =30000; // 30秒 /** * 获取锁 *@paramlockKey 锁的key *@paramrequestId 请求标识(用于释放锁时验证) *@paramexpireTime 过期时间(毫秒) */ publicbooleantryLock(String lockKey, String requestId,longexpireTime){ Boolean success = redisTemplate.opsForValue().setIfAbsent( lockKey, requestId, expireTime, TimeUnit.MILLISECONDS ); returnBoolean.TRUE.equals(success); } /** * 释放锁 * 使用Lua脚本保证原子性 */ publicbooleanunlock(String lockKey, String requestId){ String script =""" if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """; Long result = redisTemplate.execute( newDefaultRedisScript<>(script, Long.class), Collections.singletonList(lockKey), requestId ); returnLong.valueOf(1).equals(result); } /** * 带自动续期的锁 * 使用watchdog机制 */ publicbooleantryLockWithWatchdog(String lockKey, String requestId){ booleanlocked = tryLock(lockKey, requestId, DEFAULT_EXPIRE_TIME); if(locked) { // 启动watchdog线程,定期续期 startWatchdog(lockKey, requestId); } returnlocked; } privatevoidstartWatchdog(String lockKey, String requestId){ Thread watchdog =newThread(() -> { while(!Thread.currentThread().isInterrupted()) { try{ Thread.sleep(DEFAULT_EXPIRE_TIME /3); // 每10秒续期一次 // 续期 String script =""" if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end """; Long result = redisTemplate.execute( newDefaultRedisScript<>(script, Long.class), Collections.singletonList(lockKey), requestId, String.valueOf(DEFAULT_EXPIRE_TIME) ); if(!Long.valueOf(1).equals(result)) { break; // 锁已被释放或被其他进程获取 } }catch(InterruptedException e) { Thread.currentThread().interrupt(); break; } } }); watchdog.setDaemon(true); watchdog.start(); } } /** * 使用分布式锁的示例 */ @Service publicclassOrderService{ @Autowired privateDistributedLock distributedLock; @Autowired privateInventoryService inventoryService; /** * 创建订单 - 使用分布式锁保证幂等性 */ publicOrdercreateOrder(String orderNo, Long productId,intquantity){ String lockKey ="lock"+ orderNo; String requestId = UUID.randomUUID().toString(); try{ // 获取分布式锁 if(!distributedLock.tryLock(lockKey, requestId,30000)) { thrownewRuntimeException("获取锁失败,请稍后重试"); } // 检查订单是否已存在(幂等性检查) Order existingOrder = orderMapper.findByOrderNo(orderNo); if(existingOrder !=null) { returnexistingOrder; // 返回已存在的订单 } // 扣减库存 if(!inventoryService.decreaseStock(productId, quantity)) { thrownewRuntimeException("库存不足"); } // 创建订单 Order order =newOrder(); order.setOrderNo(orderNo); order.setProductId(productId); order.setQuantity(quantity); orderMapper.insert(order); returnorder; }finally{ // 释放锁 distributedLock.unlock(lockKey, requestId); } } }
3.2 实际应用案例
3.2.1 秒杀系统防超卖
/**
* 秒杀系统防超卖方案
*/
@Service
publicclassSeckillService{
@Autowired
privateRedisTemplate redisTemplate;
@Autowired
privateJdbcTemplate jdbcTemplate;
/**
* 方案1:Redis预扣库存 + 异步入库
*/
publicSeckillResultseckillWithRedis(Long userId, Long productId){
String stockKey ="seckill"+ productId;
String orderKey ="seckill"+ productId;
// 1. 检查是否已购买(防止重复购买)
Boolean isMember = redisTemplate.opsForSet().isMember(orderKey, userId);
if(Boolean.TRUE.equals(isMember)) {
returnSeckillResult.fail("您已参与过此活动");
}
// 2. 预扣库存(原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if(stock ==null|| stock < 0) {
// 库存不足,恢复
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("商品已售罄");
}
try {
// 3. 记录用户已购买
redisTemplate.opsForSet().add(orderKey, userId);
// 4. 发送消息到MQ,异步创建订单
OrderMessage message = new OrderMessage(userId, productId, 1);
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
return SeckillResult.success("秒杀成功,订单创建中");
} catch (Exception e) {
// 异常时恢复库存
redisTemplate.opsForValue().increment(stockKey);
redisTemplate.opsForSet().remove(orderKey, userId);
return SeckillResult.fail("系统繁忙,请稍后重试");
}
}
/**
* 方案2:数据库行级锁
* 适用于库存量大、并发相对较低的场景
*/
@Transactional
public SeckillResult seckillWithDbLock(Long userId, Long productId, int quantity) {
// 1. 查询库存(加锁)
String selectSql = """
SELECT stock FROM inventory WHERE product_id = ? FOR UPDATE
""";
Integer stock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);
if (stock == null || stock < quantity) {
return SeckillResult.fail("库存不足");
}
// 2. 扣减库存
String updateSql = "UPDATE inventory SET stock = stock - ? WHERE product_id = ?";
jdbcTemplate.update(updateSql, quantity, productId);
// 3. 创建订单
String insertSql = """
INSERT INTO orders (order_no, user_id, product_id, quantity, status)
VALUES (?, ?, ?, ?, 1)
""";
String orderNo = generateOrderNo();
jdbcTemplate.update(insertSql, orderNo, userId, productId, quantity);
return SeckillResult.success(orderNo);
}
/**
* 方案3:乐观锁 + 限制重试次数
*/
public SeckillResult seckillWithOptimisticLock(Long userId, Long productId, int quantity) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
// 使用乐观锁扣减库存
String sql = """
UPDATE inventory
SET stock = stock - ?, version = version + 1
WHERE product_id = ? AND stock >= ?
""";
intaffected = jdbcTemplate.update(sql, quantity, productId, quantity);
if(affected >0) {
// 扣减成功,创建订单
String orderNo = createOrder(userId, productId, quantity);
returnSeckillResult.success(orderNo);
}
// 可能是库存不足或版本冲突,检查库存
Integer stock = jdbcTemplate.queryForObject(
"SELECT stock FROM inventory WHERE product_id = ?",
Integer.class,productId
);
if(stock ==null|| stock < quantity) {
return SeckillResult.fail("库存不足");
}
// 版本冲突,短暂等待后重试
try {
Thread.sleep(10 + (long)(Math.random() * 30));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return SeckillResult.fail("系统繁忙,请稍后重试");
}
}
3.2.2 死锁自动检测和告警
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MySQL死锁监控和告警脚本 """ importpymysql importtime importjson importrequests fromdatetimeimportdatetime classDeadlockMonitor: """死锁监控器""" def__init__(self, host, user, password, database, alert_webhook=None): self.conn_params = { 'host': host, 'user': user, 'password': password, 'database': database, 'charset':'utf8mb4' } self.alert_webhook = alert_webhook self.last_deadlock_info =None defget_innodb_status(self): """获取InnoDB状态""" conn = pymysql.connect(**self.conn_params) try: withconn.cursor()ascursor: cursor.execute("SHOW ENGINE INNODB STATUS") result = cursor.fetchone() returnresult[2]ifresultelseNone finally: conn.close() defparse_deadlock(self, status): """解析死锁信息""" ifnotstatus: returnNone lines = status.split(' ') in_deadlock_section =False deadlock_info = [] current_section = [] forlineinlines: if'LATEST DETECTED DEADLOCK'inline: in_deadlock_section =True continue ifin_deadlock_section: ifline.startswith('---')and'TRANSACTION'notinline: ifcurrent_section: deadlock_info.append(' '.join(current_section)) current_section = [] continue if'WE ROLL BACK'inline: current_section.append(line) deadlock_info.append(' '.join(current_section)) break current_section.append(line) return' '.join(deadlock_info)ifdeadlock_infoelseNone defget_lock_waits(self): """获取当前锁等待情况""" conn = pymysql.connect(**self.conn_params) try: withconn.cursor(pymysql.cursors.DictCursor)ascursor: sql =""" SELECT r.trx_id AS waiting_trx_id, r.trx_mysql_thread_id AS waiting_thread, TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) AS wait_seconds, r.trx_query AS waiting_query, b.trx_id AS blocking_trx_id, b.trx_mysql_thread_id AS blocking_thread, b.trx_query AS blocking_query FROM performance_schema.data_lock_waits w INNER JOIN information_schema.innodb_trx b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID INNER JOIN information_schema.innodb_trx r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID """ cursor.execute(sql) returncursor.fetchall() finally: conn.close() defalert(self, title, content): """发送告警""" print(f"[ALERT]{title}") print(content) ifself.alert_webhook: try: payload = { 'msgtype':'markdown', 'markdown': { 'title': title, 'text':f"##{title} {content}" } } requests.post(self.alert_webhook, json=payload, timeout=5) exceptExceptionase: print(f"发送告警失败:{e}") defcheck_deadlock(self): """检查死锁""" status = self.get_innodb_status() deadlock_info = self.parse_deadlock(status) ifdeadlock_infoanddeadlock_info != self.last_deadlock_info: self.last_deadlock_info = deadlock_info self.alert( "MySQL检测到死锁", f"**时间**:{datetime.now()} **详情**: ``` {deadlock_info[:2000]} ```" ) returnTrue returnFalse defcheck_lock_waits(self, threshold_seconds=30): """检查长时间锁等待""" lock_waits = self.get_lock_waits() forwaitinlock_waits: ifwait['wait_seconds']andwait['wait_seconds'] > threshold_seconds: self.alert( "MySQL锁等待超时", f"**等待时间**:{wait['wait_seconds']}秒 " f"**等待线程**:{wait['waiting_thread']} " f"**等待SQL**:{wait['waiting_query']} " f"**阻塞线程**:{wait['blocking_thread']} " f"**阻塞SQL**:{wait['blocking_query']}" ) defrun(self, interval=10): """运行监控""" print(f"死锁监控已启动,检查间隔:{interval}秒") whileTrue: try: self.check_deadlock() self.check_lock_waits(threshold_seconds=30) exceptExceptionase: print(f"监控异常:{e}") time.sleep(interval) if__name__ =='__main__': monitor = DeadlockMonitor( host='192.168.1.11', user='monitor', password='password', database='lock_demo', alert_webhook='https://your-webhook-url.com' ) monitor.run()
3.2.3 事务超时和慢事务监控
-- 查询运行时间超过指定秒数的事务 SELECT trx_id, trx_mysql_thread_idASthread_id, trx_state, trx_started, TIMESTAMPDIFF(SECOND, trx_started,NOW())ASrunning_seconds, trx_rows_locked, trx_rows_modified, trx_lock_structs, trx_query FROMinformation_schema.innodb_trx WHERETIMESTAMPDIFF(SECOND, trx_started,NOW()) >60 ORDERBYrunning_secondsDESC; -- 查询持有锁最多的事务 SELECT trx_id, trx_mysql_thread_id, trx_rows_locked, trx_lock_structs, trx_tables_locked, trx_query FROMinformation_schema.innodb_trx ORDERBYtrx_rows_lockedDESC LIMIT10; -- 查询锁定行数最多的表 SELECT object_schema, object_name, COUNT(*)aslock_count FROMperformance_schema.data_locks WHERElock_type ='RECORD' GROUPBYobject_schema, object_name ORDERBYlock_countDESC; -- 创建慢事务告警存储过程 DELIMITER // CREATEPROCEDUREcheck_slow_transactions(INthreshold_secondsINT) BEGIN DECLAREdoneINTDEFAULTFALSE; DECLAREv_trx_idVARCHAR(100); DECLAREv_thread_idBIGINT; DECLAREv_running_secondsINT; DECLAREv_queryTEXT; DECLAREcurCURSORFOR SELECT trx_id, trx_mysql_thread_id, TIMESTAMPDIFF(SECOND, trx_started,NOW()), trx_query FROMinformation_schema.innodb_trx WHERETIMESTAMPDIFF(SECOND, trx_started,NOW()) > threshold_seconds; DECLARECONTINUEHANDLERFORNOTFOUNDSETdone =TRUE; -- 创建告警日志表 CREATETABLEIFNOTEXISTSslow_transaction_log ( idBIGINTAUTO_INCREMENT PRIMARYKEY, trx_idVARCHAR(100), thread_idBIGINT, running_secondsINT, queryTEXT, logged_at DATETIMEDEFAULTCURRENT_TIMESTAMP ); OPEN cur; read_loop: LOOP FETCH cur INTO v_trx_id, v_thread_id, v_running_seconds, v_query; IF done THEN LEAVE read_loop; ENDIF; -- 记录慢事务 INSERTINTOslow_transaction_log (trx_id, thread_id, running_seconds,query) VALUES(v_trx_id, v_thread_id, v_running_seconds, v_query); ENDLOOP; CLOSE cur; END// DELIMITER ; -- 使用Event定期检查 CREATEEVENTIFNOTEXISTScheck_slow_transactions_event ONSCHEDULE EVERY1MINUTE DOCALLcheck_slow_transactions(60);
四、最佳实践和注意事项
4.1 最佳实践
4.1.1 事务设计原则
-- 1. 事务尽量短小 -- 差:大事务 STARTTRANSACTION; -- 处理100万条记录 UPDATEordersSETstatus=1WHEREcreated_at < '2024-01-01'; -- 锁定大量行 COMMIT; -- 好:分批处理 DELIMITER // CREATEPROCEDURE batch_update_orders() BEGIN DECLARE affected_rows INTDEFAULT1; DECLARE batch_size INTDEFAULT1000; WHILE affected_rows > 0DO STARTTRANSACTION; UPDATEorders SETstatus=1 WHEREcreated_at < '2024-01-01'ANDstatus = 0 LIMIT batch_size; SET affected_rows = ROW_COUNT(); COMMIT; -- 短暂暂停,避免长时间占用资源 DOSLEEP(0.1); ENDWHILE; END // DELIMITER ; -- 2. 避免在事务中进行耗时操作 -- 差:事务中调用外部接口 STARTTRANSACTION; INSERTINTO orders (...) VALUES (...); -- 调用支付接口(可能需要几秒) -- 长时间持有锁 COMMIT; -- 好:先准备数据,再开启事务 -- 准备阶段(无事务) -- 调用支付接口,获取结果 STARTTRANSACTION; INSERTINTO orders (...) VALUES (...); -- 快速完成 INSERTINTO payments (...) VALUES (...); COMMIT; -- 3. 按固定顺序访问资源 -- 统一按主键升序访问,避免死锁
4.1.2 锁优化策略
-- 1. 尽量使用索引访问数据 -- 差:无索引导致锁表 UPDATEordersSETstatus=1WHEREorder_date ='2024-01-01'; -- 如果order_date没有索引,可能锁定大量行 -- 好:有索引时锁定范围精确 CREATEINDEXidx_order_dateONorders(order_date); UPDATEordersSETstatus=1WHEREorder_date ='2024-01-01'; -- 2. 减少锁定范围 -- 差:锁定所有匹配的行 SELECT*FROMordersWHEREuser_id =1FORUPDATE; -- 好:只锁定需要的行 SELECT*FROMordersWHEREuser_id =1ANDstatus=0FORUPDATE; -- 3. 合理使用锁模式 -- 只读场景使用共享锁 SELECT*FROMordersWHEREid=1LOCKINSHAREMODE; -- 需要修改时才使用排他锁 SELECT*FROMordersWHEREid=1FORUPDATE; -- 4. 避免锁升级 -- 差:从共享锁升级到排他锁可能导致死锁 SELECT*FROMordersWHEREid=1LOCKINSHAREMODE; -- 后续需要更新... UPDATEordersSETstatus=1WHEREid=1; -- 可能死锁 -- 好:直接使用排他锁 SELECT*FROMordersWHEREid=1FORUPDATE; UPDATEordersSETstatus=1WHEREid=1;
4.1.3 隔离级别选择
-- 不同场景的隔离级别推荐 -- 1. 高并发读写场景:READ COMMITTED -- 优点:锁范围小,不使用间隙锁 -- 缺点:可能出现不可重复读 SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED; -- 2. 金融交易场景:REPEATABLE READ(默认) -- 优点:一致性读,防止幻读 -- 缺点:间隙锁可能导致更多死锁 SETSESSIONTRANSACTIONISOLATIONLEVELREPEATABLEREAD; -- 3. 报表查询场景:使用一致性快照 STARTTRANSACTIONWITHCONSISTENTSNAPSHOT; SELECT*FROMordersWHERE...; -- 读取的是事务开始时的快照,不会被其他事务影响 COMMIT; -- 4. 批量导入场景:可以临时使用READ UNCOMMITTED SETSESSIONTRANSACTIONISOLATIONLEVELREADUNCOMMITTED; -- 导入完成后恢复 SETSESSIONTRANSACTIONISOLATIONLEVELREPEATABLEREAD;
4.2 注意事项
4.2.1 配置注意
| 配置项 | 建议值 | 说明 |
|---|---|---|
| innodb_lock_wait_timeout | 10-50 | 锁等待超时,根据业务调整 |
| innodb_deadlock_detect | ON | 开启死锁检测 |
| innodb_print_all_deadlocks | ON | 记录所有死锁到错误日志 |
| transaction_isolation | READ-COMMITTED / REPEATABLE-READ | 根据场景选择 |
| innodb_rollback_on_timeout | OFF | 超时时只回滚当前语句,不回滚整个事务 |
| autocommit | ON | 默认开启自动提交 |
4.2.2 常见错误
| 错误类型 | 错误信息 | 原因分析 | 解决方案 |
|---|---|---|---|
| 死锁 | Deadlock found | 循环等待 | 固定访问顺序 |
| 锁超时 | Lock wait timeout exceeded | 持锁时间过长 | 减小事务,增加超时 |
| 事务太大 | Transaction too large | 修改行数过多 | 分批处理 |
| 表锁 | Table lock wait | 无索引导致表锁 | 添加适当索引 |
| 间隙锁冲突 | Conflict on gap lock | 并发插入同一间隙 | 降低隔离级别 |
4.2.3 死锁预防清单
开发阶段: -固定访问顺序:多表操作按表名或主键排序 -减小事务范围:只在必要时开启事务 -使用低隔离级别:非必要不用REPEATABLEREAD -添加必要索引:避免全表扫描锁定 部署阶段: -开启死锁检测:innodb_deadlock_detect=ON -设置合理超时:innodb_lock_wait_timeout=10 -记录死锁日志:innodb_print_all_deadlocks=ON -配置监控告警:死锁次数、锁等待时间 运维阶段: -定期分析死锁:查看SHOWENGINEINNODBSTATUS -监控长事务:超过60秒的事务告警 -监控锁等待:等待超过10秒告警 -分析慢查询:优化持锁时间长的SQL
五、故障排查和监控
5.1 故障排查
5.1.1 死锁分析
-- 查看最近的死锁信息 SHOWENGINEINNODBSTATUSG -- 输出中的关键部分: -- LATEST DETECTED DEADLOCK -- ------------------------ -- 2024-01-01 1000 0x7f... -- *** (1) TRANSACTION: -- TRANSACTION 12345, ACTIVE 1 sec starting index read -- mysql tables in use 1, locked 1 -- LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s) -- MySQL thread id 100, OS thread handle 123, query id 456 192.168.1.10 app_user updating -- UPDATE accounts SET balance = balance - 100 WHERE user_id = 1 -- -- *** (1) WAITING FOR THIS LOCK TO BE GRANTED: -- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts` -- trx id 12345 lock_mode X locks rec but not gap waiting -- -- *** (2) TRANSACTION: -- TRANSACTION 12346, ACTIVE 1 sec starting index read -- ... -- -- *** (2) HOLDS THE LOCK(S): -- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts` -- ... -- -- *** (2) WAITING FOR THIS LOCK TO BE GRANTED: -- ... -- -- *** WE ROLL BACK TRANSACTION (1) -- 分析步骤: -- 1. 找到两个事务的SQL -- 2. 分析锁等待关系 -- 3. 确定死锁原因 -- 4. 制定解决方案
死锁日志解读
# 从错误日志中提取死锁信息 grep -A 100"LATEST DETECTED DEADLOCK"/var/log/mysql/error.log | head -100 # 使用pt-deadlock-logger记录死锁 pt-deadlock-logger --host=localhost --user=root --password=xxx --dest h=localhost,D=monitor,t=deadlocks --run-time=1h
5.1.2 锁等待分析
-- 查看当前锁等待 SELECT waiting.trx_idASwaiting_trx_id, waiting.trx_mysql_thread_idASwaiting_thread, waiting.trx_queryASwaiting_query, TIMESTAMPDIFF(SECOND, waiting.trx_wait_started,NOW())ASwaiting_seconds, blocking.trx_idASblocking_trx_id, blocking.trx_mysql_thread_idASblocking_thread, blocking.trx_queryASblocking_query, TIMESTAMPDIFF(SECOND, blocking.trx_started,NOW())ASblocking_duration FROMinformation_schema.innodb_trx waiting INNERJOINperformance_schema.data_lock_waits dlw ONwaiting.trx_id = dlw.REQUESTING_ENGINE_TRANSACTION_ID INNERJOINinformation_schema.innodb_trx blocking ONblocking.trx_id = dlw.BLOCKING_ENGINE_TRANSACTION_ID; -- 查看锁的详细信息 SELECT dl.ENGINE_LOCK_ID, dl.ENGINE_TRANSACTION_ID, dl.OBJECT_SCHEMA, dl.OBJECT_NAME, dl.INDEX_NAME, dl.LOCK_TYPE, dl.LOCK_MODE, dl.LOCK_STATUS, dl.LOCK_DATA FROMperformance_schema.data_locks dl; -- 终止阻塞事务(谨慎使用) -- 先确认阻塞线程ID KILL12345;
5.1.3 长事务分析
-- 查找运行时间最长的事务 SELECT trx_id, trx_mysql_thread_idASthread_id, trx_state, trx_started, NOW() - trx_startedASrunning_time, trx_rows_locked, trx_rows_modified, trx_tables_in_use, trx_tables_locked, trx_query FROMinformation_schema.innodb_trx ORDERBYtrx_startedASC; -- 查看事务对应的连接信息 SELECT t.trx_id, t.trx_mysql_thread_id, p.user, p.host, p.db, p.command, p.time, p.state, t.trx_query FROMinformation_schema.innodb_trx t INNERJOINinformation_schema.processlist p ONt.trx_mysql_thread_id = p.id; -- 查看事务的undo日志量(判断回滚代价) SELECT trx_id, trx_undo_record_size, trx_undo_record_size /1024/1024ASundo_mb FROMinformation_schema.innodb_trx WHEREtrx_undo_record_size >0;
5.2 性能监控
5.2.1 关键指标
-- InnoDB锁相关指标 SHOWGLOBALSTATUSLIKE'Innodb_row_lock%'; -- Innodb_row_lock_current_waits: 当前等待锁的数量 -- Innodb_row_lock_time: 总锁等待时间(毫秒) -- Innodb_row_lock_time_avg: 平均锁等待时间 -- Innodb_row_lock_time_max: 最大锁等待时间 -- Innodb_row_lock_waits: 总锁等待次数 -- 死锁次数 SHOWGLOBALSTATUSLIKE'Innodb_deadlocks'; -- 锁内存使用 SHOWGLOBALSTATUSLIKE'Innodb_row_lock_memory'; -- 计算锁争用率 SELECT (SELECTVARIABLE_VALUEFROMperformance_schema.global_statusWHEREVARIABLE_NAME ='Innodb_row_lock_waits') / (SELECTVARIABLE_VALUEFROMperformance_schema.global_statusWHEREVARIABLE_NAME ='Questions') *100 ASlock_contention_percent;
5.2.2 监控指标表
| 指标类别 | 指标名称 | 含义 | 告警阈值 |
|---|---|---|---|
| 死锁 | Innodb_deadlocks | 死锁累计次数 | 增长率 > 1/min |
| 锁等待 | Innodb_row_lock_waits | 锁等待累计次数 | 增长率 > 10/sec |
| 锁时间 | Innodb_row_lock_time_avg | 平均锁等待时间(ms) | > 1000 |
| 当前等待 | Innodb_row_lock_current_waits | 当前等待锁数量 | > 10 |
| 长事务 | 运行超过60秒的事务 | 长事务数量 | > 0 |
| 锁表 | Tables_locks_waited | 表锁等待次数 | > 0 |
5.2.3 Prometheus告警规则
groups:
-name:mysql-lock-alerts
rules:
-alert:MySQLDeadlocks
expr:increase(mysql_global_status_innodb_deadlocks[5m])>0
for:1m
labels:
severity:warning
annotations:
summary:"MySQL发生死锁"
description:"{{ $labels.instance }}在过去5分钟内发生{{ $value }}次死锁"
-alert:MySQLHighLockWaits
expr:rate(mysql_global_status_innodb_row_lock_waits[5m])>10
for:5m
labels:
severity:warning
annotations:
summary:"MySQL锁等待频繁"
description:"{{ $labels.instance }}锁等待率为{{ $value }}/秒"
-alert:MySQLLongLockWait
expr:mysql_global_status_innodb_row_lock_time_avg>1000
for:5m
labels:
severity:warning
annotations:
summary:"MySQL锁等待时间过长"
description:"{{ $labels.instance }}平均锁等待时间{{ $value }}ms"
-alert:MySQLLongTransaction
expr:mysql_info_schema_innodb_trx_running_seconds>60
for:1m
labels:
severity:critical
annotations:
summary:"MySQL存在长事务"
description:"{{ $labels.instance }}存在运行超过60秒的事务"
5.3 备份与恢复
5.3.1 事务日志备份
#!/bin/bash
# MySQL binlog备份脚本
BACKUP_DIR="/data/backup/binlog"
MYSQL_USER="backup"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
RETENTION_DAYS=7
mkdir -p$BACKUP_DIR
# 获取当前binlog文件列表
mysql -u$MYSQL_USER-p$MYSQL_PASS-h$MYSQL_HOST-e"SHOW BINARY LOGS;"|
tail -n +2 | awk'{print $1}'|whilereadbinlog;do
# 复制binlog到备份目录
if[ ! -f"$BACKUP_DIR/$binlog"];then
mysqlbinlog -u$MYSQL_USER-p$MYSQL_PASS-h$MYSQL_HOST
--read-from-remote-server$binlog>$BACKUP_DIR/$binlog.sql
gzip$BACKUP_DIR/$binlog.sql
echo"备份$binlog完成"
fi
done
# 清理过期备份
find$BACKUP_DIR-name"*.sql.gz"-mtime +$RETENTION_DAYS-delete
echo"Binlog备份完成"
5.3.2 死锁恢复流程
-- 死锁后的恢复步骤 -- 1. 确认事务状态 SELECT trx_id, trx_state, trx_started, trx_mysql_thread_id, trx_query FROMinformation_schema.innodb_trx; -- 2. 如果事务被回滚,应用程序需要重试 -- 检查应用程序的重试逻辑 -- 3. 如果需要手动回滚 ROLLBACK; -- 4. 检查数据一致性 -- 根据业务逻辑验证数据 -- 5. 分析死锁原因 SHOWENGINEINNODBSTATUSG -- 找到LATEST DETECTED DEADLOCK部分 -- 6. 记录和上报 -- 将死锁信息记录到监控系统
六、总结
6.1 技术要点回顾
MySQL事务与锁机制的核心要点:
1. ACID特性
原子性:通过undo log实现
一致性:通过约束和应用逻辑保证
隔离性:通过锁和MVCC实现
持久性:通过redo log保证
2. 锁类型
行锁:记录锁、间隙锁、临键锁
表锁:意向锁、MDL锁
锁模式:共享锁、排他锁
3. 死锁处理
预防:固定访问顺序、减小事务
检测:innodb_deadlock_detect
恢复:自动回滚一个事务
4. 最佳实践
事务尽量短小
按固定顺序访问资源
合理选择隔离级别
使用合适的锁策略
6.2 进阶学习方向
| 方向 | 内容 | 推荐资源 |
|---|---|---|
| MVCC原理 | 版本链、ReadView机制 | 《MySQL技术内幕:InnoDB存储引擎》 |
| 锁算法 | B+树锁定协议 | MySQL源码 |
| 分布式事务 | XA、TCC、SAGA | Seata框架文档 |
| 死锁检测算法 | 等待图、超时检测 | 数据库系统概论 |
| 性能调优 | 锁粒度优化 | Percona博客 |
6.3 参考资料
MySQL官方文档:https://dev.mysql.com/doc/
《MySQL技术内幕:InnoDB存储引擎》第2版
《高性能MySQL》第4版
Percona Blog:https://www.percona.com/blog/
附录
A. 命令速查表
| 命令 | 说明 | 示例 |
|---|---|---|
| START TRANSACTION | 开始事务 | START TRANSACTION; |
| COMMIT | 提交事务 | COMMIT; |
| ROLLBACK | 回滚事务 | ROLLBACK; |
| SELECT ... FOR UPDATE | 排他锁查询 | SELECT * FROM t WHERE id=1 FOR UPDATE; |
| SELECT ... FOR SHARE | 共享锁查询 | SELECT * FROM t WHERE id=1 FOR SHARE; |
| SHOW ENGINE INNODB STATUS | 查看InnoDB状态 | SHOW ENGINE INNODB STATUSG |
| KILL | 终止连接 | KILL 12345; |
B. 配置参数详解
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| transaction_isolation | REPEATABLE-READ | 默认隔离级别 | 根据场景选择 |
| innodb_lock_wait_timeout | 50 | 锁等待超时(秒) | 10-30 |
| innodb_deadlock_detect | ON | 死锁检测开关 | ON |
| innodb_print_all_deadlocks | OFF | 记录所有死锁 | ON |
| innodb_rollback_on_timeout | OFF | 超时回滚整个事务 | OFF |
| autocommit | ON | 自动提交 | ON |
C. 术语表
| 术语 | 英文 | 说明 |
|---|---|---|
| 脏读 | Dirty Read | 读取未提交的数据 |
| 不可重复读 | Non-Repeatable Read | 同一事务两次读取结果不同 |
| 幻读 | Phantom Read | 查询结果集行数变化 |
| 死锁 | Deadlock | 循环等待锁 |
| 间隙锁 | Gap Lock | 锁定索引间隙 |
| 临键锁 | Next-Key Lock | 记录锁+间隙锁 |
| MVCC | Multi-Version Concurrency Control | 多版本并发控制 |
| Undo Log | - | 回滚日志 |
| Redo Log | - | 重做日志 |
| 两阶段锁 | Two-Phase Locking | 加锁和解锁分两阶段 |
-
互联网
+关注
关注
55文章
11357浏览量
110715 -
数据库
+关注
关注
7文章
4092浏览量
68676 -
MySQL
+关注
关注
1文章
938浏览量
29851
原文标题:MySQL事务与锁机制详解:ACID原理与死锁排查
文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
基于MySQL的锁机制
MySQL死锁原因排查技巧详解
事务深度遍历过程详解
Oracle核心技术之事务和锁
MySQL事务的四大隔离级别详解
MySQL中的高级内容详解
MySQL事务与锁机制详解
评论