公司项目用到了MySQL和ES,ES主要用来查询,但是之前每次都是更新MySQL后同步更新ES,由于ES没有更新的方法,需要每次都查询更新,导致每次耗时都很长.考虑将更新ES的操作异步处理掉,然后找到了canal这么一个开源项目.
项目地址:https://github.com/alibaba/canal
废话不多说直接开干.

GitHub下载源码

image
adapter:增加客户端数据落地的适配及启动功能(支持HBase等)
admin:canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
deployer:这个就相当于canal的服务端,启动它才可以在客户端接收数据库变更信息
我们这里只需要下载deployer就可以了

修改配置

需要配置的数据库信息要是主库,并且mysql用户需要有读取binlog的权限

  • 修改 /config/canal.properties
    image-1665640091463
    image-1665640130554
    image-1665640625151
    image-1665640575546
    canal.properties具体参数说明参考官方资料 aliyun-RDS-QuickStart

  • 修改 /canal/example/instance.properties
    image-1665640489329
    image-1665640454376
    canal.instance.filter.regex 配置需要监听的表
    canal.instance.filter.black.regex 监听黑名单
    canal.instance.filter.field 配置需要放入MQ消息体中的字段或者整行注释掉,将所有字段信息放入MQ,因为业务中需要多张表的信息一起写入ES,所以我只拿id去反查数据库组装数据后存ES
    instance.properties具体参数说明参考官方资料 Canal Kafka RocketMQ QuickStart

阿里云的RocketMQ在配置分区时如果配置canal.mq.partitionsNum不生效,
可以尝试配置canal.mq.enableDynamicQueuePartition为true,开启动态获取MQ服务端的分区数
参考issue:https://github.com/alibaba/canal/issues/4400 这个坑把我整了好久.

启动

执行脚本 startup.sh
可以看看 /logs/canal 和 /logs/example下的日志,看看canal是不是正常启动
我启动的时候,报了一个异常

Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: the topic[binlog_prod_yaomaitong] is batch send , not support.
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:671) ~[na:na]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:467) ~[na:na]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449) ~[na:na]
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:403) ~[na:na]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:831) ~[na:na]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:981) ~[na:na]
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:963) ~[na:na]
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:800) ~[na:na]
at com.alibaba.otter.canal.connector.rocketmq.producer.CanalRocketMQProducer.sendMessage(CanalRocketMQProducer.java:303) ~[na:na]

原因是阿里云的RocketMQ不支持批处理
需要修改源码 CanalRocketMQProducer
image-1665645607957
将框内的send方法改为下面的for循环发送消息

参考issue: https://github.com/alibaba/canal/issues/3304
然后重新打包项目
或者用我修改后重新打包的 点我下载

MQ消费者代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Message;
import com.yiwise.es.sync.constant.SqlTypeConstant;
import com.yiwise.scrm.common.rocketmq.core.ConsumerConfig;
import com.yiwise.scrm.common.rocketmq.core.MQListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
 * Description: 消费cannal放进MQ的消息
 * Create DateTime: 2022/4/24 11:37
 *
 * @author zhangchangsheng01@gmail.com
 */
@Slf4j
@ConsumerConfig(topic = "${rocket.mq.topic.sync.customer}", group = "${rocket.mq.group.consumer.sync.customer}", consumeThreadNums = 100)
@Component
public class SyncCustomerConsumerListenerTest extends MQListener {



    /**
     * 消费MQ的信息,根据MQ解析的SQL信息获取表名,数据变化的主键id
     *
     * @param message
     * @author zhangchangsheng01@gmail.com
     * @date 2022/4/24 14:00
     */
    @Override
    public boolean consume(Message message) {
        log.info("======================从MQ获取信息:[{}]", message);
        MDC.put("MDC_LOG_ID", message.getMsgID());
        // 解析MQ的信息,监听的binlog信息
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ body转String:{}", body);
        try {
            JSONObject jsonObject = JSON.parseObject(body);
            // 表名
            String tableName = jsonObject.getString("table");
            String sqlType = jsonObject.getString("type");
            // 主键id
            JSONArray array = jsonObject.getJSONArray("data");
            if (StringUtils.equals(sqlType, SqlTypeConstant.QUERY) || StringUtils.isBlank(tableName) || Objects.isNull(array)) {
                return true;
            }
            // 这里的data取出来是数据库记录,因为我在instance.properties 中只往MQ发送了每行记录的id,所以这里取出来是个id的集合
            log.info("表名:{},操作类型:{},主键id:{}", tableName, sqlType, array);
        } catch (Exception e) {
            log.error("消费MQ失败", e);
        }
        return true;
    }

}

因为我需要使用多消费者消费,所以这里将消费者部署到了两台机器,在阿里云MQ的控制台可以看到两个客户端连接实例
image-1665648003071

image-1665648455126

image-1665648512016
可以看到两台机器的消费者都在进行消费,到这里就算是大功告成了.

主要踩了两个坑,一个是canal1.1.4可以正常启动,但是MQ只能单消费者消费,有次晚上看到MQ堆积了一些消息,想着多个消费者一起消费,避免消息积压,但是坑爹的1.1.4版本配置canal.mq.partitionsNum并没有生效,反复看官方文档,网上找资料都没有很详细的介绍,大多数都是把官网的配置参数介绍复制粘贴到自己的文章,后面在issue里找到了相关的解决方案.但是1.1.4版本没有canal.mq.enableDynamicQueuePartition配置,所以升级到了1.1.6版本,升级1.1.6版本后由于阿里云的MQ不支持批量处理所以报错了,因为很快搜索到这个issue,所以花了几个小时下源码,改源码,重新打包.