0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

跨机房ES同步实战

OSC开源社区 来源:OSCHINA 社区 作者:京东云开发者-谢泽 2022-12-13 15:10 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

背景众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对 ES 集群在跨机房同步过程中的方案和经验进行介绍和总结。

架构

1.部署 Logstash 在金山云机房上,Logstash 启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的 ES 集群在相同的 VPC2.Logstash 需要配置大网访问权限,保证 Logstash 和 ES 原集群和目标集群互通。3.数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。4.增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。ab134d5c-7a83-11ed-8abf-dac502259ad0.png原理

Logstash 工作原理

ab315e3c-7a83-11ed-8abf-dac502259ad0.pngLogstash 分为三个部分 input 、filter、ouput:1.input 处理接收数据,数据可以来源 ES,日志文件,kafka 等通道.2.filter 对数据进行过滤,清洗。3.ouput 输出数据到目标设备,可以输出到 ES,kafka,文件等。

增量同步原理

1. 对于 T 时刻的数据,先使用 Logstash 将 T 以前的所有数据迁移到有孚机房京东云 ES,假设用时∆T2. 对于 T 到 T+∆T 的增量数据,再次使用 logstash 将数据导入到有孚机房京东云的 ES 集群3. 重复上述步骤 2,直到∆T 足够小,此时将业务切换到华为云,最后完成新增数据的迁移适用范围:ES 的数据中带有时间戳或者其他能够区分新旧数据的标签

流程

ab61eb2e-7a83-11ed-8abf-dac502259ad0.png

准备工作

1.创建 ECS 和安装 JDK 忽略,自行安装即可2.下载对应版本的 Logstash,尽量选择与 Elasticsearch 版本一致,或接近的版本安装即可https://www.elastic.co/cn/downloads/logstash

1)源码下载直接解压安装包,开箱即用

2)修改对内存使用,logstash 默认的堆内存是 1G,根据 ECS 集群选择合适的内存,可以加快集群数据的迁移效率。

ab81d11e-7a83-11ed-8abf-dac502259ad0.png

3. 迁移索引

Logstash 会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。

以下提供创建索引的 python 脚本,用户可以使用该脚本创建需要的索引。

create_mapping.py 文件是同步索引的 python 脚本,config.yaml 是集群地址配置文件。

注:使用该脚本需要安装相关依赖

yum install -y PyYAML
yum install -y python-requests

拷贝以下代码保存为 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

defhelp():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
defprocess_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias inlist(aliases.keys()):
        if alias == dest_index:
            print(
                "source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro ={"opendistro":{"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers ={'Content-Type':'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code !=200:
        print(
            "create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)
        print(create_resp.text)
        withopen(source_index +".json","w")as f:
            json.dump(mapping, f)
defmain():
    config_yaml ="config.yaml"
    opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])
    for opt_name, opt_value in opts:
        if opt_name in('-h','--help'):
            help()
            exit()
        if opt_name in('-c','--config'):
            config_yaml = opt_value

    config_file =open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth =None
    if source_user !="":
        source_auth =(source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth =None
    if dest_user !="":
        dest_auth =(dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index"+ source_index +", target index: "+ dest_index)
            source_url = source +"/"+ source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    response.status_code)+" response is "+ response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index "+ source_index +" to target index "+ dest_index +" successed.")
    else:
        # get all indices
        response = requests.get(source +"/_alias", auth=source_auth)
        if response.status_code !=200:
            print("*** get all index failed. resp statusCode:"+str(
                response.status_code)+" response is "+ response.text)
            exit()
        all_index = response.json()
        for index inlist(all_index.keys()):
            if"."in index:
                continue
            print("start to process source index"+ index)
            source_url = source +"/"+ index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    index_response.status_code)+" response is "+ index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index "+ index +" to target index "+ dest_index +" successed.")

if __name__ =='__main__':
    main()

配置文件保存为 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true

# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
mapping:
    source_index: dest_index

以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目标集群的 kibana 上查看或者执行 curl 查看索引迁移情况:

GET _cat/indices?v

ab95c520-7a83-11ed-8abf-dac502259ad0.png

全量迁移Logstash 配置位于 config 目录下。用户可以参考配置修改 Logstash 配置文件,为了保证迁移数据的准确性,一般建议建立多组 Logstash,分批次迁移数据,每个 Logstash 迁移部分数据。配置集群间迁移配置参考:abc4a7aa-7a83-11ed-8abf-dac502259ad0.png



input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        # 需要迁移的索引列表,以逗号分隔,支持通配符
        index => "a_*,b_*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
 # 目的端索引名称,以下配置为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 调试信息,正式迁移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量迁移

预处理:

1.@timestamp在 elasticsearch2.0.0beta 版本后弃用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次对于京办从金山云机房迁移到京东有孚机房,所涉及到的业务领域多,各个业务线中所代表新增记录的时间戳字段不统一,所涉及到的兼容工作量大,于是考虑通过 elasticsearch 中预处理功能 pipeline 进行预处理添加统一增量标记字段:gmt_created_at,以减少迁移工作的复杂度(各自业务线可自行评估是否需要此步骤)。

PUT _ingest/pipeline/gmt_created_at
{
  "description":"Adds gmt_created_at timestamp to documents",
  "processors":[
    {
      "set":{
        "field":"_source.gmt_created_at",
        "value":"{{_ingest.timestamp}}"
      }
    }
  ]
}

3. 检查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各个 index 设置对应 settings 增加 pipeline 为默认预处理

PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}

5. 检查新增 settings 是否生效

GET index_xxxx/_settings

ac0e3f46-7a83-11ed-8abf-dac502259ad0.png

增量迁移脚本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,统一 gmt_create_at 为增量同步的特殊标记

schedule: 每分钟同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>["ip:port"]
        # 安全集群配置登录用户名密码
        user =>"username"
        password =>"password"
        index =>"index_*"
        query =>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size =>5000
        scroll =>"5m"
        docinfo =>true
        schedule =>"* * * * *"
      }
}
filter {
     mutate {
      remove_field =>["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts =>["http://ip:port"]
        # 安全集群配置登录用户名密码
        user =>"username"
        password =>"password"
        index =>"%{[@metadata][_index]}"
        document_type =>"%{[@metadata][_type]}"
        document_id =>"%{[@metadata][_id]}"
        ilm_enabled =>false
        manage_template =>false
    }

# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata =>true}}
}

问题:

mapping 中存在 join 父子类型的字段,直接迁移报 400 异常ac4a1f70-7a83-11ed-8abf-dac502259ad0.png  
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解决方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140https://github.com/elastic/elasticsearch/issues/26183

结合业务特征,通过在 filter 中加入小量的 ruby 代码,将_routing 的值取出来,放回 logstah event 中,由此问题得以解决。

示例:

ac725e7c-7a83-11ed-8abf-dac502259ad0.png


审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    7349

    浏览量

    95029
  • 数据迁移
    +关注

    关注

    0

    文章

    96

    浏览量

    7300

原文标题:跨机房 ES 同步实战

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    机房UPS电源维护全攻略:延长使用寿命的实用方法,优比施工程师经验分享

    的更换成本和业务风险。如何通过科学维护让机房UPS电源寿命更长?本文结合优比施工业级UPS在多个行业机房实战经验,总结出可落地的维护方案。一、机房UPS电源的“
    的头像 发表于 04-21 10:04 90次阅读
    <b class='flag-5'>机房</b>UPS电源维护全攻略:延长使用寿命的实用方法,优比施工程师经验分享

    AI大模型微调企业项目实战

    自主可控大模型:企业微调实战课,筑牢未来 AI 底座 在人工智能席卷全球商业版图的今天,企业对大模型(LLM)的态度已经从“新奇观望”转变为“全面拥抱”。然而,随着应用层面的不断深入,一个严峻
    发表于 04-16 18:48

    # 1MHz同步降压 - 升压转换器DC797实战指南

    1MHz同步降压 - 升压转换器DC797实战指南 在电子设备的供电设计中,高效、稳定且体积小巧的电源转换器永远是工程师追求的目标。今天给大家分享一下Demonstration Circuit
    的头像 发表于 04-01 13:50 220次阅读

    ES7243E+ES8311音频录制与播放电路资料

    本电路为一款低成本音频录制与播放参考电路,含sch原理图和pcb板图。ES7243E接2路模拟麦克风实现音频采集,ES8311接1路功放实现音频播放,适合用于语音对话类的电子玩具或在线语音通讯设备
    发表于 02-04 17:18 4次下载

    深度解析ES8389/ES8390/音频芯片Linux驱动(Linux6.1内核)

          在嵌入式音频开发中,顺芯( Everest ) ES8389/ES8390 是一款高集成度的音频 Codec 芯片,广泛应用于智能音箱、车载终端、便携设备等场景。本文
    的头像 发表于 02-02 11:37 2494次阅读
    深度解析<b class='flag-5'>ES</b>8389/<b class='flag-5'>ES</b>8390/音频芯片Linux驱动(Linux6.1内核)

    机房项目中的时间系统:从忽视到谨慎的十年体会

    机房项目中的时间系统:从忽视到谨慎的十年体会 做系统集成、机房项目这些年,我对“时间同步”这个基础环节的看法,变化其实挺大的。 刚入行那会儿,时间同步在方案里几乎没什么存在感。常见做法
    的头像 发表于 01-20 13:13 270次阅读

    机房如何搬迁?有哪些步骤与规范?

    很多朋友提到机房搬迁,最近有不少的朋友做到这方面的项目。随着企业、单位的不断深入发展,现有的涉及办公、生产场所已经不能满足,新的厂房及办公大楼逐渐筹建,那么机房搬迁或机房改建服务应运而生,而
    的头像 发表于 12-19 10:42 572次阅读
    <b class='flag-5'>机房</b>如何搬迁?有哪些步骤与规范?

    基于PXIe总线的多板卡通道同步机制研究

    PXIe背板资源,对多板卡同步的基本原理和常见实现方案进行论述。机箱、机柜的分布式时间同步问题将在后续文章中展开。
    的头像 发表于 12-18 09:35 633次阅读
    基于PXIe总线的多板卡通道<b class='flag-5'>同步</b>机制研究

    屏蔽机房建设图解,与非屏蔽机房有何区别?

    有不少朋友问到关于屏蔽机房与一般的机房有何区别,本期,为了方便大家更详细的了解关于屏蔽机房建设,我们可以通过这个实际图纸来详细了解。 一、普通机房安装图 普通
    的头像 发表于 12-17 09:50 721次阅读
    屏蔽<b class='flag-5'>机房</b>建设图解,与非屏蔽<b class='flag-5'>机房</b>有何区别?

    蔚来全新ES8掉头能有多敏捷

    随着全新ES8媒体试驾及用户试驾的持续进行,有许多朋友表示:“全新ES8作为中国最大的纯电SUV,开起来却没有传统大车的笨重感”。这正是智能敏捷掉头功能的功劳,也是蔚来全栈自研能力的显现。全新ES8掘金行动第三期,一起来看看全新
    的头像 发表于 09-22 11:48 1362次阅读

    手机板 layout 走线分割问题

    初学习layout时,都在说信号线不可分割,但是在工作中为了成本不能分割似乎也非绝对。 在后续工作中,分割的基础都是相邻层有一面完整的GND参考,分割发生在相邻的另外一层。 但
    发表于 09-16 14:56

    BASiC_BSRD-2503-ES01数据手册

    BASiC_BSRD-2503-ES01
    发表于 09-01 16:25 9次下载

    淘宝API平台数据同步,多店管理一屏搞定!

    同步中枢 通过淘宝开放平台API构建数据中台,实现: # 示例:平台库存同步核心逻辑import requestsdef sync_inventory(item_id, platforms
    的头像 发表于 07-30 14:41 760次阅读
    淘宝API<b class='flag-5'>跨</b>平台数据<b class='flag-5'>同步</b>,多店管理一屏搞定!

    黑芝麻智能域时间同步技术:消除多域计算单元的时钟信任鸿沟

    上海2025年7月21日 /美通社/ -- 本文围绕域时间同步技术展开,作为智能汽车 "感知-决策-执行 -交互" 全链路的时间基准,文章介绍了 PTP、gPTP、CAN 等主流同步技术及特点
    的头像 发表于 07-22 09:17 678次阅读
    黑芝麻智能<b class='flag-5'>跨</b>域时间<b class='flag-5'>同步</b>技术:消除多域计算单元的时钟信任鸿沟

    鸿蒙5开发宝藏案例分享---线程性能优化指南

    发现鸿蒙宝藏:线程序列化性能优化实战指南 大家好呀!今天在翻鸿蒙文档时挖到一个超级实用的工具—— DevEco Profiler的序列化检测功能 !平时用<span class
    发表于 06-12 17:13