• [最佳实践] Python对接clickhouse
    Python对接clickhouseclickhouse通用jdbc端口clickhouse jdbc接口使用HTTP协议,具体对应华为clickhouse端口可以在Manager->clickhouse页面 逻辑集群部分查看针对非加密、加密端口,对接使用的jdbc url有区别,具体如下非加密端口 21426 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21426/default加密端口 21428 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21428/default?ssl=true&sslmode=none其中连接的ip 为 clickhouse balancer实例对应的ip注意:本次使用非加密端口进行对接前提条件安装python3环境,以及需要连接的MRS集群环境 1、 下载python3 源码 编译tar zxvf Python-3.8.0.tgz cd Python-3.8.0 mkdir -p /usr/local/python-3.8.0 ./configure --prefix=/usr/local/python-3.8.0 -enable-optimizations --with-ssl make && make install 编译 ln -s /usr/local/python-3.8.0/bin/python3 /usr/bin/python3 ln -s /usr/local/python-3.8.0/bin/pip3 /usr/bin/pip3 ll /usr/bin/python*使用通用jdbc进行连接安装对应依赖./pip3 install jpype1==1.4.1 ./pip3 install JayDeBeApi==1.2.3Python代码import jaydebeapi import jpype import os conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:21426/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar']) import pandas as pd sql = "Select * From addressbook" df_ck = pd.read_sql(sql, conn) df_ck conn.close()注解: 将所需的lib文件放在对应目录下commons-codec-1.15.jar commons-logging-1.2.jar httpclient-4.5.13.jar httpcore-4.4.13.jar lz4-java-1.7.1.jar slf4j-api-1.7.36.jar bcprov-jdk15on-1.70.jar clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar us-common-1.0.66.jar使用clickhouse_connect进行连接安装对应依赖./pip3 install clickhouse_connect==0.6.4 python代码import clickhouse_connect client = clickhouse_connect.get_client(host='x.x.x.x', port=21426, username='username', password='passwd') client.command('show tables') client.command('select * from people')注:参照cid:link_0FAQPython代码执行报错classnotfoundconn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse:// x.x.x.x:port/default?ssl=true&sslmode=none?user=username&password=passwd!@",jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar'])查看该方法源码 help(jaydebeapi.connect)解决方法: 修改connection的参数设置为conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:port/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar'])python代码执行报错classnotfoundgrep –R ‘x.x.x.x’ 文件夹找到对应的jar包放到对应目录下,在jars参数上加上对应的jar包
  • [赋能学习] MRS3.2.0版本 二次开发: Spark读写Clickhouse样例
    一、Spark简介Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景:数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。二、背景说明在Spark应用中,用户可以通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作。通过ClickHouse JDBC驱动创建数据库和表,并插入数据。然后使用Spark JDBC接口读取ClickHouse表中数据,进行转换处理后再追加写入到ClickHouse表中。主要分为四个部分:创建ClickHouse数据库和表,将数据插入表中。使用Spark JDBC接口读取ClickHouse表中数据。注册临时表,并对表中字段ID进行处理,返回新的数据集。将新的数据集数据追加写入到ClickHouse表中。三、样例调试前提:Linux环境有安装集群客户端比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟检查linux环境的JDK版本为1.8配置linux环境的/etc/hosts文件检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息在IDEA打开样例代码的SparkOnEsJavaExample目录,检查SDK配置默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕在Linux环境新建目录,例如“/opt/spark-on-ck/”在集群中获取clickhouse.jdbc驱动,将驱动放置到spark-on-ck中find / -name clickhouse-jdbc*10、打包样例代码在IDEA主页面,选择“View->Tool Windows->Maven”打开“Maven”工具窗口。在Maven工具窗口,选择clean生命周期,执行Maven构建过程。在Maven工具窗口,选择package生命周期,执行Maven构建过程。从IDEA项目目录下的target文件夹中获取到Jar包,拷贝到Spark运行环境下(即Spark客户端),如“/opt/spark-on-ck”。​​​​​​四、linux环境下运行1、java代码中设置了需传入5个参数,因此执行命令必须进行传入该集群的这5个参数jdbcUrl:jdbc:clickhouse://x.x.x.x:21428/testdb2?ssl=true&sslmode=noneckDBName: ck中的数据库,不需要提前创建ckTableName:ck中创建的表,不需要提前创建userName:集群用户名password:集群用户名密码参考产品文档,进入clickhouse命令为clickhouse client --host ip --user username --password --port 21425 --secure将打包好的jar包上传到spark-on-ck目录下,进入该目录,执行如下命令spark-submit --master yarn --deploy-mode client --jars ./clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar --class com.huawei.bigdata.spark.examples.SparkOnClickHouseExample SparkOnClickHouseJavaExample-1.0.jar "jdbc:clickhouse://x.x.x.x:21428/testdb2?ssl=true&sslmode=none" "testdb3" "testlyf" "lyf" "passwd"进入创建的数据库查看创建的表​​​​​​​
  • [基础组件] 【ClickHouse】go 语言驱动包 clickhouse-go
    ​ 背景:xx客户已有业务go语言开发,从kafka读取数据写入ClickHouse,之前版本只有jdbc/http能力进度Update:Go 语言对接我们ClickHouse的 http_port 和 https_port 均调通了。驱动:    cid:link_11.     对接http_port,直接配置成我们的端口,账号密码即可;      当然前提是服务端开这个端口。2.     对接https,增加如下参数其他说明:1.     查看官网 有非官方的驱动工程cid:link_22.     咨询开发以及产品手册,未官方对接过go 问题解决:下载go 相关调试成功1.     构建Go语言环境:cid:link_31)     申请和下载GoLand, GoLand是JetBrains的新商业IDEcid:link_02)     . 下载Go 运行环境并且安装(可以理解JDK 之于 JAVA)           cid:link_43)     GoLand 配置代理(必须),因为类似maven 从外网下载工程依赖包。2.     下载clickhouse-go-2.0.15 导入 GoLand3.     修改并且执行样例。 4.     执行成功5.     驱动自己的官方解释 修改原因参考 tcp_port_secure 端口时,需要加上 如下参数TLS: &tls.Config{ InsecureSkipVerify: true, },其他说明:我们和开源参数说明:开源CK有2组参数 https tcps 以及http tcp我们只有在SSL_NONESSL_BOTH_ENABLE 参数改为true 时(默认值false),才会启动tcp_port和http_port ​
  • [知识分享] Colocate Join :ClickHouse的一种高性能分布式join查询模型
    摘要:本文将介绍业界MPP分布式数据库join查询模型,以及ClickHouse的分布式查询原理解析和Colocate join性能表现。本文分享自华为云社区《ClickHouse一种高性能分布式join查询模型(Colocate Join)》,作者:tiantangniao 。ClickHouse是一款开源的面向联机分析处理的列式数据库,具有极致的压缩率和极速查询性能。ClickHouse支持SQL查询,基于大宽表的聚合分析查询性能非常优异,在特定场景下ClickHouse也具备较优的join性能。本文将介绍业界MPP分布式数据库join查询模型,以及ClickHouse的分布式查询原理解析和Colocate join性能表现。 1. ClickHouse分布式joinClicHouse分布式join通常涉及到左右表为分布式表,分布式执行过程中需要将数据在节点间进行交换,我们将数据在节点间交换的动作在分布式执行计划中称为数据的流动streaming算子,ClickHouse支持的streaming算子有如下三种:Broadcast JoinShuffer JoinColocate Join以上第一种其实是数据广播算子,第二种为数据重分布算子,而第三种join的数据存储在本地不需要进行分布式交换。其实对于ClickHouse来说,说是实现了Shuffle JOIN还比较勉强,其只实现了类Broadcast JOIN类型,ClickHouse当前的分布式join查询框架更多的还是实现了两阶段查询任务(这里不详细讲解,后续几个章节分别进行展开讲解,大家可以细细体会),与ClickHouse相比通常业界MPP数据库分布式join查询框架模型的数据在节点间交换Streaming算子通常为以下几种:第一种Gather算子类似于在ClickHouse中的SQL发起initiator节点,第一阶段在各个节点完成本地join后,会将各节点结果发送给initiator节点进行第二阶段的汇总工作,initiator节点再将结果发送给客户端;第二种为数据广播算子,每个节点将自己拥有的分片数据发送给目标节点,对应到ClickHouse为Broadcast JOIN;第三种为数据重分布算子,数据重分布会将数据按照一定的重分布规则重新发送到对应的目标节点,对应到ClickHouse为Shuffer JOIN;最后一种类型,数据会在本地进行join,对应到ClickHouse为Colocate join,其不需要数据重分布或广播,节点间和网络上无数据交换和传播,此实现方式的join性能也最佳。以下分别将几种join方式在ClickHouse中的实现进行介绍。1.1 Shuffer Join1)有如下分布式Join SQL语句:2)执行过程如下:① 客户端将SQL1发送给集群中一个节点host-0(initiator/coordinator);② host-0节点将任务改写为SQL2查询任务;③ Coordinator节点将SQL2查询任务下发到集群各个节点执行;④ 各节点将SQL2解析为SQL3子查询;⑤ 子查询被下发到所有节点执行;⑥ 子查询执行完成后将结果集返回到协调节点,如:host-j;⑦ 协调节点将各个子结果集汇总为一个结果集;⑧ 协调节点将结果集发送到集群各个节点,同时将SQL4任务下发到各个节点执行;⑨ 各节点在本地将左表的分片和右表子查询结果集进行join计算,然后将结果返回到客户端。3)总结:ClickHouse 普通分布式JOIN查询并未按JOIN KEY去Shuffle数据,而是每个节点全量拉取右表数据跟左表分片进行join计算;如果右表为分布式表,则集群中每个节点会去执行分布式查询,查询会存在一个非常严重的读放大现象。假设集群有N个节点,右表查询会在集群中执行N*N次;ClickHouse 的这种join方式和业界MPP的区别:虽然是叫做Shuffle join/redistribute join,但是从根本来说不是真正的redistribute join,存在查询放大问题,也是性能较差的一种查询方式。1.2 Broadcast Join1)有如下分布式Join SQL语句:2)执行过程如下:① 客户端将SQL1发送给集群中一个节点host-0(initiator/coordinator);② host-0节点将任务改写为SQL2子查询任务;③ Coordinator节点将SQL2子查询任务下发到集群各个节点执行;④ 各子节点任务执行完成之后将结果发回到协调节点;⑤ 协调节点将上一步接收到的结果汇总为结果集;⑥ 协调节点将结果集发送到集群各个节点,同时将SQL3任务下发到各个节点;⑦ 各节点在本地将左表的分片和右表子查询结果集进行join计算,然后将结果及发回到协调节点;⑧ 协调节点将最终结果返回给客户端。3)总结:右表的查询在initiator节点完成后,通过网络发送到其他节点,避免其他节点重复计算,从而避免查询放大问题;GLOBAL JOIN 可以看做一个不完整的Broadcast JOIN实现。如果JOIN的右表数据量较大,就会占用大量网络带宽,导致查询性能降低;ClickHouse的global join方式和业界MPP的区别:ClickHouse会将右表过滤结果汇总到一个节点,然后又发送到所有节点,对单节点内存/磁盘空间占用较大,全量数据发送到所有节点,对网络带宽消耗也较大;而业界MPP数据库每个节点并行的将自己一部分数据广播发到所有节点,之后就可以直接进行下一阶段的本地join动作,多个节点都能并行执行,同时数据也不需要从一个节点发送到所有节点,对网络和单节点磁盘及内存消耗较少。1.3 Colocate Join1)有如下分布式Join SQL语句:2)执行过程如下:① 客户端将SQL1发送给集群中一个节点host-0(initiator/coordinator);② host-0节点将任务改写为SQL2子查询任务;③ Coordinator节点将SQL2子查询任务下发到集群各个节点执行;④ 各子节点任务执行完成之后将结果发回到协调节点;⑤ 协调节点将上一步接收到的结果汇总为结果集返回给客户端。3)总结:由于数据已经进行了预分区/分布,相同的JOIN KEY对应的数据一定存储在同一个计算节点,join计算过程中不会进行跨节点的数据交换工作,所以无需对右表做分布式查询,也能获得正确结果,并且性能较优。2. ClickHouse Colocate join2.1 Colocate JOIN原理:根据“相同JOIN KEY必定相同分片”原理,我们将涉及JOIN计算的表,按JOIN KEY在集群维度作分片。将分布式JOIN转为节点的本地JOIN,极大减少了查询放大问题。按如下操作:   1)将涉及JOIN的表字段按JOIN KEY用同样分片算法进行分片;   2)将JOIN SQL中右表换成相应的本地表名称进行join。2.2 Colocate JOIN性能:数据和用例准备     1)环境:准备2 shard,2副本共4个节点的ClickHouse计算节点集群;     2)用例:分别创建join字段按id % 2(2为shard个数,可根据实际集群环境进行调整)取余数据分布方式(相同id数据分布到同一个节点),以及ROUND ROBIN  (数据随机rand分布)数据分布方式分布式表和本地表,分布式表指定分布方式,本地表为Replicated表,具体用例如下:colocate_join_a_local数据按照2分片(id % 2或哈希取模)进行数据分布;相同分布列的字段key的数据会分布到同一个节点;数据通过分布式表colocate_join_a_dis把数据写入分布到各数据节点。      colocate_join_a_local_rand数据(rand())随机分布;相同分布列的字段key数据会随机分布到各节点;数据通过分布式表colocate_join_a_dis_rand写入进行分布。      结果对比    2.3 Colocate Join场景约束1)数据写入Colocate join场景需要用户在系统建设前提前进行数据规划,数据写入时join的左右表join条件字段需要使用相同哈希算法入库分布,保证join key相同数据写入到同一个计算节点上。如果对数据写入时效性要求不太高的场景,可通过分布式表进行生成数据,生成数据简单快捷,性能较慢;如果对数据写入时效性要求较高的场景,可通过应用/中间件写入数据到local表,中间件需要实现入库数据分布算法,入库性能较好。2)扩缩容扩缩容完成后,需要将全部数据重写/重分布一遍,缺点:耗时长,占用存储可能暂时会翻倍,一种节省空间的方式是:逐个表进行重分布,每个表数据重分布完成后可删除重分布前的数据,避免占用过多存储。将来的改进/增强:重分布过程中支持可写在线,重分布尽量少或不影响写入查询的在线操作,减少重分布过程中对客户业务的影响。3. 总结业界所宣称的ClickHouse只能做大宽表查询,而通过以上分析,与业界MPP执行框架相比,虽然ClickHouse的分布式join类两阶段查询,节点间会有大量数据传输,缺少数据传输及任务的完全并行以及节点间高效的exchange算子实现,但事实上在特定场景下ClickHouse也可以进行高效的join(Broadcast join和Colocate join)查询分析,如果将表结构设计及数据分布的足够好,查询性能也并不会太差:Broadcast join对于大小表关联,需要将小表数据放在右边;Colocate join需要将join key字段使用相同的分布算法,将分布键相同数据分布在同一个计算节点。对于ClickHouse而言,当前优化器能力还较弱,如join场景reorder以及统计信息缺失、基于成本代价估算CBO的优化能力还待增强,用户SQL所写即所得,可能会要求人人都是DBA,人人都要对ClickHouse或数据库有深入的理解及丰富经验才能设计出优秀的数据库结构以及写出较高性能的SQL语句。对于ClickHouse手动挡数据库,在将来我们也会在统计信息、CBO优化器、分布式join模型框架、大大表等多表关联查询以及复杂查询上进行优化增强,以降低用户使用门槛,提升用户使用体验。
  • [技术干货] MySQL到ClickHouse的高速公路-MaterializeMySQL引擎
    ### 引言 熟悉MySQL的朋友应该都知道,MySQL集群主从间数据同步机制十分完善。令人惊喜的是,ClickHouse作为近年来炙手可热的大数据分析引擎也可以挂载为MySQL的从库,作为MySQL的 "协处理器" 面向OLAP场景提供高效数据分析能力。早先的方案比较直截了当,通过第三方插件将所有MySQL上执行的操作进行转化,然后在ClickHouse端逐一回放达到数据同步。终于在2020年下半年,Yandex 公司在 ClickHouse 社区发布了MaterializeMySQL引擎,支持从MySQL全量及增量实时数据同步。MaterializeMySQL引擎目前支持 MySQL 5.6/5.7/8.0 版本,兼容 Delete/Update 语句,及大部分常用的 DDL 操作。 ### 基础概念 - **MySQL & ClickHouse** MySQL一般特指完整的MySQL RDBMS,是开源的关系型数据库管理系统,目前属于Oracle公司。MySQL凭借不断完善的功能以及活跃的开源社区,吸引了越来越多的企业和个人用户。 ClickHouse是由Yandex公司开源的面向OLAP场景的分布式列式数据库。ClickHouse具有实时查询,完整的DBMS及高效数据压缩,支持批量更新及高可用。此外,ClickHouse还较好地兼容SQL语法并拥有开箱即用等诸多优点。 - **Row Store & Column Store** MySQL存储采用的是Row Store,表中数据按照 Row 为逻辑存储单元在存储介质中连续存储。这种存储方式适合随机的增删改查操作,对于按行查询较为友好。但如果选择查询的目标只涉及一行中少数几个属性,Row 存储方式也不得不将所有行全部遍历再筛选出目标属性,当表属性较多时查询效率通常较低。尽管索引以及缓存等优化方案在 OLTP 场景中能够提升一定的效率,但在面对海量数据背景的 OLAP 场景就显得有些力不从心了。 ClickHouse 则采用的是 Column Store,表中数据按照Column为逻辑存储单元在存储介质中连续存储。这种存储方式适合采用 SIMD (Single Instruction Multiple Data) 并发处理数据,尤其在表属性较多时查询效率明显提升。由于列存方式中物理相邻的数据类型通常相同,因此天然适合数据压缩,从而达到极致的数据压缩比。 ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202101/19/2027086qqtb3kjx8livjuu.png) ### 使用方法 - 部署Master-MySQL 开启BinLog功能:ROW模式 开启GTID模式:解决位点同步时MySQL主从切换问题(BinLog reset导致位点失效) ```shell # my.cnf关键配置 gtid_mode=ON enforce_gtid_consistency=1 binlog_format=ROW ``` - 部署Slave-ClickHouse 获取 [ClickHouse/Master](https://github.com/ClickHouse/ClickHouse) 代码编译安装 推荐使用`GCC-10.2.0`,`CMake 3.15`,`ninja1.9.0`及以上 - 创建Master-MySQL中database及table ```mysql creat databases master_db; use master_db; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; # 插入几条数据 INSERT INTO runoob_tbl (runoob_, runoob_author, submission_date) VALUES ("MySQL-learning", "Bob", NOW()); INSERT INTO runoob_tbl (runoob_, runoob_author, submission_date) VALUES ("MySQL-learning", "Tim", NOW()); ``` - 创建 Slave-ClickHouse 中 MaterializeMySQL database ```sql # 开启materialize同步功能 SET allow_experimental_database_materialize_mysql=1; # 创建slave库,参数分别是("mysqld服务地址", "待同步库名", "授权账户", "密码") CREATE DATABASE slave_db ENGINE = MaterializeMySQL('192.168.6.39:3306', 'master_db', 'root', '3306123456'); ``` 此时可以看到ClickHouse中已经有从MySQL中同步的数据了: ```mysql DESKTOP:) select * from runoob_tbl; SELECT * FROM runoob_tbl Query id: 6e2b5f3b-0910-4d29-9192-1b985484d7e3 ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┐ │ 1 │ MySQL-learning │ Bob │ 2021-01-06 │ └───────────┴────────────────┴───────────────┴─────────────────┘ ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┐ │ 2 │ MySQL-learning │ Tim │ 2021-01-06 │ └───────────┴────────────────┴───────────────┴─────────────────┘ 2 rows in set. Elapsed: 0.056 sec. ``` ### 工作原理 - BinLog Event MySQL中BinLog Event主要包含以下几类: ```mysql 1. MYSQL_QUERY_EVENT    -- DDL 2. MYSQL_WRITE_ROWS_EVENT -- insert 3. MYSQL_UPDATE_ROWS_EVENT -- update 4. MYSQL_DELETE_ROWS_EVENT -- delete ``` 事务提交后,MySQL 将执行过的 SQL 处理 BinLog Event,并持久化到 BinLog 文件 ClickHouse通过消费BinLog达到数据同步,过程中主要考虑3个方面问题: 1、DDL兼容:由于ClickHouse和MySQL的数据类型定义有区别,DDL语句需要做相应转换 2、Delete/Update 支持:引入`_version`字段,控制版本信息 3、Query 过滤:引入`_sign`字段,标记数据有效性 - DDL操作 对比一下MySQL的DDL语句以及在ClickHouse端执行的DDL语句: ```mysql mysql> show create table runoob_tbl\G; *************************** 1. row *************************** Table: runoob_tbl Create Table: CREATE TABLE `runoob_tbl` ( `runoob_id` int unsigned NOT NULL AUTO_INCREMENT, `runoob_` varchar(100) NOT NULL, `runoob_author` varchar(40) NOT NULL, `submission_date` date DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 1 row in set (0.00 sec) --------------------------------------------------------------- cat /metadata/slave_db/runoob_tbl.sql ATTACH TABLE _ UUID '14dbff59-930e-4aa8-9f20-ccfddaf78077' ( `runoob_id` UInt32, `runoob_` String, `runoob_author` String, `submission_date` Nullable(Date), `_sign` Int8 MATERIALIZED 1, `_version` UInt64 MATERIALIZED 1 ) ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(runoob_id, 4294967) ORDER BY tuple(runoob_id) SETTINGS index_granularity = 8192 ``` 可以看到: 1、在DDL转化时默认增加了2个隐藏字段:_sign(-1删除, 1写入) 和 _version(数据版本) 2、默认将表引擎设置为 ReplacingMergeTree,以 _version 作为 column version 3、原DDL主键字段 runoob_id 作为ClickHouse排序键和分区键 此外还有许多DDL处理,比如增加列、索引等,相应代码在`Parsers/MySQL` 目录下。 - Delete/Update操作 Update: ```mysql # Mysql端: UPDATE runoob_tbl set runoob_author='Mike' where runoob_id=2; mysql> select * from runoob_tbl; +-----------+----------------+---------------+-----------------+ | runoob_id | runoob_title | runoob_author | submission_date | +-----------+----------------+---------------+-----------------+ | 1 | MySQL-learning | Bob | 2021-01-06 | | 2 | MySQL-learning | Mike | 2021-01-06 | +-----------+----------------+---------------+-----------------+ 2 rows in set (0.00 sec) ---------------------------------------------------------------- # ClickHouse端: DESKTOP:) select *, _sign, _version from runoob_tbl order by runoob_id; SELECT *, _sign, _version FROM runoob_tbl ORDER BY runoob_id ASC Query id: c5f4db0a-eff6-4b49-a429-b55230c26301 ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┬─_sign─┬─_version─┐ │ 1 │ MySQL-learning │ Bob │ 2021-01-06 │ 1 │ 2 │ │ 2 │ MySQL-learning │ Mike │ 2021-01-06 │ 1 │ 4 │ │ 2 │ MySQL-learning │ Tim │ 2021-01-06 │ 1 │ 3 │ └───────────┴────────────────┴───────────────┴─────────────────┴───────┴──────────┘ 3 rows in set. Elapsed: 0.003 sec. ``` 可以看到,ClickHouse数据也实时同步了更新操作。 Delete: ```mysql # Mysql端 mysql> DELETE from runoob_tbl where runoob_id=2; mysql> select * from runoob_tbl; +-----------+----------------+---------------+-----------------+ | runoob_id | runoob_title | runoob_author | submission_date | +-----------+----------------+---------------+-----------------+ | 1 | MySQL-learning | Bob | 2021-01-06 | +-----------+----------------+---------------+-----------------+ 1 row in set (0.00 sec) ---------------------------------------------------------------- # ClickHouse端 DESKTOP:) select *, _sign, _version from runoob_tbl order by runoob_id; SELECT *, _sign, _version FROM runoob_tbl ORDER BY runoob_id ASC Query id: e9cb0574-fcd5-4336-afa3-05f0eb035d97 ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┬─_sign─┬─_version─┐ │ 1 │ MySQL-learning │ Bob │ 2021-01-06 │ 1 │ 2 │ └───────────┴────────────────┴───────────────┴─────────────────┴───────┴──────────┘ ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┬─_sign─┬─_version─┐ │ 2 │ MySQL-learning │ Mike │ 2021-01-06 │ -1 │ 5 │ └───────────┴────────────────┴───────────────┴─────────────────┴───────┴──────────┘ ┌─runoob_id─┬─runoob_title───┬─runoob_author─┬─submission_date─┬─_sign─┬─_version─┐ │ 2 │ MySQL-learning │ Mike │ 2021-01-06 │ 1 │ 4 │ │ 2 │ MySQL-learning │ Tim │ 2021-01-06 │ 1 │ 3 │ └───────────┴────────────────┴───────────────┴─────────────────┴───────┴──────────┘ 4 rows in set. Elapsed: 0.002 sec. ``` 可以看到,删除id为2的行只是额外插入了`_sign == -1`的一行记录,并没有真正删掉。 - 日志回放 MySQL 主从间数据同步时Slave节点将 BinLog Event 转换成相应的SQL语句,Slave 模拟 Master 写入。类似地,传统第三方插件沿用了MySQL主从模式的BinLog消费方案,即将 Event 解析后转换成 ClickHouse 兼容的 SQL 语句,然后在 ClickHouse 上执行(回放),但整个执行链路较长,通常性能损耗较大。不同的是,MaterializeMySQL 引擎提供的内部数据解析以及回写方案隐去了三方插件的复杂链路。回放时将 BinLog Event 转换成底层 Block 结构,然后直接写入底层存储引擎,接近于物理复制。此方案可以类比于将 BinLog Event 直接回放到 InnoDB 的 Page 中。 ### **同步策略** - 位点同步 v20.9.1版本前是基于位点同步的,ClickHouse每消费完一批 BinLog Event,就会记录 Event 的位点信息到 `.metadata` 文件: ```mysql [FavonianKong@Wsl[20:42:37]slave_db] $ cat ./.metadata Version: 2 Binlog File: mysql-bin.000003 Binlog Position:355005999 Data Version: 5 ``` 这样当 ClickHouse 再次启动时,它会把 {‘mysql-bin.000003’, 355005999} 二元组通过协议告知 MySQL Server,MySQL 从这个位点开始发送数据: ``` s1> ClickHouse 发送 {‘mysql-bin.000003’, 355005999} 位点信息给 MySQL s2> MySQL 找到本地 mysql-bin.000003 文件并定位到 355005999 偏移位置,读取下一个 Event 发送给 ClickHouse s3> ClickHouse 接收 binlog event 并完成同步操作 s4> ClickHouse 更新 .metadata位点 ``` **存在问题:** 如果MySQL Server是一个集群,通过VIP对外服务,MaterializeMySQL创建 database 时 host 指向的是VIP,当集群主从发生切换后,`{Binlog File, Binlog Position}` 二元组不一定是准确的,因为BinLog可以做reset操作。 ``` s1> ClickHouse 发送 {'mysql-bin.000003’, 355005999} 给集群新主 MySQL s2> 新主 MySQL 发现本地没有 mysql-bin.000003 文件,因为它做过 reset master 操作,binlog 文件是 mysql-bin.000001 s3> 产生错误复制 ``` 为了解决这个问题,v20.9.1版本后上线了 GTID 同步模式,废弃了不安全的位点同步模式。 - GTID同步 GTID模式为每个 event 分配一个全局唯一ID和序号,直接告知 MySQL 这个 GTID 即可,于是`.metadata`变为: ```mysql [FavonianKong@Wsl[21:30:19]slave_db] Version: 2 Binlog File: mysql-bin.000003 Executed GTID: 0857c24e-4755-11eb-888c-00155dfbdec7:1-783 Binlog Position:355005999 Data Version: 5 ``` 其中 `0857c24e-4755-11eb-888c-00155dfbdec7` 是生成 Event的主机`UUID`,`1-783`是已经同步的event区间 于是流程变为: ``` s1> ClickHouse 发送 GTID:0857c24e-4755-11eb-888c-00155dfbdec7:1-783 给 MySQL s2> MySQL 根据 GTID 找到本地位点,读取下一个 Event 发送给 ClickHouse s3> ClickHouse 接收 BinLog Event 并完成同步操作 s4> ClickHouse 更新 .metadata GTID信息 ``` ### 源码分析 - 概述 在最新源码 (v20.13.1.1) 中,ClickHouse 官方对 DatabaseMaterializeMySQL 引擎的相关源码进行了重构,并适配了 GTID 同步模式。ClickHouse 整个项目的入口 `main` 函数在 `/ClickHouse/programs/main.cpp` 文件中,主程序会根据接收指令将任务分发到 `ClickHouse/programs` 目录下的子程序中处理。本次分析主要关注 Server 端 `MaterializeMySQL` 引擎的工作流程。 - 源码目录 与 MaterializeMySQL 相关的主要源码路径: ```c++ ClickHouse/src/databases/MySQL //MaterializeMySQL存储引擎实现 ClickHouse/src/Storages/ //表引擎实现 ClickHouse/src/core/MySQL* //复制相关代码 ClickHouse/src/Interpreters/ //Interpreters实现,SQL的rewrite也在这里处理 ClickHouse/src/Parsers/MySQL //解析部分实现,DDL解析等相关处理在这里 ``` - 服务端主要流程 ClickHouse 使用 POCO 网络库处理网络请求,Client连接的处理逻辑在 ClickHouse/src/Server/*Handler.cpp 的 hander方法里。以TCP为例,除去握手,初始化上下文以及异常处理等相关代码,主要逻辑可以抽象成: ```c++ // ClickHouse/src/Server/TCPHandler.cpp TCPHandler.runImpl() { ... while(true) { ... if (!receivePacket()) //line 184 continue /// Processing Query //line 260 state.io = executeQuery(state.query, *query_context, ...); ... } ``` - 数据同步预处理 Client发送的SQL在executeQuery函数处理,主要逻辑简化如下: ```c++ // ClickHouse/src/Interpreters/executeQuery.cpp static std::tuple executeQueryImpl(...) { ... // line 354,解析器可配置 ast = parseQuery(...); ... // line 503, 根据语法树生成interpreter auto interpreter = InterpreterFactory::get(ast, context, ...); ... // line 525, 执行器interpreter执行后返回结果 res = interpreter->execute(); ... } ``` 主要有三点: 1、解析SQL语句并生成语法树 AST 2、InterpreterFactory 工厂类根据 AST 生成执行器 3、interpreter->execute() 跟进第三点,看看 InterpreterCreateQuery 的 excute() 做了什么: ```c++ // ClickHouse/src/Interpreters/InterpreterCreateQuery.cpp BlockIO InterpreterCreateQuery::execute() { ... // CREATE | ATTACH DATABASE if (!create.database.empty() && create.table.empty()) // line 1133, 当使用MaterializeMySQL时,会走到这里建库 return createDatabase(create); } ``` 这里注释很明显,主要执行 CREATE 或 ATTACH DATABASE,继续跟进 createDatabase() 函数: ```c++ // ClickHouse/src/Interpreters/InterpreterCreateQuery.cpp BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) { ... // line 208, 这里会根据 ASTCreateQuery 参数,从 DatabaseFactory 工厂获取数据库对象 // 具体可以参考 DatabasePtr DatabaseFactory::getImpl() 函数 DatabasePtr database = DatabaseFactory::get(create, metadata_path, ...); ... // line 253, 多态调用,在使用MaterializeMySQL时 // 上方get函数返回的是 DatabaseMaterializeMySQL database->loadStoredObjects(context, ...); } ``` 到这里,相当于将任务分发给DatabaseMaterializeMySQL处理,接着跟踪 loadStoredObjects 函数: ```c++ //ClickHouse/src/Databases/MySQL/DatabaseMaterializeMySQL.cpp template void DatabaseMaterializeMySQL::loadStoredObjects(Context & context, ...) { Base::loadStoredObjects(context, has_force_restore_data_flag, force_attach); try { // line87, 这里启动了materialize的同步线程 materialize_thread.startSynchronization(); started_up = true; } catch (...) ... } ``` 跟进startSynchronization() 绑定的执行函数: ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp void MaterializeMySQLSyncThread::synchronization() { ... // 全量同步在 repareSynchronized() 进行 if (std::optional metadata = prepareSynchronized()) { while (!isCancelled()) { UInt64 max_flush_time = settings->max_flush_data_time; BinlogEventPtr binlog_event = client.readOneBinlogEvent(...); { //增量同步侦听binlog_envent if (binlog_event) onEvent(buffers, binlog_event, *metadata); } } } ... } ``` - 全量同步 MaterializeMySQLSyncThread::prepareSynchronized 负责DDL和全量同步,主要流程简化如下: ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp std::optional MaterializeMySQLSyncThread::prepareSynchronized() { while (!isCancelled()) { ... try { //构造函数内会获取MySQL的状态、MySQL端的建表语句, MaterializeMetadata metadata(connection, ...); // line345, DDL相关转换 metadata.transaction(position, [&]() { cleanOutdatedTables(database_name, global_context); dumpDataForTables(connection, metadata, global_context, ...); }); return metadata; } ... } } ``` ClickHouse作为MySQL从节点,在MaterializeMetadata构造函数中对MySQL端进行了一系列预处理: 1、将打开的表关闭,同时对表加上读锁并启动事务 2、TablesCreateQuery通过SHOW CREATE TABLE 语句获取MySQL端的建表语句 3、获取到建表语句后释放表锁 继续往下走,执行到 metadata.transaction() 函数,该调用传入了匿名函数作为参数,一直跟进该函数会发现最终会执行匿名函数,也就是cleanOutdatedTables以及dumpDataForTables函数,主要看一下 dumpDataForTables 函数: ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp static inline void dumpDataForTables(...) { ... //line293, 这里执行建表语句 tryToExecuteQuery(..., query_context, database_name, comment); } ``` 继续跟踪 tryToExecuteQuery 函数,会调用到 executeQueryImpl() 函数,上文提到过这个函数,但这次我们的上下文信息变了,生成的执行器发生变化,此时会进行 DDL 转化以及 dump table 等操作: ```c++ // ClickHouse/src/Interpreters/executeQuery.cpp static std::tuple executeQueryImpl(...) { ... // line 354,解析器可配置 ast = parseQuery(...); ... // line 503,这里跟之前上下文信息不同,生成interpreter也不同 auto interpreter = InterpreterFactory::get(ast,context, ...); ... // line 525, 执行器interpreter执行后返回结果 res = interpreter->execute(); ... } ``` 此时 InterpreterFactory 返回 InterpreterExternalDDLQuery,跟进去看 execute 函数做了什么: ```c++ // ClickHouse/src/Interpreters/InterpreterExternalDDLQuery.cpp BlockIO InterpreterExternalDDLQuery::execute() { ... if (external_ddl_query.from->name == "MySQL") { #ifdef USE_MYSQL ... // line61, 当全量复制执行DDL时,会执行到这里 else if (...->as()) return MySQLInterpreter::InterpreterMySQLCreateQuery( external_ddl_query.external_ddl, cogetIdentifierName(arguments[0]), getIdentifierName(arguments[1])).execute(); #endif } ... return BlockIO(); } ``` 继续跟进去发现 getIdentifierName(arguments[1])).execute() 会rewrite DDL: ```c++ // ClickHouse/src/Interpreters/MySQL/InterpretersMySQLDDLQuery.cpp ASTs InterpreterCreateImpl::getRewrittenQueries(...) { ... // 检查是否存在primary_key, 没有直接报错 if (primary_keys.empty()) throw Exception("cannot be materialized, no primary keys.", ...); ... // 添加 _sign 和 _version 列. auto sign_column_name = getUniqueColumnName(columns_name_and_type, "_sign"); auto version_column_name = getUniqueColumnName(columns_name_and_type, "_version"); // 这里悄悄把建表引擎修改成了ReplacingMergeTree storage->set(storage->engine, makeASTFunction("ReplacingMergeTree", ...)); ... return ASTs{rewritten_query}; } ``` 完成DDL转换之后就会去执行新的DDL语句,完成建表操作,再回到 dumpDataForTables: ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp static inline void dumpDataForTables(...) { ... //line293, 这里执行建表语句 tryToExecuteQuery(..., query_context, database_name, comment); ... // line29, 这里开始 dump 数据并存放到MySQLBlockInputStream MySQLBlockInputStream input(connection, ...) } ``` - 增量同步 还记得startSynchronization() 绑定的执行函数吗?全量同步分析都是在 prepareSynchronized()进行的,那增量更新呢? ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp void MaterializeMySQLSyncThread::synchronization() { ... // 全量同步在 repareSynchronized() 进行 if (std::optional metadata = prepareSynchronized()) { while (!isCancelled()) { UInt64 max_flush_time = settings->max_flush_data_time; BinlogEventPtr binlog_event = client.readOneBinlogEvent(...); { //增量同步侦听binlog_envent if (binlog_event) onEvent(buffers, binlog_event, *metadata); } } } ... } ``` 可以看到,while 语句里有一个 binlog_event 的侦听函数,用来侦听 MySQL 端 BinLog 日志变化,一旦 MySQL 端执行相关操作,其 BinLog 日志会更新并触发 binlog_event,增量更新主要在这里进行。 ```c++ // ClickHouse/src/Databases/MySQL/MaterializeMySQLSyncThread.cpp void MaterializeMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPtr & receive_event, MaterializeMetadata & metadata) { // 增量同步通过监听binlog event实现,目前支持四种event:MYSQL_WRITE_ROWS_EVENT、 // MYSQL_UPDATE_ROWS_EVENT、MYSQL_DELETE_ROWS_EVENT 和 MYSQL_QUERY_EVENT // 具体的流程可以查找对应的 onHandle 函数, 不在此详细分析 if (receive_event->type() == MYSQL_WRITE_ROWS_EVENT){...} else if (receive_event->type() == MYSQL_UPDATE_ROWS_EVENT){...} else if (receive_event->type() == MYSQL_DELETE_ROWS_EVENT){...} else if (receive_event->type() == MYSQL_QUERY_EVENT){...} else {/* MYSQL_UNHANDLED_EVENT*/} } ``` ### 小结 MaterializeMySQL 引擎是 ClickHouse 官方2020年主推的特性,由于该特性在生产环境中属于刚需且目前刚上线不久,整个模块处于高速迭代的状态,因此有许多待完善的功能。例如复制过程状态查看以及数据的一致性校验等。感兴趣的话可参考Github上的2021-Roadmap,里面会更新一些社区最近得计划。以上内容如有理解错误还请指正。 ### 引用 - ClickHouse社区文档 - ClickHouse社区源码 - MySQL实时复制与实现 - MaterializeMySQL引擎分析