最近不是正好在研究 canal 嘛,剛巧前兩天看了一篇關(guān)于解決緩存與數(shù)據(jù)庫一致性問題的文章,里邊提到了一種解決方案是結(jié)合 canal 來操作的,所以阿Q就想趁熱打鐵,手動(dòng)來實(shí)現(xiàn)一下。
架構(gòu)
文中提到的思想是:
- 采用先更新數(shù)據(jù)庫,后刪除緩存的方式來解決并發(fā)引發(fā)的一致性問題;
- 采用異步重試的方式來保證“更新數(shù)據(jù)庫、刪除緩存”這兩步都能執(zhí)行成功;
- 可以采用訂閱變更日志的方式來清除 Redis 中的緩存;
基于這種思想,阿Q腦海中搭建了以下架構(gòu)
- APP 從 Redis 中查詢信息,將數(shù)據(jù)的更新寫入 MySQL 數(shù)據(jù)庫中;
- Canal 向 MySQL 發(fā)送 dump 協(xié)議,接收 binlog 推送的數(shù)據(jù);
- Canal 將接收到的數(shù)據(jù)投遞給 MQ 消息隊(duì)列;
- MQ 消息隊(duì)列消費(fèi)消息,同時(shí)刪除 Redis 中對(duì)應(yīng)數(shù)據(jù)的緩存;
環(huán)境準(zhǔn)備
這篇文章中有 canal 的安裝教程以及對(duì) mysql 的相關(guān)配置:canal安裝
考慮到我們服務(wù)器之前安裝過 RabbitMQ ,所以我們就用 RabbitMQ 來充當(dāng)消息隊(duì)列吧。
Canal 配置
修改 conf/canal.properties
配置
# 指定模式
canal.serverMode = rabbitMQ
# 指定實(shí)例,多個(gè)實(shí)例使用逗號(hào)分隔: canal.destinations = example1,example2
canal.destinations = example
# rabbitmq 服務(wù)端 ip
rabbitmq.host = 127.0.0.1
# rabbitmq 虛擬主機(jī)
rabbitmq.virtual.host = /
# rabbitmq 交換機(jī)
rabbitmq.exchange = xxx
# rabbitmq 用戶名
rabbitmq.username = xxx
# rabbitmq 密碼
rabbitmq.password = xxx
rabbitmq.deliveryMode =
修改實(shí)例配置文件 conf/example/instance.properties
#配置 slaveId,自定義,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10
# 數(shù)據(jù)庫地址:配置自己的ip和端口
canal.instance.master.address=ip:port
# 數(shù)據(jù)庫用戶名和密碼
canal.instance.dbUsername=xxx
canal.instance.dbPassword=xxx
# 指定庫和表
canal.instance.filter.regex=.*\\\\..* // 這里的 .* 表示 canal.instance.master.address 下面的所有數(shù)據(jù)庫
# mq config
# rabbitmq 的 routing key
canal.mq.topic=xxx
然后重啟 canal 服務(wù)。
數(shù)據(jù)庫
建表語句
CREATE TABLE `product_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,4) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
數(shù)據(jù)初始化
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(1, '從你的全世界路過', 14.0000, '2020-11-21 21:26:12', '2021-03-27 22:17:39');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(2, '喬布斯傳', 25.0000, '2020-11-21 21:26:42', '2021-03-27 22:17:42');
INSERT INTO cheetah.product_info
(id, name, price, create_date, update_date)
VALUES(3, 'java開發(fā)', 87.0000, '2021-03-27 22:43:31', '2021-03-27 22:43:34');
實(shí)戰(zhàn)
項(xiàng)目引入的依賴比較多,為了不占用過多的篇幅,大家可以在公眾號(hào)【阿Q說代碼】后臺(tái)回復(fù)“canal”獲取項(xiàng)目源碼!
MySQL 和 Redis 的相關(guān)配置在此不再贅述,有不懂的可以私聊阿Q:qingqing-4132;
RabbitMQ 配置
@Configuration
public class RabbitMQConfig {
public static final String CANAL_QUEUE = "canal_queue";//隊(duì)列
public static final String DIRECT_EXCHANGE = "canal";//交換機(jī),要與canal中配置的相同
public static final String ROUTING_KEY = "routingkey";//routing-key,要與canal中配置的相同
/**
* 定義隊(duì)列
**/
@Bean
public Queue canalQueue(){
return new Queue(CANAL_QUEUE,true);
}
/**
* 定義直連交換機(jī)
**/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
/**
* 隊(duì)列和交換機(jī)綁定
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(canalQueue()).to(directExchange()).with(ROUTING_KEY);
}
}
商品信息入緩存
/**
* 獲取商品信息:
* 先從緩存中查,如果不存在再去數(shù)據(jù)庫中查,然后將數(shù)據(jù)保存到緩存中
* @param productInfoId
* @return
*/
@Override
public ProductInfo findProductInfo(Long productInfoId) {
//1.從緩存中獲取商品信息
Object object = redisTemplate.opsForValue().get(REDIS_PRODUCT_KEY + productInfoId);
if(ObjectUtil.isNotEmpty(object)){
return (ProductInfo)object;
}
//2.如果緩存中不存在,從數(shù)據(jù)庫獲取信息
ProductInfo productInfo = this.baseMapper.selectById(productInfoId);
if(productInfo != null){
//3.將商品信息緩存
redisTemplate.opsForValue().set(REDIS_PRODUCT_KEY+productInfoId, productInfo,
REDIS_PRODUCT_KEY_EXPIRE, TimeUnit.SECONDS);
return productInfo;
}
return null;
}
執(zhí)行方法后,查看 Redis 客戶端是否有數(shù)據(jù)存入
更新數(shù)據(jù)入MQ
/**
* 更新商品信息
* @param productInfo
* @return
*/
@PostMapping("/update")
public AjaxResult update(@RequestBody ProductInfo productInfo){
productInfoService.updateById(productInfo);
return AjaxResult.success();
}
當(dāng)我執(zhí)行完 update 方法的時(shí)候,去RabbitMQ Management
查看,發(fā)現(xiàn)并沒有消息進(jìn)入隊(duì)列。
問題描述
通過排查之后我在服務(wù)器中 canal 下的 /usr/local/logs/example/example.log
文件里發(fā)現(xiàn)了問題所在。
原因就是meta.dat
中保存的位點(diǎn)信息和數(shù)據(jù)庫的位點(diǎn)信息不一致導(dǎo)致 canal 抓取不到數(shù)據(jù)庫的動(dòng)作。
于是我找到 canal 的 conf/example/instance.properties
實(shí)例配置文件,發(fā)現(xiàn)沒有將canal.instance.master.address=127.0.0.1:3306
設(shè)置成自己的數(shù)據(jù)庫地址。
解決方案
- 先停止 canal 服務(wù)的運(yùn)行;
- 刪除
meta.dat
文件; - 再重啟 canal,問題解決;
再次執(zhí)行 update 方法,會(huì)發(fā)現(xiàn) RabbitMQ Management
中已經(jīng)有我們想要的數(shù)據(jù)了。
MQ接收數(shù)據(jù)
編寫 RabbitMQ 消費(fèi)代碼的邏輯
@RabbitListener(queues = "canal_queue")//監(jiān)聽隊(duì)列名稱
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消費(fèi)的隊(duì)列消息來自:" + message.getMessageProperties().getConsumerQueue());
//刪除reids中對(duì)應(yīng)的key
ProductInfoDetail productInfoDetail = JSON.parseObject(msg, ProductInfoDetail.class);
log.info("庫名:"+ productInfoDetail.getDatabase());
log.info("表名: "+ productInfoDetail.getTable());
if(productInfoDetail!=null && productInfoDetail.getData()!=null){
List
當(dāng)我們?cè)俅握{(diào)用 update
接口時(shí),控制臺(tái)會(huì)打印以下信息
從圖中打印的信息可以看出就是我們的庫和表以及消息隊(duì)列,Redis 客戶端中緩存的信息也被刪除了。
拓展
看到這,你肯定會(huì)問:RabbitMQ 是閱后即焚的機(jī)制,它確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除,如果此時(shí)我們的業(yè)務(wù)還沒有跑完,沒來的及刪除 Redis 中的緩存就宕機(jī)了,豈不是緩存一直都得不到更新了嗎?
首先我們要明確的是 RabbitMQ 是通過消費(fèi)者回執(zhí)來確認(rèn)消費(fèi)者是否成功處理消息的,即消費(fèi)者獲取消息后,應(yīng)該向 RabbitMQ 發(fā)送 ACK 回執(zhí),表明自己已經(jīng)處理消息了。
為了不讓上述問題出現(xiàn),消費(fèi)者返回 ACK 回執(zhí)的時(shí)機(jī)就顯得非常重要了, 而 SpringAMQP 也為我們提供了三種可選的確認(rèn)模式:
- manual:手動(dòng) ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用 api 發(fā)送 ack;
- auto:自動(dòng) ack ,由 spring 監(jiān)測(cè) listener 代碼是否出現(xiàn)異常,沒有異常則返回 ack,拋出異常則返回 nack;
- none:關(guān)閉 ack,MQ 假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除;
由此可知在 none 模式下消息投遞最不可靠,可能會(huì)丟失消息;在默認(rèn)的 auto 模式下如果出現(xiàn)服務(wù)器宕機(jī)的情況也是會(huì)丟失消息的,本次實(shí)戰(zhàn)中,阿Q為了防止消息丟失采用的是 manual 這種模式,配置信息如下:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #開啟手動(dòng)確認(rèn)
所以在代碼中也就出現(xiàn)了
//用于肯定確認(rèn)
channel.basicAck(deliveryTag, true);
//用于否定確認(rèn)
channel.basicReject(deliveryTag ,true);
當(dāng)然此種模式雖然不會(huì)丟失消息,但是會(huì)導(dǎo)致效率變低。
-
緩存
+關(guān)注
關(guān)注
1文章
242瀏覽量
26771 -
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3852瀏覽量
64740 -
MySQL
+關(guān)注
關(guān)注
1文章
831瀏覽量
26762
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論