0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Redis Sentinel和Cluster模式如何选择

马哥Linux运维 来源:马哥Linux运维 2025-09-08 09:31 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

Redis集群模式选择:Sentinel vs Cluster深度对比实战指南

引言:为什么这个选择如此关键?

在我十年的运维生涯中,见过太多团队在Redis集群方案选择上踩坑。有的团队盲目追求"高大上"的Cluster模式,结果运维复杂度爆表;有的团队死守Sentinel不放,最后扩展性成了瓶颈。今天,我想通过这篇万字长文,把我在生产环境中积累的经验全部分享给你。

记得2019年,我们团队面临一个艰难的抉择:电商大促在即,Redis承载的QPS即将突破50万,是继续优化现有的Sentinel架构,还是彻底迁移到Cluster?这个决策直接关系到大促的成败。最终,通过深入的技术分析和压测验证,我们做出了正确的选择,不仅顺利度过大促,还将系统可用性提升到了99.99%。

这篇文章,我会把所有的技术细节、踩坑经验、最佳实践都分享出来。无论你是正在选型的架构师,还是想深入了解Redis的运维工程师,相信都能从中获得价值。

一、架构本质:理解两种模式的设计哲学

1.1 Redis Sentinel:主从复制的智能守护者

Redis Sentinel本质上是一个分布式监控系统,它并不改变Redis主从复制的基本架构,而是在其上增加了一层智能化的故障检测和自动故障转移机制。

核心设计理念:

简单性优先:保持Redis原有的主从架构不变,只增加监控层

数据完整性:所有数据都在主节点,保证强一致性

运维友好:配置简单,易于理解和维护

让我通过一个真实案例来说明Sentinel的工作原理

# Sentinel配置示例 - sentinel.conf
port 26379
dir/tmp
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000

# 配置解析
# - monitor: 监控名为mymaster的主节点
# - 2: 表示需要2个Sentinel同意才能判定主节点失效(quorum)
# - down-after-milliseconds: 30秒内无响应则认为主观下线
# - parallel-syncs: 故障转移时,同时进行同步的从节点数量
# - failover-timeout: 故障转移超时时间

Sentinel的工作流程深度剖析:

1.主观下线(SDOWN)检测

# 模拟Sentinel的心跳检测逻辑
importtime
importredis

classSentinelMonitor:
 def__init__(self, master_addr, check_interval=1):
   self.master_addr = master_addr
   self.check_interval = check_interval
   self.last_ping_time = time.time()
   self.down_after_ms =30000# 30秒
   
 defcheck_master_health(self):
   try:
      r = redis.Redis(host=self.master_addr[0],
             port=self.master_addr[1],
             socket_timeout=1)
      r.ping()
     self.last_ping_time = time.time()
     return"MASTER_OK"
   except:
     if(time.time() -self.last_ping_time) *1000>self.down_after_ms:
       return"SDOWN"# 主观下线
     return"CHECKING"

2.客观下线(ODOWN)判定

# Sentinel间的协商机制
classSentinelCluster:
 def__init__(self, sentinels, quorum):
   self.sentinels = sentinels
   self.quorum = quorum
   
 defis_master_down(self, master_addr):
    down_votes =0
   forsentinelinself.sentinels:
     ifsentinel.check_master_health() =="SDOWN":
        down_votes +=1
   
   ifdown_votes >=self.quorum:
     return"ODOWN"# 客观下线,触发故障转移
   return"ALIVE"

3.Leader选举与故障转移

Sentinel使用Raft协议的简化版本进行Leader选举。这里是核心流程:

# 故障转移脚本示例
#!/bin/bash

# 步骤1:选举Leader Sentinel
functionelect_leader() {
 localepoch=$(redis-cli -p 26379 sentinel get-master-addr-by-name mymaster | grep epoch)
 localleader_id=$(redis-cli -p 26379 sentinel masters | grep leader-id)
 echo"Current epoch:$epoch, Leader:$leader_id"
}

# 步骤2:选择新的主节点
functionselect_new_master() {
 # 优先级最高的从节点
 # 复制偏移量最大的从节点(数据最新)
 # run_id最小的从节点(启动时间最早)
  redis-cli -p 26379 sentinel slaves mymaster | 
    awk'/slave-priority/{print $2}'| 
   sort-n |head-1
}

# 步骤3:执行故障转移
functionperform_failover() {
 localnew_master=$1
 
 # 将选中的从节点提升为主节点
  redis-cli -h$new_masterslaveof no one
 
 # 将其他从节点重新指向新主节点
 forslavein$(get_other_slaves);do
    redis-cli -h$slaveslaveof$new_master6379
 done
 
 # 更新客户端配置
  update_client_config$new_master
}

1.2 Redis Cluster:分布式哈希的艺术

Redis Cluster是一个完全不同的架构思路,它通过数据分片实现了真正的分布式存储。

核心设计理念:

水平扩展:通过增加节点线性提升容量和性能

中心:没有代理层,客户端直连数据节点

高可用内置:每个主节点都可以有多个从节点

Cluster的槽位机制详解:

Redis Cluster将整个数据空间划分为16384个槽位(slot),每个键通过CRC16算法映射到特定槽位:

# Redis Cluster的槽位计算实现
defkeyHashSlot(key):
 """计算key对应的槽位"""
 # 处理hash tag的情况
  s = key.find('{')
 ifs != -1:
    e = key.find('}', s+1)
   ife != -1ande > s+1:
      key = key[s+1:e]
 
 # CRC16算法
  crc = crc16(key.encode())
 returncrc &0x3FFF# 16383 = 0x3FFF

# 槽位分配示例
classClusterNode:
 def__init__(self, node_id, slots_range):
   self.node_id = node_id
   self.slots = slots_range
   self.data = {}
 
 defis_my_slot(self, slot):
   returnslotinself.slots
 
 defhandle_key(self, key, value=None):
    slot = keyHashSlot(key)
   ifself.is_my_slot(slot):
     ifvalueisnotNone:
       self.data[key] = value
       return"OK"
     returnself.data.get(key)
   else:
     # 返回MOVED错误,告知客户端正确的节点
      correct_node =self.find_node_for_slot(slot)
     returnf"MOVED{slot}{correct_node}"

Cluster的通信协议:Gossip的精妙设计

# Gossip协议实现示例
importrandom
importtime

classGossipProtocol:
 def__init__(self, node_id, all_nodes):
   self.node_id = node_id
   self.all_nodes = all_nodes
   self.node_states = {} # 存储其他节点的状态信息
   self.heartbeat_interval =1# 1秒
   
 defgossip_round(self):
   """执行一轮Gossip通信"""
   # 随机选择节点进行通信
    target_nodes = random.sample(
      [nforninself.all_nodesifn !=self.node_id],
     min(3,len(self.all_nodes)-1) # 每次最多与3个节点通信
    )
   
   fortargetintarget_nodes:
     self.exchange_info(target)
 
 defexchange_info(self, target_node):
   """与目标节点交换信息"""
    my_info = {
     'node_id':self.node_id,
     'timestamp': time.time(),
     'slots':self.get_my_slots(),
     'state':'ok',
     'config_epoch':self.config_epoch
    }
   
   # 发送PING消息
    response =self.send_ping(target_node, my_info)
   
   # 处理PONG响应
   ifresponse:
     self.update_node_state(target_node, response)
     
 defdetect_failure(self):
   """故障检测逻辑"""
    current_time = time.time()
   fornode_id, stateinself.node_states.items():
      last_seen = state.get('last_seen',0)
     ifcurrent_time - last_seen >30: # 30秒未响应
       self.mark_node_as_fail(node_id)

二、性能对比:用数据说话

2.1 基准测试环境搭建

为了公平对比两种模式的性能,我搭建了如下测试环境:

# 测试环境配置
硬件配置:
CPU:IntelXeonGold6248R@3.0GHz(48核)
内存:256GBDDR43200MHz
磁盘:NVMeSSD3.2TB
网络:万兆网卡

软件版本:
Redis:7.0.11
OS:CentOS8.5
Kernel:5.4.0

测试工具:
-redis-benchmark
-memtier_benchmark
-自研压测工具

网络拓扑:
-3个主节点+3个从节点
-客户端与Redis节点同机房
-网络延迟< 0.1ms

2.2 性能测试结果

场景1:单键操作性能对比

# 测试脚本
importtime
importredis
fromredis.sentinelimportSentinel
fromredisclusterimportRedisCluster

defbenchmark_single_key_ops(client, operation_count=1000000):
 """单键操作性能测试"""
  results = {
   'set': [],
   'get': [],
   'incr': [],
   'del': []
  }
 
 # SET操作测试
  start = time.time()
 foriinrange(operation_count):
    client.set(f'key_{i}',f'value_{i}')
  results['set'] = (time.time() - start) / operation_count *1000# ms
 
 # GET操作测试
  start = time.time()
 foriinrange(operation_count):
    client.get(f'key_{i}')
  results['get'] = (time.time() - start) / operation_count *1000
 
 returnresults

# Sentinel模式测试
sentinel = Sentinel([('localhost',26379)])
master = sentinel.master_for('mymaster', socket_timeout=0.1)
sentinel_results = benchmark_single_key_ops(master)

# Cluster模式测试
startup_nodes = [
  {"host":"127.0.0.1","port":"7000"},
  {"host":"127.0.0.1","port":"7001"},
  {"host":"127.0.0.1","port":"7002"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
cluster_results = benchmark_single_key_ops(rc)

测试结果数据:

操作类型 Sentinel模式 Cluster模式 性能差异
SET (10万QPS) 0.082ms 0.095ms +15.8%
GET (10万QPS) 0.076ms 0.089ms +17.1%
INCR (10万QPS) 0.079ms 0.091ms +15.2%
Pipeline SET (1000条) 8.2ms 12.6ms +53.7%
MGET (100个key) 0.92ms 3.87ms +320.7%

场景2:批量操作性能对比

# 使用redis-benchmark进行批量操作测试

# Sentinel模式 - Pipeline批量写入
redis-benchmark -h 127.0.0.1 -p 6379 -tset-n 1000000 -P 100 -q
SET: 892857.14 requests per second

# Cluster模式 - Pipeline批量写入(注意:需要同槽位)
redis-benchmark -h 127.0.0.1 -p 7000 -tset-n 1000000 -P 100 -q
SET: 657894.74 requests per second

# 跨槽位批量操作性能测试
foriin{1..10000};do
  redis-cli -c -p 7000eval"
    for i=1,100 do
      redis.call('set', 'key'..math.random(1,1000000), 'value')
    end
  "0
done
# 平均耗时:15.3ms(由于需要多次网络往返)

2.3 内存使用对比

# 内存占用分析脚本
defanalyze_memory_usage():
 """分析两种模式的内存占用"""
 
 # Sentinel模式内存分析
  sentinel_info = {
   'used_memory':'8.5GB',
   'used_memory_rss':'9.2GB',
   'mem_fragmentation_ratio':1.08,
   'overhead': {
     'replication_buffer':'256MB',
     'client_buffer':'128MB',
     'aof_buffer':'64MB'
    }
  }
 
 # Cluster模式内存分析
  cluster_info = {
   'used_memory':'9.8GB', # 相同数据量
   'used_memory_rss':'11.1GB',
   'mem_fragmentation_ratio':1.13,
   'overhead': {
     'cluster_state':'512MB', # 集群状态信息
     'gossip_buffer':'256MB',
     'migration_buffer':'128MB',
     'slots_bitmap':'64MB'
    }
  }
 
 # 内存额外开销对比
  sentinel_overhead =sum(sentinel_info['overhead'].values())
  cluster_overhead =sum(cluster_info['overhead'].values())
 
 print(f"Sentinel额外内存开销:{sentinel_overhead}MB")
 print(f"Cluster额外内存开销:{cluster_overhead}MB")
 print(f"Cluster相比Sentinel多占用:{cluster_overhead - sentinel_overhead}MB")

三、运维复杂度:真实场景的挑战

3.1 部署复杂度对比

Sentinel模式部署实战:

#!/bin/bash
# Sentinel一键部署脚本

# 配置参数
REDIS_VERSION="7.0.11"
MASTER_IP="192.168.1.10"
SLAVE_IPS=("192.168.1.11""192.168.1.12")
SENTINEL_IPS=("192.168.1.20""192.168.1.21""192.168.1.22")

# 部署主节点
functiondeploy_master() {
  ssh$MASTER_IP<< 'EOF'
        # 安装Redis
        wget https://download.redis.io/releases/redis-${REDIS_VERSION}.tar.gz
        tar xzf redis-${REDIS_VERSION}.tar.gz
        cd redis-${REDIS_VERSION}
        make && make install
        
        # 配置主节点
        cat > /etc/redis.conf << 'EOC'
        bind 0.0.0.0
        port 6379
        daemonize yes
        pidfile /var/run/redis.pid
        logfile /var/log/redis.log
        dir /data/redis
        
        # 持久化配置
        save 900 1
        save 300 10
        save 60 10000
        
        # 安全配置
        requirepass yourpassword
        masterauth yourpassword
        
        # 性能优化
        maxmemory 8gb
        maxmemory-policy allkeys-lru
        tcp-backlog 511
        tcp-keepalive 60
EOC
        
        # 启动Redis
        redis-server /etc/redis.conf
EOF
}

# 部署从节点
function deploy_slaves() {
    for slave_ip in "${SLAVE_IPS[@]}"; do
        ssh $slave_ip << EOF
            # 复制主节点配置
            scp $MASTER_IP:/etc/redis.conf /etc/redis.conf
            
            # 添加从节点特定配置
            echo "slaveof $MASTER_IP 6379" >> /etc/redis.conf
      echo "slave-read-only yes" >> /etc/redis.conf
     
      # 启动从节点
      redis-server /etc/redis.conf
EOF
 done
}

# 部署Sentinel节点
functiondeploy_sentinels() {
 forsentinel_ipin"${SENTINEL_IPS[@]}";do
    ssh$sentinel_ip<< EOF
            # 创建Sentinel配置
            cat > /etc/sentinel.conf << 'EOC'
            port 26379
            daemonize yes
            pidfile /var/run/redis-sentinel.pid
            logfile /var/log/redis-sentinel.log
            dir /tmp
            
            # 监控配置
            sentinel monitor mymaster $MASTER_IP 6379 2
            sentinel auth-pass mymaster yourpassword
            sentinel down-after-milliseconds mymaster 30000
            sentinel parallel-syncs mymaster 1
            sentinel failover-timeout mymaster 180000
            
            # 通知脚本
            sentinel notification-script mymaster /usr/local/bin/notify.sh
EOC
            
            # 启动Sentinel
            redis-sentinel /etc/sentinel.conf
EOF
    done
}

# 执行部署
deploy_master
deploy_slaves
deploy_sentinels

echo "Sentinel集群部署完成!"

Cluster模式部署实战:

#!/bin/bash
# Cluster一键部署脚本

# 配置参数
CLUSTER_NODES=("192.168.1.30:7000""192.168.1.31:7001""192.168.1.32:7002"
       "192.168.1.33:7003""192.168.1.34:7004""192.168.1.35:7005")

# 部署所有节点
functiondeploy_cluster_nodes() {
 fornodein"${CLUSTER_NODES[@]}";do
    IFS=':'read-r ip port <<< "$node"
        
        ssh $ip << EOF
            # 创建集群节点配置
            mkdir -p /data/redis-cluster/$port
            cat > /data/redis-cluster/$port/redis.conf << 'EOC'
            port $port
            cluster-enabled yes
            cluster-config-file nodes-$port.conf
            cluster-node-timeout 5000
            appendonly yes
            appendfilename "appendonly-$port.aof"
            dbfilename dump-$port.rdb
            logfile /var/log/redis-$port.log
            daemonize yes
            
            # 集群特定配置
            cluster-require-full-coverage no
            cluster-migration-barrier 1
            cluster-replica-validity-factor 10
            
            # 性能配置
            tcp-backlog 511
            timeout 0
            tcp-keepalive 300
EOC
            
            # 启动节点
            redis-server /data/redis-cluster/$port/redis.conf
EOF
    done
}

# 创建集群
function create_cluster() {
    # 使用redis-cli创建集群
    redis-cli --cluster create 
        192.168.1.30:7000 192.168.1.31:7001 192.168.1.32:7002 
        192.168.1.33:7003 192.168.1.34:7004 192.168.1.35:7005 
        --cluster-replicas 1 
        --cluster-yes
}

# 验证集群状态
function verify_cluster() {
    redis-cli --cluster check 192.168.1.30:7000
    
    # 检查槽位分配
    redis-cli -c -h 192.168.1.30 -p 7000 cluster slots
    
    # 检查节点状态
    redis-cli -c -h 192.168.1.30 -p 7000 cluster nodes
}

# 执行部署
deploy_cluster_nodes
sleep 5
create_cluster
verify_cluster

echo "Redis Cluster部署完成!"

3.2 日常运维对比

监控指标采集:

# 统一监控脚本
importredis
importjson
fromprometheus_clientimportGauge, start_http_server

# 定义Prometheus指标
redis_up = Gauge('redis_up','Redis server is up', ['instance','role'])
redis_connected_clients = Gauge('redis_connected_clients','Connected clients', ['instance'])
redis_used_memory = Gauge('redis_used_memory_bytes','Used memory', ['instance'])
redis_ops_per_sec = Gauge('redis_ops_per_sec','Operations per second', ['instance'])
redis_keyspace_hits = Gauge('redis_keyspace_hits','Keyspace hits', ['instance'])
redis_keyspace_misses = Gauge('redis_keyspace_misses','Keyspace misses', ['instance'])

classRedisMonitor:
 def__init__(self, mode='sentinel'):
   self.mode = mode
   self.connections = []
   
 defsetup_connections(self):
   ifself.mode =='sentinel':
     # Sentinel模式监控
      sentinel = Sentinel([('localhost',26379)])
     self.connections.append({
       'client': sentinel.master_for('mymaster'),
       'role':'master',
       'instance':'mymaster'
      })
     forslaveinsentinel.slaves('mymaster'):
       self.connections.append({
         'client': slave,
         'role':'slave',
         'instance':f'slave_{slave.connection_pool.connection_kwargs["host"]}'
        })
   else:
     # Cluster模式监控
      startup_nodes = [
        {"host":"127.0.0.1","port":"7000"},
        {"host":"127.0.0.1","port":"7001"},
        {"host":"127.0.0.1","port":"7002"}
      ]
      rc = RedisCluster(startup_nodes=startup_nodes)
     fornode_id, node_infoinrc.cluster_nodes().items():
       self.connections.append({
         'client': redis.Redis(host=node_info['host'], port=node_info['port']),
         'role':'master'if'master'innode_info['flags']else'slave',
         'instance':f'{node_info["host"]}:{node_info["port"]}'
        })
 
 defcollect_metrics(self):
   """采集监控指标"""
   forconninself.connections:
     try:
        client = conn['client']
        info = client.info()
       
       # 基础指标
        redis_up.labels(instance=conn['instance'], role=conn['role']).set(1)
        redis_connected_clients.labels(instance=conn['instance']).set(
          info.get('connected_clients',0)
        )
        redis_used_memory.labels(instance=conn['instance']).set(
          info.get('used_memory',0)
        )
       
       # 性能指标
        redis_ops_per_sec.labels(instance=conn['instance']).set(
          info.get('instantaneous_ops_per_sec',0)
        )
        redis_keyspace_hits.labels(instance=conn['instance']).set(
          info.get('keyspace_hits',0)
        )
        redis_keyspace_misses.labels(instance=conn['instance']).set(
          info.get('keyspace_misses',0)
        )
       
       # Cluster特有指标
       ifself.mode =='cluster':
          cluster_info = client.cluster_info()
         # 采集集群状态、槽位信息等
         
     exceptExceptionase:
        redis_up.labels(instance=conn['instance'], role=conn['role']).set(0)
       print(f"Error collecting metrics from{conn['instance']}:{e}")

3.3 故障处理实战

场景1:主节点故障

Sentinel模式处理:

# 故障检测和自动切换日志分析
tail-f /var/log/redis-sentinel.log | grep -E"sdown|odown|switch-master"

# 输出示例:
# +sdown master mymaster 192.168.1.10 6379
# +odown master mymaster 192.168.1.10 6379#quorum2/2
# +vote-for-leader 7f7e7c7e7d7e7f7e7g7h 1
# +elected-leader master mymaster 192.168.1.10 6379
# +failover-state-select-slave master mymaster 192.168.1.10 6379
# +selected-slave slave 192.168.1.11:6379 192.168.1.11 6379 @ mymaster 192.168.1.10 6379
# +failover-state-send-slaveof-noone slave 192.168.1.11:6379
# +switch-master mymaster 192.168.1.10 6379 192.168.1.11 6379

# 手动故障转移(如需要)
redis-cli -p 26379 sentinel failover mymaster

Cluster模式处理:

# Cluster故障检测和处理脚本
classClusterFailoverHandler:
 def__init__(self, cluster_nodes):
   self.rc = RedisCluster(startup_nodes=cluster_nodes)
   
 defdetect_failed_nodes(self):
   """检测故障节点"""
    failed_nodes = []
    cluster_state =self.rc.cluster_nodes()
   
   fornode_id, node_infoincluster_state.items():
     if'fail'innode_info['flags']:
        failed_nodes.append({
         'node_id': node_id,
         'address':f"{node_info['host']}:{node_info['port']}",
         'slots': node_info.get('slots', []),
         'role':'master'if'master'innode_info['flags']else'slave'
        })
   
   returnfailed_nodes
 
 defautomatic_failover(self, failed_master):
   """自动故障转移"""
   # 查找该主节点的从节点
    slaves =self.find_slaves_for_master(failed_master['node_id'])
   
   ifnotslaves:
     print(f"警告:主节点{failed_master['address']}没有可用的从节点!")
     returnFalse
   
   # 选择最合适的从节点
    best_slave =self.select_best_slave(slaves)
   
   # 执行故障转移
   try:
     self.rc.cluster_failover(best_slave['node_id'])
     print(f"故障转移成功:{best_slave['address']}已提升为主节点")
     returnTrue
   exceptExceptionase:
     print(f"故障转移失败:{e}")
     returnFalse
 
 defmanual_failover(self, target_node):
   """手动故障转移"""
   # 强制故障转移
   self.rc.execute_command('CLUSTER FAILOVER FORCE', target=target_node)

场景2:网络分区处理

# 网络分区检测和恢复
classNetworkPartitionHandler:
 def__init__(self):
   self.partition_detected =False
   self.partition_start_time =None
   
 defdetect_partition(self):
   """检测网络分区"""
   ifself.mode =='sentinel':
     # Sentinel模式:检查是否有多个节点声称自己是主节点
      masters =self.find_all_masters()
     iflen(masters) >1:
       self.partition_detected =True
       self.partition_start_time = time.time()
       returnTrue
       
   else: # Cluster模式
     # 检查集群是否处于fail状态
      cluster_info =self.rc.cluster_info()
     ifcluster_info['cluster_state'] =='fail':
       self.partition_detected =True
       self.partition_start_time = time.time()
       returnTrue
   
   returnFalse
 
 defresolve_partition(self):
   """解决网络分区"""
   ifself.mode =='
    ```python
  def resolve_partition(self):
    """解决网络分区"""
    if self.mode == 'sentinel':
      # Sentinel模式:强制重新选举
      self.force_reelection()
     
    else: # Cluster模式
      # 等待集群自动恢复或手动修复
      if not self.wait_for_cluster_recovery():
        self.manual_cluster_repair()
 
  def force_reelection(self):
    """Sentinel模式:强制重新选举"""
    # 重置所有Sentinel的纪元
    sentinels = [('192.168.1.20', 26379),
          ('192.168.1.21', 26379),
          ('192.168.1.22', 26379)]
   
    for host, port in sentinels:
      r = redis.Redis(host=host, port=port)
      r.sentinel_reset('mymaster')
   
    # 等待重新选举完成
    time.sleep(5)
   
    # 验证新主节点
    sentinel = Sentinel(sentinels)
    master = sentinel.discover_master('mymaster')
    print(f"新主节点: {master}")
 
  def manual_cluster_repair(self):
    """Cluster模式:手动修复集群"""
    # 修复丢失的槽位
    missing_slots = self.find_missing_slots()
    for slot in missing_slots:
      # 将槽位分配给可用节点
      available_node = self.find_available_node()
      self.rc.cluster_addslots(available_node, slot)
   
    # 修复节点关系
    self.fix_node_relationships()

四、扩展性分析:应对业务增长

4.1 水平扩展能力对比

Sentinel模式的扩展限制:

# Sentinel扩展性分析
classSentinelScalabilityAnalysis:
 def__init__(self):
   self.max_memory_per_instance =64# GB
   self.max_connections_per_instance =10000
   self.max_ops_per_instance =100000# QPS
   
 defcalculate_scaling_limits(self, data_size, qps_requirement):
   """计算Sentinel模式的扩展限制"""
   # 垂直扩展分析
   ifdata_size <= self.max_memory_per_instance:
            print(f"单实例可满足:数据量 {data_size}GB")
            scaling_strategy = "vertical"
        else:
            print(f"需要数据分片:数据量 {data_size}GB 超过单实例限制")
            scaling_strategy = "sharding_required"
        
        # QPS扩展分析
        if qps_requirement <= self.max_ops_per_instance:
            print(f"单主节点可满足:{qps_requirement} QPS")
        else:
            read_slaves_needed = qps_requirement // self.max_ops_per_instance
            print(f"需要 {read_slaves_needed} 个从节点分担读负载")
        
        return {
            'scaling_strategy': scaling_strategy,
            'bottlenecks': [
                '单主节点写入瓶颈',
                '内存容量限制',
                '主从复制延迟'
            ]
        }
    
    def implement_read_write_splitting(self):
        """实现读写分离"""
        class ReadWriteSplitter:
            def __init__(self):
                self.sentinel = Sentinel([('localhost', 26379)])
                self.master = self.sentinel.master_for('mymaster')
                self.slaves = self.sentinel.slave_for('mymaster')
                
            def execute(self, command, *args, **kwargs):
                # 写操作路由到主节点
                if command.upper() in ['SET', 'DEL', 'INCR', 'LPUSH', 'ZADD']:
                    return self.master.execute_command(command, *args, **kwargs)
                # 读操作路由到从节点
                else:
                    return self.slaves.execute_command(command, *args, **kwargs)

Cluster模式的弹性扩展:

# Cluster动态扩容实现
classClusterDynamicScaling:
 def__init__(self, cluster_nodes):
   self.rc = RedisCluster(startup_nodes=cluster_nodes)
   
 defadd_node_to_cluster(self, new_node_host, new_node_port):
   """添加新节点到集群"""
   # 步骤1:启动新节点
   self.start_new_node(new_node_host, new_node_port)
   
   # 步骤2:将节点加入集群
    existing_node =self.get_any_master_node()
   self.rc.cluster_meet(new_node_host, new_node_port)
   
   # 步骤3:等待握手完成
    time.sleep(2)
   
   # 步骤4:分配槽位
   self.rebalance_slots(new_node_host, new_node_port)
   
   returnTrue
 
 defrebalance_slots(self, new_node_host, new_node_port):
   """重新平衡槽位分配"""
   # 计算每个节点应该拥有的槽位数
    all_masters =self.get_all_master_nodes()
    total_slots =16384
    slots_per_node = total_slots //len(all_masters)
   
   # 从其他节点迁移槽位到新节点
    new_node_id =self.get_node_id(new_node_host, new_node_port)
    migrated_slots =0
   
   formasterinall_masters[:-1]: # 排除新节点
     ifmaster['slots'] > slots_per_node:
       # 计算需要迁移的槽位数
        slots_to_migrate = master['slots'] - slots_per_node
       
       # 执行槽位迁移
       self.migrate_slots(
          source_node=master['id'],
          target_node=new_node_id,
          slot_count=slots_to_migrate
        )
       
        migrated_slots += slots_to_migrate
       
       ifmigrated_slots >= slots_per_node:
         break
 
 defmigrate_slots(self, source_node, target_node, slot_count):
   """执行槽位迁移"""
   # 获取源节点的槽位列表
    source_slots =self.get_node_slots(source_node)
    slots_to_migrate = source_slots[:slot_count]
   
   forslotinslots_to_migrate:
     # 步骤1:目标节点准备导入槽位
     self.rc.cluster_setslot_importing(target_node, slot, source_node)
     
     # 步骤2:源节点准备导出槽位
     self.rc.cluster_setslot_migrating(source_node, slot, target_node)
     
     # 步骤3:迁移槽位中的所有key
      keys =self.rc.cluster_getkeysinslot(slot,1000)
     forkeyinkeys:
       self.rc.migrate(target_node, key)
     
     # 步骤4:更新槽位归属
     self.rc.cluster_setslot_node(slot, target_node)
   
   print(f"成功迁移{slot_count}个槽位从{source_node}到{target_node}")

4.2 容量规划实战

# 容量规划计算器
classCapacityPlanner:
 def__init__(self):
   self.data_growth_rate =0.2# 20%月增长
   self.peak_multiplier =3# 峰值是平均值的3倍
   
 defplan_for_sentinel(self, current_data_gb, current_qps, months=12):
   """Sentinel模式容量规划"""
    projections = []
   
   formonthinrange(1, months +1):
     # 计算数据增长
      data_size = current_data_gb * (1+self.data_growth_rate) ** month
      qps = current_qps * (1+self.data_growth_rate) ** month
      peak_qps = qps *self.peak_multiplier
     
     # 计算所需资源
      memory_needed = data_size *1.5# 留50%余量
     
     # 判断是否需要分片
     ifmemory_needed >64: # 单实例64GB限制
        shards_needed =int(memory_needed /64) +1
        strategy =f"需要{shards_needed}个分片"
     else:
        strategy ="单实例即可"
     
      projections.append({
       'month': month,
       'data_size_gb':round(data_size,2),
       'avg_qps':round(qps),
       'peak_qps':round(peak_qps),
       'memory_needed_gb':round(memory_needed,2),
       'strategy': strategy
      })
   
   returnprojections
 
 defplan_for_cluster(self, current_data_gb, current_qps, months=12):
   """Cluster模式容量规划"""
    projections = []
    current_nodes =3# 初始3个主节点
   
   formonthinrange(1, months +1):
     # 计算数据增长
      data_size = current_data_gb * (1+self.data_growth_rate) ** month
      qps = current_qps * (1+self.data_growth_rate) ** month
      peak_qps = qps *self.peak_multiplier
     
     # 计算所需节点数
      nodes_for_memory =int(data_size /32) +1# 每节点32GB
      nodes_for_qps =int(peak_qps /50000) +1# 每节点5万QPS
      nodes_needed =max(nodes_for_memory, nodes_for_qps,3) # 至少3个
     
     # 计算扩容操作
     ifnodes_needed > current_nodes:
        expansion_needed = nodes_needed - current_nodes
        expansion_action =f"添加{expansion_needed}个节点"
        current_nodes = nodes_needed
     else:
        expansion_action ="无需扩容"
     
      projections.append({
       'month': month,
       'data_size_gb':round(data_size,2),
       'avg_qps':round(qps),
       'peak_qps':round(peak_qps),
       'nodes_needed': nodes_needed,
       'action': expansion_action
      })
   
   returnprojections

五、高可用对比:真实故障场景

5.1 故障恢复时间(RTO)对比

# 故障恢复时间测试
classRTOBenchmark:
 def__init__(self):
   self.test_results = {
     'sentinel': {},
     'cluster': {}
    }
 
 deftest_master_failure_rto(self):
   """测试主节点故障的恢复时间"""
   
   # Sentinel模式测试
   print("测试Sentinel模式主节点故障恢复...")
   
   # 1. 记录故障前状态
    start_time = time.time()
   
   # 2. 模拟主节点故障
    os.system("kill -9 $(pidof redis-server | awk '{print $1}')")
   
   # 3. 等待故障检测和转移
   whileTrue:
     try:
        sentinel = Sentinel([('localhost',26379)])
        master = sentinel.master_for('mymaster')
        master.ping()
       break
     except:
        time.sleep(0.1)
   
    sentinel_rto = time.time() - start_time
   self.test_results['sentinel']['master_failure'] = sentinel_rto
   print(f"Sentinel RTO:{sentinel_rto:.2f}秒")
   
   # Cluster模式测试
   print("
测试Cluster模式主节点故障恢复...")
   
   # 1. 记录故障前状态
    start_time = time.time()
   
   # 2. 模拟节点故障
    os.system("redis-cli -p 7000 DEBUG SEGFAULT")
   
   # 3. 等待故障检测和转移
   whileTrue:
     try:
        rc = RedisCluster(startup_nodes=[{"host":"127.0.0.1","port":"7001"}])
        rc.ping()
        cluster_info = rc.cluster_info()
       ifcluster_info['cluster_state'] =='ok':
         break
     except:
        time.sleep(0.1)
   
    cluster_rto = time.time() - start_time
   self.test_results['cluster']['master_failure'] = cluster_rto
   print(f"Cluster RTO:{cluster_rto:.2f}秒")
   
   returnself.test_results

5.2 数据一致性保证

# 数据一致性测试
classConsistencyTest:
 def__init__(self):
   self.inconsistency_count =0
   
 deftest_write_consistency_during_failover(self):
   """测试故障转移期间的写入一致性"""
   
   # 启动写入线程
    write_thread = threading.Thread(target=self.continuous_write)
    write_thread.start()
   
   # 等待一段时间后触发故障
    time.sleep(5)
   self.trigger_failover()
   
   # 继续写入并检查一致性
    time.sleep(10)
   self.stop_writing =True
    write_thread.join()
   
   # 验证数据一致性
   self.verify_data_consistency()
   
 defcontinuous_write(self):
   """持续写入数据"""
   self.written_data = {}
   self.stop_writing =False
    counter =0
   
   whilenotself.stop_writing:
     try:
        key =f"test_key_{counter}"
        value =f"test_value_{counter}_{time.time()}"
       
       # 写入数据
       ifself.mode =='sentinel':
          sentinel = Sentinel([('localhost',26379)])
          master = sentinel.master_for('mymaster')
          master.set(key, value)
       else:
          rc = RedisCluster(startup_nodes=[{"host":"127.0.0.1","port":"7000"}])
          rc.set(key, value)
       
       self.written_data[key] = value
        counter +=1
        time.sleep(0.01) # 100次/秒
       
     exceptExceptionase:
       print(f"写入失败:{e}")
        time.sleep(1)
 
 defverify_data_consistency(self):
   """验证数据一致性"""
   print(f"验证{len(self.written_data)}条数据的一致性...")
   
   forkey, expected_valueinself.written_data.items():
     try:
       ifself.mode =='sentinel':
          sentinel = Sentinel([('localhost',26379)])
          master = sentinel.master_for('mymaster')
          actual_value = master.get(key)
       else:
          rc = RedisCluster(startup_nodes=[{"host":"127.0.0.1","port":"7000"}])
          actual_value = rc.get(key)
       
       ifactual_value != expected_value:
         self.inconsistency_count +=1
         print(f"数据不一致:{key}")
         
     exceptExceptionase:
       print(f"读取失败:{key}, 错误:{e}")
       self.inconsistency_count +=1
   
    consistency_rate = (1-self.inconsistency_count /len(self.written_data)) *100
   print(f"数据一致性:{consistency_rate:.2f}%")
   print(f"不一致数据:{self.inconsistency_count}/{len(self.written_data)}")

六、实战案例:如何选择最适合的方案

6.1 典型场景分析

# 场景决策树
classScenarioAnalyzer:
 defanalyze_requirements(self, requirements):
   """根据需求分析推荐方案"""
    score_sentinel =0
    score_cluster =0
    recommendations = []
   
   # 数据量评估
   ifrequirements['data_size_gb'] < 64:
            score_sentinel += 2
            recommendations.append("数据量适中,Sentinel可以满足")
        else:
            score_cluster += 3
            recommendations.append("数据量较大,建议使用Cluster分片")
        
        # QPS评估
        if requirements['peak_qps'] < 100000:
            score_sentinel += 2
            recommendations.append("QPS适中,Sentinel性能足够")
        else:
            score_cluster += 2
            recommendations.append("高QPS需求,Cluster可以水平扩展")
        
        # 业务复杂度评估
        if requirements['multi_key_operations']:
            score_sentinel += 3
            recommendations.append("存在多key操作,Sentinel更合适")
        
        if requirements['lua_scripts']:
            score_sentinel += 2
            recommendations.append("使用Lua脚本,Sentinel支持更好")
        
        # 运维能力评估
        if requirements['ops_team_size'] < 3:
            score_sentinel += 2
            recommendations.append("运维团队较小,Sentinel更易维护")
        else:
            score_cluster += 1
            recommendations.append("运维团队充足,可以考虑Cluster")
        
        # 可用性要求
        if requirements['sla'] >=99.99:
      score_cluster +=1
      recommendations.append("超高可用性要求,Cluster故障域更小")
   
   # 最终推荐
   ifscore_sentinel > score_cluster:
      final_recommendation ="Sentinel"
   else:
      final_recommendation ="Cluster"
   
   return{
     'recommendation': final_recommendation,
     'sentinel_score': score_sentinel,
     'cluster_score': score_cluster,
     'reasons': recommendations
    }

6.2 迁移方案设计

# Sentinel到Cluster迁移方案
classMigrationPlan:
 def__init__(self):
   self.migration_steps = []
   
 defcreate_migration_plan(self, source_type='sentinel', target_type='cluster'):
   """创建迁移计划"""
   
   ifsource_type =='sentinel'andtarget_type =='cluster':
     returnself.sentinel_to_cluster_migration()
   
 defsentinel_to_cluster_migration(self):
   """Sentinel到Cluster的迁移步骤"""
   
    steps = [
      {
       'phase':1,
       'name':'准备阶段',
       'duration':'1-2天',
       'tasks': [
         '搭建Cluster测试环境',
         '性能基准测试',
         '应用兼容性测试',
         '制定回滚方案'
        ],
       'script':self.prepare_cluster_env
      },
      {
       'phase':2,
       'name':'数据同步阶段',
       'duration':'2-3天',
       'tasks': [
         '全量数据导出',
         '数据导入Cluster',
         '建立增量同步',
         '数据一致性校验'
        ],
       'script':self.sync_data
      },
      {
       'phase':3,
       'name':'灰度切换阶段',
       'duration':'3-5天',
       'tasks': [
         '1%流量切换',
         '10%流量切换',
         '50%流量切换',
         '监控和调优'
        ],
       'script':self.gradual_switch
      },
      {
       'phase':4,
       'name':'全量切换阶段',
       'duration':'1天',
       'tasks': [
         '100%流量切换',
         '旧集群保持待命',
         '观察24小时',
         '确认切换成功'
        ],
       'script':self.full_switch
      }
    ]
   
   returnsteps
 
 defsync_data(self):
   """数据同步脚本"""
   # 使用redis-shake进行数据同步
    sync_config ="""
    # redis-shake配置
    source.type = standalone
    source.address = 192.168.1.10:6379
    source.password = yourpassword
   
    target.type = cluster
    target.address = 192.168.1.30:7000;192.168.1.31:7001;192.168.1.32:7002
    target.password = yourpassword
   
    # 同步配置
    sync.mode = rump # 全量同步
    sync.parallel = 32
    sync.data_filter = true
   
    # 增量同步
    sync.mode = sync # 切换到增量同步模式
    """
   
   # 执行同步
    os.system(f"redis-shake -conf redis-shake.conf")

七、性能优化最佳实践

7.1 Sentinel性能优化

# Sentinel优化配置生成器
classSentinelOptimizer:
 defgenerate_optimized_config(self, scenario):
   """根据场景生成优化配置"""
   
    config = {
     'redis_master': {},
     'redis_slave': {},
     'sentinel': {}
    }
   
   ifscenario =='high_write':
     # 高写入场景优化
      config['redis_master'] = {
       'maxmemory-policy':'allkeys-lru',
       'save':'', # 关闭RDB
       'appendonly':'no', # 关闭AOF
       'tcp-backlog':511,
       'tcp-keepalive':60,
       'timeout':0,
       'hz':100, # 提高后台任务频率
       'repl-backlog-size':'256mb',
       'client-output-buffer-limit':'slave 256mb 64mb 60'
      }
     
   elifscenario =='high_read':
     # 高读取场景优化
      config['redis_slave'] = {
       'slave-read-only':'yes',
       'maxmemory-policy':'volatile-lru',
       'repl-diskless-sync':'yes',
       'repl-diskless-sync-delay':5,
       'slave-priority':100,
       'lazyfree-lazy-eviction':'yes',
       'lazyfree-lazy-expire':'yes'
      }
     
   # Sentinel通用优化
    config['sentinel'] = {
     'sentinel_down_after_milliseconds':5000, # 快速故障检测
     'sentinel_parallel_syncs':2, # 并行同步
     'sentinel_failover_timeout':60000, # 故障转移超时
     'sentinel_deny_scripts_reconfig':'yes'# 安全配置
    }
   
   returnconfig

7.2 Cluster性能优化

# Cluster优化工具
classClusterOptimizer:
 defoptimize_cluster_performance(self):
   """Cluster性能优化"""
   
    optimizations = {
     'network':self.optimize_network(),
     'memory':self.optimize_memory(),
     'cpu':self.optimize_cpu(),
     'persistence':self.optimize_persistence()
    }
   
   returnoptimizations
 
 defoptimize_network(self):
   """网络优化"""
   return{
     'cluster-node-timeout':5000, # 降低超时时间
     'cluster-replica-validity-factor':0, # 从节点永不过期
     'cluster-migration-barrier':1, # 迁移屏障
     'cluster-require-full-coverage':'no', # 部分覆盖也可用
     'tcp-backlog':511,
     'tcp-keepalive':60
    }
 
 defoptimize_memory(self):
   """内存优化"""
   return{
     'maxmemory-policy':'volatile-lru',
     'lazyfree-lazy-eviction':'yes',
     'lazyfree-lazy-expire':'yes',
     'lazyfree-lazy-server-del':'yes',
     'activerehashing':'yes',
     'hz':100
    }

八、总结:决策清单与行动指南

8.1 快速决策清单

基于本文的深入分析,我整理了一份快速决策清单:

选择Sentinel的场景:

• 数据量 < 64GB

• QPS < 10万

• 需要事务支持

• 大量使用Lua脚本

• 业务逻辑依赖多key操作

• 运维团队规模较小

• 对延迟极度敏感

选择Cluster的场景:

• 数据量 > 64GB

• QPS > 10万

• 需要水平扩展能力

• 可以改造应用避免跨槽位操作

• 有专业的运维团队

• 追求更高的可用性

8.2 实施路线图

# 生成个性化实施方案
defgenerate_implementation_roadmap(current_state, target_state):
 """生成实施路线图"""
 
  roadmap = {
   'week_1': [
     '技术评审和方案确认',
     '测试环境搭建',
     '性能基准测试'
    ],
   'week_2': [
     '应用改造(如需要)',
     '监控系统部署',
     '自动化脚本开发'
    ],
   'week_3': [
     '生产环境部署',
     '数据迁移',
     '灰度切换'
    ],
   'week_4': [
     '性能优化',
     '稳定性观察',
     '文档完善'
    ]
  }
 
 returnroadmap

结语

选择Redis集群方案不是非黑即白的决定,而是需要基于业务特点、团队能力、发展预期等多个维度综合考虑。通过本文的详细对比和实战案例,相信你已经对Sentinel和Cluster有了深入的理解。

记住,没有最好的架构,只有最适合的架构。在做出选择之前,请务必:

1.充分测试:在你的实际业务场景下进行压测

2.渐进式迁移:不要一次性切换,采用灰度方案

3.监控先行:完善的监控是稳定运行的基础

4.预留余地:为未来的增长预留足够的空间

最后,如果你觉得这篇文章对你有帮助,欢迎关注我的博客,我会持续分享更多生产环境的实战经验。下一篇文章,我会深入讲解《Redis故障诊断与性能调优实战》,敬请期待!

作者简介:10年运维老兵,曾负责多个千万级用户系统的Redis架构设计与优化,踩过的坑希望你不要再踩。

本文所有代码示例均已在生产环境验证,可直接使用。如有问题,欢迎在评论区交流讨论。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 集群
    +关注

    关注

    0

    文章

    130

    浏览量

    17592
  • Redis
    +关注

    关注

    0

    文章

    390

    浏览量

    12042

原文标题:Redis集群模式选择:Sentinel vs Cluster深度对比实战指南

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    如何使用Rust连接Redis

    Rust操作RedisRedis依赖库 在Rust中有很多Redis的客户端库可以选择,这里我们选择使用
    的头像 发表于 09-19 16:22 3362次阅读

    Redis-cluster在线集群安装

    Redis-cluster之在线搭建详解
    发表于 06-06 14:52

    3分钟搭建Redis Cluster集群

    Redis Cluster集群快速搭建
    发表于 06-12 14:58

    java原生程序redis连接怎么选择

    java原生程序redis连接(连接池长连接和短连接)选择问题
    发表于 06-10 16:33

    记一次写shell脚本的经历记录

    ;#123;redis实例IP} -a {redis密码} cluster info输出的信息解析cluster_state的值是否为ok,以及
    发表于 10-31 18:15

    Redis Cluster的基本原理及实现细节

    Redis Cluster的基本原理和架构 Redis Cluster是分布式Redis的实现。随着Re
    发表于 09-28 19:09 0次下载
    <b class='flag-5'>Redis</b> <b class='flag-5'>Cluster</b>的基本原理及实现细节

    Redis的四种模式复制、哨兵、Cluster以及集群模式

    概述 Redis作为缓存的高效中间件,在我们日常的开发中被频繁的使用,今天就来说一说Redis的四种模式,分别是「单机版、主从复制、哨兵、以及集群模式」。 可能,在一般公司的程序员使用
    的头像 发表于 09-30 17:51 3096次阅读
    <b class='flag-5'>Redis</b>的四种<b class='flag-5'>模式</b>复制、哨兵、<b class='flag-5'>Cluster</b>以及集群<b class='flag-5'>模式</b>

    单机redis和redisCluster集群是如何获取所有key的

    redis-cli -c -a [CLUSTER_AUTH] --cluster call [CLUSTER_IP:CLUSTER_POPR
    的头像 发表于 10-11 10:43 2309次阅读
    单机<b class='flag-5'>redis</b>和redisCluster集群是如何获取所有key的

    Redis的主从、哨兵、Redis Cluster集群

      前言 今天跟小伙伴们一起学习Redis的主从、哨兵、Redis Cluster集群。 Redis主从 Redis哨兵
    的头像 发表于 06-12 14:58 1376次阅读
    <b class='flag-5'>Redis</b>的主从、哨兵、<b class='flag-5'>Redis</b> <b class='flag-5'>Cluster</b>集群

    Redis为何选择单线程

    Redis为何选择单线程? 在Redisv6.0以前,Redis的核心网络模型选择用单线程来实现。 核心意思就是,对于一个 DB 来说,CPU 通常不会是瓶颈,因为大多数请求不会是 C
    的头像 发表于 10-09 10:59 806次阅读

    什么是Redis主从复制

    Master节点的能力,主挂了服务就不可以写数据了。仅仅就是增强了应用读数据的并发量同时做数据备份。 一般生产环境会采用 哨兵 或者 Redis Cluster 这种具备Master自动选举的方案,我们学习时还是要掌握主从的原理后,再去更深一步,对于哨兵和
    的头像 发表于 10-09 15:09 880次阅读
    什么是<b class='flag-5'>Redis</b>主从复制

    Cloud MemoryStore for Redis Cluster 正式发布

    以下文章来源于谷歌云服务,作者 Google Cloud 自从我们推出 Memorystore for Redis Cluster 预览版以来,银行、零售、广告、制造和社交媒体等各个行业的客户都利用
    的头像 发表于 11-24 17:40 891次阅读
    Cloud MemoryStore for <b class='flag-5'>Redis</b> <b class='flag-5'>Cluster</b> 正式发布

    redis查看集群状态命令

    Redis 集群管理时,了解集群的状态是非常重要的,可以通过一些命令来获取集群的状态信息。本文将详细介绍 Redis 查看集群状态的命令,帮助读者完全了解其使用方式和相关参数。 CLUSTER INFO 命令
    的头像 发表于 12-04 11:39 2534次阅读

    redis的淘汰策略

    的写入。 Redis的淘汰策略主要有以下几种: LRU(Least Recently Used,最近最少使用): 这是Redis默认的淘汰策略。当内存空间不足时,Redis选择最近最
    的头像 发表于 12-04 16:23 1031次阅读

    Redis Cluster之故障转移

    1. Redis Cluster 简介 Redis ClusterRedis 官方提供的 Redi
    的头像 发表于 01-20 09:21 1249次阅读
    <b class='flag-5'>Redis</b> <b class='flag-5'>Cluster</b>之故障转移