91在线观看视频-91在线观看视频-91在线观看免费视频-91在线观看免费-欧美第二页-欧美第1页

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

跨機房ES同步實戰

OSC開源社區 ? 來源:OSCHINA 社區 ? 作者:京東云開發者-謝澤 ? 2022-12-13 15:10 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

背景眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基于同城或異地多活機制的高可用設計,在保障數據一致性的同時,能夠最大程度降低由于機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的用戶體驗,降低單點問題對業務造成的潛在損失顯得尤為重要。同城雙活,對于生產的高可用保障,重大的意義和價值是不可言喻的。表面上同城雙活只是簡單的部署了一套生產環境而已,但是在架構上,這個改變的影響是巨大的,無狀態應用的高可用管理、請求流量的管理、版本發布的管理、網絡架構的管理等,其提升的架構復雜度巨大。結合真實的協同辦公產品:京辦(為北京市政府提供協同辦公服務的綜合性平臺)生產環境面對的復雜的政務網絡以及京辦同城雙活架構演進的案例,給大家介紹下京辦持續改進、分階段演進過程中的一些思考和實踐經驗的總結。本文僅針對 ES 集群在跨機房同步過程中的方案和經驗進行介紹和總結。

架構

1.部署 Logstash 在金山云機房上,Logstash 啟動多個實例(按不同的類型分類,提高同步效率),并且和金山云機房的 ES 集群在相同的 VPC2.Logstash 需要配置大網訪問權限,保證 Logstash 和 ES 原集群和目標集群互通。3.數據遷移可以全量遷移和增量遷移,首次遷移都是全量遷移后續的增加數據選擇增量遷移。4.增量遷移需要改造增加識別的增量數據的標識,具體方法后續進行介紹。ab134d5c-7a83-11ed-8abf-dac502259ad0.png?原理

Logstash 工作原理

ab315e3c-7a83-11ed-8abf-dac502259ad0.png??Logstash 分為三個部分 input 、filter、ouput:1.input 處理接收數據,數據可以來源 ES,日志文件,kafka 等通道.2.filter 對數據進行過濾,清洗。3.ouput 輸出數據到目標設備,可以輸出到 ES,kafka,文件等。

增量同步原理

1. 對于 T 時刻的數據,先使用 Logstash 將 T 以前的所有數據遷移到有孚機房京東云 ES,假設用時?T2. 對于 T 到 T+?T 的增量數據,再次使用 logstash 將數據導入到有孚機房京東云的 ES 集群3. 重復上述步驟 2,直到?T 足夠小,此時將業務切換到華為云,最后完成新增數據的遷移適用范圍:ES 的數據中帶有時間戳或者其他能夠區分新舊數據的標簽

流程

?ab61eb2e-7a83-11ed-8abf-dac502259ad0.png?

準備工作

1.創建 ECS 和安裝 JDK 忽略,自行安裝即可2.下載對應版本的 Logstash,盡量選擇與 Elasticsearch 版本一致,或接近的版本安裝即可https://www.elastic.co/cn/downloads/logstash

1)源碼下載直接解壓安裝包,開箱即用

2)修改對內存使用,logstash 默認的堆內存是 1G,根據 ECS 集群選擇合適的內存,可以加快集群數據的遷移效率。

ab81d11e-7a83-11ed-8abf-dac502259ad0.png

?3. 遷移索引

Logstash 會幫助用戶自動創建索引,但是自動創建的索引和用戶本身的索引會有些許差異,導致最終數據的搜索格式不一致,一般索引需要手動創建,保證索引的數據完全一致。

以下提供創建索引的 python 腳本,用戶可以使用該腳本創建需要的索引。

create_mapping.py 文件是同步索引的 python 腳本,config.yaml 是集群地址配置文件。

注:使用該腳本需要安裝相關依賴

yum install -y PyYAML
yum install -y python-requests

拷貝以下代碼保存為 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

defhelp():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
defprocess_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias inlist(aliases.keys()):
        if alias == dest_index:
            print(
                "source index "+ dest_index +" alias "+ alias +" is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro ={"opendistro":{"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
defput_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers ={'Content-Type':'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code !=200:
        print(
            "create index "+ url +" failed with response: "+str(create_resp)+", source index is "+ source_index)
        print(create_resp.text)
        withopen(source_index +".json","w")as f:
            json.dump(mapping, f)
defmain():
    config_yaml ="config.yaml"
    opts, args = getopt.getopt(sys.argv[1:],'-h-c:',['help','config='])
    for opt_name, opt_value in opts:
        if opt_name in('-h','--help'):
            help()
            exit()
        if opt_name in('-c','--config'):
            config_yaml = opt_value

    config_file =open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth =None
    if source_user !="":
        source_auth =(source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth =None
    if dest_user !="":
        dest_auth =(dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index"+ source_index +", target index: "+ dest_index)
            source_url = source +"/"+ source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    response.status_code)+" response is "+ response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index "+ source_index +" to target index "+ dest_index +" successed.")
    else:
        # get all indices
        response = requests.get(source +"/_alias", auth=source_auth)
        if response.status_code !=200:
            print("*** get all index failed. resp statusCode:"+str(
                response.status_code)+" response is "+ response.text)
            exit()
        all_index = response.json()
        for index inlist(all_index.keys()):
            if"."in index:
                continue
            print("start to process source index"+ index)
            source_url = source +"/"+ index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code !=200:
                print("*** get ElasticSearch message failed. resp statusCode:"+str(
                    index_response.status_code)+" response is "+ index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest +"/"+ dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index "+ index +" to target index "+ dest_index +" successed.")

if __name__ =='__main__':
    main()

配置文件保存為 config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只處理這個文件中mapping地址的索引
# 如果設置成true,則只會將下面的mapping中的索引獲取到并在目的端創建
# 如果設置成false,則會取源端集群的所有索引,除去(.kibana)
# 并且將索引名稱與下面的mapping匹配,如果匹配到使用mapping的value作為目的端的索引名稱
# 如果匹配不到,則使用源端原始的索引名稱
only_mapping: true

# 要遷移的索引,key為源端的索引名字,value為目的端的索引名字
mapping:
    source_index: dest_index

以上代碼和配置文件準備完成,直接執行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目標集群的 kibana 上查看或者執行 curl 查看索引遷移情況:

GET _cat/indices?v

?ab95c520-7a83-11ed-8abf-dac502259ad0.png

??全量遷移Logstash 配置位于 config 目錄下。用戶可以參考配置修改 Logstash 配置文件,為了保證遷移數據的準確性,一般建議建立多組 Logstash,分批次遷移數據,每個 Logstash 遷移部分數據。配置集群間遷移配置參考:abc4a7aa-7a83-11ed-8abf-dac502259ad0.png

?

input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        # 需要遷移的索引列表,以逗號分隔,支持通配符
        index => "a_*,b_*"
        # 以下三項保持默認即可,包含線程數和遷移數據大小和logstash jvm配置相關
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
 # 目的端索引名稱,以下配置為和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置為和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目標端數據的_id,如果不需要保留原_id,可以刪除以下這行,刪除后性能會更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 調試信息,正式遷移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量遷移

預處理:

1.@timestamp在 elasticsearch2.0.0beta 版本后棄用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

2. 本次對于京辦從金山云機房遷移到京東有孚機房,所涉及到的業務領域多,各個業務線中所代表新增記錄的時間戳字段不統一,所涉及到的兼容工作量大,于是考慮通過 elasticsearch 中預處理功能 pipeline 進行預處理添加統一增量標記字段:gmt_created_at,以減少遷移工作的復雜度(各自業務線可自行評估是否需要此步驟)。

PUT _ingest/pipeline/gmt_created_at
{
  "description":"Adds gmt_created_at timestamp to documents",
  "processors":[
    {
      "set":{
        "field":"_source.gmt_created_at",
        "value":"{{_ingest.timestamp}}"
      }
    }
  ]
}

3. 檢查 pipeline 是否生效

GET _ingest/pipeline/*

4. 各個 index 設置對應 settings 增加 pipeline 為默認預處理

PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}

5. 檢查新增 settings 是否生效

GET index_xxxx/_settings

?ac0e3f46-7a83-11ed-8abf-dac502259ad0.png

??增量遷移腳本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的 DSL,統一 gmt_create_at 為增量同步的特殊標記

schedule: 每分鐘同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>["ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"index_*"
        query =>'{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size =>5000
        scroll =>"5m"
        docinfo =>true
        schedule =>"* * * * *"
      }
}
filter {
     mutate {
      remove_field =>["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts =>["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user =>"username"
        password =>"password"
        index =>"%{[@metadata][_index]}"
        document_type =>"%{[@metadata][_type]}"
        document_id =>"%{[@metadata][_id]}"
        ilm_enabled =>false
        manage_template =>false
    }

# 調試信息,正式遷移去掉
stdout { codec => rubydebug { metadata =>true}}
}

問題:

mapping 中存在 join 父子類型的字段,直接遷移報 400 異常?ac4a1f70-7a83-11ed-8abf-dac502259ad0.png ?
[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解決方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140https://github.com/elastic/elasticsearch/issues/26183

結合業務特征,通過在 filter 中加入小量的 ruby 代碼,將_routing 的值取出來,放回 logstah event 中,由此問題得以解決。

示例:

ac725e7c-7a83-11ed-8abf-dac502259ad0.png


審核編輯 :李倩


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 數據
    +關注

    關注

    8

    文章

    7257

    瀏覽量

    91942
  • 數據遷移
    +關注

    關注

    0

    文章

    84

    瀏覽量

    7115

原文標題:跨機房 ES 同步實戰

文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關推薦
    熱點推薦

    機房托管費詳細分析

    機房托管費是一個復雜而多變的話題,它受到多種因素的影響,以下是對機房托管費用的詳細分析,主機推薦小編為您整理發布機房托管費詳細分析。
    的頭像 發表于 02-28 09:48 ?476次閱讀

    機房精密空調故障?排查步驟看這!

    機房精密空調作為維持機房環境穩定的關鍵設備,其故障排查工作至關重要。下面聊一下排查機房精密空調故障的詳細步驟。
    的頭像 發表于 02-17 15:48 ?569次閱讀
    <b class='flag-5'>機房</b>精密空調故障?排查步驟看這!

    機房施工—機房吊頂與靜電地板怎樣安裝?

    為了滿足機房的高效、穩定運行,吊頂和靜電地板的安裝成為機房建設的關鍵環節。下面聊一下機房吊頂與靜電地板的安裝施工方案。 吊頂安裝: 1、材料選擇:選用輕鋼龍骨、石膏板等符合國家標準的材料,確保
    的頭像 發表于 02-07 19:00 ?343次閱讀
    <b class='flag-5'>機房</b>施工—<b class='flag-5'>機房</b>吊頂與靜電地板怎樣安裝?

    機房空調—機房送風與回風設計常見問題和解決方法

    機房送風與回風設計是確保機房穩定運行的重要環節。然而,在實際設計和應用中,常常會遇到一些問題。下面聊一下機房送風與回風設計常見問題。 一、送風系統設計常見問題 1、送風口布局不合理
    的頭像 發表于 02-07 10:37 ?619次閱讀
    <b class='flag-5'>機房</b>空調—<b class='flag-5'>機房</b>送風與回風設計常見問題和解決方法

    ES7P2131/2124 ES32H0403觸控SDK使用說明

    電子發燒友網站提供《ES7P2131/2124 ES32H0403觸控SDK使用說明.pdf》資料免費下載
    發表于 01-16 16:12 ?0次下載
    <b class='flag-5'>ES</b>7P2131/2124 <b class='flag-5'>ES</b>32H0403觸控SDK使用說明

    ES-DEV-ES7W8020DB用戶指南

    電子發燒友網站提供《ES-DEV-ES7W8020DB用戶指南.pdf》資料免費下載
    發表于 01-16 15:31 ?0次下載
    <b class='flag-5'>ES-DEV-ES</b>7W8020DB用戶指南

    ES-DEV-ES32W0030DB用戶指南

    電子發燒友網站提供《ES-DEV-ES32W0030DB用戶指南.pdf》資料免費下載
    發表于 01-16 15:30 ?0次下載
    <b class='flag-5'>ES-DEV-ES</b>32W0030DB用戶指南

    ES-PDS-ES32F3696LX-V1.1原理圖

    電子發燒友網站提供《ES-PDS-ES32F3696LX-V1.1原理圖.pdf》資料免費下載
    發表于 01-16 15:20 ?0次下載
    <b class='flag-5'>ES-PDS-ES</b>32F3696LX-V1.1原理圖

    ES-PDS-ES32F0100DB1原理圖

    電子發燒友網站提供《ES-PDS-ES32F0100DB1原理圖.pdf》資料免費下載
    發表于 01-16 15:15 ?0次下載
    <b class='flag-5'>ES-PDS-ES</b>32F0100DB1原理圖

    機房精密空調安裝指南

    1、確認精密空調型號和規格是否符合機房需求,確保精密空調能夠滿足機房的制冷、除濕、加濕等需求。
    的頭像 發表于 10-25 17:44 ?817次閱讀
    <b class='flag-5'>機房</b>精密空調安裝指南

    動環監控主機應用機房

    信息化高速發展的時代,數據中心作為信息社會的“心臟”,其穩定運行與高效管理直接關系到各行各業的發展。隨著云計算、大數據、物聯網等技術的不斷融合與創新,智慧機房的概念應運而生,動環監控主機的引入,正是
    的頭像 發表于 09-14 15:59 ?644次閱讀

    機房監控,機房監控系統百科

    一、引言 隨著信息技術的飛速發展,數據中心和機房作為支撐企業運營和存儲關鍵數據的基礎設施,其重要性日益凸顯。機房環境的穩定性、安全性及設備的運行狀態直接影響到企業的業務連續性和數據安全性。因此,建立
    的頭像 發表于 09-02 14:32 ?831次閱讀

    機房監控,機房監控百科

    機房監控是現代數據中心管理不可或缺的一部分,它直接關系到系統的穩定運行、數據的安全保護以及故障的快速響應。一個完善的機房監控系統能夠實時監測機房內的環境參數、設備狀態及安全情況,確保數據中心高效、可靠地運行。以下是一篇關于
    的頭像 發表于 08-22 17:34 ?655次閱讀

    機房托管費用貴嗎?機房托管要考慮哪些因素?

     機房托管費用受多種因素影響,包括地理位置、設備規模、服務水平、安全性要求等。不同配置和服務質量的托管價格差異較大,一般1U服務器托管費用一年在2000到5000元之間。Rak部落為您整理發布機房托管費用的差異,希望對您選擇機房
    的頭像 發表于 08-16 11:34 ?985次閱讀

    機房托管的好處

    機房托管提供的專業級服務和便利性,是其受到企業青睞的重要原因。下面將詳細探討機房托管的多重優勢,并了解這種服務模式如何幫助企業提升運營效率和安全保障。Rak部落為您整理發布機房托管的好處。
    的頭像 發表于 08-08 10:08 ?551次閱讀
    主站蜘蛛池模板: 上一篇26p国模 | 久久久夜夜夜 | 亚洲精品久久久久久婷婷 | 免费一级成人毛片 | 7777sq国产精品| 亚洲二区视频 | 亚洲a人片在线观看网址 | 国内在线观看精品免费视频 | 夜夜操夜夜摸 | 色性网| 天堂8资源在线官网资源 | 亚洲欧美在线一区二区 | 久操伊人 | 天天射天天爱天天干 | 久久国产综合 | xxx69欧美| 日韩一级片在线观看 | 永久免费观看午夜视频在线 | 黄色大片免费观看 | 色一乱一伦一区一直爽 | 看亚洲a级一级毛片 | 午夜视频在线观看一区二区 | 一级特黄aaa大片在线观看 | 人人干天天操 | 日本一区二区三区在线 视频观看免费 | 成人三级毛片 | 国产女人视频免费观看 | 久久久久久久国产 | h小视频在线观看网 | 永久黄网站色视频免费观看 | 久碰香蕉精品视频在线观看 | 天天插在线视频 | 亚洲国产精品乱码一区二区三区 | 日本一区高清视频 | 日日操夜夜骑 | 深夜视频在线 | 伊人精品久久久大香线蕉99 | 亚洲第一页视频 | 最近高清免费观看视频 | 日本一区不卡在线观看 | 麻豆三级在线播放 |