0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

MySQL事务与锁机制详解

马哥Linux运维 来源:马哥Linux运维 2026-01-27 10:33 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

一、概述

1.1 背景介绍

在我担任某互联网金融平台SRE期间,曾遇到过一次严重的线上事故:凌晨3点,监控系统疯狂告警,数据库活跃连接数从平时的200飙升到2000,大量请求超时。紧急排查后发现,一个批量更新任务与在线交易产生了死锁,导致数据库连接被占满。

这次事故持续了40分钟,影响了上万名用户的交易。事后复盘发现,问题根源是开发团队对MySQL事务和锁机制理解不足,写出了容易产生死锁的代码。

从那以后,我花了大量时间研究MySQL的事务和锁机制,并总结出一套完整的排查和预防方法。本文将系统性地讲解MySQL事务的ACID特性、锁的工作原理,以及死锁的排查和解决方案。

1.2 技术特点

MySQL InnoDB存储引擎的事务和锁机制具有以下特点:

ACID事务特性

Atomicity(原子性):事务是不可分割的工作单位

Consistency(一致性):事务执行前后数据保持一致

Isolation(隔离性):并发事务之间相互隔离

Durability(持久性):事务提交后数据永久保存

多粒度锁机制

行锁:锁定单行记录,并发度高

间隙锁:锁定索引间隙,防止幻读

表锁:锁定整张表,开销小但并发度低

意向锁:表级锁,用于协调行锁和表锁

MVCC多版本并发控制

读不阻塞写,写不阻塞读

通过undo log实现一致性读

支持多种隔离级别

1.3 适用场景

场景类型 隔离级别 锁策略 典型应用
高并发读写 READ COMMITTED 最小化锁范围 电商订单
金融交易 REPEATABLE READ 行锁+间隙锁 转账、支付
报表统计 READ COMMITTED 快照读 数据分析
库存扣减 REPEATABLE READ SELECT FOR UPDATE 秒杀系统
批量更新 READ COMMITTED 分批提交 数据迁移

1.4 环境要求

组件 版本要求 说明
MySQL 8.0.35+ / 8.4 LTS 本文基于8.0.35版本
操作系统 Rocky Linux 9 / Ubuntu 24.04 推荐Rocky Linux 9
存储引擎 InnoDB 必须使用InnoDB
内存 16GB+ 足够的缓冲池空间

关键配置要求:

-- 查看InnoDB相关配置
SHOWVARIABLESLIKE'innodb%';

-- 关键配置
innodb_buffer_pool_size = 8G -- 缓冲池大小
innodb_lock_wait_timeout = 50 -- 锁等待超时(秒)
innodb_deadlock_detect = ON -- 开启死锁检测
innodb_print_all_deadlocks = ON -- 打印所有死锁信息
transaction_isolation = REPEATABLE-READ -- 默认隔离级别

二、详细步骤

2.1 准备工作

2.1.1 ACID特性深入理解

原子性(Atomicity)

事务中的所有操作要么全部成功,要么全部失败回滚。MySQL通过undo log实现原子性。

-- 原子性示例:转账操作
STARTTRANSACTION;

-- 操作1:扣减转出账户余额
UPDATEaccountsSETbalance = balance -1000WHEREuser_id =1;

-- 操作2:增加转入账户余额
UPDATEaccountsSETbalance = balance +1000WHEREuser_id =2;

-- 如果两个操作都成功,提交事务
COMMIT;

-- 如果任一操作失败,回滚事务
-- ROLLBACK;

-- 原子性保证:
-- 1. 要么两个账户都更新成功
-- 2. 要么两个账户都保持原状
-- 不会出现钱扣了但没有到账的情况

一致性(Consistency)

事务执行前后,数据库从一个一致状态转换到另一个一致状态。

-- 一致性示例:确保总金额不变
-- 假设系统中只有两个账户,总金额应该始终为10000

-- 事务前检查
SELECTSUM(balance)FROMaccounts; -- 结果:10000

STARTTRANSACTION;
UPDATEaccountsSETbalance = balance -1000WHEREuser_id =1;
UPDATEaccountsSETbalance = balance +1000WHEREuser_id =2;
COMMIT;

-- 事务后检查
SELECTSUM(balance)FROMaccounts; -- 结果仍然:10000

-- 一致性由应用程序和数据库约束共同保证
-- 比如:CHECK约束、外键约束、触发器等

隔离性(Isolation)

并发执行的事务之间相互隔离,一个事务的中间状态对其他事务不可见。

-- 隔离性示例:并发读写

-- 会话1
STARTTRANSACTION;
UPDATEproductsSETstock = stock -1WHEREid=1;
-- 此时还未提交

-- 会话2
SELECTstockFROMproductsWHEREid=1;
-- 根据隔离级别,可能看到更新前或更新后的值

-- MySQL默认使用REPEATABLE READ隔离级别
-- 会话2看到的是事务开始时的快照,即更新前的值

持久性(Durability)

事务一旦提交,其结果就是永久性的,即使系统崩溃也不会丢失。

-- 持久性由redo log保证
-- 事务提交时,redo log会刷入磁盘

-- 相关配置
SHOWVARIABLESLIKE'innodb_flush_log_at_trx_commit';

-- innodb_flush_log_at_trx_commit = 1(默认)
-- 每次事务提交都将redo log刷入磁盘
-- 最安全但性能略低

-- innodb_flush_log_at_trx_commit = 2
-- 每次提交写入OS缓存,每秒刷盘
-- 性能好,但断电可能丢失1秒数据

-- innodb_flush_log_at_trx_commit = 0
-- 每秒写入OS缓存并刷盘
-- 性能最好,但可能丢失1秒数据

2.1.2 事务隔离级别

MySQL支持四种隔离级别,解决不同的并发问题:

隔离级别 脏读 不可重复读 幻读 性能
READ UNCOMMITTED 可能 可能 可能 最高
READ COMMITTED 不可能 可能 可能
REPEATABLE READ 不可能 不可能 InnoDB防止
SERIALIZABLE 不可能 不可能 不可能 最低

-- 查看当前隔离级别
SELECT@@transaction_isolation;
-- 或
SHOWVARIABLESLIKE'transaction_isolation';

-- 设置会话隔离级别
SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED;

-- 设置全局隔离级别(需要重连生效)
SETGLOBALTRANSACTIONISOLATIONLEVELREADCOMMITTED;

-- 在配置文件中设置
-- [mysqld]
-- transaction-isolation = READ-COMMITTED

脏读演示

-- 会话1(设置为READ UNCOMMITTED)
SETSESSIONTRANSACTIONISOLATIONLEVELREADUNCOMMITTED;
STARTTRANSACTION;

-- 会话2
STARTTRANSACTION;
UPDATEaccountsSETbalance =500WHEREuser_id =1;
-- 未提交

-- 会话1
SELECTbalanceFROMaccountsWHEREuser_id =1;
-- 结果:500(读到了未提交的数据,即脏读)

-- 会话2
ROLLBACK; -- 回滚

-- 会话1再次查询
SELECTbalanceFROMaccountsWHEREuser_id =1;
-- 结果可能是原来的值,之前读到的500是"脏数据"

不可重复读演示

-- 会话1(READ COMMITTED级别)
SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED;
STARTTRANSACTION;

SELECTbalanceFROMaccountsWHEREuser_id =1;
-- 结果:1000

-- 会话2
UPDATEaccountsSETbalance =500WHEREuser_id =1;
COMMIT;

-- 会话1再次查询
SELECTbalanceFROMaccountsWHEREuser_id =1;
-- 结果:500(同一事务内两次读取结果不同,即不可重复读)

COMMIT;

幻读演示

-- 会话1(即使REPEATABLE READ也可能有幻读场景)
STARTTRANSACTION;

SELECTCOUNT(*)FROMordersWHEREuser_id =1;
-- 结果:10

-- 会话2
INSERTINTOorders (user_id, amount)VALUES(1,100);
COMMIT;

-- 会话1使用当前读
SELECTCOUNT(*)FROMordersWHEREuser_id =1FORUPDATE;
-- 结果:11(看到了新插入的行,即幻读)

-- 注意:InnoDB的REPEATABLE READ通过间隙锁很大程度上防止了幻读
-- 但在某些边界情况下仍可能发生

2.1.3 创建测试环境

-- 创建测试数据库
CREATEDATABASEIFNOTEXISTSlock_demo;
USElock_demo;

-- 创建账户表
CREATETABLEaccounts (
 idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY,
  user_idBIGINTUNSIGNEDNOTNULL,
  balanceDECIMAL(15,2)NOTNULLDEFAULT0.00,
 versionINTUNSIGNEDNOTNULLDEFAULT0, -- 乐观锁版本号
  created_at DATETIMEDEFAULTCURRENT_TIMESTAMP,
  updated_at DATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
 UNIQUEKEYuk_user_id (user_id)
)ENGINE=InnoDB;

-- 创建订单表
CREATETABLEorders (
 idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY,
  order_noVARCHAR(32)NOTNULL,
  user_idBIGINTUNSIGNEDNOTNULL,
  amountDECIMAL(10,2)NOTNULL,
 statusTINYINTDEFAULT0,
  created_at DATETIMEDEFAULTCURRENT_TIMESTAMP,
 UNIQUEKEYuk_order_no (order_no),
 INDEXidx_user_id (user_id),
 INDEXidx_status (status),
 INDEXidx_user_status (user_id,status)
)ENGINE=InnoDB;

-- 创建库存表
CREATETABLEinventory (
 idBIGINTUNSIGNEDAUTO_INCREMENT PRIMARYKEY,
  product_idBIGINTUNSIGNEDNOTNULL,
  stockINTUNSIGNEDNOTNULLDEFAULT0,
 versionINTUNSIGNEDNOTNULLDEFAULT0,
 UNIQUEKEYuk_product_id (product_id)
)ENGINE=InnoDB;

-- 插入测试数据
INSERTINTOaccounts (user_id, balance)VALUES
(1,10000.00), (2,5000.00), (3,3000.00);

INSERTINTOinventory (product_id, stock)VALUES
(1001,100), (1002,200), (1003,50);

-- 生成订单测试数据
INSERTINTOorders (order_no, user_id, amount,status)
SELECT
 CONCAT('ORD',LPAD(seq,10,'0')),
 FLOOR(RAND() *3) +1,
 ROUND(RAND() *1000,2),
 FLOOR(RAND() *5)
FROM(
 SELECT@row:= @row+1asseqFROM
  (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
  UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t1,
  (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
  UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t2,
  (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
  UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t3,
  (SELECT@row:=0) r
) seq_table;

2.2 核心配置

2.2.1 InnoDB锁类型详解

1. 共享锁(S锁)和排他锁(X锁)

-- 共享锁(S锁):允许其他事务读,但不允许写
SELECT*FROMaccountsWHEREuser_id =1LOCKINSHAREMODE;
-- MySQL 8.0 新语法
SELECT*FROMaccountsWHEREuser_id =1FORSHARE;

-- 排他锁(X锁):不允许其他事务读写(当前读除外)
SELECT*FROMaccountsWHEREuser_id =1FORUPDATE;

-- 锁兼容性矩阵
-- |   | S锁 | X锁 |
-- | S锁 | 兼容 | 冲突 |
-- | X锁 | 冲突 | 冲突 |

2. 意向锁(IS/IX锁)

-- 意向锁是表级锁,用于表明事务稍后会在表中的行上加什么类型的锁
-- 意向共享锁(IS):事务准备给数据行加共享锁
-- 意向排他锁(IX):事务准备给数据行加排他锁

-- 查看意向锁
SELECT*FROMperformance_schema.data_locksWHERELOCK_TYPE ='TABLE';

-- 意向锁的作用:
-- 加表锁时,不需要遍历每一行来检查是否有行锁
-- 只需检查意向锁即可

-- 兼容性矩阵:
-- |   | IS | IX | S | X |
-- | IS | 兼容 | 兼容 | 兼容 | 冲突 |
-- | IX | 兼容 | 兼容 | 冲突 | 冲突 |
-- | S  | 兼容 | 冲突 | 兼容 | 冲突 |
-- | X  | 冲突 | 冲突 | 冲突 | 冲突 |

3. 记录锁(Record Lock)

-- 记录锁锁定索引记录
-- 如果表没有索引,InnoDB会创建隐藏的聚簇索引,并使用该索引进行记录锁定

STARTTRANSACTION;
-- 锁定id=1的记录
SELECT*FROMaccountsWHEREid=1FORUPDATE;
-- 此时其他事务无法修改id=1的行

-- 查看记录锁
SELECT*FROMperformance_schema.data_locks
WHERELOCK_TYPE ='RECORD'ANDLOCK_MODE ='X,REC_NOT_GAP';

4. 间隙锁(Gap Lock)

-- 间隙锁锁定索引记录之间的间隙,防止其他事务插入
-- 只在REPEATABLE READ及以上隔离级别生效

-- 假设accounts表中有id: 1, 5, 10
STARTTRANSACTION;
SELECT*FROMaccountsWHEREidBETWEEN3AND7FORUPDATE;
-- 这会锁定(1,5)和(5,10)的间隙

-- 其他事务无法在这些间隙中插入新记录
-- INSERT INTO accounts (id, user_id, balance) VALUES (3, 3, 1000); -- 会等待

-- 查看间隙锁
SELECT*FROMperformance_schema.data_locks
WHERELOCK_TYPE ='RECORD'ANDLOCK_MODE ='X,GAP';

5. 临键锁(Next-Key Lock)

-- 临键锁 = 记录锁 + 间隙锁
-- 锁定一个索引记录及其前面的间隙

-- 假设有id: 1, 5, 10
STARTTRANSACTION;
SELECT*FROMaccountsWHEREid=5FORUPDATE;
-- 在REPEATABLE READ级别,这会锁定:
-- 1. 记录id=5
-- 2. 间隙(1,5)

-- 临键锁是InnoDB默认的锁类型,用于防止幻读

6. 插入意向锁(Insert Intention Lock)

-- 插入意向锁是一种特殊的间隙锁
-- 多个事务可以同时获取同一间隙的插入意向锁(只要插入位置不同)

-- 会话1
STARTTRANSACTION;
INSERTINTOaccounts (id, user_id, balance)VALUES(3,3,1000);
-- 获取(1,5)间隙的插入意向锁,插入id=3

-- 会话2
STARTTRANSACTION;
INSERTINTOaccounts (id, user_id, balance)VALUES(4,4,2000);
-- 也可以获取(1,5)间隙的插入意向锁,插入id=4
-- 两个插入可以并发执行,因为插入位置不冲突

2.2.2 锁监控配置

-- 开启锁监控
SETGLOBALinnodb_status_output =ON;
SETGLOBALinnodb_status_output_locks =ON;

-- 查看InnoDB状态(包含锁信息)
SHOWENGINEINNODBSTATUSG

-- 使用performance_schema监控锁
-- data_locks:当前持有的锁
SELECT*FROMperformance_schema.data_locks;

-- data_lock_waits:锁等待关系
SELECT*FROMperformance_schema.data_lock_waits;

-- 查看等待锁的事务
SELECT
  r.trx_idASwaiting_trx_id,
  r.trx_mysql_thread_idASwaiting_thread,
  r.trx_queryASwaiting_query,
  b.trx_idASblocking_trx_id,
  b.trx_mysql_thread_idASblocking_thread,
  b.trx_queryASblocking_query
FROMperformance_schema.data_lock_waits w
INNERJOINinformation_schema.innodb_trx bONb.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
INNERJOINinformation_schema.innodb_trx rONr.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID;

2.2.3 死锁检测配置

-- 开启死锁检测(默认开启)
SETGLOBALinnodb_deadlock_detect =ON;

-- 设置锁等待超时时间
SETGLOBALinnodb_lock_wait_timeout =50; -- 默认50秒

-- 打印所有死锁信息到错误日志
SETGLOBALinnodb_print_all_deadlocks =ON;

-- 配置文件设置
-- [mysqld]
-- innodb_deadlock_detect = ON
-- innodb_lock_wait_timeout = 10
-- innodb_print_all_deadlocks = ON

2.3 启动和验证

2.3.1 验证锁机制

-- 测试记录锁
-- 会话1
STARTTRANSACTION;
SELECT*FROMaccountsWHEREid=1FORUPDATE;
-- 不提交,保持锁定

-- 会话2
STARTTRANSACTION;
-- 尝试更新同一行
UPDATEaccountsSETbalance = balance +100WHEREid=1;
-- 此语句会等待,因为id=1被会话1锁定

-- 会话1
COMMIT; -- 提交后会话2的更新才会执行

-- 查看锁等待情况
SELECT*FROMperformance_schema.data_lock_waits;

2.3.2 验证死锁检测

-- 构造死锁场景
-- 会话1
STARTTRANSACTION;
UPDATEaccountsSETbalance = balance -100WHEREid=1;

-- 会话2
STARTTRANSACTION;
UPDATEaccountsSETbalance = balance -100WHEREid=2;

-- 会话1
UPDATEaccountsSETbalance = balance +100WHEREid=2;
-- 等待会话2释放id=2的锁

-- 会话2
UPDATEaccountsSETbalance = balance +100WHEREid=1;
-- 等待会话1释放id=1的锁
-- 此时发生死锁!

-- MySQL会检测到死锁,回滚其中一个事务
-- ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction

-- 查看最近的死锁信息
SHOWENGINEINNODBSTATUSG
-- 找到"LATEST DETECTED DEADLOCK"部分

三、示例代码和配置

3.1 完整配置示例

3.1.1 死锁案例分析

案例1:相反顺序更新

-- 最常见的死锁场景:两个事务以相反顺序更新行

-- 事务1:先更新A,再更新B
STARTTRANSACTION;
UPDATEaccountsSETbalance = balance -100WHEREuser_id =1; -- 锁定user_id=1
-- 等待...
UPDATEaccountsSETbalance = balance +100WHEREuser_id =2; -- 需要锁定user_id=2

-- 事务2:先更新B,再更新A
STARTTRANSACTION;
UPDATEaccountsSETbalance = balance -50WHEREuser_id =2; -- 锁定user_id=2
-- 等待...
UPDATEaccountsSETbalance = balance +50WHEREuser_id =1; -- 需要锁定user_id=1

-- 死锁!事务1持有A等待B,事务2持有B等待A

-- 解决方案:固定更新顺序
-- 始终按user_id升序或降序更新
STARTTRANSACTION;
-- 方法1:应用层排序
UPDATEaccountsSETbalance = balance -100WHEREuser_id =1;
UPDATEaccountsSETbalance = balance +100WHEREuser_id =2;
COMMIT;

案例2:间隙锁死锁

-- 间隙锁导致的死锁

-- 表中有id: 1, 10, 20
-- 事务1
STARTTRANSACTION;
SELECT*FROMaccountsWHEREid=5FORUPDATE; -- 锁定间隙(1,10)
-- 等待...

-- 事务2
STARTTRANSACTION;
SELECT*FROMaccountsWHEREid=15FORUPDATE; -- 锁定间隙(10,20)
INSERTINTOaccounts (id, user_id, balance)VALUES(7,7,1000); -- 等待事务1

-- 事务1
INSERTINTOaccounts (id, user_id, balance)VALUES(12,12,2000); -- 等待事务2
-- 死锁!

-- 解决方案:
-- 1. 降低隔离级别到READ COMMITTED(不使用间隙锁)
-- 2. 使用唯一索引精确匹配,避免间隙锁
-- 3. 减少锁定范围

案例3:唯一键冲突死锁

-- 唯一键冲突可能导致死锁

-- 表中已有 order_no = 'ORD001'
-- 事务1
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',1,100);

-- 事务2
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',2,200);
-- 唯一键冲突,等待事务1

-- 事务3
STARTTRANSACTION;
INSERTINTOorders (order_no, user_id, amount)VALUES('ORD002',3,300);
-- 也等待

-- 事务1回滚
ROLLBACK;
-- 事务2和事务3可能死锁,因为它们都在等待锁

-- 解决方案:
-- 1. 使用INSERT ... ON DUPLICATE KEY UPDATE
-- 2. 使用INSERT IGNORE
-- 3. 先查询再插入(在应用层处理)

3.1.2 悲观锁实现

/**
* 悲观锁实现转账功能
* 使用SELECT FOR UPDATE锁定记录
*/
@Service
@Transactional
publicclassTransferService{

 @Autowired
 privateJdbcTemplate jdbcTemplate;

 /**
  * 转账 - 悲观锁实现
  * 关键:按固定顺序获取锁,避免死锁
  */
 publicvoidtransfer(Long fromUserId, Long toUserId, BigDecimal amount){
   // 按user_id排序,确保获取锁的顺序一致
    Long firstUserId = Math.min(fromUserId, toUserId);
    Long secondUserId = Math.max(fromUserId, toUserId);

   try{
     // 按顺序锁定账户
      BigDecimal firstBalance = lockAndGetBalance(firstUserId);
      BigDecimal secondBalance = lockAndGetBalance(secondUserId);

     // 确定转出和转入账户的余额
      BigDecimal fromBalance = fromUserId.equals(firstUserId) ? firstBalance : secondBalance;
      BigDecimal toBalance = fromUserId.equals(firstUserId) ? secondBalance : firstBalance;

     // 检查余额
     if(fromBalance.compareTo(amount) < 0) {
                thrownew RuntimeException("余额不足");
            }

            // 执行转账
            updateBalance(fromUserId, fromBalance.subtract(amount));
            updateBalance(toUserId, toBalance.add(amount));

        } catch (Exception e) {
            // 异常时事务自动回滚
            thrownew RuntimeException("转账失败: " + e.getMessage(), e);
        }
    }

    private BigDecimal lockAndGetBalance(Long userId) {
        // SELECT FOR UPDATE 锁定记录
        String sql = "SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE";
        return jdbcTemplate.queryForObject(sql, BigDecimal.class, userId);
    }

    private void updateBalance(Long userId, BigDecimal newBalance) {
        String sql = "UPDATE accounts SET balance = ? WHERE user_id = ?";
        jdbcTemplate.update(sql, newBalance, userId);
    }
}

3.1.3 乐观锁实现

/**
* 乐观锁实现库存扣减
* 使用版本号或CAS机制
*/
@Service
publicclassInventoryService{

 @Autowired
 privateJdbcTemplate jdbcTemplate;

 /**
  * 扣减库存 - 乐观锁实现
  *@returntrue 成功,false 失败(库存不足或版本冲突)
  */
 publicbooleandecreaseStock(Long productId,intquantity){
   intmaxRetries =3;

   for(inti =0; i < maxRetries; i++) {
            // 查询当前库存和版本号
            String selectSql = "SELECT stock, version FROM inventory WHERE product_id = ?";
            Map result = jdbcTemplate.queryForMap(selectSql, productId);

     intcurrentStock = (Integer) result.get("stock");
     intcurrentVersion = (Integer) result.get("version");

     // 检查库存
     if(currentStock < quantity) {
                returnfalse;  // 库存不足
            }

            // 使用版本号进行CAS更新
            String updateSql = """
                UPDATE inventory
                SET stock = stock - ?, version = version + 1
                WHERE product_id = ? AND version = ?
            """;

            int affected = jdbcTemplate.update(updateSql, quantity, productId, currentVersion);

            if (affected >0) {
       returntrue; // 更新成功
      }

     // 版本冲突,重试
     try{
        Thread.sleep(10+ (long)(Math.random() *50)); // 随机延迟
      }catch(InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }

   returnfalse; // 重试次数用尽
  }

 /**
  * 扣减库存 - 使用行级条件更新(更简洁的乐观锁)
  */
 publicbooleandecreaseStockSimple(Long productId,intquantity){
    String sql ="""
      UPDATE inventory
      SET stock = stock - ?
      WHERE product_id = ? AND stock >= ?
    """;

   intaffected = jdbcTemplate.update(sql, quantity, productId, quantity);
   returnaffected >0;
  }
}

3.1.4 分布式锁实现

/**
* 基于Redis的分布式锁实现
* 解决跨实例的并发问题
*/
@Component
publicclassDistributedLock{

 @Autowired
 privateStringRedisTemplate redisTemplate;

 privatestaticfinallongDEFAULT_EXPIRE_TIME =30000; // 30秒

 /**
  * 获取锁
  *@paramlockKey 锁的key
  *@paramrequestId 请求标识(用于释放锁时验证)
  *@paramexpireTime 过期时间(毫秒)
  */
 publicbooleantryLock(String lockKey, String requestId,longexpireTime){
    Boolean success = redisTemplate.opsForValue().setIfAbsent(
      lockKey,
      requestId,
      expireTime,
      TimeUnit.MILLISECONDS
    );
   returnBoolean.TRUE.equals(success);
  }

 /**
  * 释放锁
  * 使用Lua脚本保证原子性
  */
 publicbooleanunlock(String lockKey, String requestId){
    String script ="""
      if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
      else
        return 0
      end
    """;

    Long result = redisTemplate.execute(
     newDefaultRedisScript<>(script, Long.class),
     Collections.singletonList(lockKey),
     requestId
    );

   returnLong.valueOf(1).equals(result);
  }

 /**
  * 带自动续期的锁
  * 使用watchdog机制
  */
 publicbooleantryLockWithWatchdog(String lockKey, String requestId){
   booleanlocked = tryLock(lockKey, requestId, DEFAULT_EXPIRE_TIME);

   if(locked) {
     // 启动watchdog线程,定期续期
      startWatchdog(lockKey, requestId);
    }

   returnlocked;
  }

 privatevoidstartWatchdog(String lockKey, String requestId){
    Thread watchdog =newThread(() -> {
     while(!Thread.currentThread().isInterrupted()) {
       try{
          Thread.sleep(DEFAULT_EXPIRE_TIME /3); // 每10秒续期一次

         // 续期
          String script ="""
            if redis.call('get', KEYS[1]) == ARGV[1] then
              return redis.call('pexpire', KEYS[1], ARGV[2])
            else
              return 0
            end
          """;

          Long result = redisTemplate.execute(
           newDefaultRedisScript<>(script, Long.class),
           Collections.singletonList(lockKey),
           requestId,
           String.valueOf(DEFAULT_EXPIRE_TIME)
          );

         if(!Long.valueOf(1).equals(result)) {
           break; // 锁已被释放或被其他进程获取
          }
        }catch(InterruptedException e) {
          Thread.currentThread().interrupt();
         break;
        }
      }
    });

    watchdog.setDaemon(true);
    watchdog.start();
  }
}

/**
* 使用分布式锁的示例
*/
@Service
publicclassOrderService{

 @Autowired
 privateDistributedLock distributedLock;

 @Autowired
 privateInventoryService inventoryService;

 /**
  * 创建订单 - 使用分布式锁保证幂等性
  */
 publicOrdercreateOrder(String orderNo, Long productId,intquantity){
    String lockKey ="lock"+ orderNo;
    String requestId = UUID.randomUUID().toString();

   try{
     // 获取分布式锁
     if(!distributedLock.tryLock(lockKey, requestId,30000)) {
       thrownewRuntimeException("获取锁失败,请稍后重试");
      }

     // 检查订单是否已存在(幂等性检查)
      Order existingOrder = orderMapper.findByOrderNo(orderNo);
     if(existingOrder !=null) {
       returnexistingOrder; // 返回已存在的订单
      }

     // 扣减库存
     if(!inventoryService.decreaseStock(productId, quantity)) {
       thrownewRuntimeException("库存不足");
      }

     // 创建订单
      Order order =newOrder();
      order.setOrderNo(orderNo);
      order.setProductId(productId);
      order.setQuantity(quantity);
      orderMapper.insert(order);

     returnorder;

    }finally{
     // 释放锁
      distributedLock.unlock(lockKey, requestId);
    }
  }
}

3.2 实际应用案例

3.2.1 秒杀系统防超卖

/**
* 秒杀系统防超卖方案
*/
@Service
publicclassSeckillService{

 @Autowired
 privateRedisTemplate redisTemplate;

 @Autowired
 privateJdbcTemplate jdbcTemplate;

 /**
  * 方案1:Redis预扣库存 + 异步入库
  */
 publicSeckillResultseckillWithRedis(Long userId, Long productId){
    String stockKey ="seckill"+ productId;
    String orderKey ="seckill"+ productId;

   // 1. 检查是否已购买(防止重复购买)
    Boolean isMember = redisTemplate.opsForSet().isMember(orderKey, userId);
   if(Boolean.TRUE.equals(isMember)) {
     returnSeckillResult.fail("您已参与过此活动");
    }

   // 2. 预扣库存(原子操作)
    Long stock = redisTemplate.opsForValue().decrement(stockKey);
   if(stock ==null|| stock < 0) {
            // 库存不足,恢复
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("商品已售罄");
        }

        try {
            // 3. 记录用户已购买
            redisTemplate.opsForSet().add(orderKey, userId);

            // 4. 发送消息到MQ,异步创建订单
            OrderMessage message = new OrderMessage(userId, productId, 1);
            rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);

            return SeckillResult.success("秒杀成功,订单创建中");

        } catch (Exception e) {
            // 异常时恢复库存
            redisTemplate.opsForValue().increment(stockKey);
            redisTemplate.opsForSet().remove(orderKey, userId);
            return SeckillResult.fail("系统繁忙,请稍后重试");
        }
    }

    /**
     * 方案2:数据库行级锁
     * 适用于库存量大、并发相对较低的场景
     */
    @Transactional
    public SeckillResult seckillWithDbLock(Long userId, Long productId, int quantity) {
        // 1. 查询库存(加锁)
        String selectSql = """
            SELECT stock FROM inventory WHERE product_id = ? FOR UPDATE
        """;
        Integer stock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);

        if (stock == null || stock < quantity) {
            return SeckillResult.fail("库存不足");
        }

        // 2. 扣减库存
        String updateSql = "UPDATE inventory SET stock = stock - ? WHERE product_id = ?";
        jdbcTemplate.update(updateSql, quantity, productId);

        // 3. 创建订单
        String insertSql = """
            INSERT INTO orders (order_no, user_id, product_id, quantity, status)
            VALUES (?, ?, ?, ?, 1)
        """;
        String orderNo = generateOrderNo();
        jdbcTemplate.update(insertSql, orderNo, userId, productId, quantity);

        return SeckillResult.success(orderNo);
    }

    /**
     * 方案3:乐观锁 + 限制重试次数
     */
    public SeckillResult seckillWithOptimisticLock(Long userId, Long productId, int quantity) {
        int maxRetries = 3;

        for (int i = 0; i < maxRetries; i++) {
            // 使用乐观锁扣减库存
            String sql = """
                UPDATE inventory
                SET stock = stock - ?, version = version + 1
                WHERE product_id = ? AND stock >= ?
      """;

     intaffected = jdbcTemplate.update(sql, quantity, productId, quantity);

     if(affected >0) {
       // 扣减成功,创建订单
        String orderNo = createOrder(userId, productId, quantity);
       returnSeckillResult.success(orderNo);
      }

     // 可能是库存不足或版本冲突,检查库存
      Integer stock = jdbcTemplate.queryForObject(
       "SELECT stock FROM inventory WHERE product_id = ?",
        Integer.class,productId
      );

     if(stock ==null|| stock < quantity) {
                return SeckillResult.fail("库存不足");
            }

            // 版本冲突,短暂等待后重试
            try {
                Thread.sleep(10 + (long)(Math.random() * 30));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }

        return SeckillResult.fail("系统繁忙,请稍后重试");
    }
}

3.2.2 死锁自动检测和告警

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL死锁监控和告警脚本
"""

importpymysql
importtime
importjson
importrequests
fromdatetimeimportdatetime


classDeadlockMonitor:
 """死锁监控器"""

 def__init__(self, host, user, password, database, alert_webhook=None):
    self.conn_params = {
     'host': host,
     'user': user,
     'password': password,
     'database': database,
     'charset':'utf8mb4'
    }
    self.alert_webhook = alert_webhook
    self.last_deadlock_info =None

 defget_innodb_status(self):
   """获取InnoDB状态"""
    conn = pymysql.connect(**self.conn_params)
   try:
     withconn.cursor()ascursor:
        cursor.execute("SHOW ENGINE INNODB STATUS")
        result = cursor.fetchone()
       returnresult[2]ifresultelseNone
   finally:
      conn.close()

 defparse_deadlock(self, status):
   """解析死锁信息"""
   ifnotstatus:
     returnNone

    lines = status.split('
')
    in_deadlock_section =False
    deadlock_info = []
    current_section = []

   forlineinlines:
     if'LATEST DETECTED DEADLOCK'inline:
        in_deadlock_section =True
       continue

     ifin_deadlock_section:
       ifline.startswith('---')and'TRANSACTION'notinline:
         ifcurrent_section:
            deadlock_info.append('
'.join(current_section))
            current_section = []
         continue

       if'WE ROLL BACK'inline:
          current_section.append(line)
          deadlock_info.append('
'.join(current_section))
         break

        current_section.append(line)

   return'

'.join(deadlock_info)ifdeadlock_infoelseNone

 defget_lock_waits(self):
   """获取当前锁等待情况"""
    conn = pymysql.connect(**self.conn_params)
   try:
     withconn.cursor(pymysql.cursors.DictCursor)ascursor:
        sql ="""
          SELECT
            r.trx_id AS waiting_trx_id,
            r.trx_mysql_thread_id AS waiting_thread,
            TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) AS wait_seconds,
            r.trx_query AS waiting_query,
            b.trx_id AS blocking_trx_id,
            b.trx_mysql_thread_id AS blocking_thread,
            b.trx_query AS blocking_query
          FROM performance_schema.data_lock_waits w
          INNER JOIN information_schema.innodb_trx b
            ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
          INNER JOIN information_schema.innodb_trx r
            ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
        """
        cursor.execute(sql)
       returncursor.fetchall()
   finally:
      conn.close()

 defalert(self, title, content):
   """发送告警"""
    print(f"[ALERT]{title}")
    print(content)

   ifself.alert_webhook:
     try:
        payload = {
         'msgtype':'markdown',
         'markdown': {
           'title': title,
           'text':f"##{title}

{content}"
          }
        }
        requests.post(self.alert_webhook, json=payload, timeout=5)
     exceptExceptionase:
        print(f"发送告警失败:{e}")

 defcheck_deadlock(self):
   """检查死锁"""
    status = self.get_innodb_status()
    deadlock_info = self.parse_deadlock(status)

   ifdeadlock_infoanddeadlock_info != self.last_deadlock_info:
      self.last_deadlock_info = deadlock_info
      self.alert(
       "MySQL检测到死锁",
       f"**时间**:{datetime.now()}

**详情**:
```
{deadlock_info[:2000]}
```"
      )
     returnTrue
   returnFalse

 defcheck_lock_waits(self, threshold_seconds=30):
   """检查长时间锁等待"""
    lock_waits = self.get_lock_waits()

   forwaitinlock_waits:
     ifwait['wait_seconds']andwait['wait_seconds'] > threshold_seconds:
        self.alert(
         "MySQL锁等待超时",
         f"**等待时间**:{wait['wait_seconds']}秒
"
         f"**等待线程**:{wait['waiting_thread']}
"
         f"**等待SQL**:{wait['waiting_query']}
"
         f"**阻塞线程**:{wait['blocking_thread']}
"
         f"**阻塞SQL**:{wait['blocking_query']}"
        )

 defrun(self, interval=10):
   """运行监控"""
    print(f"死锁监控已启动,检查间隔:{interval}秒")

   whileTrue:
     try:
        self.check_deadlock()
        self.check_lock_waits(threshold_seconds=30)
     exceptExceptionase:
        print(f"监控异常:{e}")

      time.sleep(interval)


if__name__ =='__main__':
  monitor = DeadlockMonitor(
    host='192.168.1.11',
    user='monitor',
    password='password',
    database='lock_demo',
    alert_webhook='https://your-webhook-url.com'
  )
  monitor.run()

3.2.3 事务超时和慢事务监控

-- 查询运行时间超过指定秒数的事务
SELECT
  trx_id,
  trx_mysql_thread_idASthread_id,
  trx_state,
  trx_started,
 TIMESTAMPDIFF(SECOND, trx_started,NOW())ASrunning_seconds,
  trx_rows_locked,
  trx_rows_modified,
  trx_lock_structs,
  trx_query
FROMinformation_schema.innodb_trx
WHERETIMESTAMPDIFF(SECOND, trx_started,NOW()) >60
ORDERBYrunning_secondsDESC;

-- 查询持有锁最多的事务
SELECT
  trx_id,
  trx_mysql_thread_id,
  trx_rows_locked,
  trx_lock_structs,
  trx_tables_locked,
  trx_query
FROMinformation_schema.innodb_trx
ORDERBYtrx_rows_lockedDESC
LIMIT10;

-- 查询锁定行数最多的表
SELECT
  object_schema,
  object_name,
 COUNT(*)aslock_count
FROMperformance_schema.data_locks
WHERElock_type ='RECORD'
GROUPBYobject_schema, object_name
ORDERBYlock_countDESC;

-- 创建慢事务告警存储过程
DELIMITER //

CREATEPROCEDUREcheck_slow_transactions(INthreshold_secondsINT)
BEGIN
 DECLAREdoneINTDEFAULTFALSE;
 DECLAREv_trx_idVARCHAR(100);
 DECLAREv_thread_idBIGINT;
 DECLAREv_running_secondsINT;
 DECLAREv_queryTEXT;

 DECLAREcurCURSORFOR
   SELECT
      trx_id,
      trx_mysql_thread_id,
     TIMESTAMPDIFF(SECOND, trx_started,NOW()),
      trx_query
   FROMinformation_schema.innodb_trx
   WHERETIMESTAMPDIFF(SECOND, trx_started,NOW()) > threshold_seconds;

 DECLARECONTINUEHANDLERFORNOTFOUNDSETdone =TRUE;

 -- 创建告警日志表
 CREATETABLEIFNOTEXISTSslow_transaction_log (
   idBIGINTAUTO_INCREMENT PRIMARYKEY,
    trx_idVARCHAR(100),
    thread_idBIGINT,
    running_secondsINT,
   queryTEXT,
    logged_at DATETIMEDEFAULTCURRENT_TIMESTAMP
  );

  OPEN cur;

  read_loop: LOOP
    FETCH cur INTO v_trx_id, v_thread_id, v_running_seconds, v_query;
    IF done THEN
      LEAVE read_loop;
   ENDIF;

   -- 记录慢事务
   INSERTINTOslow_transaction_log (trx_id, thread_id, running_seconds,query)
   VALUES(v_trx_id, v_thread_id, v_running_seconds, v_query);

 ENDLOOP;

  CLOSE cur;
END//

DELIMITER ;

-- 使用Event定期检查
CREATEEVENTIFNOTEXISTScheck_slow_transactions_event
ONSCHEDULE EVERY1MINUTE
DOCALLcheck_slow_transactions(60);

四、最佳实践和注意事项

4.1 最佳实践

4.1.1 事务设计原则

-- 1. 事务尽量短小
-- 差:大事务
STARTTRANSACTION;
-- 处理100万条记录
UPDATEordersSETstatus=1WHEREcreated_at < '2024-01-01';  -- 锁定大量行
COMMIT;

-- 好:分批处理
DELIMITER //
CREATEPROCEDURE batch_update_orders()
BEGIN
    DECLARE affected_rows INTDEFAULT1;
    DECLARE batch_size INTDEFAULT1000;

    WHILE affected_rows > 0DO
   STARTTRANSACTION;

   UPDATEorders
   SETstatus=1
   WHEREcreated_at < '2024-01-01'ANDstatus = 0
        LIMIT batch_size;

        SET affected_rows = ROW_COUNT();
        COMMIT;

        -- 短暂暂停,避免长时间占用资源
        DOSLEEP(0.1);
    ENDWHILE;
END //
DELIMITER ;

-- 2. 避免在事务中进行耗时操作
-- 差:事务中调用外部接口
STARTTRANSACTION;
INSERTINTO orders (...) VALUES (...);
-- 调用支付接口(可能需要几秒)
-- 长时间持有锁
COMMIT;

-- 好:先准备数据,再开启事务
-- 准备阶段(无事务)
-- 调用支付接口,获取结果

STARTTRANSACTION;
INSERTINTO orders (...) VALUES (...);  -- 快速完成
INSERTINTO payments (...) VALUES (...);
COMMIT;

-- 3. 按固定顺序访问资源
-- 统一按主键升序访问,避免死锁

4.1.2 锁优化策略

-- 1. 尽量使用索引访问数据
-- 差:无索引导致锁表
UPDATEordersSETstatus=1WHEREorder_date ='2024-01-01';
-- 如果order_date没有索引,可能锁定大量行

-- 好:有索引时锁定范围精确
CREATEINDEXidx_order_dateONorders(order_date);
UPDATEordersSETstatus=1WHEREorder_date ='2024-01-01';

-- 2. 减少锁定范围
-- 差:锁定所有匹配的行
SELECT*FROMordersWHEREuser_id =1FORUPDATE;

-- 好:只锁定需要的行
SELECT*FROMordersWHEREuser_id =1ANDstatus=0FORUPDATE;

-- 3. 合理使用锁模式
-- 只读场景使用共享锁
SELECT*FROMordersWHEREid=1LOCKINSHAREMODE;

-- 需要修改时才使用排他锁
SELECT*FROMordersWHEREid=1FORUPDATE;

-- 4. 避免锁升级
-- 差:从共享锁升级到排他锁可能导致死锁
SELECT*FROMordersWHEREid=1LOCKINSHAREMODE;
-- 后续需要更新...
UPDATEordersSETstatus=1WHEREid=1; -- 可能死锁

-- 好:直接使用排他锁
SELECT*FROMordersWHEREid=1FORUPDATE;
UPDATEordersSETstatus=1WHEREid=1;

4.1.3 隔离级别选择

-- 不同场景的隔离级别推荐

-- 1. 高并发读写场景:READ COMMITTED
-- 优点:锁范围小,不使用间隙锁
-- 缺点:可能出现不可重复读
SETSESSIONTRANSACTIONISOLATIONLEVELREADCOMMITTED;

-- 2. 金融交易场景:REPEATABLE READ(默认)
-- 优点:一致性读,防止幻读
-- 缺点:间隙锁可能导致更多死锁
SETSESSIONTRANSACTIONISOLATIONLEVELREPEATABLEREAD;

-- 3. 报表查询场景:使用一致性快照
STARTTRANSACTIONWITHCONSISTENTSNAPSHOT;
SELECT*FROMordersWHERE...;
-- 读取的是事务开始时的快照,不会被其他事务影响
COMMIT;

-- 4. 批量导入场景:可以临时使用READ UNCOMMITTED
SETSESSIONTRANSACTIONISOLATIONLEVELREADUNCOMMITTED;
-- 导入完成后恢复
SETSESSIONTRANSACTIONISOLATIONLEVELREPEATABLEREAD;

4.2 注意事项

4.2.1 配置注意

配置项 建议值 说明
innodb_lock_wait_timeout 10-50 锁等待超时,根据业务调整
innodb_deadlock_detect ON 开启死锁检测
innodb_print_all_deadlocks ON 记录所有死锁到错误日志
transaction_isolation READ-COMMITTED / REPEATABLE-READ 根据场景选择
innodb_rollback_on_timeout OFF 超时时只回滚当前语句,不回滚整个事务
autocommit ON 默认开启自动提交

4.2.2 常见错误

错误类型 错误信息 原因分析 解决方案
死锁 Deadlock found 循环等待 固定访问顺序
锁超时 Lock wait timeout exceeded 持锁时间过长 减小事务,增加超时
事务太大 Transaction too large 修改行数过多 分批处理
表锁 Table lock wait 无索引导致表锁 添加适当索引
间隙锁冲突 Conflict on gap lock 并发插入同一间隙 降低隔离级别

4.2.3 死锁预防清单

开发阶段:
-固定访问顺序:多表操作按表名或主键排序
-减小事务范围:只在必要时开启事务
-使用低隔离级别:非必要不用REPEATABLEREAD
-添加必要索引:避免全表扫描锁定

部署阶段:
-开启死锁检测:innodb_deadlock_detect=ON
-设置合理超时:innodb_lock_wait_timeout=10
-记录死锁日志:innodb_print_all_deadlocks=ON
-配置监控告警:死锁次数、锁等待时间

运维阶段:
-定期分析死锁:查看SHOWENGINEINNODBSTATUS
-监控长事务:超过60秒的事务告警
-监控锁等待:等待超过10秒告警
-分析慢查询:优化持锁时间长的SQL

五、故障排查和监控

5.1 故障排查

5.1.1 死锁分析

-- 查看最近的死锁信息
SHOWENGINEINNODBSTATUSG

-- 输出中的关键部分:
-- LATEST DETECTED DEADLOCK
-- ------------------------
-- 2024-01-01 1000 0x7f...
-- *** (1) TRANSACTION:
-- TRANSACTION 12345, ACTIVE 1 sec starting index read
-- mysql tables in use 1, locked 1
-- LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s)
-- MySQL thread id 100, OS thread handle 123, query id 456 192.168.1.10 app_user updating
-- UPDATE accounts SET balance = balance - 100 WHERE user_id = 1
--
-- *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
-- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts`
-- trx id 12345 lock_mode X locks rec but not gap waiting
--
-- *** (2) TRANSACTION:
-- TRANSACTION 12346, ACTIVE 1 sec starting index read
-- ...
--
-- *** (2) HOLDS THE LOCK(S):
-- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts`
-- ...
--
-- *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
-- ...
--
-- *** WE ROLL BACK TRANSACTION (1)

-- 分析步骤:
-- 1. 找到两个事务的SQL
-- 2. 分析锁等待关系
-- 3. 确定死锁原因
-- 4. 制定解决方案

死锁日志解读

# 从错误日志中提取死锁信息
grep -A 100"LATEST DETECTED DEADLOCK"/var/log/mysql/error.log | head -100

# 使用pt-deadlock-logger记录死锁
pt-deadlock-logger --host=localhost --user=root --password=xxx 
  --dest h=localhost,D=monitor,t=deadlocks 
  --run-time=1h

5.1.2 锁等待分析

-- 查看当前锁等待
SELECT
  waiting.trx_idASwaiting_trx_id,
  waiting.trx_mysql_thread_idASwaiting_thread,
  waiting.trx_queryASwaiting_query,
 TIMESTAMPDIFF(SECOND, waiting.trx_wait_started,NOW())ASwaiting_seconds,
  blocking.trx_idASblocking_trx_id,
  blocking.trx_mysql_thread_idASblocking_thread,
  blocking.trx_queryASblocking_query,
 TIMESTAMPDIFF(SECOND, blocking.trx_started,NOW())ASblocking_duration
FROMinformation_schema.innodb_trx waiting
INNERJOINperformance_schema.data_lock_waits dlw
 ONwaiting.trx_id = dlw.REQUESTING_ENGINE_TRANSACTION_ID
INNERJOINinformation_schema.innodb_trx blocking
 ONblocking.trx_id = dlw.BLOCKING_ENGINE_TRANSACTION_ID;

-- 查看锁的详细信息
SELECT
  dl.ENGINE_LOCK_ID,
  dl.ENGINE_TRANSACTION_ID,
  dl.OBJECT_SCHEMA,
  dl.OBJECT_NAME,
  dl.INDEX_NAME,
  dl.LOCK_TYPE,
  dl.LOCK_MODE,
  dl.LOCK_STATUS,
  dl.LOCK_DATA
FROMperformance_schema.data_locks dl;

-- 终止阻塞事务(谨慎使用)
-- 先确认阻塞线程ID
KILL12345;

5.1.3 长事务分析

-- 查找运行时间最长的事务
SELECT
  trx_id,
  trx_mysql_thread_idASthread_id,
  trx_state,
  trx_started,
 NOW() - trx_startedASrunning_time,
  trx_rows_locked,
  trx_rows_modified,
  trx_tables_in_use,
  trx_tables_locked,
  trx_query
FROMinformation_schema.innodb_trx
ORDERBYtrx_startedASC;

-- 查看事务对应的连接信息
SELECT
  t.trx_id,
  t.trx_mysql_thread_id,
  p.user,
  p.host,
  p.db,
  p.command,
  p.time,
  p.state,
  t.trx_query
FROMinformation_schema.innodb_trx t
INNERJOINinformation_schema.processlist p
 ONt.trx_mysql_thread_id = p.id;

-- 查看事务的undo日志量(判断回滚代价)
SELECT
  trx_id,
  trx_undo_record_size,
  trx_undo_record_size /1024/1024ASundo_mb
FROMinformation_schema.innodb_trx
WHEREtrx_undo_record_size >0;

5.2 性能监控

5.2.1 关键指标

-- InnoDB锁相关指标
SHOWGLOBALSTATUSLIKE'Innodb_row_lock%';
-- Innodb_row_lock_current_waits: 当前等待锁的数量
-- Innodb_row_lock_time: 总锁等待时间(毫秒)
-- Innodb_row_lock_time_avg: 平均锁等待时间
-- Innodb_row_lock_time_max: 最大锁等待时间
-- Innodb_row_lock_waits: 总锁等待次数

-- 死锁次数
SHOWGLOBALSTATUSLIKE'Innodb_deadlocks';

-- 锁内存使用
SHOWGLOBALSTATUSLIKE'Innodb_row_lock_memory';

-- 计算锁争用率
SELECT
  (SELECTVARIABLE_VALUEFROMperformance_schema.global_statusWHEREVARIABLE_NAME ='Innodb_row_lock_waits') /
  (SELECTVARIABLE_VALUEFROMperformance_schema.global_statusWHEREVARIABLE_NAME ='Questions') *100
ASlock_contention_percent;

5.2.2 监控指标表

指标类别 指标名称 含义 告警阈值
死锁 Innodb_deadlocks 死锁累计次数 增长率 > 1/min
锁等待 Innodb_row_lock_waits 锁等待累计次数 增长率 > 10/sec
锁时间 Innodb_row_lock_time_avg 平均锁等待时间(ms) > 1000
当前等待 Innodb_row_lock_current_waits 当前等待锁数量 > 10
长事务 运行超过60秒的事务 长事务数量 > 0
锁表 Tables_locks_waited 表锁等待次数 > 0

5.2.3 Prometheus告警规则

groups:
-name:mysql-lock-alerts
 rules:
  -alert:MySQLDeadlocks
   expr:increase(mysql_global_status_innodb_deadlocks[5m])>0
   for:1m
   labels:
    severity:warning
   annotations:
    summary:"MySQL发生死锁"
    description:"{{ $labels.instance }}在过去5分钟内发生{{ $value }}次死锁"

  -alert:MySQLHighLockWaits
   expr:rate(mysql_global_status_innodb_row_lock_waits[5m])>10
   for:5m
   labels:
    severity:warning
   annotations:
    summary:"MySQL锁等待频繁"
    description:"{{ $labels.instance }}锁等待率为{{ $value }}/秒"

  -alert:MySQLLongLockWait
   expr:mysql_global_status_innodb_row_lock_time_avg>1000
   for:5m
   labels:
    severity:warning
   annotations:
    summary:"MySQL锁等待时间过长"
    description:"{{ $labels.instance }}平均锁等待时间{{ $value }}ms"

  -alert:MySQLLongTransaction
   expr:mysql_info_schema_innodb_trx_running_seconds>60
   for:1m
   labels:
    severity:critical
   annotations:
    summary:"MySQL存在长事务"
    description:"{{ $labels.instance }}存在运行超过60秒的事务"

5.3 备份与恢复

5.3.1 事务日志备份

#!/bin/bash
# MySQL binlog备份脚本

BACKUP_DIR="/data/backup/binlog"
MYSQL_USER="backup"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
RETENTION_DAYS=7

mkdir -p$BACKUP_DIR

# 获取当前binlog文件列表
mysql -u$MYSQL_USER-p$MYSQL_PASS-h$MYSQL_HOST-e"SHOW BINARY LOGS;"| 
  tail -n +2 | awk'{print $1}'|whilereadbinlog;do

 # 复制binlog到备份目录
 if[ ! -f"$BACKUP_DIR/$binlog"];then
    mysqlbinlog -u$MYSQL_USER-p$MYSQL_PASS-h$MYSQL_HOST
      --read-from-remote-server$binlog>$BACKUP_DIR/$binlog.sql
    gzip$BACKUP_DIR/$binlog.sql
   echo"备份$binlog完成"
 fi
done

# 清理过期备份
find$BACKUP_DIR-name"*.sql.gz"-mtime +$RETENTION_DAYS-delete

echo"Binlog备份完成"

5.3.2 死锁恢复流程

-- 死锁后的恢复步骤

-- 1. 确认事务状态
SELECT
  trx_id,
  trx_state,
  trx_started,
  trx_mysql_thread_id,
  trx_query
FROMinformation_schema.innodb_trx;

-- 2. 如果事务被回滚,应用程序需要重试
-- 检查应用程序的重试逻辑

-- 3. 如果需要手动回滚
ROLLBACK;

-- 4. 检查数据一致性
-- 根据业务逻辑验证数据

-- 5. 分析死锁原因
SHOWENGINEINNODBSTATUSG
-- 找到LATEST DETECTED DEADLOCK部分

-- 6. 记录和上报
-- 将死锁信息记录到监控系统

六、总结

6.1 技术要点回顾

MySQL事务与锁机制的核心要点:

1. ACID特性

原子性:通过undo log实现

一致性:通过约束和应用逻辑保证

隔离性:通过锁和MVCC实现

持久性:通过redo log保证

2. 锁类型

行锁:记录锁、间隙锁、临键锁

表锁:意向锁、MDL锁

锁模式:共享锁、排他锁

3. 死锁处理

预防:固定访问顺序、减小事务

检测:innodb_deadlock_detect

恢复:自动回滚一个事务

4. 最佳实践

事务尽量短小

按固定顺序访问资源

合理选择隔离级别

使用合适的锁策略

6.2 进阶学习方向

方向 内容 推荐资源
MVCC原理 版本链、ReadView机制 《MySQL技术内幕:InnoDB存储引擎》
算法 B+树锁定协议 MySQL源码
分布式事务 XA、TCC、SAGA Seata框架文档
死锁检测算法 等待图、超时检测 数据库系统概论
性能调优 锁粒度优化 Percona博客

6.3 参考资料

MySQL官方文档:https://dev.mysql.com/doc/

《MySQL技术内幕:InnoDB存储引擎》第2版

《高性能MySQL》第4版

Percona Blog:https://www.percona.com/blog/

附录

A. 命令速查表

命令 说明 示例
START TRANSACTION 开始事务 START TRANSACTION;
COMMIT 提交事务 COMMIT;
ROLLBACK 回滚事务 ROLLBACK;
SELECT ... FOR UPDATE 排他锁查询 SELECT * FROM t WHERE id=1 FOR UPDATE;
SELECT ... FOR SHARE 共享锁查询 SELECT * FROM t WHERE id=1 FOR SHARE;
SHOW ENGINE INNODB STATUS 查看InnoDB状态 SHOW ENGINE INNODB STATUSG
KILL 终止连接 KILL 12345;

B. 配置参数详解

参数 默认值 说明 建议
transaction_isolation REPEATABLE-READ 默认隔离级别 根据场景选择
innodb_lock_wait_timeout 50 锁等待超时(秒) 10-30
innodb_deadlock_detect ON 死锁检测开关 ON
innodb_print_all_deadlocks OFF 记录所有死锁 ON
innodb_rollback_on_timeout OFF 超时回滚整个事务 OFF
autocommit ON 自动提交 ON

C. 术语表

术语 英文 说明
脏读 Dirty Read 读取未提交的数据
不可重复读 Non-Repeatable Read 同一事务两次读取结果不同
幻读 Phantom Read 查询结果集行数变化
死锁 Deadlock 循环等待锁
间隙锁 Gap Lock 锁定索引间隙
临键锁 Next-Key Lock 记录锁+间隙锁
MVCC Multi-Version Concurrency Control 多版本并发控制
Undo Log - 回滚日志
Redo Log - 重做日志
两阶段锁 Two-Phase Locking 加锁和解锁分两阶段

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 互联网
    +关注

    关注

    55

    文章

    11357

    浏览量

    110715
  • 数据库
    +关注

    关注

    7

    文章

    4092

    浏览量

    68676
  • MySQL
    +关注

    关注

    1

    文章

    938

    浏览量

    29851

原文标题:MySQL事务与锁机制详解:ACID原理与死锁排查

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    基于MySQL机制

    在数据库系统中,为了保证数据的一致性和并发控制,机制发挥着至关重要的作用。尤其在关系型数据库MySQL中,其独特的机制设计更是赢得了许多
    的头像 发表于 09-30 11:16 1650次阅读

    详解Mysql数据库InnoDB存储引擎事务

    关于Mysql数据库InnoDB存储引擎事务的一点理解
    发表于 05-13 10:11

    MySQL的索引、事务、视图介绍

    MySQL--索引、事务、视图
    发表于 06-15 07:05

    MySQL死锁原因排查技巧详解

    在查询相关资料和咨询jameszhou后,知道了这个实际和innodb 引擎的写机制有关,innodb执行写事务操作时,实际是先取得索引中该行的行(即使该表上没有任何索引,那么innodb会在后台创建一个隐藏的聚集主键索引),
    发表于 10-19 16:38 5239次阅读
    <b class='flag-5'>MySQL</b>死锁原因排查技巧<b class='flag-5'>详解</b>

    事务深度遍历过程详解

    )。 在一个事务中定义USER标签的name作为模式可索引的属性,然后使用分离的事务实际设置一个真实用户的值: 许多数据库管理系统使用机制来管理对同一个数据库的同时访问。Neo4j
    发表于 12-11 12:34 1470次阅读

    Oracle核心技术之事务

    是Oracle数据库引擎用来同步多个用户,同时对同一个数据块访问的一种机制可以消除多用户操作同一个资源产生的隐患。本章重点讨论有关事务的概念。
    发表于 03-26 10:24 3次下载

    MySQL事务的四大隔离级别详解

    之前分析一个死锁问题,发现自己对数据库隔离级别理解还不够深入,所以趁着这几天假期,整理一下MySQL事务的四大隔离级别相关知识,希望对大家有帮助~ 事务 什么是事务
    的头像 发表于 11-27 16:07 3269次阅读

    MySQL中的高级内容详解

    MySQL 进阶!!! 本文思维导图如下。 事务控制和锁定语句 我们知道,MyISAM 和 MEMORY 存储引擎支持表级锁定(table-level locking),InnoDB 存储引擎支持行级锁定
    的头像 发表于 03-11 16:55 2938次阅读
    <b class='flag-5'>MySQL</b>中的高级内容<b class='flag-5'>详解</b>

    关于Mysql的20道问题详解

    1.什么Mysql事务事务的四大特性?事务带来的什么问题? Mysql事务的隔离级别分为四
    的头像 发表于 10-26 09:56 1977次阅读
    关于<b class='flag-5'>Mysql</b>的20道问题<b class='flag-5'>详解</b>

    数据库的机制真正的原理

    MySQL数据库中,为了解决并发问题,引入了很多的机制,很多时候,数据库的是在有数据库操作的过程中自动添加的。所以,这就导致很多程序员经常会忽略数据库的
    的头像 发表于 11-12 09:33 3081次阅读

    MySQL事务隔离级别要实际解决的问题

    MySQL 是支持多事务并发执行的。否则来一个事务处理一个请求,处理一个人请求的时候,其它事务都等着,那估计都没人敢用MySQL作为数据库,
    的头像 发表于 11-17 17:00 3482次阅读
    <b class='flag-5'>MySQL</b><b class='flag-5'>事务</b>隔离级别要实际解决的问题

    一文彻底搞懂MySQL究竟的啥1

    MySQL系列文章已经鸽了挺久了,最近赶紧挤了挤时间,和大家聊一聊MySQL。 只要学计算机,「``」永远是一个绕不过的话题。
    的头像 发表于 03-03 10:12 1162次阅读
    一文彻底搞懂<b class='flag-5'>MySQL</b><b class='flag-5'>锁</b>究竟<b class='flag-5'>锁</b>的啥1

    一文彻底搞懂MySQL究竟的啥2

    MySQL系列文章已经鸽了挺久了,最近赶紧挤了挤时间,和大家聊一聊MySQL。 只要学计算机,「``」永远是一个绕不过的话题。
    的头像 发表于 03-03 10:13 1109次阅读
    一文彻底搞懂<b class='flag-5'>MySQL</b><b class='flag-5'>锁</b>究竟<b class='flag-5'>锁</b>的啥2

    MYSQL事务的底层原理详解

    事务的实现机制上,MySQL 采用的是 WAL:Write-ahead logging,预写式日志,机制来实现的。
    的头像 发表于 11-15 10:10 1348次阅读
    <b class='flag-5'>MYSQL</b><b class='flag-5'>事务</b>的底层原理<b class='flag-5'>详解</b>

    阿里二面:了解MySQL事务底层原理吗

    MySQL 是如何来解决脏写这种问题的?没错,就是MySQL 在开启一个事务的时候,他会将某条记录和事务做一个绑定。这个其实和 JV
    的头像 发表于 01-18 16:34 971次阅读
    阿里二面:了解<b class='flag-5'>MySQL</b><b class='flag-5'>事务</b>底层原理吗