canal同步MySQL的binlog数据时踩了个大坑
创始人
2024-03-14 21:07:07
0

背景

在同步MySQL数据到ES的场景中,选择了canal组件同步数据。

问题描述

在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。

问题分析

初步估计是内部解析yml的时候出错了,但具体是什么原因只能看源码调试了。

下载源码

GitHub地址

IDEA导入代码

项目结构图如下:

配置文件: application.yml (## 备注代表需要注意和修改的地方)

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:## 模式mode: kafka #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumer## kafka地址,ip用内网(容器)ipkafka.bootstrap.servers: 192.168.0.107:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:## 数据库配置srcDataSources:defaultDS:url: jdbc:mysql://192.168.0.107:3306/test?useUnicode=trueusername: rootpassword: rootcanalAdapters:## Kafka主题名- instance: canal_manager # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase## Elasticsearch 配置,canal 1.5后name:es7(用es可能会有问题)- name: es7hosts: http://192.168.0.107:9200 # 127.0.0.1:9200 for rest modeproperties:## 模式restmode: rest # or rest# security.auth: test:123456 #  only used for rest modecluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address
复制代码

customer.ym (## 备注代表需要注意和修改的地方)

dataSourceKey: defaultDS
## Kafka主题
destination: canal_manager
groupId: g1
esMapping:## Elasticsearch 索引_index: product## 主键_id: _id_type: _docupsert: true#  relations:#    customer_order:#      name: customer## 正确的sql#sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, r.Pro_No proNo, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"## 出错的sqlsql: "SELECT s.Sales_No _id, s.Sales_Name salesName, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"etlCondition: "where p.c_time>={}"commitBatch: 3000
复制代码

启动入口:CanalAdapterApplication 全量同步接口类:CommonRest 请求示例:

// post
http://127.0.0.1:8081/etl/es7/customer.yml
复制代码

查找问题

启动程序后,发现打印了错误日志。

2022-03-18 09:10:56.537 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
2022-03-18 09:10:56.742 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ... 
2022-03-18 09:10:56.874 [main] INFO  c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
2022-03-18 09:11:00.028 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserExceptionat com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.context.support.AbstractApplicationContext.__refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.jrLockAndRefresh(AbstractApplicationContext.java:40002) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:41008) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
Caused by: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserExceptionat com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]... 44 common frames omitted
Caused by: com.alibaba.druid.sql.parser.ParserException: nullat com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser.parse(SqlParser.java:71) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:143) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]... 45 common frames omitted
2022-03-18 09:11:00.046 [main] INFO  c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /Users/Desktop/workspace/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin
2022-03-18 09:11:00.101 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: canal_manager-g1 succeed
2022-03-18 09:11:00.101 [Thread-35] INFO  c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: canal_manager <=============
2022-03-18 09:11:00.101 [main] INFO  c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2022-03-18 09:11:00.113 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2022-03-18 09:11:00.126 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-03-18 09:11:00.128 [Thread-35] INFO  o
复制代码

查看日志异常好像是sql解析异常:java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException,但是sql放到Navicat或其他工具中都可以正常执行,可能是canal内部有自己的解析规则。

根据错误日志断点调试一下,发现具体的问题在ES7xAdapter 适配器初始化的时候出错了。异常信息:“关系条件”列必须位于“选择”列中(Relation condition column must in select columns.)。

// ES同步指定sql格式解析 SqlParser.java

/*** 解析sql** @param sql sql* @return 视图对象*/public static SchemaItem parse(String sql) {try {SQLStatementParser parser = new MySqlStatementParser(sql);SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();SchemaItem schemaItem = new SchemaItem();schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();List tableItems = new ArrayList<>();SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));List fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));schemaItem.init();if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {throw new ParserException("Parse sql error");}return schemaItem;} catch (Exception e) {throw new ParserException();}}
复制代码

解决方案

根据异常提示和测试,我们知道canal的配置文件sql格式要求连表查询的时候,必须将关联条件也查出来,就是说如果表A和表B的关联字段都要查出来。

// 正确的sql,要将关联的两个表主键都查出来
select a.id, b.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id// 错误的sql
select a.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id复制代码

总结

有些问题还是需要源码才能发现的,就像这个情况,日志只提示了sql解析异常,但是看起来又没有问题,只能去看代码逻辑调试,才能发现根本的原因。个人经验总结,如果有不对的地方,请大家指正

相关内容

热门资讯

汽车油箱结构是什么(汽车油箱结... 本篇文章极速百科给大家谈谈汽车油箱结构是什么,以及汽车油箱结构原理图解对应的知识点,希望对各位有所帮...
美国2年期国债收益率上涨15个... 原标题:美国2年期国债收益率上涨15个基点 美国2年期国债收益率上涨15个基...
嵌入式 ADC使用手册完整版 ... 嵌入式 ADC使用手册完整版 (188977万字)💜&#...
重大消息战皇大厅开挂是真的吗... 您好:战皇大厅这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...
盘点十款牵手跑胡子为什么一直... 您好:牵手跑胡子这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游...
senator香烟多少一盒(s... 今天给各位分享senator香烟多少一盒的知识,其中也会对sevebstars香烟进行解释,如果能碰...
终于懂了新荣耀斗牛真的有挂吗... 您好:新荣耀斗牛这款游戏可以开挂,确实是有挂的,需要了解加客服微信8435338】很多玩家在这款游戏...
盘点十款明星麻将到底有没有挂... 您好:明星麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【5848499】很多玩家在这款游戏...
总结文章“新道游棋牌有透视挂吗... 您好:新道游棋牌这款游戏可以开挂,确实是有挂的,需要了解加客服微信【7682267】很多玩家在这款游...
终于懂了手机麻将到底有没有挂... 您好:手机麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...