MySQL 自身簡(jiǎn)單、高效、可靠,是又拍云內(nèi)部使用最廣泛的數(shù)據(jù)庫(kù)。但是當(dāng)數(shù)據(jù)量達(dá)到一定程度的時(shí)候,對(duì)整個(gè) MySQL 的操作會(huì)變得非常遲緩。而公司內(nèi)部 robin/logs 表的數(shù)據(jù)量已經(jīng)達(dá)到 800w,后續(xù)又有全文檢索的需求。這個(gè)需求直接在 MySQL上實(shí)施是難以做到的。
原數(shù)據(jù)庫(kù)的同步問(wèn)題
由于傳統(tǒng)的 mysql 數(shù)據(jù)庫(kù)并不擅長(zhǎng)海量數(shù)據(jù)的檢索,當(dāng)數(shù)據(jù)量到達(dá)一定規(guī)模時(shí)(估算單表兩千萬(wàn)左右),查詢和插入的耗時(shí)會(huì)明顯增加。同樣,當(dāng)需要對(duì)這些數(shù)據(jù)進(jìn)行模糊查詢或是數(shù)據(jù)分析時(shí),MySQL作為事務(wù)型關(guān)系數(shù)據(jù)庫(kù)很難提供良好的性能支持。使用適合的數(shù)據(jù)庫(kù)來(lái)實(shí)現(xiàn)模糊查詢是解決這個(gè)問(wèn)題的關(guān)鍵。 但是,切換數(shù)據(jù)庫(kù)會(huì)迎來(lái)兩個(gè)問(wèn)題,一是已有的服務(wù)對(duì)現(xiàn)在的 MySQL重度依賴,二是MySQL的事務(wù)能力和軟件生態(tài)仍然不可替代,直接遷移數(shù)據(jù)庫(kù)的成本過(guò)大。我們綜合考慮了下,決定同時(shí)使用多個(gè)數(shù)據(jù)庫(kù)的方案,不同的數(shù)據(jù)庫(kù)應(yīng)用于不同的使用場(chǎng)景。
而在支持模糊查詢功能的數(shù)據(jù)庫(kù)中,elasticsearch 自然是首選的查詢數(shù)據(jù)庫(kù)。這樣后續(xù)對(duì)業(yè)務(wù)需求的切換也會(huì)非常靈活。 那具體該如何實(shí)現(xiàn)呢?在又拍云以往的項(xiàng)目中,也有遇到相似的問(wèn)題。之前采用的方法是在業(yè)務(wù)中編寫(xiě)代碼,然后同步到 elasticsearch 中。具體是這樣實(shí)施的:每個(gè)系統(tǒng)編寫(xiě)特定的代碼,修改 MySQL數(shù)據(jù)庫(kù)后,再將更新的數(shù)據(jù)直接推送到需要同步的數(shù)據(jù)庫(kù)中,或推送到隊(duì)列由消費(fèi)程序來(lái)寫(xiě)入到數(shù)據(jù)庫(kù)中。 但這個(gè)方案有一些明顯的缺點(diǎn):
系統(tǒng)高耦合,侵入式代碼,使得業(yè)務(wù)邏輯復(fù)雜度增加
方案不通用,每一套同步都需要額外定制,不僅增加業(yè)務(wù)處理時(shí)間,還會(huì)提升軟件復(fù)復(fù)雜度
工作量和復(fù)雜度增加
在業(yè)務(wù)中編寫(xiě)同步方案,雖然在項(xiàng)目早期比較方便,但隨著數(shù)據(jù)量和系統(tǒng)的發(fā)展壯大,往往最后會(huì)成為業(yè)務(wù)的大痛點(diǎn)。
解決思路及方案
調(diào)整架構(gòu)
既然以往的方案有明顯的缺點(diǎn),那我們?nèi)绾蝸?lái)解決它呢??jī)?yōu)秀的解決方案往往是 “通過(guò)架構(gòu)來(lái)解決問(wèn)題“,那么能不能通過(guò)架構(gòu)的思想來(lái)解決問(wèn)題呢? 答案是可以的。我們可以將程序偽裝成 “從數(shù)據(jù)庫(kù)”,主庫(kù)的增量變化會(huì)傳遞到從庫(kù),那這個(gè)偽裝成 “從數(shù)據(jù)庫(kù)” 的程序就能實(shí)時(shí)獲取到數(shù)據(jù)變化,然后將增量的變化推送到消息隊(duì)列 MQ,后續(xù)消費(fèi)者消耗 MQ 的數(shù)據(jù),然后經(jīng)過(guò)處理之后再推送到各自需要的數(shù)據(jù)庫(kù)。 這個(gè)架構(gòu)的核心是通過(guò)監(jiān)聽(tīng) MySQL 的 binlog 來(lái)同步增量數(shù)據(jù),通過(guò)基于 query 的查詢舊表來(lái)同步舊數(shù)據(jù),這就是本文要講的一種異構(gòu)數(shù)據(jù)庫(kù)同步的實(shí)踐。
改進(jìn)數(shù)據(jù)庫(kù)
經(jīng)過(guò)深度的調(diào)研,成功得到了一套異構(gòu)數(shù)據(jù)庫(kù)同步方案,并且成功將公司生產(chǎn)環(huán)境下的 robin/logs 的表同步到了 elasticsearch 上。 首先對(duì) MySQL 開(kāi)啟 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生產(chǎn)環(huán)境的數(shù)據(jù)庫(kù)不宜修改。這里請(qǐng)教了海楊前輩,他提供了”從庫(kù)聯(lián)級(jí)“的思路,在從庫(kù)中監(jiān)聽(tīng) binlog 繞過(guò)了操作生產(chǎn)環(huán)境重啟主庫(kù)的操作,大大降低了系統(tǒng)風(fēng)險(xiǎn)。 后續(xù)操作比較順利,啟動(dòng) maxwell 監(jiān)聽(tīng)從庫(kù)變化,然后將增量變化推送到 kafka ,最后配置 logstash 消費(fèi) kafka中的數(shù)據(jù)變化事件信息,將結(jié)果推送到 elasticsearch。配置 logstash需要結(jié)合表結(jié)構(gòu),這是整套方案實(shí)施的重點(diǎn)。 這套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 與 kafka已經(jīng)在生產(chǎn)環(huán)境中有部署,所以無(wú)需單獨(dú)部署維護(hù)。而 logstash 與 maxwell 只需要修改配置文件和啟動(dòng)命令即可快速上線。整套方案的意義不僅在于成本低,而且可以大規(guī)模使用,公司內(nèi)有 MySQL 同步到其它數(shù)據(jù)庫(kù)的需求時(shí),都可以上任。
成果展示前后對(duì)比
- 使用該方案同步和業(yè)務(wù)實(shí)現(xiàn)同步的對(duì)比
- 寫(xiě)入到 elasticsearch 性能對(duì)比 (8核4G內(nèi)存)
項(xiàng)目 | logstash | 業(yè)務(wù)同步 |
寫(xiě)入速度 | 1500 條/s | 200 條/s |
經(jīng)過(guò)對(duì)比測(cè)試,800w 數(shù)據(jù)量全量同步,使用 logstash 寫(xiě)到 elasticsearch,實(shí)際需要大概 3 小時(shí),而舊方案的寫(xiě)入時(shí)間需要 2.5 天。
方案實(shí)施細(xì)節(jié)
接下來(lái),我們來(lái)看看具體是如何實(shí)現(xiàn)的。 本方案無(wú)需編寫(xiě)額外代碼,非侵入式的,實(shí)現(xiàn) MySQL數(shù)據(jù)與 elasticsearch 數(shù)據(jù)庫(kù)的同步。
下列是本次方案需要使用所有的組件:
MySQL
Kafka
Maxwell(監(jiān)聽(tīng) binlog)
Logstash(將數(shù)據(jù)同步給 elasticsearch)
Elasticsearch
1. MySQL配置
本次使用 MySQL 5.5 作示范,其他版本的配置可能稍許不同需要
首先我們需要增加一個(gè)數(shù)據(jù)庫(kù)只讀的用戶,如果已有的可以跳過(guò)。
-- 創(chuàng)建一個(gè) 用戶名為 maxwell 密碼為 xxxxxx 的用戶 CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX'; GRANT ALL ON maxwell.* TO 'maxwell'@'localhost'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
開(kāi)啟數(shù)據(jù)庫(kù)的 `binlog`,修改 `mysql` 配置文件,注意 `maxwell` 需要的 `binlog` 格式必須是`row`。
# /etc/mysql/my.cnf [mysqld] # maxwell 需要的 binlog 格式必須是 row binlog_format=row # 指定 server_id 此配置關(guān)系到主從同步需要按情況設(shè)置, # 由于此mysql沒(méi)有開(kāi)啟主從同步,這邊默認(rèn)設(shè)置為 1 server_id=1 # logbin 輸出的文件名, 按需配置 log-bin=master重啟 MySQL 并查看配置是否生效:
sudo systemctl restart mysqld
select @@log_bin; -- 正確結(jié)果是 1 select @@binlog_format; -- 正確結(jié)果是 ROW如果要監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)開(kāi)啟了主從同步,并且不是主數(shù)據(jù)庫(kù),需要再?gòu)臄?shù)據(jù)庫(kù)開(kāi)啟 binlog 聯(lián)級(jí)同步。
# /etc/my.cnf log_slave_updates = 1需要被同步到 elasticsearch 的表結(jié)構(gòu)。
-- robin.logs show create table robin.logs; -- 表結(jié)構(gòu) CREATE TABLE `logs` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `content` text NOT NULL, `user_id` int(11) NOT NULL, `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL, `type` varchar(20) DEFAULT '', `meta` text, `created_at` bigint(15) NOT NULL, `idx_host` varchar(255) DEFAULT '', `idx_domain_id` int(11) unsigned DEFAULT NULL, `idx_record_value` varchar(255) DEFAULT '', `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL, `idx_orig_record_value` varchar(255) DEFAULT '', PRIMARY KEY (`id`), KEY `created_at` (`created_at`) ) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8
2.Maxwell 配置
本次使用 maxwell-1.39.2 作示范, 確保機(jī)器中包含 java 環(huán)境, 推薦 openjdk11
下載 maxwell 程序
wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz tar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2maxwell 使用了兩個(gè)數(shù)據(jù)庫(kù):
一個(gè)是需要被監(jiān)聽(tīng)binlog的數(shù)據(jù)庫(kù)(只需要讀權(quán)限)
另一個(gè)是記錄maxwell服務(wù)狀態(tài)的數(shù)據(jù)庫(kù),當(dāng)前這兩個(gè)數(shù)據(jù)庫(kù)可以是同一個(gè)
重要參數(shù)說(shuō)明:
host 需要監(jiān)聽(tīng)binlog的數(shù)據(jù)庫(kù)地址
port 需要監(jiān)聽(tīng)binlog的數(shù)據(jù)庫(kù)端口
user 需要監(jiān)聽(tīng)binlog的數(shù)據(jù)庫(kù)用戶名
password 需要監(jiān)聽(tīng)binlog的密碼
replication_host 記錄maxwell服務(wù)的數(shù)據(jù)庫(kù)地址
replication_port 記錄maxwell服務(wù)的數(shù)據(jù)庫(kù)端口
replication_user 記錄maxwell服務(wù)的數(shù)據(jù)庫(kù)用戶名
filter 用于監(jiān)聽(tīng)binlog數(shù)據(jù)時(shí)過(guò)濾不需要的數(shù)據(jù)庫(kù)數(shù)據(jù)或指定需要的數(shù)據(jù)庫(kù)
producer 將監(jiān)聽(tīng)到的增量變化數(shù)據(jù)提交給的消費(fèi)者 (如 stdout、kafka)
kafka.bootstrap.servers kafka 服務(wù)地址
kafka_version kafka 版本
kafka_topic 推送到kafka的主題
啟動(dòng) maxwell
注意,如果 kafka 配置了禁止自動(dòng)創(chuàng)建主題,需要先自行在 kafka 上創(chuàng)建主題,kafka_version 需要根據(jù)情況指定, 此次使用了兩張不同的庫(kù)
./bin/maxwell --host=mysql-maxwell.mysql.svc.cluster.fud3 --port=3306 --user=root --password=password --replication_host=192.168.5.38 --replication_port=3306 --replication_user=cloner --replication_password=password --filter='exclude: *.*, include: robin.logs' --producer=kafka --kafka.bootstrap.servers=192.168.30.10:9092 --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1
3. 安裝 Logstash
Logstash 包中已經(jīng)包含了 openjdk,無(wú)需額外安裝。
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz tar zxvf logstash-8.5.0-linux-x86_64.tar.gz刪除不需要的配置文件。
rm config/logstash.yml修改logstash配置文件,此處語(yǔ)法參考官方文檔(https://www.elastic.co/guide/en/logstash/current/input-plugins.html)。
# config/logstash-sample.conf input { kafka { bootstrap_servers => "192.168.30.10:9092" group_id => "main" topics => ["maxwell-robinlogs"] } } filter { json { source => "message" } # 將maxwell的事件類(lèi)型轉(zhuǎn)化為es的事件類(lèi)型 # 如增加 -> index 修改-> update translate { source => "[type]" target => "[action]" dictionary => { "insert" => "index" "bootstrap-insert" => "index" "update" => "update" "delete" => "delete" } fallback => "unknown" } # 過(guò)濾無(wú)效的數(shù)據(jù) if ([action] == "unknown") { drop {} } # 處理數(shù)據(jù)格式 if [data][idx_host] { mutate { add_field => { "idx_host" => "%{[data][idx_host]}" } } } else { mutate { add_field => { "idx_host" => "" } } } if [data][idx_domain_id] { mutate { add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" } } } else { mutate { add_field => { "idx_domain_id" => "" } } } if [data][idx_record_value] { mutate { add_field => { "idx_record_value" => "%{[data][idx_record_value]}" } } } else { mutate { add_field => { "idx_record_value" => "" } } } if [data][idx_record_opt] { mutate { add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" } } } else { mutate { add_field => { "idx_record_opt" => "" } } } if [data][idx_orig_record_value] { mutate { add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" } } } else { mutate { add_field => { "idx_orig_record_value" => "" } } } if [data][type] { mutate { replace => { "type" => "%{[data][type]}" } } } else { mutate { replace => { "type" => "" } } } mutate { add_field => { "id" => "%{[data][id]}" "content" => "%{[data][content]}" "user_id" => "%{[data][user_id]}" "status" => "%{[data][status]}" "meta" => "%{[data][meta]}" "created_at" => "%{[data][created_at]}" } remove_field => ["data"] } mutate { convert => { "id" => "integer" "user_id" => "integer" "idx_domain_id" => "integer" "created_at" => "integer" } } # 只提煉需要的字段 mutate { remove_field => [ "message", "original", "@version", "@timestamp", "event", "database", "table", "ts", "xid", "commit", "tags" ] } } output { # 結(jié)果寫(xiě)到es elasticsearch { hosts => ["http://es-zico2.service.upyun:9500"] index => "robin_logs" action => "%{action}" document_id => "%{id}" document_type => "robin_logs" } # 結(jié)果打印到標(biāo)準(zhǔn)輸出 stdout { codec => rubydebug } }執(zhí)行程序:
# 測(cè)試配置文件* bin/logstash -f config/logstash-sample.conf --config.test_and_exit # 啟動(dòng)* bin/logstash -f config/logstash-sample.conf --config.reload.automatic
4. 全量同步
完成啟動(dòng)后,后續(xù)的增量數(shù)據(jù) maxwell 會(huì)自動(dòng)推送給 logstash 最終推送到 elasticsearch ,而之前的舊數(shù)據(jù)可以通過(guò) maxwell 的 bootstrap 來(lái)同步,往下面表中插入一條任務(wù),那么 maxwell 會(huì)自動(dòng)將所有符合條件的 where_clause 的數(shù)據(jù)推送更新。
INSERT INTO maxwell.bootstrap ( database_name, table_name, where_clause, client_id ) values ( 'robin', 'logs', 'id > 1', 'maxwell' );后續(xù)可以在 elasticsearch 檢測(cè)數(shù)據(jù)是否同步完成,可以先查看數(shù)量是否一致,然后抽樣對(duì)比詳細(xì)數(shù)據(jù)。
# 檢測(cè) elasticsearch 中的數(shù)據(jù)量 GET robin_logs/robin_logs/_count
?
審核編輯:劉清
-
數(shù)據(jù)庫(kù)
+關(guān)注
關(guān)注
7文章
3885瀏覽量
65641 -
MySQL
+關(guān)注
關(guān)注
1文章
841瀏覽量
27401 -
Maxwell
+關(guān)注
關(guān)注
4文章
36瀏覽量
13000
原文標(biāo)題:如何高效實(shí)現(xiàn)MySQL與elasticsearch的數(shù)據(jù)同步
文章出處:【微信號(hào):OSC開(kāi)源社區(qū),微信公眾號(hào):OSC開(kāi)源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
數(shù)據(jù)采集到MYSQL和SQLSERVER數(shù)據(jù)庫(kù)可以實(shí)現(xiàn)哪些功能
工業(yè)智能網(wǎng)關(guān)采集能耗數(shù)據(jù)對(duì)接到MySQL數(shù)據(jù)庫(kù)

從Delphi、C++ Builder和Lazarus連接到MySQL數(shù)據(jù)庫(kù)

使用插件將Excel連接到MySQL/MariaDB

適用于MySQL和MariaDB的Python連接器:可靠的MySQL數(shù)據(jù)連接器和數(shù)據(jù)庫(kù)

如何在Linux環(huán)境下高效安裝部署和配置Elasticsearch
MySQL數(shù)據(jù)庫(kù)的安裝

在華為云上通過(guò) Docker 容器部署 Elasticsearch 并進(jìn)行性能評(píng)測(cè)

構(gòu)建數(shù)據(jù)庫(kù)解決方案,基于華為云 Flexus X 實(shí)例容器化 MySQL 主從同步架構(gòu)

構(gòu)建高效搜索解決方案,Elasticsearch & Kibana 的完美結(jié)合

數(shù)據(jù)庫(kù)數(shù)據(jù)恢復(fù)—Mysql數(shù)據(jù)庫(kù)表記錄丟失的數(shù)據(jù)恢復(fù)流程

數(shù)據(jù)庫(kù)數(shù)據(jù)恢復(fù)—MYSQL數(shù)據(jù)庫(kù)ibdata1文件損壞的數(shù)據(jù)恢復(fù)案例
香港云服務(wù)器怎么部署MySQL數(shù)據(jù)庫(kù)?
Elasticsearch 再次開(kāi)源

適用于MySQL的dbForge架構(gòu)比較

評(píng)論