• [技术干货] 数据结构设计
    定义结构体ThreadSlot保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer,StreamProducer是父线程向子线程传递的唯一结构、线程唤醒所需的锁和条件变量。 定义结构体 StreamThreadPool 表征线程池,其中 size 表示线程池中拟预留的ThreadSlot 个数,ThreadSlot被保存在threadSlots数组中;无锁队列emptyRing用来保存未创建线程ThreadSlot,对应地,idleRing 用来保存空闲的已创建Stream线程的ThreadSlot。定义结构体StreamPool,由 于 Stream线程的初始化信息和database是强相关的,如果不保留 database 相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足database信息匹配,所以一个emptyRing 和一个database相匹配,保存在链表PoolListHead中。 
  • [技术干货] idleRing的作用
    Stream 线程池为了高效管理线程的出/入池操作,采用无锁队列实现。定义结构体ThreadSlot 保存线程池中每一个线程的信息,包含:线程状态、线程号、线程对应的database oid、线程执行所需的信息StreamProducer、线程唤醒所需的锁和条件变量。 当线程还未被创建时,初始化一定数量的ThreadSlot数量以预留Stream线程,这些ThreadSlot 被保存在数组threadSlots中 。当 Stream线程执行完毕,需要将Stream线程放置到表征可复用线程的无锁队列,称之为 idleRing;当线程因为超时、异常等原因不再复用,需要退出时,将Stream线程对应的ThreadSlot放置到表征未创建线程的无锁队列,称之为emptyRing。 idleRing 的作用是为了快速获取并复用线程池中的线程,emptyRing的作用是快速获取一个未被使用的ThreadSlot结构,以创建一个新的Stream线程。由于Stream线程的初始化信息和database是强相关的,如果不保留database相关的信息,那么线程初始化的时间代价仍然较高,所以线程池中的线程复用时,需要满足 database 信息匹配。对于设计线程池而言,每一个database都应该对应一个idleRing。 
  • [技术干货] Stream 线程
    Stream 线程是临时线程,随query启动和退出,负责Stream算子的执行,Stream线程初始化和退出都会争抢锁等进程级资源,在 Stream 线程个数无法进一步优化的场景下,需要设计有效方案以减少 Stream 线程初始化和退出的时间代价,将进程初始化耗时稳定在ms级,保障数据库的确定性时延查询。Stream 线程池的核心思想是等 Stream线程执行完计划任务,保留必要且可复用的线程信息,将线程放入线程池中。 线程池中的线程执行过程如上图所示,其具体步骤为: 步骤一:线程信息初始化; 步骤二:线程待唤醒后轻量级初始化(query级初始化); 步骤三:线程任务执行; 步骤四:线程清理; 返回步骤二:继续等待下条query执行。 在返回步骤二时,当线程等待超时、超出线程池容量(最大Stream线程个数)、异常时线程已不可用,需要销毁。 其中步骤一中在线程初始化时,需要执行的操作有:线程创建、创建相关内存上下文、信号处理函数注册、内存追踪信息初始化、初始化GUC选项等操作; 步骤二中在线程轻量级/查询级初始化时,需要执行的操作有恢复GUC参数、初始化BackendParams、重置GUC参数等操作。
  • [技术干货] Pooler 连接清理
    清理Session持有的连接 cache_connection,是否使用pooler连接池缓存连接,默认开; session_timeout,客户端连接空闲超时后报错退出归还连接; enable_force_reuse_connections,事务结束后强制归还连接; conn_recycle_timeout(8.2.1), CN 空闲session超时后归还连接。pg_clean_free_conn视图/函数,清理1/4的空闲连接池连接,CM定期调用;CLEAN CONNECTION 语法,清理对应DB或user的所有空闲连接 clean connection to all for database postgres to USER user1。GaussDB(DWS)分布式架构的Stream 算子作为SQL join操作时频繁发生的执行算子,共存在三种模式:Gather、Redistribute、Broadcast,分别负责CN节点GATHER数据,DN节点REDISTRIBUTE和BROACAST数据。大集群高并发场景下,Stream算子过多可能会导致通信的性能瓶颈,引起性能劣化(2000个stream同时启动,进程初始化耗时从ms级劣化到s级),因此需要尽可能减少Stream算子。但是在某些现场环境下,存在数据倾斜、join查询不包含必要分布键等客观情况,Stream算子无法有效减少,为多表join 场景下的查询时延保障带来挑战。因此 GaussDB(DWS)对于线程初始化->线程任务执行->线程退出执行的流程方面做了stream线程池优化,减少了线程初始化与线程退出所带来的开销。
  • [技术干货] Pooler 连接池复用流程
    对于每起一个线程 session(即连接),都会有一个 PoolAgent 与之对应,即从DatabasePool->NodePool->pool取出来的连接。 cnDef、dnDef:初始化时从pgxc_node中拿到,即cn定义、端口、ip (session级别)dnConn、cnConn:从 databasePools->nodePools-> pool拿真正对应的连接 (query级别)dnHandles、cnHandles:从dnConn、cnConn里边dop出来使用,确保两者是一一对应的状态,当query结束时,只需要close handles就行,cnConn不需要close Pooler 连接池具体的复用流程如下: 1) session 需要连接时,通过DB+USER为key找到正确的pooler连接池,优先从中取走现有连接,如果连接池中没有连接的话,则新建连接; 2) query结束后,CN的postgres线程并不会归还连接,连接可以用于当前session的下一个查询; 3) session结束后,CN的postgres线程会将连接还到对应的pooler,连接对应的DN上的postgres线程并不会退出,处于ReadCommand中,等待复用后CN新的postgres线程发起任务。
  • [技术干货] postgres工作线程
    CN的pooler连接池中会保存与其他CN/DN的连接,每一个连接在对端会对应一个postgres工作线程。 postgres 工作线程是带状态属性的,如database,所以可以认为pooler连接池中的连接也是带属性的。不同属性间的连接是不能复用的,按不同属性切分为pool A/B/C 等连接池。每个连接池中会存有连接往不同节点的空闲连接的数组,提供接口给外部使用或放入连接。 CN上的postgres工作线程在需要连接其他节点时,会创建一个本地agent,尝试从pooler 连接池取跟本线程相同属性的空闲连接,pooler如果没有空闲连接,就会新建一个连接。连接交给 agent 后,可以视为线程私有。在线程退出时,agent 才会将连接还给pooler。 DatabasePool 用于存储空闲连接,根据database,user_name,pgoptions 三者来确定如果通过三者查找到了,那么就取其中对应的nodePools,找不到则新加nodePools 中对应的数据结构为 PGXCNodePool,用于存储本节点与其他每个cn/dn 的连接。
  • [技术干货] pooler连接池详解
    GaussDB(DWS)为MPP型分布式数据库,使用Shared Nothing架构,数据分散存储在各个DN节点,而CN不存储数据,CN作为接收查询的入口,生成的计划会尽量下推到 DN 并行执行以提升性能,此过程中会产生大量的建连操作,使得通信开销变得很大。因此在大数据时代,集群规模越来越大,业务并发越来越高,数据库集群各节点间的通信压力也越来越大。GaussDB(DWS)集群通信技术,在大规模集群中可以承载高并发业务,能够实现高性能分布式通信系统。客户端向CN的监听端口发起连接; CN postmaster主线程accept连接,创建postgres线程并将连接交给此线程处理; 客户端下发query到CN; CN的postgres线程将查询计划下发给其他 CN/DN,查询结果沿原路径返回到客户端; 客户端查询结束,关闭连接; CN上对应的postgres线程销毁退出。CN与DN建连立流程,和客户端与CN建连立流程基本相同。因此为了减少CN与DN建立连接,以及DN进程中postgres线程创建、销毁的开销,CN端实现了Pooler连接池。
  • [技术干货] 分布式死锁
    GaussDB(DWS)的shared nothing结构,使得一条语句可能在不同的节点上执行,在这些节点上都要对操作对象申请锁,且同样存在以不同顺序申请锁的可能,因此便存在分布式死锁的场景如何排查分布式死锁: 先构造一个分布式死锁场景,如下图,session 1 在 CN 1 上开启事务并先查询lock_table1;此时 session 2 在CN 2 上开启事务并查询lock_table1,然后两个会话分别执行truncate表。通过查询分布式死锁视图:select * from pgxc_deadlock order by nodename,dbname,locktype,nspname,relname;CN_5001 的 truncate 语句线程号为:139887210493696;在等待线程号为:139887432832768 的 truncate 语句释放 lock_table1 的 AccessShareLock(事务中 select 语句持有的锁),同时该线程:139887210493696,持有 lock_table1 的AccessExclusiveLock;CN_5004 的 truncate 语句线程号为:139887432832768;在等待线程号为:139887210493696的truncate 语句释放lock_table1的AccessExclusiveLock;同时该线程:139887432832768持有 lock_table1 的 AccessShareLock;这种场景下在不同实例上分布式的等待关系,便形成了分布式死锁。 
  • [技术干货] 自动处理单点死锁
    GaussDB(DWS)会自动处理单点死锁,当单节点死锁发生时,数据库会自动回滚其中一条事务,以消除死锁现象。 VACUUM FULL 与delete select 语句造成的死锁(等同一对象的不同锁);部分业务场景下,存在查询时间窗在白天,而业务跑批删除只能在晚上执行,同样为了保证查询效率降低脏页率,对业务表的VACUUM FULL操作也在晚上,时间窗重合,升锁过程便可能产生死锁; VACUUM FULL语句申请1:ExclusiveLock并持有; delete from 语句申请2:AcessShareLock并持有; vacuum full 升级锁3:AccessExclusiveLock 失败; delete from 升级锁4:RowExclusiveLock失败;两个语句形成死锁。 atler列存表与select max(a)的死锁,两条语句只涉及一张表,但仍旧会产生死锁,列存表有CUdesc表及Delta表,语句在运行时拿锁顺序不同,便可能产生死锁; upsert的死锁现象:行存带主键约束或列存表场景下并发upsert,并发更新重复的数据,且不同事务内部更新的相同数据的顺序不同。
  • [技术干货] 锁相关参数介绍
    lockwait_timeout:控制单个锁的最长等待时间。当申请的锁等待时间超过设定值时,系统会报错,即等锁超时,一般默认值为20min; deadlock_timeout:死锁检测的超时时间,当申请的锁超过该设定值仍未获取到时,触发死锁检测,系统会检查是否产生死锁,一般默认值为1s; update_lockwait_timeout:允许并发更新参数开启时,控制并发更新同一行单个锁的最长等待时间,超过该设定值,会报错,一般默认值为2min; ddl_lock_timeout: 当出现八级表锁冲突的时候生效,当等待获取八级锁的时间超过配置的时间,抛错返回,默认值为0,表示不生效,需用户手动开启(在8.1.3版本及更高版本生效)。 以上参数的单位均为毫秒,请保证deadlock_timeout的值小于lockwait_timeout,否则将不会触发死锁检测。死锁:两个及以上不同的进程实体在运行时因为竞争资源而陷入僵局,除非外力作用,否则双方都无法继续推进;而数据库事务可针对资源按照任意顺序加锁,就有一定几率因不同的加锁顺序而产生死锁; 死锁场景模拟: 锁表顺序不同,常见于存储过程中第一时刻:session 1:先拿到 lock_table2 的 8 级锁,此时 session 2 拿到lock_table1的8级锁;第二时刻:session 1:再尝试申请lock_table1的1级锁; session 2 :尝试申请lock_table2的1级锁;两个会话都持锁并等待对方手里的锁释放。 
  • [技术干货] 锁相关视图
    pg_locks 视图存储各打开事务所持有的锁信息,需关注的字段:locktype(被锁定对象的类型)、relation(被锁定对象关系的 OID)、pid(持锁或等锁的线程 ID)、mode(持锁或等锁模式)、granted(t:持锁,f:等锁);pgxc_lock_conflicts 视图提供集群中有冲突的锁的信息(适合锁冲突现场还在时使用),目前只收集 locktype 为 relation、partition、page、tuple 和transactionid 的锁的信息,需要关注的字段 nodename(被锁定对象节点的名字)、queryid(申请锁的查询ID)、query(申请锁的查询语句)、pid、mode、granted; pgxc_deadlock 视图获取导致分布式死锁产生的锁等待信息,只收集locktype为relation、partition、page、tuple 和 transactionid 的锁等待信息; 通过pgxc_lockwait_detail 和 pgxc_wait_detail 查看锁等待状态,该方法仅适用于8.1.3及以上版本。
  • [运维管理] Plan management绑定计划用例
    问题背景select current_database(), n.nspname,c.relname,0 from pg_class c , pg_namespace n where n.oid = c.relnamespace and (c.relkind = ANY (ARRAY['r'::"char", 'v'::"char", 'f'::"char"])) AND NOT pg_is_other_temp_schema(n.oid) AND (pg_has_role(c.relowner, 'USAGE'::text) OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'::text)) and n.nspname = 'public' order by c.relname limit 20 offset 0;语句执行慢,管理员用户很快,普通用户执行慢发现是使用系统视图时,做了很多用or连接的权限判断:pg_has_role(c.relowner, 'USAGE'::text) OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'::text)由于dabadmin用户pg_has_role总能返回true,因此or之后的条件无需继续判断;而普通用户的or条件需要逐一判断,如果数据库中表个数比较多,最终会导致普通用户比dbadmin需要更长的执行时间。加hint, 走index + nestloop的优化后,时间从156秒优化到24秒。/*+ nestloop(n c) leading((n c)) set global(enable_seqscan  off)*/但是客户代码已上线,无法修改,无法加hint,使用管理员用户又不符合安全要求。可通过配置,使用 Plan management功能进行计划绑定。即根据sql_hash,使语句和outline(hint)进行绑定。版本要求:910及以上Plan management特性基本原理:在CN上,对每个sql生成的计划(除FQS计划或CN轻量化)进行遍历,将计划中的join算子、scan(table scan和index scan)算子、join和agg上的倾斜优化信息、join两端的stream算子以及表的关联顺序提取为outline(即一组hint),并将outline进行保存(dbms_om.sql_outline)。用户可通过topsql分析出哪个计划是优的,并将比较优的计划的outline绑定给该sql。绑定后,该sql再次生成计划时,会通过应用该hint来固定执行计划。除FQS和CN强量化计划外,其他计划都会生成outline。方法步骤:1.设置开启plan management需要后台开启集群参数,无需重启SET planmgmt_options='plan_save_mode_outline,enable_plan_baseline,plan_save_level_topsql';SET enable_planmgmt_backend=on;注:planmgmt_options和enable_planmgmt_backend要保持一致,即开启时,planmgmt_options不为空,enable_planmgmt_backend为on。关闭时,planmgmt_options为空,enable_planmgmt_backend为off。低版本当这两个参数不保持一致有报错风险:prepare gid is xxx and top xid is xxx different transaction,高版本已修复。planmgmt_options参数说明:计划管理配置项,该参数的值由若干个配置项用逗号隔开构成。参数类型:USERSETplan_save_mode_outline,表示从计划中推导outline进行保存。(enable_planmgmt_backend为off时,设置该配置项不会生效。)plan_save_tblnum_n,n为整数,取值范围0~65535。表示语句依赖的表个数大等于n个时,将生成的generic计划进行保存。plan_save_level_topsql,表示把满足topsql条件语句的非FQS的generic计划进行保存。plan_save_level_all,表示把所有非FQS的generic计划进行保存。enable_plan_baseline,表示为语句使用可用的绑定计划。plan_save_level_topsql参数在设置后,会触发修改系统表 dbms_om.sql_outline 表结构修改后增加Distribute By: HASH(outline_name, sql_hash)Location Nodes: ALL DATANODES2.获取要绑定outline的sql_hash方法一:verbose计划中的query summary 会打印sql_hash方法二:从topsql中获取'sql_hash'3.语句调优获取需要绑定的OUTLINE,根据实际情况进行调优加hint调优:/*+ nestloop(n c) leading((n c)) set global(enable_seqscan  off)*/explain (verbose on, blockname on, outline on) select /*+ nestloop(n c) leading((n c)) set global(enable_seqscan  off)*/ current_database(), n.nspname,c.relname,0 from pg_class c , pg_namespace n where n.oid = c.relnamespace and (c.relkind = ANY (ARRAY['r'::"char", 'v'::"char", 'f'::"char"])) AND NOT pg_is_other_temp_schema(n.oid) AND (pg_has_role(c.relowner, 'USAGE'::text) OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'::text)) and n.nspname = 'public' order by c.relname limit 20 offset 0;调优时,如果计划内容不够详细,可以set explain_perf_mode=normal;后打计划,可以看到更详细的计划,该参数默认为pretty。4.获取OUTLINE可手动创建OUTLINE,也可以从dbms_om.sql_outline系统表获取OUTLINE方法一:使用优化后的hint手动生成OUTLINE,打计划时加上(verbose on, blockname on, outline on)选项explain (verbose on, blockname on, outline on) select /*+nestloop(n c) leading((n c)) set global(enable_seqscan  off)*/ current_database(), n.nspname,c.relname,0 from pg_class c , pg_namespace n where n.oid = c.relnamespace and (c.relkind = ANY (ARRAY['r'::"char", 'v'::"char", 'f'::"char"])) AND NOT pg_is_other_temp_schema(n.oid) AND (pg_has_role(c.relowner, 'USAGE'::text) OR has_table_privilege(c.oid, 'SELECT, INSERT, UPDATE, DELETE, TRUNCATE, REFERENCES, TRIGGER'::text) OR has_any_column_privilege(c.oid, 'SELECT, INSERT, UPDATE, REFERENCES'::text)) and  n.nspname = 'yfi.dwd_yfai_ops'  order by c.relname limit 20 offset 0;手动创建:OUTLINE名称要以“outline_”开头,sql_hash需要与被绑定的语句保持一致,USING 后为OUTLINE的具体内容。CREATE OUTLINE outline_name_1 FOR sql_7cd3bd7d2be865a0e315e3fdf29e2c0e USING '/*+       begin_outline_data        Leading[@"sel$1" n@"sel$1" c@"sel$1"]        NestLoop(@"sel$1" n@"sel$1" c@"sel$1")        IndexScan(@"sel$1" n@"sel$1" pg_namespace_nspname_index)        IndexScan(@"sel$1" c@"sel$1" pg_class_relname_nsp_index)       end_outline_data   */';建好后,可以从dbms_om.sql_outline系统表查到SELECT sql_hash, plan_hash, outline_name, outline FROM dbms_om.sql_outline WHERE outline like '%public.t1%public.t2%';方法二:查系统表:用优化后的sql_hash获取outline_name,SELECT sql_hash, plan_hash, outline_name, outline FROM dbms_om.sql_outline where sql_hash like '%xxx%';5.绑定outline使用sql_hash,outline_name进行绑定SELECT pgxc_bind_plan('sql_hash', 'outline_name_1');绑定后,可以查询pg_catalog.pg_plan_baseline查看已绑定的outline。6.验证方法一:客户侧验证,打印执行计划,看是否走了绑定后的计划,或者看topsql中的执行记录使用了绑定的计划:方法二:topsql中的记录:warning提示指定的hint未生效,走了绑定的outline_name_1注:planmgmt_options中的plan_save_level_topsql可在配置完成功后去掉,避免sql_outline表过大,开启plan_save_level_topsql后,满足记录topsql条件的语句,都会往dbms_om.sql_outline表中记录相应的outline。7.解绑使用sql_hash解绑SELECT pgxc_unbind_plan('sql_7cd3bd7d2be865a0e315e3fdf29e2c0e');注:绑定OUTLINE后,手工写的hint有些会不生效,与hint_option 参数有关,在进行hint调优时,建议解绑后再hint。 
  • 从 Lambda 到 Kappa:Flink 在实时数仓中的深度实践
    从 Lambda 到 Kappa:Flink 在实时数仓中的深度实践—— 附 200 行完整代码带你手撸「流式宽表 → ClickHouse → BI」端到端链路00 写在前面:为什么又要聊实时数仓Lambda 架构用批层兜底、流层加速的方案统治了大数据 10 年,却也把“两套代码、两套运维、口径对不齐”写进了教科书。随着 Flink 1.18 正式把存算分离、流批一体写进生产级 Feature,Kappa 架构才真正敢在交易、物流、广告等核心场景“裸奔”。本文想回答三个问题:如何用 Flink SQL 在 30 分钟内搭一条「流式宽表」产线,0 Java 代码;当维表大到 8 TB、更新频率 5 min/次时,怎么做维表 JOIN 才能不打爆内存;如何把 ClickHouse 当“可更新的 Kafka”用,实现毫秒级 OLAP,同时让 BI 工具直接读分布式表。全文 1.2 万字,所有代码在 GitHub 开源(文末地址)。如果你只想跑通 Demo,一条 docker-compose up 即可;如果你想深入 Flink SQL 的 Plan 优化、ClickHouse 的 MergeTree Write-Ahead Log,建议收藏后慢慢读。01 架构总览:一条数据从 Kafka 到 BI 大屏的 5 站地铁站点技术选型为什么选它备注1. 数据采集Kafka 3.7社区版无 license 风险,支持 Exactly-Once三节点,ISR=22. 流式 ETLFlink 1.18流批一体、CDC Source 成熟、SQL 层支持 Temporal JoinTaskManager 16 vCore / 64 GB3. 维度存储Redis 7.2 + Tiered Storage热数据内存、冷数据落盘,支持 5 min 级全量刷新单分片 32 GB,RDB+AOF 双写4. 明细存储ClickHouse 23.8列式、MergeTree 支持 Update/Delete、物化视图秒级刷新三分片两副本,SSD 盘 12 TB5. 可视化Superset 3.0自带 ClickHouse 方言,支持 SQL Lab 拖拽对接 LDAP,行级权限02 环境准备:一条命令拉起全链路git clone https://github.com/yourname/kappa-flink-demo.git cd kappa-flink-demo docker-compose up -dCompose 里已包含:Kafka、Zookeeper、Flink JobManager/TaskManager、ClickHouse、Redis、Superset。自动创建 3 张 Kafka Topic:user_behavior、item_snapshot、order_detail。自动灌入 500 万条脱敏样本,持续以 1 万 QPS 灌流。03 需求拆解:把“订单宽表”做成实时3.1 业务口径主事实:order_detail(订单粒度,每秒 1 万条)维度 1:item_snapshot(商品维表,8000 万条,5 min 全量刷新一次)维度 2:user_behavior(用户实时点击流,用于计算“下单前 30 min 浏览次数”)3.2 技术难点维表太大,无法全量加载到 Flink State;商品维表会物理删除,需要回撤历史订单宽表;浏览次数需要“区间聚合”,且可重复计算。04 Flink SQL:30 分钟 0 Java 完成“流式宽表”4.1 建 Kafka 表CREATE TABLE order_detail ( order_id STRING, user_id BIGINT, item_id BIGINT, price DECIMAL(10,2), ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'order_detail', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'latest-offset' ); 4.2 建 ClickHouse 结果表(支持 Update)CREATE TABLE order_wide ( order_id String, user_id UInt64, item_id UInt64, price Float64, browse_cnt UInt32, item_name String, update_time DateTime ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://clickhouse:8123/default', 'table-name'= 'order_wide', 'sink.update-strategy' = 'dedup' -- 按主键 order_id 更新 ); 4.3 维表 JOIN:Redis Async + 缓存穿透降级-- 在 Flink 1.18 里,Temporal Join 语法可以作用在 Lookup Table 上 CREATE TABLE item_dim ( item_id BIGINT, item_name STRING, PRIMARY KEY (item_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'async', -- 异步请求,默认 100 并发 'command' = 'HGET', 'host' = 'redis', 'port' = '6379', 'cache.max-size' = '100000', -- 本地 LRU 'cache.ttl' = '5 min', 'missing-key' = 'blank' -- 维表缺失时补空串,不抛异常 ); 4.4 浏览次数:区间聚合用窗口 TVFCREATE VIEW user_browse AS SELECT user_id, COUNT(*) AS browse_cnt, window_start, window_end FROM TABLE( TUMBLE(TABLE user_behavior, DESCRIPTOR(ts), INTERVAL '30' MINUTE)) GROUP BY user_id, window_start, window_end; 4.5 终极 SQL:组装宽表INSERT INTO order_wide SELECT o.order_id, o.user_id, o.item_id, o.price, COALESCE(b.browse_cnt,0), i.item_name, NOW() FROM order_detail o LEFT JOIN item_dim FOR SYSTEM_TIME AS OF o.ts AS i ON o.item_id = i.item_id LEFT JOIN user_browse FOR SYSTEM_TIME AS OF o.ts AS b ON o.user_id = b.user_id AND o.ts BETWEEN b.window_start AND b.window_end; 4.6 提交作业docker exec -it jobmanager \ ./bin/sql-client.sh -f /opt/flink-sql/order_wide.sql打开 Flink WebUI,可以看到:吞吐量 12 w/s;Redis Lookup Join 99-th 延迟 3 ms;ClickHouse 更新抖动 < 1 s。05 维表 8 TB 优化:把“全量刷新”做成“增量点查”当 item_snapshot 膨胀到 8 TB,5 min 一次全量刷 Redis 已不现实。我们引入「TTL 分层 + BloomFilter 降级」方案:在 MySQL 里开启 Binlog,Flink CDC 把变更流推到 Kafka;Redis 只缓存 7 天热数据,冷数据回源 ClickHouse 维表;在 Flink SQL 里通过 COALESCE(redis, clickhouse) 双路 Lookup,实测缓存命中率 94%,P99 延迟从 900 ms 降到 12 ms。代码片段:CREATE TABLE item_cold_dim ( item_id BIGINT, item_name STRING, PRIMARY KEY (item_id) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://clickhouse:8123/dim', 'table-name'= 'item_snapshot', 'lookup.cache.ttl' = '1 hour', 'lookup.max-retries' = '3' ); -- 双路 JOIN 封装成视图 CREATE VIEW item_all AS SELECT COALESCE(r.item_id, c.item_id) AS item_id, COALESCE(r.item_name, c.item_name) AS item_name FROM item_dim r FULL OUTER JOIN item_cold_dim c USING (item_id); 把 4.5 节的 item_dim 直接替换成 item_all,即可实现“热温冷”三级查询。06 ClickHouse 写入调优:让 MergeTree 当 Kafka 用6.1 表结构CREATE TABLE order_wide ( order_id String, user_id UInt64, item_id UInt64, price Float64, browse_cnt UInt32, item_name String, update_time DateTime ) ENGINE = ReplacingMergeTree(update_time) ORDER BY (order_id) PARTITION BY toYYYYMM(update_time); ReplacingMergeTree 保证同 order_id 自动去重;PARTITION BY 月,防止 Part 过多;ORDER BY 用唯一键,提高去重效率。6.2 写入参数在 Flink ClickHouse Connector 里增加:sink.batch-size = 5000 sink.flush-interval = 1s sink.max-retries = 3 sink.write-local = true -- 直接写本地表,绕过 Distributed 引擎测试 16 并发 TaskManager,可稳定 25 w r/s 写入,后台 Merge 压力通过 max_bytes_to_merge_at_max_space_in_pool 调大到 20 GB,CPU 占用 < 30%。07 端到端一致性:EOS 不只是 KafkaFlink 1.18 的 ClickHouse Connector 已支持 两阶段提交(2PC)。打开 checkpoint:execution.checkpointing.interval = 30s execution.checkpointing.mode = EXACTLY_ONCE 并在 ClickHouse 端开启 Atomic 数据库引擎:CREATE DATABASE default ENGINE = Atomic; 当 checkpoint 成功,Flink 会统一 ACK Kafka offset + ClickHouse commit,失败自动回滚。用 sys.checkpoint 表监控:SELECT * FROM sys.checkpoints WHERE job_id = 'order_wide' ORDER BY checkpoint_id DESC LIMIT 1; 端到端“断点续传”实测:kill -9 TaskManager,作业重启后零重复、零丢失。08 BI 对接:Superset 拖拽 ClickHouse 物化视图在 Superset 里添加 ClickHouse 数据源,SQLAlchemy URI 填:clickhousedb://default:@clickhouse:8123/default 建物化视图加速大屏:CREATE MATERIALIZED VIEW mv_order_wide_hour ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(hour) ORDER BY (hour, item_name) AS SELECT toStartOfHour(update_time) AS hour, item_name, count() AS order_cnt, sum(price) AS gmv, avg(browse_cnt) AS avg_browse FROM order_wide GROUP BY hour, item_name; Superset 图表 SQL 直接 SELECT * FROM mv_order_wide_hour FINAL,开启 AUTO-REFRESH=30s,大屏即可在 500 ms 内返回。09 生产踩坑小结坑现象根因解法1. Redis 热 KeyCPU 飙到 100%,P99 延迟 2 s某爆款商品被 20 w QPS 查询增加本地 LRU + 随机过期打散2. ClickHouse 写入 Part 爆炸merge 速度跟不上,查询 502Flink 并发太高,每批 500 条就写调大 sink.batch-size=5000,并加 parts_to_delay_insert3. ReplacingMergeTree 去重延迟大屏看到重复 order_id查询没带 FINAL,后台 merge 未完成对 OLAP 查询统一加 _final=1 参数10 展望:当 Flink 成了“实时数仓的 Linux”Flink Table Store 0.9 已发布,LakeHouse 统一格式(Paimon)正在孵化,未来可能不再需要“Kafka+ClickHouse”双栈,一套 Flink SQL 写到 Paimon,湖内支持 Update/Delete,湖外接 Presto/StarRocks 秒级查询。本文 Demo 将持续更新到 Flink 2.0,目标:真正用同一套 SQL,完成“流读、批算、湖存、Serve”闭环。
  • [技术干货] 【FAQ】2025年9月数据库问题汇总
    【问题求助】 GAUSSDB集中式数据库,是否可以实现指定只对SQL中涉及的某些表使用并行提问时间2025-09-19 15:47:34详细描述假设有一条SQL: select * from t1,t2 where t1.id=t2.id是否可以通过某种方法,实现只对其中的t2表开启并行处理?链接地址https://bbs.huaweicloud.com/forum/thread-0251193650445362097-1-1.html回答在 GaussDB 集中式数据库 中,可以通过 表级并行度控制 或 查询提示(Hint) 实现仅对特定表(如 t2)启用并行处理,而其他表(如 t1)保持串行执行。以下是具体方法:方法1:使用查询提示(Hint)强制并行GaussDB 支持通过 PARALLEL 提示指定表的并行度。例如,以下 SQL 仅对 t2 表启用并行扫描(假设并行度为 4),而 t1 表仍按默认方式执行:SELECT /*+ PARALLEL(t2 4) */ * FROM t1, t2 WHERE t1.id = t2.id; 说明:PARALLEL(t2 4) 表示对 t2 表使用 4 个并行工作线程。未指定 t1 的并行度时,默认不启用并行(或按系统配置)。方法2:通过表属性设置默认并行度如果希望长期对 t2 表默认启用并行,可以通过修改表属性实现:-- 设置 t2 表的默认并行度为 4 ALTER TABLE t2 SET (PARALLEL_DEGREE = 4); -- 执行查询(此时 t2 会自动并行,t1 仍串行) SELECT * FROM t1, t2 WHERE t1.id = t2.id; 注意:此方法会影响所有涉及 t2 表的查询,需谨慎使用。方法3:使用执行计划控制通过 EXPLAIN 分析执行计划,确认是否仅对目标表生效:EXPLAIN SELECT /*+ PARALLEL(t2 4) */ * FROM t1, t2 WHERE t1.id = t2.id; 检查输出中 t2 的扫描节点是否显示 Parallel Scan,而 t1 为普通扫描。关键注意事项并行度选择:并行度需根据表大小、系统资源调整,过高的并行度可能导致资源争用。Hint 优先级:查询提示(Hint)会覆盖表属性或系统默认设置。版本兼容性:不同 GaussDB 版本语法可能略有差异,建议参考官方文档。总结通过 查询提示 或 表属性设置,可以精准控制 GaussDB 集中式数据库中特定表的并行执行,而其他表保持串行。推荐优先使用 PARALLEL Hint 实现灵活控制。检查输出中 t2 的扫描节点是否显示 Parallel Scan,而 t1 为普通扫描。关键注意事项并行度选择:并行度需根据表大小、系统资源调整,过高的并行度可能导致资源争用。Hint 优先级:查询提示(Hint)会覆盖表属性或系统默认设置。版本兼容性:不同 GaussDB 版本语法可能略有差异,建议参考官方文档。总结通过 查询提示 或 表属性设置,可以精准控制 GaussDB 集中式数据库中特定表的并行执行,而其他表保持串行。推荐优先使用 PARALLEL Hint 实现灵活控制。【问题求助】 GaussDB监控采集报错:pg_replication_slots表中列"wal_status"不存在 (SQLSTATE 42703)提问时间2025-09-18 16:03:01详细描述我在使用GaussDB时遇到一个监控采集方面的错误,特来求助。我的collector在尝试采集replication_slot指标时失败了,报错信息如下:time=2025-09-17T10:45:23.325+08:00 level=ERROR source=collector.go:207 msg=“collector failed” name=replication_slot duration_seconds=0.6615468 err=“ERROR: Column “wal_status” does not exist. (SQLSTATE 42703)”相关SQL:SELECTslot_name,slot_type,0 AS current_wal_lsn,0 AS confirmed_flush_lsn,active,0,wal_statusFROM pg_replication_slots;错误提示很明确:SQL查询中引用了名为 wal_status的列,但该列在目标表中不存在。我想了解:问题根因:这是否是因为我的GaussDB版本(或特定模式)中,系统视图或系统表的结构与采集工具期望的不一致?replication_slot相关的系统视图究竟是哪个?(例如是pg_replication_slots吗?)这个视图在当前版本的GaussDB中是否不包含 wal_status列?解决方案:对于这类监控指标采集,GaussDB的正确实践是什么?是需要查询不同的系统视图,还是需要启用特定的监控开关或配置?版本差异:wal_status列是否是某些更新版本中才加入的?我当前使用的GaussDB版本可能是什么?任何关于此问题的排查思路、系统视图结构说明或版本兼容性信息都将非常有帮助!感谢!背景信息/补充说明(可选):我使用的GaussDB版本是:gaussdb (GaussDB Kernel 505.2.1 build ff07bff6) compiled at 2024-12-27 09:22:42 commit 10161 last mr 21504 release采集工具是:Prometheus gaussdb_exporter链接地址https://bbs.huaweicloud.com/forum/thread-02127193564980516097-1-1.html回答GaussDB内核版本(如505.2.1)的pg_replication_slots视图​​未包含wal_status列​​,需改用pg_get_replication_slots()函数或升级到支持该列的版本(如507+),建议联系华为云获取适配的监控查询语句。
  • [问题求助] GAUSSDB集中式数据库,是否可以实现指定只对SQL中涉及的某些表使用并行
    假设有一条SQL: select * from t1,t2 where t1.id=t2.id 是否可以通过某种方法,实现只对其中的t2表开启并行处理?
总条数:1630 到第
上滑加载中