• [性能调优] hive 报错 reflect.InvocationTargetException
    执行 insert into table2 select * from table1报错。表是parquet格式,lzo压缩。
  • [知识分享] 开发一个不需要重写成Hive QL的大数据SQL引擎
    本文分享自华为云社区《​​​​​​​从零开发大数据SQL引擎》,作者:JavaEdge 。 学习大数据技术的核心原理,掌握一些高效的思考和思维方式,构建自己的技术知识体系。 明白了原理,有时甚至不需要学习,顺着原理就可以推导出各种实现细节。 各种知识表象看杂乱无章,若只是学习繁杂知识点,固然自己的知识面是有限的,并且遇到问题的应变能力也很难提高。所以有些高手看起来似乎无所不知,不论谈论起什么技术,都能头头是道,其实并不是他们学习、掌握了所有技术,而是他们是在谈到这个问题时,才开始进行推导,并迅速得出结论。 高手不一定要很资深、经验丰富,把握住技术的核心本质,掌握快速分析推导的能力,能迅速将自己的知识技能推到陌生领域,就是高手。 本系列专注大数据开发需要关注的问题及解决方案。跳出繁杂知识表象,掌握核心原理和思维方式,进而融会贯通各种技术,再通过各种实践训练,成为终极高手。 # 大数据仓库Hive 作为一个成功的大数据仓库,它将SQL语句转换成MapReduce执行过程,并把大数据应用的门槛下降到普通数据分析师和工程师就可以很快上手的地步。 但Hive也有问题,由于它使用自定义Hive QL,对熟悉Oracle等传统数据仓库的分析师有上手难度。特别是很多企业使用传统数据仓库进行数据分析已久,沉淀大量SQL语句,非常庞大也非常复杂。某银行的一条统计报表SQL足足两张A4纸,光是完全理解可能就要花很长时间,再转化成Hive QL更费力,还不说可能引入bug。 开发一款能支持标准数据库SQL的大数据仓库引擎,让那些在Oracle上运行良好的SQL可以直接运行在Hadoop上,而不需要重写成Hive QL。 # Hive处理过程 1. 将输入的Hive QL经过语法解析器转换成Hive抽象语法树(Hive AST) 2. 将Hive AST经过语义分析器转换成MapReduce执行计划 3. 将生成的MapReduce执行计划和Hive执行函数代码提交到Hadoop执行 可见,最简单的,对第一步改造即可。考虑替换Hive语法解析器:能将标准SQL转换成Hive语义分析器能处理的Hive抽象语法树,即红框代替黑框 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/6/1651800704680661735.png) 红框内:浅蓝色是个开源的SQL语法解析器,将标准SQL解析成标准SQL抽象语法树(SQL AST),后面深蓝色定制开发的SQL抽象语法树分析与转换器,将SQL AST转换成Hive AST。 那么关键问题就来了: # 标准SQL V.S Hive QL - 语法表达方式,Hive QL语法和标准SQL语法略有不同 - Hive QL支持的语法元素比标准SQL要少很多,比如,数据仓库领域主要的测试集TPC-H所有的SQL语句,Hive都不支持。尤其是Hive不支持复杂嵌套子查询,而数据仓库分析中嵌套子查询几乎无处不在。如下SQL,where条件existes里包含了另一条SQL: ```mysql select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date '[DATE]' and o_orderdate date '[DATE]' + interval '3' month and exists (select * from lineitem where l_orderkey = o_orderkey and l_commitdate l_receiptdate) group by o_orderpriority order by o_orderpriority; ``` 开发支持标准SQL语法的SQL引擎难点,就是**消除复杂嵌套子查询掉**,即让where里不包含select。 SQL理论基础是关系代数,主要操作仅包括:并、差、积、选择、投影。而一个嵌套子查询可等价转换成一个连接(join)操作,如: ```mysql select s_grade from staff where s_city not in ( select p_city from proj where s_empname = p_pname ) ``` 这是个在where条件里嵌套了not in子查询的SQL语句,它可以用left outer join和left semi join进行等价转换,示例如下,这是Panthera自动转换完成得到的等价SQL。这条SQL语句不再包含嵌套子查询, ```mysql select panthera_10.panthera_1 as s_grade from (select panthera_1, panthera_4, panthera_6, s_empname, s_city from (select s_grade as panthera_1, s_city as panthera_4, s_empname as panthera_6, s_empname as s_empname, s_city as s_city from staff) panthera_14 left outer join (select panthera_16.panthera_7 as panthera_7, panthera_16.panthera_8 as panthera_8, panthera_16.panthera_9 as panthera_9, panthera_16.panthera_12 as panthera_12, panthera_16.panthera_13 as panthera_13 from (select panthera_0.panthera_1 as panthera_7, panthera_0.panthera_4 as panthera_8, panthera_0.panthera_6 as panthera_9, panthera_0.s_empname as panthera_12, panthera_0.s_city as panthera_13 from (select s_grade as panthera_1, s_city as panthera_4, s_empname as panthera_6, s_empname, s_city from staff) panthera_0 left semi join (select p_city as panthera_3, p_pname as panthera_5 from proj) panthera_2 on (panthera_0.panthera_4 = panthera_2.panthera_3) and (panthera_0.panthera_6 = panthera_2.panthera_5) where true) panthera_16 group by panthera_16.panthera_7, panthera_16.panthera_8, panthera_16.panthera_9, panthera_16.panthera_12, panthera_16.panthera_13) panthera_15 on ((((panthera_14.panthera_1 => panthera_15.panthera_7) and (panthera_14.panthera_4 => panthera_15.panthera_8)) and (panthera_14.panthera_6 => panthera_15.panthera_9)) and (panthera_14.s_empname => panthera_15.panthera_12)) and (panthera_14.s_city => panthera_15.panthera_13) where ((((panthera_15.panthera_7 is null) and (panthera_15.panthera_8 is null)) and (panthera_15.panthera_9 is null)) and (panthera_15.panthera_12 is null)) and (panthera_15.panthera_13 is null)) panthera_10 ; ``` 通过可视化工具将上面两条SQL的语法树展示出来,是这样的。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/6/1651800986651769761.png) 这是原始的SQL抽象语法树。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/6/1651800995289624147.png) 这是等价转换后的抽象语法树,内容太多被压缩的无法看清,不过你可以感受一下(笑)。 那么,在程序设计上如何实现这样复杂的语法转换呢?当时Panthera项目组合使用了几种经典的设计模式,每个语法点被封装到一个类里去处理,每个类通常不过几十行代码,这样整个程序非常简单、清爽。如果在测试过程中遇到不支持的语法点,只需为这个语法点新增加一个类即可,团队协作与代码维护非常容易。 使用装饰模式的语法等价转换类的构造,Panthera每增加一种新的语法转换能力,只需要开发一个新的Transformer类,然后添加到下面的构造函数代码里即可。 private static SqlASTTransformer tf = new RedundantSelectGroupItemTransformer( new DistinctTransformer( new GroupElementNormalizeTransformer( new PrepareQueryInfoTransformer( new OrderByTransformer( new OrderByFunctionTransformer( new MinusIntersectTransformer( new PrepareQueryInfoTransformer( new UnionTransformer( new Leftsemi2LeftJoinTransformer( new CountAsteriskPositionTransformer( new FilterInwardTransformer( //use leftJoin method to handle not exists for correlated new CrossJoinTransformer( new PrepareQueryInfoTransformer( new SubQUnnestTransformer( new PrepareFilterBlockTransformer( new PrepareQueryInfoTransformer( new TopLevelUnionTransformer( new FilterBlockAdjustTransformer( new PrepareFilterBlockTransformer( new ExpandAsteriskTransformer( new PrepareQueryInfoTransformer( new CrossJoinTransformer( new PrepareQueryInfoTransformer( new ConditionStructTransformer( new MultipleTableSelectTransformer( new WhereConditionOptimizationTransformer( new PrepareQueryInfoTransformer( new InTransformer( new TopLevelUnionTransformer( new MinusIntersectTransformer( new NaturalJoinTransformer( new OrderByNotInSelectListTransformer( new RowNumTransformer( new BetweenTransformer( new UsingTransformer( new SchemaDotTableTransformer( new NothingTransformer()))))))))))))))))))))))))))))))))))))); 而在具体的Transformer类中,则使用组合模式对抽象语法树AST进行遍历,以下为Between语法节点的遍历。我们看到使用组合模式进行树的遍历不需要用递归算法,因为递归的特性已经隐藏在树的结构里面了。 @Override protected void transform(CommonTree tree, TranslateContext context) throws SqlXlateException { tf.transformAST(tree, context); trans(tree, context); } void trans(CommonTree tree, TranslateContext context) { // deep firstly for (int i = 0; i tree.getChildCount(); i++) { trans((CommonTree) (tree.getChild(i)), context); } if (tree.getType() == PantheraExpParser.SQL92_RESERVED_BETWEEN) { transBetween(false, tree, context); } if (tree.getType() == PantheraExpParser.NOT_BETWEEN) { transBetween(true, tree, context); } } 将等价转换后的抽象语法树AST再进一步转换成Hive格式的抽象语法树,就可以交给Hive的语义分析器去处理了,从而也就实现了对标准SQL的支持。 当时Facebook为证明Hive对数据仓库的支持,手工将TPC-H的测试SQL转换成Hive QL,将这些手工Hive QL和Panthera进行对比测试,两者性能各有所长,总体上不相上下,说明Panthera自动进行语法分析和转换的效率还行。 Panthera(ASE)和Facebook手工Hive QL对比测试: ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/6/1651801049418221604.png) 标准SQL语法集的语法点很多,007进行各种关系代数等价变形,也不可能适配所有标准SQL语法。 # SQL注入 常见的Web攻击手段,如下图所示,攻击者在HTTP请求中注入恶意SQL命令(drop table users;),服务器用请求参数构造数据库SQL命令时,恶意SQL被一起构造,并在数据库中执行。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/6/1651801064347723171.png) 但JDBC的PrepareStatement可阻止SQL注入攻击,MyBatis之类的ORM框架也可以阻止SQL注入,请从数据库引擎的工作机制解释PrepareStatement和MyBatis的防注入攻击的原理。
  • [问题求助] spark 读hive 写gaussdb 问题求助
    【功能模块】用spark 读hive 写gaussdb代码如下:取hive一条数据测试【操作步骤&问题现象】提交到yarn上 client模式  但是出现如下问题:、请问如何解决?是配置问题 ,还是其它问题?【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] spark 读hive 数据处理写入gaussdb 出现问题
    【功能模块】【操作步骤&问题现象】我用spark读Hive数据 然后写入gaussdb时  出现下述问题为了方便测试,取了hive的一条数据,然后写gaussdb  ;submit 提交到yarn集群跑的,client模式。请问是写错了,还是哪里配置的不对?谢谢【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [干货汇总] 【大数据系列】不care工具,在大数据平台中Hive能自动处理SQL
    本文分享自华为云社区《[Hive执行原理](https://bbs.huaweicloud.com/blogs/348195?utm_source=csdn&utm_medium=bbs-ex&utm_campaign=other&utm_content=content)》,作者: JavaEdge 。 MapReduce简化了大数据编程的难度,使得大数据计算不再是高不可攀的技术圣殿,普通工程师也能使用MapReduce开发大数据程序。但是对于经常需要进行大数据计算的人,比如从事研究商业智能(BI)的数据分析师来说,他们通常使用SQL进行大数据分析和统计,MapReduce编程还是有一定的门槛。而且如果每次统计和分析都开发相应的MapReduce程序,成本也确实太高了。 有没有更简单的办法,可以直接将SQL运行在大数据平台? 先看如何用MapReduce实现SQL数据分析。 # MapReduce实现SQL的原理 常见的一条SQL分析语句,MapReduce如何编程实现? ```mysql ` SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;` ``` 统计分析语句,统计不同年龄用户访问不同网页的兴趣偏好,具体数据输入和执行结果: ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609311443266186.png) - 左边,要分析的数据表 - 右边,分析结果 把左表相同的行求和,即得右表,类似WordCount计算。该SQL的MapReduce的计算过程,按MapReduce编程模型 - map函数的输入K和V,主要看V V就是左表中每行的数据,如1, 25> - map函数的输出就是以输入的V作为K,V统一设为1 比如1, 25>, 1> map函数的输出经shuffle后,相同的K及其对应的V被放在一起组成一个,作为输入交给reduce函数处理。比如2, 25>, 1>被map函数输出两次,那么到了reduce这里,就变成输入2, 25>, 1, 1>>,这里的K是2, 25>,V集合是1, 1>。 在reduce函数内部,V集合里所有的数字被相加,然后输出。所以reduce的输出就是2, 25>, 2> ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609380761298262.png) 如此,一条SQL就被MapReduce计算好了。 在数据仓库中,SQL是最常用的分析工具,既然一条SQL可以通过MapReduce程序实现,那有无工具能自动将SQL生成MapReduce代码?这样数据分析师只要输入SQL,即可自动生成MapReduce可执行的代码,然后提交Hadoop执行。这就是Hadoop大数据仓库Hive。 # Hive架构 Hive能直接处理我们输入的SQL(Hive SQL语法和数据库标准SQL略不同),调用MapReduce计算框架完成数据分析操作。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609395656925319.png) 通过Hive Client(Hive的命令行工具,JDBC等)向Hive提交SQL命令: - 若为DDL,Hive会通过执行引擎Driver将数据表的信息记录在Metastore元数据组件,该组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联HDFS文件路径等这些数据库的元信息 - 若为DQL,Driver就会将该语句提交给自己的编译器Compiler进行语法分析、语法解析、语法优化等一系列操作,最后生成一个MapReduce执行计划。然后根据执行计划生成一个MapReduce的作业,提交给Hadoop MapReduce计算框架处理。 对一个简单的SQL命令: ```mysql SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’; ``` 其对应的Hive执行计划: ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609437583417627.png) Hive内部预置了很多函数,Hive执行计划就是根据SQL语句生成这些函数的DAG(有向无环图),然后封装进MapReduce的map、reduce函数。该案例中的map函数调用了三个Hive内置函数TableScanOperator、FilterOperator、FileOutputOperator,就完成了map计算,而且无需reduce函数。 # Hive如何实现join操作 除了简单的聚合(group by)、过滤(where),Hive还能执行连接(join on)操作。 pv_users表的数据在实际中无法直接得到,因为pageid数据来自用户访问日志,每个用户进行一次页面浏览,就会生成一条访问记录,保存在page_view表中。而age年龄信息则记录在用户表user。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609484669408457.png) 这两张表都有一个相同的字段userid,据该字段可连接两张表,生成前面例子的pv_users表: ```mysql SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid); ``` 该SQL命令也能转化为MapReduce计算,连接过程如下: ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609508775654849.png) join的MapReduce计算过程和前面的group by稍有不同,因为join涉及两张表,来自两个文件(夹),所以需要在map输出的时候进行标记,比如来自第一张表的输出Value就记录为1, X>,这里的1表示数据来自第一张表。这样经过shuffle以后,相同的Key被输入到同一个reduce函数,就可以根据表的标记对Value数据求笛卡尔积,用第一张表的每条记录和第二张表的每条记录连接,输出就是join的结果。 所以打开Hive源码,看join相关代码,会看到一个两层for循环,对来自两张表的记录进行连接操作。 # 总结 开发无需经常编写MapReduce程序,因为网站最主要的大数据处理就是SQL分析,因此Hive在大数据应用很重要。 随Hive普及,我们对在Hadoop上执行SQL的需求越强,对大数据SQL的应用场景也多样化起来,于是又开发了各种大数据SQL引擎。 Cloudera开发了Impala,运行在HDFS上的MPP架构的SQL引擎。和MapReduce启动Map和Reduce两种执行进程,将计算过程分成两个阶段进行计算不同,Impala在所有DataNode服务器上部署相同的Impalad进程,多个Impalad进程相互协作,共同完成SQL计算。在一些统计场景中,Impala可做到ms级计算速度。 后来Spark诞生,也推出自己的SQL引擎Shark,即Spark SQL,将SQL语句解析成Spark的执行计划,在Spark上执行。由于Spark比MapReduce快很多,Spark SQL也相应比Hive快很多,并且随着Spark的普及,Spark SQL也逐渐被人们接受。后来Hive推出了Hive on Spark,将Hive的执行计划转换成Spark的计算模型。 我们还希望在NoSQL执行SQL,毕竟SQL发展几十年,积累庞大用户,很多人习惯用SQL解决问题。于是Saleforce推出了Phoenix,一个执行在HBase上的SQL引擎。 这些SQL引擎只支持类SQL语法,并不能像数据库那样支持标准SQL,特别是数据仓库领域几乎必然会用到嵌套查询SQL:在where条件里面嵌套select子查询,但几乎所有的大数据SQL引擎都不支持。然而习惯于传统数据库的使用者希望大数据也能支持标准SQL。 回到Hive。Hive本身的技术架构其实并没有什么创新,数据库相关的技术和架构已经非常成熟,只要将这些技术架构应用到MapReduce上就得到了Hadoop大数据仓库Hive。但是想到将两种技术嫁接到一起,却是极具创新性的,通过嫁接产生出的Hive极大降低大数据的应用门槛,也使Hadoop得到普及。 >参考 - https://learning.oreilly.com/library/view/hadoop-the-definitive/9781491901687/ch17.html#TheMetastore
  • [二次开发] 【mrs产品】【hive功能】springboot启动可以登录zk,hive连接执行语句报错
    [Thread-44] DEBUG cn.hsa.ims.engine.bo.impl.DataPreprocessionBOImpl2.genFilterCriteriaTmpTableSql(160) - 生成筛选条件临时表SQL:create table TMP_MDTRT_D_15000020220420143641111001 as select xxx [Thread-44] INFO  cn.hsa.ims.bigdata.odps.HuaweiJdbcClient.submit(58) - Hive的入参:create table TMP_MDTRT_D_15000020220420143641111001 as select xxx[Thread-44] INFO  org.apache.hadoop.hive.conf.HiveConf.findConfigFile(198) - Found configuration file null[Thread-44] WARN  org.apache.hadoop.hive.conf.HiveConf.initialize(5430) - HiveConf of name hive.s3a.locals3.jceks does not exist[Thread-44] INFO  org.apache.hadoop.hive.conf.HiveConf.initTimeZoneIsLocal(5471) - current conf hive.parquet.time.zone.isLocal=true[Thread-44] ERROR com.alibaba.druid.pool.DruidDataSource.init(973) - {dataSource-4} init error
  • [二次开发] 【mrs1】【Hive二次开发】怎么用druid管理hive连接,代码实例中xml和jaas.conf没看出有什么用?
    mrs中hive二次开发,怎么使用druid管理hive连接。代码例子core-site.xml和user.hive.jaas.conf没看出有什么用。JDBCExamplePreLogin比JDBCExample 多了login。登录和不登录有什么区别?不需要写LoginUtil.setJaasConf、LoginUtil.setZookeeperServerPrincipal、LoginUtil.login也可以连接hive用spark引擎执行语句吗?
  • [大数据] Hive 引擎Tez和MR 启用local模式参数
    1.说明  正常在提交任务时,Hive都是提交在yarn上面。启动local模式的目的就是让hive的任务不运行在yarn上面,直接当前的服务器执行任务。2.优点当我们对Hive的源码进行Debug,且代码需要Debug到每个task内部时,如果任务是执行在yarn模式的话,那么是无法打断点的,需要进入local模式才能打断点3.MR当引擎为MR时,需要修改以下参数,可以修改配置文件hive-site.xml,也可以通过set来生效3.1 hive-site.xml<property> <name>hive.exec.mode.local.auto</name> <value>true</value> </property> <property> <name>hive.exec.mode.local.auto.inputbytes.max</name> <value>134217728</value> </property> <property> <name>hive.exec.mode.local.auto.input.files.max</name> <value>10</value> </property>3.2 set模式# 是否开启local模式 set hive.exec.mode.local.auto=true; # 输入最大数据量,默认128MB set hive.exec.mode.local.auto.inputbytes.max=134217728; # 最大任务数 set hive.exec.mode.local.auto.input.files.max=10;4.Tez当引擎为MR时,需要修改tez的配置文件tez-site.xml,如果通过set来执行将不生效4.1 tez-site.xml<property> <name>tez.local.mode</name> <value>true</value> </property> <property> <name>tez.grouping.split-count</name> <value>1</value> </property>说明:tez.grouping.split-count和上面的hive.exec.mode.local.auto.input.files.max类似
  • [问题求助] 数据开发-hive脚本页面如何使用变量
    数据开发-hive脚本页面如何使用变量RT
  • [问题求助] 【MRS产品】【hive功能】报错 return code 2 from org.apache.hadoop.hive.ql
    使用cdm导入10G的数据进入hive,表A,parquest格式,压缩lzo。在hdfs里面生成的是一个10G左右的文件。然后创建格式一模一样的表B,执行语句, insert overwrite table 表B select * from表A,将表A数据插入到表B一份。表B在HDFS的形式是一些256M大小的文件,大约四十多个。现在执行select count(1) from 表A ,结果是:40086390select count(1) from 表B,报错,报错是:Error while processing statement FAILED Execution Error code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask计算引擎是MR。
  • [技术干货] Spark &amp; Hive 云原生改造在智领云的应用
    引 言随着 Kubernetes 越来越成熟,使用者越来越多,大数据应用上云的需求也越来越迫切。原有的大数据资源管理器 Yarn 很难做到所有应用资源统一控制,完全隔离,带来的主机应用和大数据计算应用互相抢占资源,由此导致的计算任务时间可能经常性抖动,多租户应用互相影响。而在 Kubernetes 上,此类问题天然解决。所有的应用都以 Pod 形式在 Kubernetes 平台上统一管理,在规范的 namespace 管理下,不存在不受控的应用来进行资源抢占。同时,我们可以轻松地实现多租户,资源配额,运行审计计费等功能。在此背景下,客户原有的大数据计算任务如何无缝迁移到 Kubernetes 上,大数据应用如何进行云原生改造,都成了我们最迫切要解决的问题。本篇文章主要针对这两类问题来进行探讨。Spark 和 Hive 在智领云业务场景中的使用Spark 和 Hive 是大数据平台中很常用的两个计算引擎。Hive 本身是一个基于 HDFS 的类 SQL 数据库,其 HQL 语句的底层执行引擎可以使用基于 Yarn 的 MapReduce,也可以使用 Spark。Spark 在 2021 年 3 月推出的 3.1 版本中实现了对 Kubernetes 支持的 GA(general availability,意味着生产级的支持)。在这个版本中,Spark 允许用户从命令行(spark-submit)上提交 Spark 任务到 Kubernetes 集群中。但是目前的 Hive 版本中,如果底层使用 Spark,还只能提交任务到 Yarn,而不能支持将 Hive 查询提交到 Kubernetes。除了从命令行提交 Spark 任务,智领云使用 Hive 和 Spark 相关的业务还有三个场景:第一个、用户创建 Hive 作业进行数据 ETL 任务开发或数仓分层计算工作流创建。第二个、用户应用程序(例如,批处理调度系统)提交 pySpark 文件或者 Spark jar 包到平台,进行数据计算。第三个、用户使用 Jupyterlab Spark kernel 来访问 Hive/Mysql/Hdfs/Glusterfs 等数据源文件进行数据探索、机器学习及人工智能算法开发。这三种场景下的共同点都是需要将计算任务转换成 Spark job 在集群中调度运行(Hive 的缺省引擎已经从 MR 变成了 Spark)。在每个场景下,我们都在 Spark on Kubernetes 的基础上提供了相应的解决方案:在 Kubernetes 平台上使用 Hive,我们使用了 Hive on Spark on Kubernetes。对于 Spark 作业,我们集成了 Spark On Kubernetes Operator。JupyterLab 中使用 Spark, 我们集成了SparkMagicKernel 和 Livy。每种集成方式,都有其独有的优势和缺陷及其适用场景,下面我们来一一讲解。Hive On Spark On Kubernetes在类 SQL 数据库中,Hive 是很多 Hadoop 生态系统缺省的选择。虽然 Spark 也提供了 SparkSQL 来支持 SQL 查询,但是 Hive 使用的 HQL 和 SparkSQL 在语法支持上还是存在比较大的差异,对于大型的数据仓库项目,用户可能积累了几千个 Hive 任务,如果此时想要快速地迁移到 Kubernetes 上,那么使用 SparkSQL 存在很大的迁移成本和风险。但是如果我们只更改 Hive 的底层执行引擎,改成 Spark on Kubernetes,那么,我们就能让客户的 Hive HQL 应用,无需修改地快速迁移到 Kubernetes 上。但是由于 Spark on Kubernetes 是最近才达到 GA 状态,和不同的 Hive 版本,Kubernetes 版本,以及很多相关大数据组件之间的适配还没有成熟,我们需要对现有的版本之间进行一些适配才能平滑运行 Hive On Spark On Kubernetes。首先,我们必须要确定 Hive、Spark、Hadoop(HDFS)以及相关的 Kerberos、Ranger 的版本。对于 Spark 版本的选择,我们开发选型的时候最新版为 3.1.1,目前 Spark 最新版已经到了 3.2.1,Spark 选取最新版本即可,最新版本能够增强对 Kubernetes 的支持。而 Hive 版本选取的主要原则,即查看 Spark-client 模块的代码是否支持 Kubernetes,如果不支持是否容易改造以便支持。而 4.0.0 版本开始,spark-client 模块重构了代码结构,增加了 SparkClient 的抽象类,有该结构支持,增加对 Kubernetes 的支持就容易了很多。这里还有一种选择,就是 Hive 选用 3.1.2 的稳定版本,然后把最新的 master 分支的 spark-client 模块代码 cherry pick 到 3.1.2 版本即可。在 Hive 代码中主要改造内容是增加 KubernetesSubmitSparkClient,主要内容是构造 SparkSubmit 向 Kubernetes 提交 Spark 任务的各种参数,包括和 Hive 中 RPC server 通信的配置,提交 Spark 作业后,Spark driver pod 启动后会连接 HiveServer2 中的 RPC server,连接成功后,HiveServer2 会发送相应的 Spark job 到 Spark driver 来进行计算。而 Spark 代码的改动,主要是修改 Spark 中的 hiveShim 模块,增加对 Hive 4.0.0 的支持。Hive On Spark 在智领云的数据平台,主要作为 Hive 作业/工作流以及 Hue 查询工具的底层执行引擎:调度系统通过 Beeline 来连接 HiveServer2,Hue 通过 JDBC 连接 HiveServer2,客户端发送用户的 SQL 语句到 HiveServer2。HiveServer2 解析完成 SQL 后,会生成一系列的 HQL taskplan,对于这些 HQL 的执行,HiveServer2 会启动一个 RPC server,SparkSubmit 会带上 RPC server 参数,启动一个 Spark Driver Pod 来和 HiveServer2 进行 RPC 通信,这个 Spark Driver Pod 的主要功能就是接收 HiveServer2 发送过来的 SQL Job 进行计算,计算完成后,将结果返回给 HiveServer2 中运行的 RPC server。在 Kubernetes 平台,SparkSubmit 客户端和 Kubernetes APIServer 通信,Kubernetes 在接收到 Spark 任务请求后,会调用 Scheduler 组件启动 Spark Driver Pod, Spark Driver 在启动完成后,会发送启动 Executor 请求给 Kubernetes APIServer, Kubernetes 再启动 Spark Executor Pod, Spark Driver 和 Executor 建立连接,完成整个 Spark 集群的创建。整体架构如下图所示:权限控制方面,我们使用 Ranger 来完成授权和鉴权操作,使用 Kerberos 来完成认证操作。对于 Ranger 鉴权插件, Hive 和 Spark 都有相应的解决方案。Hive 直接通过 Hive Ranger 插件和 Ranger 服务来通信,完成鉴权操作,Spark 则通过 Spark Authorizer 插件再调用 Hive Ranger 插件来完成鉴权。在 Hive On Spark 模式下,我们使用 Spark 对 Kerberos 的支持来完成用户身份认证操作,通过 Hive Ranger 插件来完成鉴权操作。Spark on Kubernetes OperatorSpark on Kubernetes Operator 项目是 Google 非官方推出的 Spark On Kubernetes 解决方案。它的内部实现是基于 Spark 官方的 Spark On Kubernetes 解决方案之上,更多的利用了 Kubernetes 特性,来增强在 Kubernetes 上使用 Spark 计算引擎的易用性和灵活性以及性能的提升。它本质上是一个 Kubernetes Operator,所以在该解决方案下,用户提交 Spark 作业只需要通过 Yaml 文件即可,并且可以定制 Kubernetes Schedule。比如,可以配置使用华为提供的针对大数据领域优化过的 Volcano 调度引擎。在智领云平台上,Spark on Kubernetes Operator 承载了用户提交 Jar 包或者 pySpark 文件类型的所有 Spark/Spark-streaming 作业的底层调度引擎。在 Spark OnKubernetes Operator 成熟之后,Hive on Spark 底层未来也可以增加 Spark On Kubernetes Operator 运行模式的支持,仅仅只需要在 spark-client 模块中增加KubernetesOperatorSparkClient 抽象类的支持即可。Spark Operator 方案也存在一个弊端,就是 Spark 作业配置 Yaml 的高度复杂化,该 Yaml 需要配置 Spark 作业的所有信息,包括Driver/Executor 的资源控制,包括 Spark 的镜像版本和调度算法。普通用户不需要关注这些配置。在此问题下,我们模仿 Apache Livy 的 API 增加了一个 Spark On Kubernetes Operator Server。该服务负责管理 Spark On Kubernetes Operator Job,提供创建/更新/删除 Job 接口,提供查询 Job 状态及日志请求。用户只需要配置少量Spark Job 参数,后台服务会根据参数完成 Spark Job Yaml 文件渲染,提交到 Kubernetes 集群。在权限控制这一块,我们可以使用 Spark 相关配置结合 Spark Operator 对 Kerberos 的支持来实现。对 Ranger Hive 插件的支持,我们可以使用 Spark Authorizer 插件来转接适配,不过该插件版本较老,我们需要修改其 POM 文件和相关代码来使其可以支持 Spark 3.1.1 版本。在 Spark Operator 模式下, Spark 作业的相关配置都在 Yaml 中配置,我们可以利用 Spark Operator 对 Sidecar 的支持来完成 Spark Operator 对 Ranger Hive 插件的支持。主要方法就是 Spark 3.1.1 版本的原生镜像不变,将 Ranger 相关的 Jars 通过 Sidecar 共享目录共享给 Spark 主 Container,并配置相关 ClassPath 参数,使 Spark 能够找到 Ranger 和 Spark Authorizer 相关 Jar 包。JupyterLab On KubernetesJupyterLab 作为数据科学家首选的 IDE,在数据及人工智能领域应用非常广泛。在智领云平台,我们的主要改造是打通JupyterLab 和我们的调度平台的互相访问,增加 Spark 读写 Hive / HDFS 的支持。这个场景和前两个场景的主要区别在于 JupyterLab Kernel 和 Spark Driver Pod 之间可能有持续的交互,而不是 run to finish。其次,在 UI 界面下的任务需要无需修改的在后台(测试或生产环境下)运行。在此需求之下,我们主要做了几点改动:选取了 SparkMagic Kernel 支持了用户编写测试 Spark 代码。改造 JupyterLab Server 代码,允许用户直接点击开启当前 Spark 任务的 4040 调试页面 UI。改造 JupyterLab Client 代码,允许用户可以直接在 JupyterLab Notebook 内直接引用系统或者用户自定义变量,并能够在调度和调试时生效。增加了 JupyterLab 调度 Worker,使调度平台可以直接调度运行用户的 ipynb 类型的 Notebook 文件。增加 JupyterLab Python 环境管理,允许 JupyterLab 在重启后保持其之前设置的 Python 环境。SparkMagic Kernel 执行 Spark 任务是利用 Apache Livy 服务来实现任务的提交以及交互Session 的维护。Apache Livy 目前版本对 Kubernetes 并不支持,我们需要添加 Kubernetes client 和状态查询的支持。Apache Livy 实现的对 Kubernetes 的支持实际上是和 Hive on Spark 模式类似,都是创建 RPC Server,然后调用 SparkSubmit 提交 Spark 任务和 RPC Server 通信,来完成 SQL 任务的交互。下图展示了整个流程的架构。在此种模式下,Hive 的权限控制配置和 Spark Operator 类似,都是使用 Spark Authorizer 和 Hive Ranger 插件来实现。未来在智领云平台,我们使用了存储和计算分离的方案,在计算层使用 Spark on Kubernetes 作为主要的计算引擎,底层可以采用 HDFS 兼容现有系统,也可以采用其它支持 HDFS 接口的云原生存储。这样的架构,加上对 Hive 等传统 Hadoop 生态的云原生改造,可以在最大程度的支持现有系统的同时逐步迁移到纯云原生的体系架构下,无缝集成新的大数据和人工智能系统。而基础架构即代码(Infra as Code)方式的使用, CI / CD 全链路的支持,为类似 DataOps,DataMesh 的新型数据应用开发运维范式提供了清晰可行的技术架构支持。而由此带来的业务开发效率的提升,业务管理运维效能的提升,都是质的变化。未来可期。转载于CSDN微信公众号
  • [大数据] Hive Tez 0.10.0版本编译安装 适配3.1以上的hadoop
    1.说明在官网下载的最新的Apache Tez在适配Hadoop3.0以上的版本时会存在一些问题,这边打算自己编译安装来适配2.下载tez github源码下载https://github.com/apache/tez/或者可以直接使用wget命令下载wget https://github.com/apache/tez/archive/rel/release-0.10.0.tar.gz --no-check-certificate3.编译tar -zxvf release-0.10.0.tar.gz解压之后的目录:修改pom.xml的hadoop版本修改适配自己版本的hadoop添加配置可用的Maven仓库源 <repositories> <repository> <id>huaweimaven</id> <name>huawei maven</name> <url>https://mirrors.huaweicloud.com/repository/maven/</url> </repository> </repositories>选择不编译tez-ui模块 不影响功能,编译比较耗时间编译命令:mvn clean install -DskipTests4.安装选择/usr/local作为tez的安装目录将编译好的tez包上传到/usr/local目录上mv tez-0.10.0.tar.gz /usr/local tar -zxvf tez-0.10.0.tar.gz ln -s tez-0.10.0 /usr/local/tez上传tez.tar.gz到hdfs在hdfs上创建目录hdfs dfs -mkdir -p /apps/tez hdfs dfs -put tez.tar.gz /apps/tez新增tez-site.xmlcd /usr/local/tez/conf vim tez-site.xml<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>tez.lib.uris</name> <value>${fs.defaultFS}/apps/tez/tez.tar.gz</value> </property> <property> <name>tez.use.cluster.hadoop-libs</name> <value>false</value> </property> <property> <name>tez.history.logging.service.class</name> <value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value> </property> </configuration>修改hadoop-env.sh所有节点都需要替换cd /usr/local/hadoop/etc/hadoop vim hadoop-env.shexport TEZ_CONF_DIR=/usr/local/tez/conf/tez-site.xml export TEZ_JARS=/usr/local/tez export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*修改yarn-site.xml所有节点都需要替换cd /usr/local/hadoop/etc/hadoop vim yarn-site.xml<property> <name>yarn.timeline-service.enabled</name> <value>true</value> </property> <property> <name>yarn.timeline-service.hostname</name> <value>server1</value> </property> <property> <name>yarn.timeline-service.http-cross-origin.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.system-metrics-publisher.enabled</name> <value>true</value> </property> <property> <name>yarn.timeline-service.generic-application-history.enabled</name> <value>true</value> </property> <property> <name>yarn.timeline-service.address</name>cd <value>server1:10200</value> </property> <property> <name>yarn.timeline-service.webapp.address</name> <value>server1:8188</value> </property> <property> <name>yarn.timeline-service.webapp.https.address</name> <value>server1:8190</value> </property> <property> <name>yarn.timeline-service.leveldb-timeline-store.path</name> <value>${hadoop.tmp.dir}/yarn/timeline</value> <description>Store file name for leveldb timeline store.</description> </property> <property> <name>yarn.timeline-service.ttl-ms</name> <value>604800000</value> <description>Time to live for timeline store data in milliseconds.</description> </property> <property> <name>yarn.timeline-service.bind-host</name> <value>0.0.0.0</value> </property>
  • [数据集成FDI] 【roma产品】【数据集成功能】api接口导入数据到hive各个表映射到hdfs路径如何配置
    【功能模块】【操作步骤&问题现象】1、接入数据源中大数据存储模块,hive、fi hive、mrs hive 、fi hdfs、mrs hdfs这几个存储方式有什么区别吗2、api接口导入数据到hive 各个表映射到hdfs路径如何配置?【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [生态对接] 伙伴使用MRS HIVE连接时失败
    伙伴使用驱动连接MRS HIVE时连接失败,原因目前定位出是因为驱动生成的服务端pricipal在kdc数据库中不存在,需要根据节点名重新生成principal,求指导
  • [大数据] Hive调优(二) 基于Tez的map和reduce数量控制与调优
    1.概述主要对基于Tez的map数和reduce数测试与调优 2.数据准备(1)表信息本次测试的表和sql都是使用的TPC-DSTez文件存储格式为orc。 表名总数占用空间文件数date_dim73049354.1 K1item480003.0 M1store11812.3 K1store_sales23039641872829.2 G1874 store_sales的表结构: (2)SQL语句测试SQL为SQL2 CPU密集型 3.map数的控制3.1.1 map数控制测试影响map个数的tez参数,可以通过在Hive中使用set的形式来使用参数默认值说明tez.grouping.max-size1073741824(1GB)group分割大小的最大值tez.grouping.min-size16777216(16MB)group分割大小的最小值tez.grouping.split-count未设置group的分割组数 mapreduce.input.fileinputformat.split.maxsize这个参数对Tez中的map数也是有影响的。默认为256MB,但是这次测试不针对这个参数 (1)tez.grouping.max-size测试1:tez.grouping.max-size =1073741824; 1GB共有363个map数 测试2:tez.grouping.max-size =536870912; 512MB共有701个map数 测试3:tez.grouping.max-size =268435456; 256MB共有818个map数 测试1:tez.grouping.max-size =2147483648; 2GB共有363个map数因为2GB大于文件的分割长度 (2)tez.grouping.split-count测试1:tez.grouping.split-count =300;共有363个map数 测试2:tez.grouping.split-count =500;共有749个map数 测试3:tez.grouping.split-count =1000;共有895个map数当设置的值大于原始的895时,tez会直接使用895 测试4:tez.grouping.split-count =200;共有244个map数 3.1.2 map数控制结果(1)tez.grouping.max-sizetez.grouping.max-size值Map数10737418243635368709127012684354568182147483648363 (2)tez.grouping.split-counttez.grouping.split-count值Map数3003635007491000895200244 4.reduce数的控制影响reduce个数的参数参数默认值说明mapred.reduce.tasks-1指定reduce的个数hive.exec.reducers.bytes.per.reducer67108864每个reduce的数据处理量hive.exec.reducers.max1009reduce的最大个数hive.tez.auto.reducer.parallelismtrue是否启动reduce自动并行 前面三个参数的作用和基于MR的相同,故不在做分析,只分析hive.tez.auto.reducer.parallelism参数 (1)hive.tez.auto.reducer.parallelismhive.tez.auto.reducer.parallelism启用之后,Tez会估计数据量大小,设置并行度。在运行时会根据需要调整估计值。测试1:set hive.exec.reducers.max=20;set hive.tez.auto.reducer.parallelism = false; 测试2:set hive.exec.reducers.max=20;set hive.tez.auto.reducer.parallelism = true;从dag syslog中可以看到,开启并行执行之后,Reducer 2从20变成了7。Reducer 3 4 5 6从20变成了5  5.调优流程(1)mapTez内部对map数已经有了很多的优化,需要通过dag的系统日志来分析,在有限的资源内,结合集群的资源来提高并发,集群资源越多,map数就可以设置大一点。 (2)reduce需要根据集群的资源以及map端实际的输出数据量来设置reduce数。 6.总结推荐使用(1)mapl  tez.grouping.split-count结合集群的资源来设置,例如sql2:总共分割895,测试过程中895的性能好。 (2)reducel  推荐reduce数为集群能启动的最大container数的80%,或者小于这个数。l  mapred.reduce.tasks这个参数一般不推荐使用。l  是否开启并行,需要根据实际sql的测试结果来判断。最好开启和关闭并行都测试一下