• [知识分享] 大数据开发,Hadoop Spark太重?你试试esProc SPL
    【摘要】 随着大数据时代的来临,数据量不断增长,传统小机上跑数据库的模式扩容困难且成本高昂,难以支撑业务发展。很多用户开始转向分布式计算路线,用多台廉价的PC服务器组成集群来完成大数据计算任务。Hadoop/Spark就是其中重要的软件技术,由于开源免费而广受欢迎。经过多年的应用和发展,Hadoop已经被广泛接受,不仅直接应用于数据计算,还发展出很多基于它的新数据库,比如Hive、Impala等。 H...本文分享自华为云社区《Hadoop Spark太重,esProc SPL很轻》,作者:石臻臻的杂货铺。随着大数据时代的来临,数据量不断增长,传统小机上跑数据库的模式扩容困难且成本高昂,难以支撑业务发展。很多用户开始转向分布式计算路线,用多台廉价的PC服务器组成集群来完成大数据计算任务。Hadoop/Spark就是其中重要的软件技术,由于开源免费而广受欢迎。经过多年的应用和发展,Hadoop已经被广泛接受,不仅直接应用于数据计算,还发展出很多基于它的新数据库,比如Hive、Impala等。Hadoop/Spark之重Hadoop的设计目标是成百上千台节点的集群,为此,开发者实现了很多复杂、沉重的功能模块。但是,除了一些互联网巨头企业、国家级通信运营商和大型银行外,大多数场景的数据量并没有那么巨大。结果,经常能看到只有几个到十几个节点的Hadoop集群。由于目标和现实的错位,对很多用户来讲,Hadoop成了一个在技术、应用和成本上都很沉重的产品。技术之重如果真的有几千台计算机组成的集群,是不可能依靠手工个性化管理的。试想,将这些计算机罗列出来,运维人员看都看不过来,更别说管理和分配任务了。再说,这么多机器,难免会不断出现各种故障,怎么保证计算任务顺利执行?Hadoop/Spark的开发者为了解决这些问题,编写了大量代码,用于实现自动化节点管理、任务分配和强容错功能。但是,这些功能本身就要占用很多计算资源(CPU、内存和硬盘等),如果用到几台到十几台节点的集群上,就太过沉重了。集群本来就不大,Hadoop还要占用相当一部分的资源,非常不划算。不仅如此,Hadoop产品线很长,要把这些模块都放在一个平台上运行,还要梳理好各个产品之间的相互依赖性,就不得不实现一个包罗万象的复杂架构。虽然大多数场景只用其中一两个产品,也必须接受这个复杂、沉重的平台。后来出现的Spark弥补了Hadoop对内存利用的不足,技术上是不是可以变轻呢?很遗憾,Spark走向了另一个极端,从理论模型上就只考虑内存计算了。特别是Spark 中的 RDD 采用了 immutable 机制,在每个计算步骤后都会复制出新的 RDD,造成内存和 CPU 的大量占用和浪费,离开大内存甚至无法运行,所以技术上还是很重。使用之重Hadoop技术上太过复杂,也就意味着安装和运维会很麻烦。集群只有几台计算机时,却不得不使用为几千台节点集群设计的节点管理、任务分配和容错功能。可想而知,安装、配置、调试都很困难,日常运行的维护、管理工作也不容易。即使克服这些困难让Hadoop运行起来了,编写大数据计算代码时还会面临更大的麻烦。Hadoop编程的核心框架是MapReduce,程序员要编写并行程序,只要写 Map 和 Reduce 动作即可,用来解决求和、计数等简单问题也确实有效。但是,遇到复杂一些的业务逻辑,用MapReduce编程就会变得非常困难。例如,业务计算中很常见的JOIN计算,就很难用MapReduce实现。再比如,很多和次序有关的运算实现起来也很困难。Spark的Scala语言具备一定的结构化数据计算能力,是不是能简单一些呢?很可惜,Scala使用难度很大,难学更难精。遇到复杂一些的运算逻辑,Scala也很难写出来。MapReduce、Scala都这么难,所以Hadoop/Spark计算语法开始回归SQL语言。Hive可以将SQL转化为MapReduce所以很受欢迎,Spark SQL的应用也比Scala广泛的多。但是,用SQL做一些常规查询还算简单,用于处理多步骤过程计算或次序相关运算还是非常麻烦,要写很复杂的UDF。而且,许多计算场景虽然勉强能用SQL实现,但是计算速度却很不理想,也很难进行性能调优。成本之重虽然 Hadoop 软件本身开源免费,但它技术复杂、使用困难,会带来高昂的综合成本。前面说过,Hadoop自身会占用过多的CPU、内存和硬盘,而Spark需要大内存支撑才能正常运行。所以不得不为Hadoop/Spark采购更高配置的服务器,要增加硬件支出。Hadoop/Spark使用困难,就需要投入更多的人力去完成安装、运维,保证Hadoop/Spark的正常运转;还要投入更多的开发人员,编程实现各种复杂的业务计算,要增加人力资源成本。由于使用过于困难,很多用户不得不采购商业公司的收费版本Hadoop/Spark,价格相当可观,会大幅增加软件采购成本。既然Hadoop如此沉重,为什么还有很多用户会选择它呢?答案很简单:暂时找不到别的选择,也只有Hadoop勉强可用,好歹知名度高一些。如此一来,用户就只能安装、配置Hadoop的重型应用,并忍受Hadoop本身对计算资源的大量消耗。小规模集群的服务器数量本来就不多,Hadoop又浪费了不少,小马拉大车,最后运行的效果可想而知。花了大价钱采购、费事费力的使用Hadoop,实际计算的性能却不理想。就没有别的选择了?轻量级的选择开源的esProc SPL是轻量级大数据计算引擎,采用了全新的实现技术,可以做到技术轻、使用简单、成本低。技术轻本文开头说过,越来越大的数据量让传统数据库撑不住,所以用户只能转向分布式计算技术。而数据库之所以撑不住,是因为SQL难以实现高速算法,大数据运算性能只能指望数据库的优化引擎,遇到复杂计算时,优化引擎又常常无能为力。所以,我们应该想办法设计更高效的算法,而不是一味地追求分布式计算。按照这个思路,SPL提供了众多高性能算法(有许多是业界首创)以及高效的存储方案,同等硬件环境下可以获得远超过数据库的运算性能。安装在单机上的SPL就可以完成很多大数据计算任务,架构比集群简单很多,从技术上自然就轻的多了。SPL的高性能算法有下面这些:对于数据量更大的情况,SPL实现了轻量级集群计算功能。这一功能的设计目标是几台到十几台节点的集群,采用了与Hadoop完全不同的实现方法。SPL集群不提供复杂沉重的自动化管理功能,而是允许对每个节点进行个性化配置。程序员可以根据数据特征和计算目标来决定各节点存储什么样的数据,完成哪些计算。这样做,不仅大大降低了架构复杂度,也是提升性能的重要手段。以订单分析为例,订单表很大,要通过产品号字段与较小的产品表主键做关联,再按照产品供应商分组汇总订单金额。SPL集群可以很容易的将订单表分段存放在各个节点的硬盘上,再将较小的产品表读入每个节点的内存中。计算时,每个节点仅对本机上的订单分段和产品数据做关联、分组汇总,可以缩短总计算时间;再将结果传输到一个节点上做二次汇总。由于传输的是第一次汇总的结果,数据量小、网络传输时间较短。总体来说,这个方案可以获得最佳性能,虽然程序员需要做一些更细致的工作,但对于小规模集群来说,增加的工作量并不大。SPL也不提供超强的容错能力,不会像Hadoop那样,在有节点故障的情况下,还要保证任何一个任务都会执行成功。实际上,大多数计算任务的执行时间都在几个小时以内,而几台、十几台机器的集群一般都能做到较长时间正常运行,不会这么频繁的出故障。即使偶尔出现节点故障导致任务执行失败,再重新计算一遍也可以接受,毕竟这种情况不会经常发生。所以,SPL的容错能力只是保证有少数节点故障的时候,整个集群还能继续工作并接受新任务(包括重算的任务),这就大大降低了SPL集群的复杂度。在内存计算方面,SPL没有使用Spark RDD的 immutable机制,而是采用了指针式复用机制,利用地址(指针)访问内存,在数据结构没有改变的情况下,直接用原数据的地址形成结果集,不必每个计算都将数据复制一遍,仅仅多保存一个地址(指针),可以同时减少 CPU 和内存的消耗,运行起来要比Spark轻很多了。并且,SPL改进了当前的外存计算算法体系,降低了复杂度并扩大了适应范围,可以做到内外存计算结合,充分提升计算性能的同时,还不像Spark那样依赖大内存。使用简单SPL采用轻量级技术,自然更容易安装、配置和运行维护。SPL不仅可以作为独立服务器使用,还很容易集成到需要高性能计算的应用中,比如即时查询系统,只要引入几个jar包即可。Hadoop则很难集成,只能在边上作为一个数据源运行。有些临时性数据需要随时进行处理,则可使用SPL的桌面集成开发环境可视化地计算,快速得到结果。如果要安装部署Hadoop,那么等环境搭建好时临时数据任务已经过期了。前面展示的众多SPL高性能算法,也能让大数据计算编程变得简单。程序员可以在较短时间内掌握这些算法函数,学习成本相对较低。而且,使用这些现成的函数很容易实现各种复杂的计算需求,不仅比MapReduce/Scala简单,比SQL也简单很多。比如,以电商网站常见的漏斗分析为例,用SQL实现三步漏斗的代码大致如下:with e1 as ( select gid,1 as step1,min(etime) as t1 from T where etime>= to_date('2021-01-10', 'yyyy-MM-dd') and etime<to_date('2021-01-25', 'yyyy-MM-dd') and eventtype='eventtype1' and … group by 1),with e2 as ( select gid,1 as step2,min(e1.t1) as t1,min(e2.etime) as t2 from T as e2 inner join e1 on e2.gid = e1.gid where e2.etime>= to_date('2021-01-10', 'yyyy-MM-dd') and e2.etime<to_date('2021-01-25', 'yyyy-MM-dd') and e2.etime > t1 and e2.etime < t1 + 7 and eventtype='eventtype2' and … group by 1),with e3 as ( select gid,1 as step3,min(e2.t1) as t1,min(e3.etime) as t3 from T as e3 inner join e2 on e3.gid = e2.gid where e3.etime>= to_date('2021-01-10', 'yyyy-MM-dd') and e3.etime<to_date('2021-01-25', 'yyyy-MM-dd') and e3.etime > t2 and e3.etime < t1 + 7 and eventtype='eventtype3' and … group by 1)select sum(step1) as step1, sum(step2) as step2, sum(step3) as step3from e1 left join e2 on e1.gid = e2.gid left join e3 on e2.gid = e3.gidSQL写出来要三十多行,理解起来有相当的难度。如果用MapReduce/Scala来写,会更加困难。即使是用SQL实现,写出来的这段代码和漏斗的步骤数量相关,每增加一步就要再增加一段子查询。相比之下,SPL 就简单得多,处理任意步骤数都是下面这样简洁的代码:AB1=["etype1","etype2","etype3"]=file("event.ctx").open()2=B1.cursor(id,etime,etype;etime>=date("2021-01-10") && etime<date("2021-01-25") && A1.contain(etype) && …)3=A2.group(id).(~.sort(etime))=A3.new(~.select@1(etype==A1(1)):first,~:all).select(first)4=B3.(A1.(t=if(#==1,t1=first.etime,if(t,all.select@1(etype==A1.~ && etime>t && etime<t1+7).etime, null))))5=A4.groups(;count(~(1)):STEP1,count(~(2)):STEP2,count(~(3)):STEP3)SPL集群计算的代码也非常简单,比如前面提到的订单分析计算,具体要求是:大订单表分段存储在4个节点上,小产品表则加载到每个节点的内存中,两表关联之后要按照产品供应商分组汇总订单金额。用SPL写出来大致是下面这样:AB1["192.168.0.101:8281","192.168.0.102:8281",…, "192.168.0.104:8281"]2fork to(4);A1=file("product.ctx").open().import()3>env(PRODUCT,B2)4=memory(A1,PRODUCT)5=file("orders.ctx":to(4),A1).open().cursor(p_id,quantity)6=A5.switch(p_id,A4)7=A7.groups(p_id.vendor;sum(p_id.price*quantity))这段代码执行时,任务管理(内存加载、任务拆分、合并等)所需要的计算资源,远远小于关联和分组汇总计算的消耗。如此轻便的任务管理功能,可以在任意节点、甚至是集成开发环境IDE上执行。成本低与Hadoop相同,SPL也是开源软件,不同的是SPL不仅软件免费,综合成本也非常低。SPL安装、配置、运维很容易,可以大大降低支持人员的人力资源成本。同时,由于SPL降低了大数据计算编程的难度,程序员很容易实现各种复杂的计算,开发效率显著提高,也就节省了程序员的人力资源成本。而且,由于SPL技术体系非常轻,平台自身占用的CPU、内存和硬盘很少,可以让更多的资源用于业务计算,能大幅提高硬件利用率。SPL也不像Spark那样依赖大内存,总体来说,大大减少了硬件采购成本。SPL既轻且快SPL技术轻、自身消耗小,而且还提供了众多高性能算法,所以,在几个到几十个节点的集群,甚至单机的情况下,比Hadoop/Spark有更好的性能表现。案例1:某电商漏斗分析计算。Spark:6节点,每节点4CPU核,平均计算时间:25秒。SPL:单机,8线程计算,平均计算时间可达10秒。代码量仅有Spark Scala的一半。案例2:某大型银行用户画像分析。Hadoop上某OLAP服务器:虚拟机100CPU核,计算时间:120秒。SPL:虚拟机12CPU核,计算时间:仅4秒。性能提高250倍。案例3:某商业银行的手机银行APP,活期明细查询,数据量大且高并发。基于Hadoop的某商用数据仓库:高并发时无法达到秒级的响应速度,只好换用6台ES集群。SPL单机:达到6台ES集群同样的并发和响应能力。总结来说,Hadoop/Spark是源自头部互联网企业的重型解决方案,适合需要有超大规模集群的巨大企业。很多场景的数据虽然也不少,但小集群甚至无集群就足够处理,远没多到这些巨大企业的规模,也没有那么多的硬件设备和维护人员。这种情况下,轻量级的大数据计算引擎SPL是首选,投入很低的成本,就可以做到技术轻、使用简便,而且还能提高开发效率、达到更高的性能。SPL资料SPL下载SPL源代码
  • [二次开发] spark客户端提交代码报错Unable to obtain password from user 找不到密码
    【操作步骤&问题现象】1、按照文档配置完了对应的配置文件,下载user.keytab 配置到指定路径2、提交代码时出现了如下异常在线等急,感谢大佬们的相助【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] IDEA本地连接用Spark连接HIVE的问题,求助!!~
    首先:集群规模是健康的。连接的点是集群外。使用hive的beeline和spark的spark-beeline都能正常连接,和操作。但是用IDEA spark开发就出现问题了代码如下:报错如下:2022-05-27 23:29:21,610 [main] ERROR [org.apache.thrift.transport.TSaslTransport] - SASL negotiation failurejavax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)    at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)。。。Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)    at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)。。。2022-05-27 23:29:21,631 [main] ERROR [org.apache.thrift.transport.TSaslTransport] - SASL negotiation failurejavax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)    at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)。。。Exception in thread "main" org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;    at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:107)    at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:215)。。。Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)    at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:185)。。Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)。。。Caused by: java.lang.reflect.InvocationTargetException    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)。。。Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed    at org.apache.thrift.transport.TSaslTransport.sendAndThrowMessage(TSaslTransport.java:232)    at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:316)    at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)。。。。
  • [生态对接] spark 读取卡夫卡数据 提交到yarn一直报错连接异常失败 代码和参数都配置正确
    【功能模块】spark 读取kafka提交到yarn之后异常,一直报连接错误 【截图信息】 
  • [生态对接] spark提交yarn idea执行正常,提交yarn报错找不到主类
    【功能模块】spark 代码,rdd提交时报错【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [优秀实践] Spark使能KAEzip压缩项目实践
    首先非常感谢华为鲲鹏能给我们这个机会参与到华为鲲鹏生态的建设中来,同时也非常荣幸能跟随我的导师朱常鹏一起参与到这个项目中来。我将会从以下三个方面来介绍我们的项目项目背景项目方案心得体会项目背景随着大数据时代的到来,存储和计算大体量数据的需求越来越多的在企业中涌现,作为Hadoop大数据生态圈中目前离线计算性能最优越的大数据计算引擎Spark越来越多的被企业使用,为此,提升Spark在大数据场景下的性能成为当下大数据技术圈中的热点问题。在本项目中,通过对Spark的扩展,使之能够支持新型压缩算法KAEzip,KAEzip是华为新开发出的一种内置于华为鲲鹏920 CPU的高效压缩算法。相比较传统的zlib压缩算法,其压缩/解压缩性能均有较大幅度的提升。如果能够让Spark支持KAEzip压缩算法,将会较大程度上提升Spark在大数据场景下的性能,从而能够提升企业的效益。 项目方案本方案难点是需要了解掌握Spark中的压缩组件与jdk中的zlib组件和操作系统中的libz.so动态库之间的相互关系。Jdk自实现了用于zlib解压缩的动态库libzip.so,操作系统中也自带了一个zlib解压缩的动态库:libz.so,SparkSQL在读写文件时调用的是操作系统中的libz.so中的解压缩算法,其他解压缩场景如broadcast,shuffle,checkpoint调用的是jdk自带的libzip.so中的解压缩算法。Spark中的Spark core包和Spark SQL包负责压缩算法的选择与调用。在这两个包中,分别使用特征CompressionCodec和CompressionCodecs支持不同类型的压缩算法。基于特征的设计使得Spark在其生命周期内更易扩展。因此本方案主要的设计思路:实现Spark中的特征CompressionCodec和CompressionCodecs,使Spark支持KAEzip压缩算法;将Spark中对KAEzip中的压缩方法(deflate)和解压缩方法(inflate)的调用,重定向到操作系统中的libkaezip.so动态库(自实现),最终实现Spark对KAEzip的支持。具体思路如图1所示。Spark SQL提供了spark.read/spark.write等方法对本地文件或hdfs上的文件进行读写操作。根据不同的读写对象,这些可以进一步细分。比如spark.read可以细分为spark.read.text/spark.read.orc/sparkread.parquet等等,其中每个方法都可以设置不同的压缩算法,实现对文件的压缩和解压缩。为了让这些方法支持KAEzip压缩算法。我们的扩展可分为三个部分。首先,扩展CompressionCodecs特征,使Spark能够识别kaezip关键字,并映射到指定类上。该类实现了CompressionCodecs特征中规定的压缩/解压缩方法。因此对CompressionCodecs特征中的压缩/解压缩的调用,对最终被重定向到该类中的相应方法。其次,对ORC和parquet进行扩展。它们并不属于Spark或Hadoop,而是两个相互独立的Apache项目。它们目前并不支持KAEzip压缩算法,因此需要扩展,使其能够识别zlib和kaezip关键字,并将对应的关键字映射到指定的类上。最后,指定类对CompressionCodecs特征中规定的压缩/解压缩的实现依赖libkaezip.so动态库中提供的相应方法。因此需要根据需要动态的激活或者去活该动态库,确保只有在KAEzip压缩算法被调用时才被激活。具体过程如下图所示。 除了文件的读写操作外,很多Spark应用程序也可以选择压缩算法,对执行过程中产生的数据进行压缩和解压缩。这一功能由Spark core实现。因此扩展Spark core是本方案的第二重点。具体实现方案如图3所示。首先,创建两个新类KaeInputStream和KaeOutputStream,它们分别调用KaeInflaterInputStream和KaeDeflateOutputStream类中的相关方法,实现CompressionCodeC中的相应方法,实现对数据流的压缩和解压缩。进一步,KaeInflaterInputStream和KaeDeflateOutputStream类都包含一个字节数组,用于临时存储数据流,然后分别调用类KaeInflater和类KaeDeflater,实现对数据流的压缩和解压缩。通过这六个类对Spark core进行扩展,实现Spark对KAEzip的支持。最后,KaeInflater和KaeDeflater并未真正实现压缩和解压方法,而是通过JNI方式调用到操作系统中的libz.so动态库中提供的inflate()方法和deflater()方法。为此,我们还需创建libnativezip.so动态库,作为链接KaeInflater和KaeDeflater和libz.so之间的桥梁。 KaeInflater和KaeDeflater中申明和使用的native方法由libnativezip.so中的相应方法负责实现,而它们的实现会调用到libz.so中的相应方法。最终构建起KaeInflater和KaeDeflater和libz.so之间的桥梁。通过动态地替换libz.so为libkaezip.so即可实现Spark支持KAEzip压缩算法。 在方案实现中主要有以下重难点:在OpenEuler系统中搭建大数据环境,需要对以下组件进行适配:JDK,Hadoop(ARM版),Spark;通过Makefile编译libkaezip.so动态库,使其能够调用到OpenEuler中的libz.so;Java通过JNI与C/C++动态库进行交互,在Java层面调用libkaezip.so;软链接的切换;Spark CompressionCodec模块源码修改,扩展KAEzip压缩功能;OpenEuler测试环境搭建,并构建测试数据(1G~50G);测试脚本编写并测试心得体会通过参与本次的研究项目,一方面我学到很多技术:Spark压缩机制,JNI函数调用方法,压缩格式文件的写入写出等等,但更多的是思维方式的思考和转变,对于此我主要有以下两点心得体会。 错误的代价是最大的,如果项目中出现了错误,那么建立在错误上的工作都白费了,因此做项目需要做好检查和校验机制,确保前进的每一小步都是正确的,这样才是最快的。我们在一开始犯了一个严重的错误,将Spark依赖的libz.so算法库错误的认为是Spark源码自身携带的,这个结论最后被证实是错误的,这个错误间接导致了我们将近20%的工作白费,极大的影响项目的进度。 思考问题要从最基本的粒度进行思考,当我们对于一个问题并不了解时,我们应当从问题的最基本的流程和最微小的组成部分着手进行研究,我们的Spark在开始并不能支持分区块数据进行合并,总是会丢弃掉其他的分区,经过我们研究发现,KAEzip格式的压缩文件最后会有一个文件终止符,当多个分区的文件进行合并后,将会舍弃掉第一个分区文件终止符后的数据,如果我们没有从压缩文件格式的角度出发进行研究,我们是无法发现这个问题的。在此再次感谢鲲鹏众智让我能够有机会参与本次项目,最后衷心希望KAEzip压缩算法性能能越来越强大,鲲鹏920 CPU能够突破西方的技术封锁,走出国门,真正实现CPU中国化!!   重庆理工大学-大数据实验室-尹博文,指导老师:朱常鹏老师
  • [优秀实践] Spark使能KAEzip压缩项目实践
    首先非常有幸能够参与到鲲鹏Spark使能KAEzip项目,本文我将从以下几个方面进行介绍:项目背景项目方案心得体会项目背景随着云计算与大数据技术的不断发展,Hadoop和Spark等大数据处理平台越来越广泛地被企业用于大数据存储、处理与分析。为此,如何提高它们处理数据的性能已经成为大数据领域一个新的研究重点和难点。KAEzip是一种内置于华为鲲鹏920 CPU的高效压缩算法。相比较传统的zlib压缩算法,其压缩/解压缩性能均有较大幅度的提升。但是当前Apache Spark仅支持lz4,snappy和zstd等常见的压缩算法,无法有效支持KAEzip。因此本项目的研究核心就是如何有效地扩展Spark,使其支持KAEzip压缩算法。项目方案本方案难点是需要了解掌握Spark中的压缩组件与jdk中的zlib组件和操作系统中的libz.so动态库之间的相互关系。Jdk自实现了用于zlib解压缩的动态库libzip.so,操作系统中也自带了一个zlib解压缩的动态库:libz.so,SparkSQL在读写文件时调用的是操作系统中的libz.so中的解压缩算法,其他解压缩场景如broadcast,shuffle,checkpoint调用的是jdk自带的libzip.so中的解压缩算法。Spark中的Spark core包和Spark SQL包负责压缩算法的选择与调用。在这两个包中,分别使用特征CompressionCodec和CompressionCodecs支持不同类型的压缩算法。基于特征的设计使得Spark在其生命周期内更易扩展。因此本方案主要的设计思路:实现Spark中的特征CompressionCodec和CompressionCodecs,使Spark支持KAEzip压缩算法;将Spark中对KAEzip中的压缩方法(deflate)和解压缩方法(inflate)的调用,重定向到操作系统中的libkaezip.so动态库(自实现),最终实现Spark对KAEzip的支持。具体思路如图1所示。        Spark SQL提供了spark.read/spark.write等方法对本地文件或hdfs上的文件进行读写操作。根据不同的读写对象,这些可以进一步细分。比如spark.read可以细分为spark.read.text/spark.read.orc/sparkread.parquet等等,其中每个方法都可以设置不同的压缩算法,实现对文件的压缩和解压缩。为了让这些方法支持KAEzip压缩算法。我们的扩展可分为三个部分。首先,扩展CompressionCodecs特征,使Spark能够识别kaezip关键字,并映射到指定类上。该类实现了CompressionCodecs特征中规定的压缩/解压缩方法。因此对CompressionCodecs特征中的压缩/解压缩的调用,对最终被重定向到该类中的相应方法。其次,对ORC和parquet进行扩展。它们并不属于Spark或Hadoop,而是两个相互独立的Apache项目。它们目前并不支持KAEzip压缩算法,因此需要扩展,使其能够识别zlib和kaezip关键字,并将对应的关键字映射到指定的类上。最后,指定类对CompressionCodecs特征中规定的压缩/解压缩的实现依赖libkaezip.so动态库中提供的相应方法。因此需要根据需要动态的激活或者去活该动态库,确保只有在KAEzip压缩算法被调用时才被激活。具体过程如下图所示。除了文件的读写操作外,很多Spark应用程序也可以选择压缩算法,对执行过程中产生的数据进行压缩和解压缩。这一功能由Spark core实现。因此扩展Spark core是本方案的第二重点。具体实现方案如图3所示。首先,创建两个新类KaeInputStream和KaeOutputStream,它们分别调用KaeInflaterInputStream和KaeDeflateOutputStream类中的相关方法,实现CompressionCodeC中的相应方法,实现对数据流的压缩和解压缩。进一步,KaeInflaterInputStream和KaeDeflateOutputStream类都包含一个字节数组,用于临时存储数据流,然后分别调用类KaeInflater和类KaeDeflater,实现对数据流的压缩和解压缩。通过这六个类对Spark core进行扩展,实现Spark对kaezip的支持。最后,KaeInflater和KaeDeflater并未真正实现压缩和解压方法,而是通过JNI方式调用到操作系统中的libz.so动态库中提供的inflate()方法和deflater()方法。为此,我们还需创建libnativezip.so动态库,作为链接KaeInflater和KaeDeflater和libz.so之间的桥梁。 KaeInflater和KaeDeflater中申明和使用的native方法由libnativezip.so中的相应方法负责实现,而它们的实现会调用到libz.so中的相应方法。最终构建起KaeInflater和KaeDeflater和libz.so之间的桥梁。通过动态地替换libz.so为libkaezip.so即可实现Spark支持KAEzip压缩算法。在方案实现中主要有以下重难点:在OpenEuler系统中搭建大数据环境:JDK,Hadoop(ARM版),Spark;通过Makefile编译libkaezip.so动态库,使其能够调用到OpenEuler中的libz.so;Java通过JNI与C/C++动态库进行交互,在Java层面调用libkaezip.so;软链接的切换;Spark CompressionCodec模块源码修改,扩展KAEzip压缩功能;OpenEuler测试环境搭建,并构建测试数据(1G~50G);测试脚本编写并测试心得体会参与本次鲲鹏使Spark能KAEzip压缩项目,让我学到了很多:对Spark压缩过程更加了解了,对Spark源码理解更加深入了,了解了Java JNI以及MakeFile编译过程,动态库的调用方式等等。但收获更多的是思维方式,在做一个项目之前最重要的就是项目方案的设计,不要拿到项目就为了追求速度一股脑的去做,多画点时间在方案设计上,对后面项目实现部分会有很大的帮助。要通过实验验证自己的想法,相信直觉不如相信数据,就比如一开始我们以为Java中native调用的是动态库中的方法,但我们误以为这个动态库是在操作系统中的动态库,但经过实验验证这个动态库是Java自带的动态库。重要文档,重要结论,重要问题随时做好记录,可能后面就会忘记。我们在实现了Spark使能KAEzip压缩的基础上,同时对KAEzip压缩算法做出了一些扩展,如在Java层面通过加入Buffer提高KAEzip解压缩性能,同时还发现了Zlib&KAEzip压缩算法并不支持连续的块压缩(将两个文件同时读到内存中并合并,压缩后的结果只有第一个文件的内容,这是因为Zlib压缩后有一个文件结尾标识符),我们也将此问题反馈给了合作方,在源码方面进一步解决该问题。在这次项目中真的学到了很多,再次感谢鲲鹏众智让我能够有机会参与本次项目,最后希望KAEzip压缩算法能够越做越好,鲲鹏920 CPU能够走出国门,真正实现CPU国产化!!
  • [问题求助] 【基于鲲鹏应用使能套件实现Spark算法优化】【预置环境】实验第一步弹性云服务没有权限
    【功能模块】【操作步骤&问题现象】1、登录之后,点击弹性云服务ECS显示没有权限,要求联系安全管理员【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [最佳实践] 使用IntelliJ IDEA Java创建和使用Spark UDTF
    操作场景DLI支持用户使用Hive UDTF(User-Defined Table-Generating Functions)自定义表值函数,UDTF用于解决一进多出业务场景,即其输入与输出是一对多的关系,读入一行数据,输出多个值。约束限制在DLI Console上执行UDTF相关操作时,需要使用自建的SQL队列。跨账号使用UDTF时,除了创建UDTF函数的用户,其他用户如果需要使用时,需要先进行授权才可使用对应的UDTF函数。授权操作参考如下:登录DLI管理控制台,选择“ 数据管理 > 程序包管理”页面,选择对应的UDTF Jar包,单击“操作”列中的“权限管理”,进入权限管理页面,单击右上角“授权”,勾选对应权限。自定义函数中引用static类或接口时,必须要加上“try catch”异常捕获,否则可能会造成包冲突,导致函数功能异常。环境准备在进行UDTF开发前,请准备以下开发环境。表1 UDTF开发环境准备项说明操作系统Windows系统,支持Windows7以上版本。安装JDKJDK使用1.8版本。安装和配置IntelliJ IDEAIntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。安装Maven开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。开发流程DLI下UDTF函数开发流程参考如下:图1 UDTF开发流程表2 开发流程说明序号阶段操作界面说明1新建Maven工程,配置pom文件IntelliJ IDEA参考操作步骤说明,编写UDTF函数代码。2编写UDTF函数代码3调试,编译代码并导出Jar包4上传Jar包到OBSOBS控制台将生成的UDTF函数Jar包文件上传到OBS目录下。5创建DLI的UDTF函数DLI控制台在DLI控制台的SQL作业管理界面创建使用的UDTF函数。6验证和使用DLI的UDTF函数DLI控制台在DLI作业中使用创建的UDTF函数。操作步骤新建Maven工程,配置pom文件。以下通过IntelliJ IDEA 2020.2工具操作演示。打开IntelliJ IDEA,选择“File > New > Project”。图2 新建Project选择Maven,Project SDK选择1.8,单击“Next”。定义样例工程名和配置样例工程存储路径,单击“Finish”完成工程创建。在pom.xml文件中添加如下配置。<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency></dependencies>图3 pom文件中添加配置在工程路径的“src > main > java”文件夹上鼠标右键,选择“New > Package”,新建Package和类文件。Package根据需要定义,本示例定义为:“com.huawei.demo”,完成后回车。在包路径下新建Java Class文件,本示例定义为:UDTFSplit。编写UDTF函数代码。完整样例代码请参考样例代码。UDTF的类需要继承“org.apache.hadoop.hive.ql.udf.generic.GenericUDTF”,实现initialize,process,close三个方法。UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息,如,返回个数,类型等。初始化完成后,会调用process方法,真正处理在process函数中,在process中,每一次forward()调用产生一行。如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。public void process(Object[] args) throws HiveException { // TODO Auto-generated method stub if(args.length == 0){ return; } String input = args[0].toString(); if(StringUtils.isEmpty(input)){ return; } String[] test = input.split(";"); for (int i = 0; i < test.length; i++) { try { String[] result = test.split(":"); forward(result); } catch (Exception e) { continue; } } }最后调用close方法,对需要清理的方法进行清理。编写调试完成代码后,通过IntelliJ IDEA工具编译代码并导出Jar包。单击工具右侧的“Maven”,参考下图分别单击“clean”、“compile”对代码进行编译。编译成功后,单击“package”对代码进行打包。图4 编译打包打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\MyUDTF\target”下名为“MyUDTF-1.0-SNAPSHOT.jar”。登录OBS控制台,将生成的Jar包文件上传到OBS路径下。说明:Jar包文件上传的OBS桶所在的区域需与DLI的队列区域相同,不可跨区域执行操作。(可选)可以将Jar包文件上传到DLI的程序包管理中,方便后续统一管理。登录DLI管理控制台,单击“数据管理 > 程序包管理”。在“程序包管理”页面,单击右上角的“创建”创建程序包。在“创建程序包”对话框,配置以下参数。包类型:选择“JAR”。OBS路径:程序包所在的OBS路径。分组设置和组名称根据情况选择设置,方便后续识别和管理程序包。单击“确定”,完成创建程序包。创建DLI的UDTF函数。登录DLI管理控制台,单击“SQL编辑器”,执行引擎选择“spark”,选择已创建的SQL队列和数据库。图5 选择队列和数据库在SQL编辑区域输入下列命令创建UDTF函数,单击“执行”提交创建。CREATE FUNCTION mytestsplit AS 'com.huawei.demo.UDTFSplit' using jar 'obs://dli-test-obs01/MyUDTF-1.0-SNAPSHOT.jar';重启原有SQL队列,使得创建的UDTF函数生效。登录数据湖探索管理控制台,选择“队列管理”,在对应“SQL队列”类型作业的“操作”列,单击“重启”。在“重启队列”界面,选择“确定”完成队列重启。验证和使用创建的UDTF函数。在查询语句中使用6中创建的UDTF函数,如:select mytestsplit('abc:123\;efd:567\;utf:890');图6 执行结果(可选)删除UDTF函数。如果不再使用该Function,可执行以下语句删除UDTF函数:Drop FUNCTION mytestsplit;样例代码UDTFSplit.java完整的样例代码参考如下所示:package com.huawei.demo;import java.util.ArrayList;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;public class UDTFSplit extends GenericUDTF { @Override public void close() throws HiveException { // TODO Auto-generated method stub } @Override public void process(Object[] args) throws HiveException { // TODO Auto-generated method stub if(args.length == 0){ return; } String input = args[0].toString(); if(StringUtils.isEmpty(input)){ return; } String[] test = input.split(";"); for (int i = 0; i < test.length; i++) { try { String[] result = test.split(":"); forward(result); } catch (Exception e) { continue; } } } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("col1"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("col2"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); }}
  • [最佳实践] 使用IntelliJ IDEA Java创建和使用Spark UDF
    操作场景DLI支持用户使用Hive UDF(User Defined Function,用户定义函数)进行数据查询等操作,UDF只对单行数据产生作用,适用于一进一出的场景。约束限制在DLI Console上执行UDF相关操作时,需要使用自建的SQL队列。跨账号使用UDF时,除了创建UDF函数的用户,其他用户如果需要使用时,需要先进行授权才可使用对应的UDF函数。授权操作参考如下:登录DLI管理控制台,选择“ 数据管理 > 程序包管理”页面,选择对应的UDF Jar包,单击“操作”列中的“权限管理”,进入权限管理页面,单击右上角“授权”,勾选对应权限。自定义函数中引用static类或接口时,必须要加上“try catch”异常捕获,否则可能会造成包冲突,导致函数功能异常。环境准备在进行UDF开发前,请准备以下开发环境。表1 UDF开发环境准备项说明操作系统Windows系统,支持Windows7以上版本。安装JDKJDK使用1.8版本。安装和配置IntelliJ IDEAIntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。安装Maven开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。开发流程DLI下UDF函数开发流程参考如下:图1 开发流程表2 开发流程说明序号阶段操作界面说明1新建Maven工程,配置pom文件IntelliJ IDEA参考操作步骤说明,编写UDF函数代码。2编写UDF函数代码3调试,编译代码并导出Jar包4上传Jar包到OBSOBS控制台将生成的UDF函数Jar包文件上传到OBS目录下。5创建DLI的UDF函数DLI控制台在DLI控制台的SQL作业管理界面创建使用的UDF函数。6验证和使用DLI的UDF函数DLI控制台在DLI作业中使用创建的UDF函数。操作步骤新建Maven工程,配置pom文件。以下通过IntelliJ IDEA 2020.2工具操作演示。打开IntelliJ IDEA,选择“File > New > Project”。图2 新建Project选择Maven,Project SDK选择1.8,单击“Next”。定义样例工程名和配置样例工程存储路径,单击“Finish”完成工程创建。在pom.xml文件中添加如下配置。<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency></dependencies>图3 pom文件中添加配置在工程路径的“src > main > java”文件夹上鼠标右键,选择“New > Package”,新建Package和类文件。Package根据需要定义,本示例定义为:“com.huawei.demo”,完成后回车。在包路径下新建Java Class文件,本示例定义为:SumUdfDemo。编写UDF函数代码。UDF函数实现,主要注意以下几点:自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。需要实现evaluate函数,evaluate函数支持重载。详细UDF函数实现,可以参考如下样例代码:package com.huawei.demo;import org.apache.hadoop.hive.ql.exec.UDF; public class SumUdfDemo extends UDF { public int evaluate(int a, int b) { return a + b; } }编写调试完成代码后,通过IntelliJ IDEA工具编译代码并导出Jar包。单击工具右侧的“Maven”,参考下图分别单击“clean”、“compile”对代码进行编译。编译成功后,单击“package”对代码进行打包。打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\MyUDF\target”下名为“MyUDF-1.0-SNAPSHOT.jar”。登录OBS控制台,将生成的Jar包文件上传到OBS路径下。说明:Jar包文件上传的OBS桶所在的区域需与DLI的队列区域相同,不可跨区域执行操作。(可选)可以将Jar包文件上传到DLI的程序包管理中,方便后续统一管理。登录DLI管理控制台,单击“数据管理 > 程序包管理”。在“程序包管理”页面,单击右上角的“创建”创建程序包。在“创建程序包”对话框,配置以下参数。包类型:选择“JAR”。OBS路径:程序包所在的OBS路径。分组设置和组名称根据情况选择设置,方便后续识别和管理程序包。单击“确定”,完成创建程序包。创建UDF函数。登录DLI管理控制台,单击“SQL编辑器”,执行引擎选择“spark”,选择已创建的SQL队列和数据库。图4 选择队列和数据库在SQL编辑区域输入下列命令创建UDF函数,单击“执行”提交创建。CREATE FUNCTION TestSumUDF AS 'com.huawei.demo.SumUdfDemo' using jar 'obs://dli-test-obs01/MyUDF-1.0-SNAPSHOT.jar';重启原有SQL队列,使得创建的Function生效。登录数据湖探索管理控制台,选择“队列管理”,在对应“SQL队列”类型作业的“操作”列,单击“重启”。在“重启队列”界面,选择“确定”完成队列重启。使用UDF函数。在查询语句中使用6中创建的UDF函数:select TestSumUDF(1,2);图5 执行结果(可选)删除UDF函数。如果不再使用UDF函数,可执行以下语句删除该函数:Drop FUNCTION TestSumUDF;
  • [最佳实践] 使用Spark Jar作业读取和查询OBS数据
    操作场景DLI完全兼容开源的Apache Spark,支持用户开发应用程序代码来进行作业数据的导入、查询以及分析处理。本示例从编写Spark程序代码读取和查询OBS数据、编译打包到提交Spark Jar作业等完整的操作步骤说明来帮助您在DLI上进行作业开发。环境准备在进行Spark Jar作业开发前,请准备以下开发环境。表1 Spark Jar作业开发环境准备项说明操作系统Windows系统,支持Windows7以上版本。安装JDKJDK使用1.8版本。安装和配置IntelliJ IDEAIntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。安装Maven开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。开发流程DLI进行Spark Jar作业开发流程参考如下:图1 Spark Jar作业开发流程表2 开发流程说明序号阶段操作界面说明1创建DLI通用队列DLI控制台创建作业运行的DLI队列。2上传数据到OBS桶OBS控制台将测试数据上传到OBS桶下。3新建Maven工程,配置pom文件IntelliJ IDEA参考样例代码说明,编写程序代码读取OBS数据。4编写程序代码5调试,编译代码并导出Jar包6上传Jar包到OBS和DLIOBS控制台将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。7创建Spark Jar作业DLI控制台在DLI控制台创建Spark Jar作业并提交运行作业。8查看作业运行结果DLI控制台查看作业运行状态和作业运行日志。步骤1:创建DLI通用队列第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。在DLI管理控制台的左侧导航栏中,选择“队列管理”。单击“队列管理”页面右上角“购买队列”进行创建队列。创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。单击“立即购买”,确认配置。配置确认无误,单击“提交”完成队列创建。步骤2:上传数据到OBS桶根据如下数据,创建people.json文件。{"name":"Michael"}{"name":"Andy", "age":30}{"name":"Justin", "age":19}进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。单击左侧列表中的“对象”,选择“上传对象”,将people.json文件上传到OBS桶根目录下。在OBS桶根目录下,单击“新建文件夹”,创建名为“result”的文件夹。单击“result”的文件夹,在“result”下单击“新建文件夹”,创建名为“parquet”的文件夹。步骤3:新建Maven工程,配置pom依赖以下通过IntelliJ IDEA 2020.2工具操作演示。打开IntelliJ IDEA,选择“File > New > Project”。图2 新建Project选择Maven,Project SDK选择1.8,单击“Next”。定义样例工程名和配置样例工程存储路径,单击“Finish”完成工程创建。如上图所示,本示例创建Maven工程名为:SparkJarObs,Maven工程路径为:“D:\DLITest\SparkJarObs”。在pom.xml文件中添加如下配置。<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency></dependencies>图3 修改pom.xml文件在工程路径的“src > main > java”文件夹上鼠标右键,选择“New > Package”,新建Package和类文件。Package根据需要定义,本示例定义为:“com.huawei.dli.demo”,完成后回车。在包路径下新建Java Class文件,本示例定义为:SparkDemoObs。步骤4:编写代码编写SparkDemoObs程序读取OBS桶下的1的“people.json”文件,并创建和查询临时表“people”。完整的样例请参考完整样例代码参考,样例代码分段说明如下:导入依赖的包。import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import static org.apache.spark.sql.functions.col;通过当前帐号的AK和SK创建SparkSession会话spark 。SparkSession spark = SparkSession .builder() .config("spark.hadoop.fs.obs.access.key", "xxx") .config("spark.hadoop.fs.obs.secret.key", "yyy") .appName("java_spark_demo") .getOrCreate();"spark.hadoop.fs.obs.access.key"参数对应的值"xxx"需要替换为帐号的AK值。"spark.hadoop.fs.obs.secret.key"参数对应的值“yyy”需要替换为帐号的SK值。AK和SK值获取请参考:如何获取AK和SK。读取OBS桶中的“people.json”文件数据。其中“dli-test-obs01”为演示的OBS桶名,请根据实际的OBS桶名替换。Dataset<Row> df = spark.read().json("obs://dli-test-obs01/people.json");df.printSchema();通过创建临时表“people”读取文件数据。df.createOrReplaceTempView("people");查询表“people”数据。Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");sqlDF.show();将表“people”数据以parquet格式输出到OBS桶的“result/parquet”目录下。sqlDF.write().mode(SaveMode.Overwrite).parquet("obs://dli-test-obs01/result/parquet");spark.read().parquet("obs://dli-test-obs01/result/parquet").show();关闭SparkSession会话spark。spark.stop();步骤5:调试、编译代码并导出Jar包单击IntelliJ IDEA工具右侧的“Maven”,参考下图分别单击“clean”、“compile”对代码进行编译。编译成功后,单击“package”对代码进行打包。打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarObs\target”下名为“SparkJarObs-1.0-SNAPSHOT.jar”。步骤6:上传Jar包到OBS和DLI下登录OBS控制台,将生成的“SparkJarObs-1.0-SNAPSHOT.jar”Jar包文件上传到OBS路径下。将Jar包文件上传到DLI的程序包管理中,方便后续统一管理。登录DLI管理控制台,单击“数据管理 > 程序包管理”。在“程序包管理”页面,单击右上角的“创建”创建程序包。在“创建程序包”对话框,配置以下参数。包类型:选择“JAR”。OBS路径:程序包所在的OBS路径。分组设置和组名称根据情况选择设置,方便后续识别和管理程序包。单击“确定”,完成创建程序包。步骤7:创建Spark Jar作业登录DLI控制台,单击“作业管理 > Spark作业”。在“Spark作业”管理界面,单击“创建作业”。在作业创建界面,配置对应作业运行参数。具体说明如下:所属队列:选择已创建的DLI通用队列。例如当前选择步骤1:创建DLI通用队列创建的通用队列“sparktest”。作业名称(--name):自定义Spark Jar作业运行的名称。当前定义为:SparkTestObs。应用程序:选择步骤6:上传Jar包到OBS和DLI下中上传到DLI程序包。例如当前选择为:“SparkJarObs-1.0-SNAPSHOT.jar”。主类:格式为:程序包名+类名。例如当前为:com.huawei.dli.demo.SparkDemoObs。其他参数可暂不选择,想了解更多Spark Jar作业提交说明可以参考创建Spark作业。图4 创建Spark Jar作业单击“执行”,提交该Spark Jar作业。在Spark作业管理界面显示已提交的作业运行状态。步骤8:查看作业运行结果在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。如果作业运行成功则作业状态显示为“已成功”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志。图5 “Driver日志”中的作业执行日志如果作业运行成功,本示例进入OBS桶下的“result/parquet”目录,查看已生成预期的parquet文件。如果作业运行失败,单击“操作”列“更多”下的“Driver日志”,显示具体的报错日志信息,根据报错信息定位问题原因。例如,如下截图信息因为创建Spark Jar作业时主类名没有包含包路径,报找不到类名“SparkDemoObs”。可以在“操作”列,单击“编辑”,修改“主类”参数为正确的:com.huawei.dli.demo.SparkDemoObs,单击“执行”重新运行该作业即可。后续指引如果您想通过Spark Jar作业访问其他数据源,请参考《使用Spark作业跨源访问数据源》。如果您想通过Spark Jar作业在DLI创建数据库和表,请参考《使用Spark作业访问DLI元数据》。完整样例代码参考package com.huawei.dli.demo;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import static org.apache.spark.sql.functions.col;public class SparkDemoObs { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.hadoop.fs.obs.access.key", "xxx") .config("spark.hadoop.fs.obs.secret.key", "yyy") .appName("java_spark_demo") .getOrCreate(); // can also be used --conf to set the ak sk when submit the app // test json data: // {"name":"Michael"} // {"name":"Andy", "age":30} // {"name":"Justin", "age":19} Dataset<Row> df = spark.read().json("obs://dli-test-obs01/people.json"); df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ sqlDF.write().mode(SaveMode.Overwrite).parquet("obs://dli-test-obs01/result/parquet"); spark.read().parquet("obs://dli-test-obs01/result/parquet").show(); spark.stop(); }}
  • [其他] MapReduce 使用hue提交自己的spark jar 包不能执行成功,
    【功能模块】【操作步骤&问题现象】1、请问有具体的配置操作吗?2、产品文档的介绍太粗糙了【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [认证交流] 论微认证《大数据板块》大满贯
    什么是微认证?          华为云微认证是基于线上学习与在线实践,快速获得场景化技能提升的认证。微认证清单 - 大数据1、基于Spark实现车主驾驶行为分析未来城市交通是智能交通融合的场景,车与周围环境是一个紧密联系的实体,通过Spark数据分析,会让您加快了解智能汽车领域。基于Spark实现车主驾驶行为分析_华为云培训中心-华为云 (huaweicloud.com)2、球星薪酬决定性因素分析随着大数据、云计算的发展,数据规模也随之扩大,也更加关注数据的存放、处理以及分析。利用数据仓库服务,带您探索球星薪酬影响的决定性因素。薪酬数据大数据分析技术学习认证_球星薪酬决定性因素分析微认证_华为云学院-华为云 (huaweicloud.com)3、基于流计算的双十一大屏开发案例面对每天大量的实时数据,及时、高效的处理这些数据显得十分必要。本课程主要介绍如何搭建一个可视化大屏,为企业提供精准、高效的支持。可视化大屏大数据分析技术学习认证_基于流计算的双十一大屏开发案例微认证_华为云学院-华为云 (huaweicloud.com)4、使用DLI Flink SQL进行电商实时业务数据分析开发电商通常有web,小程序等多种接入方式,为掌握其实时变化,需统计各平台的实时访问量、订单数等,从而针对性地调整营销策略。使用DLI Flink SQL进行电商实时业务数据分析开发_华为云培训中心-华为云 (huaweicloud.com)5、逃杀游戏数据分析随着电竞行业的火热发展,用户数据分析成为急需解决的问题。借助大数据平台服务进行数据分析,能妥善处理海量的用户数据,帮助游戏厂商和俱乐部进行更好的战略决策。大数据分析游戏电竞行业技术学习认证_逃杀游戏数据分析微认证_华为云学院-华为云 (huaweicloud.com)6、黑色星期五消费者行为研究大数据时代消费者行为复杂多样,通过对消费者行为进行数据分析,找寻其中的变化规律,对用户进行定位进而优化销售方式。用户消费行为分析技术学习认证_黑色星期五消费者行为研究微认证_华为云学院-华为云 (huaweicloud.com)7、网站消费者行为分析大数据时代背景下,用户消费数据暗藏许多商机。通过网站用户消费行为分析实践,了解华为云大数据产品的使用方法,帮助商户发掘潜在客户。消费行为大数据分析技术学习认证_网站消费者行为分析微认证_华为云学院-华为云 (huaweicloud.com)8、外卖红包推送策略及菜品推荐随着外卖业务快速增长,如何实现客户、商家的共赢?本课程借助华为大数据方案进行客户画像,实现外卖红包推送策略及菜品推荐。大数据分析智能推荐技术学习认证_外卖红包推送策略及菜品推荐微认证_华为云学院-华为云 (huaweicloud.com)9、车联网大数据驾驶行为分析作为智能交通的基础,车联网的应用预示着工业技术,交通效率,出行方式的重大改变。微认证为您揭秘车联网大数据背后的密码,实现科学高效的车队管理。无人驾驶数据分析技术学习认证_车联网大数据驾驶行为分析微认证_华为云学院-华为云 (huaweicloud.com)
  • [认证交流] 微认证 - 大数据板块 -《使用DLI Flink SQL进行电商实时业务数据分析开发》 - 学习分享
    什么是微认证?          华为云微认证是基于线上学习与在线实践,快速获得场景化技能提升的认证。微认证清单 - 大数据使用DLI Flink SQL进行电商实时业务数据分析开发     课程简介:电商通常有web,小程序等多种接入方式,为掌握其实时变化,需统计各平台的实时访问量、订单数等,从而针对性地调整营销策略。     课程结构:电商实时业务应用场景介绍8认识电商常用的实时业务特点及应用电商实业业务对应大数据技术组件的原理47了解实现电商网站数据实时计算的相关大数据技术特性及原理华为云实时流计算Flink及解决方案7掌握华为云实时流计算Flink及解决方案及相应应用华为云实战案例15掌握华为云实时流计算Flink验流程及开发思路     1、电商实时业务应用场景介绍          电商从2009年发展至今,当前线上购物无疑是最火热的购物方式,而电商平台则又可以以多种方式接入,例如通过web方式访问、通过app的方式访问、通过微信小程序的方式访问等等。电商平台则需要每天统计各平台的实时访问数据量、订单数、访问人数等等          指标,从而能在显示大屏上实时展示相关数据,方便及时了解数据变化,有针对性地调整营销策略。这些高效快捷地统计指标是如何获得的呢?这是我们这次课程及实验所需要理解学习的          当前有很多电商的大数据平台会将每个商品的订单信息实时写入Kafka中,这些信息包括订单ID、订单生成的渠道(即web方式、app方式等)、订单时间、订单金额、折扣后实际支付金额、支付时间、用户ID、用户姓名、订单地区ID等信息。          针对业务场景,我们在大数据分析业务需要做的,就是根据当前可以获取到的业务数据,实时统计每种渠道的相关指标,输出存储到数据库中,并进行大屏展示。     2、电商实时业务对应大数据技术组件的原理        (1)流计算                概述         流式计算就像汽车过收费站,每一个车在通过闸口时都要收费。流式计算中每个实时产生的数据都要被实时的处理。        流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行离线处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。                应用场景                主要框架       Kafka        Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeepert办调的分布式日志系统。       主要应用场景是:日志收集系统和消息系统。        分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。       Kafka就是一种发布-订阅模式。 Spark                Spark简介          2009年诞生于美国加州大学伯克利分校AMP实验室。          Apache Spark是一种基于内存的快速、通用、可扩展的大数据计算引擎。          Spark 是一站式解决方案,集批处理(Spark Core )、实时流处理(Spark Streaming )、交互式查询(Spark SQL )、图计算(GraphX )与机器学习(MLLib )于一体。                  Spark应用场景           批处理可用于ETL (抽取、转换、加载)。          机器学习可用于自动判断淘宝的买家评论是好评还是差评。          交互式分析可用于查询Hive数据仓库。          流处理可用于页面点击流分析,推荐系统,舆情分析等实时业务。                 Spark架构                Spark特点                       SparkStreaming          Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据。        Flink华为云数据可视化DLI          产品概述数据湖探索(Data Lake Insight,简称DLI)是完全兼容Apache Spark和Apache Flink生态,实现批流一体的Serverless大数据计算分析服务。DLI支持多模引擎,企业仅需使用SQL或程序就可轻松完成异构数据源的批处理、流处理、内存计算、机器学习等,挖掘和探索数据价值。          特点          应用场景:电商行业数据可视化          概述            广义:指一切能够把抽象、枯燥或难以理解的内容,包括看似毫无意义的数据、信息、知识等以一种容易理解的视觉方式展示出来的技术。            狭义:利用计算机图形学和图像处理技术,将数据转换为图形或图像在屏幕上显示出来,并进行各种交互处理的理论、方法和技术。          发展          工具        华为云数据可视化DLV          概述         数据可视化(Data Lake Visualization,简称DLV)是一站式数据可视化开发平台,适配云上云下多种数据源,提供丰富多样的2D、3D可视化组件,采用拖搜式自由布局。          特点          应用场景:某企业安全态势感知     3、华为云实时流计算Flink及解决方案        基于实时流计算的可视化解决方案        解决方案应用场景之智慧城市          智慧城市是通过对大量实时数据的监控、采集和处理,为复杂问题做出快速响应。智慧城市涉及范围很广,智慧城市建设主要包括政务、交通、企业、民生等方面。         解决方案应用场景之实时推荐          根据用户行为数据(包含历史数据和实时数据),通过构建的推荐模型对用户行为秒级调整并生成对应的推荐列表,分钟级更新候选集。          实时推荐主要包括广告推荐、商品推荐、视频推荐、游戏推荐等。     动手实验:         流程介绍 实验单独学习链接:华为云原生大数据serverless服务DLI_在线课程_华为云开发者学堂_云计算培训-华为云 (huaweicloud.com)
  • [认证交流] 微认证 - 大数据板块 -《基于Spark实现车主驾驶行为分析》 - 学习分享
    什么是微认证?          华为云微认证是基于线上学习与在线实践,快速获得场景化技能提升的认证。微认证清单 - 大数据          前景概述:该课程的考试内容部分来自于 微认证课程《车联网大数据驾驶行为分析》,因为这是该课程的早期前身版本,车联网的概述介绍等基于Spark实现车主驾驶行为分析     课程简介:未来城市交通是智能交通融合的场景,车与周围环境是一个紧密联系的实体,基于此背景,我们使用华为云MRS服务中的Spark组件来分析统计指定时间内,车主急加速、急刹车、空挡滑行、超速、疲劳驾驶等违法行为的次数。结合实际的案例,能够让我们更好的掌握Spark及MRS的使用。     课程结构:车联网的背景及案例4了解车联网的背景及应用场景华为车联网常用EI服务介绍31了解华为车联网EI服务,包括OBS和MRSSpark车主驾驶行为分析实验介绍18掌握MRS服务的使用,学习Spark程序的执行过程1、车联网的背景及案例      汽车技术重大变革的历程     技术革命引爆出行方式变革,智能、网联成就智慧出行    未来场景驱动汽车行业数字化转型            典型应用1 - 车辆监控及历史信息的统计分析     典型应用2 - 站点、班线、任务、区域管理2、华为车联网常用EI服务介绍     对象存储服务 OBS(基础服务)          一个基于对象的存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力,使用时无需考虑容量限制,并且提供多种存储类型供选择,满足客户各类业务场景诉求        业务模型           每个租户在OBS只能创建100给桶(所以区域桶个数之和不超过100),桶的名字在OBS系统内唯一,如果租户准备使用的桶名已经被其他租户使用需要更换桶名使用。         可靠性        数据可靠性保证机制           冗余分片分别放在不同服务器中,小于冗余个数的服务器故障业务完全不受影响; EC算法替代3副本,存储利用率从33%提升到80%+; 一个对象会被拆分为2048~4096个条带,单对象理论峰值带宽:2400 MB/s ~ 4800 MB/s。           多AZ冗余算法使磁盘利用率达到55%,同时支持1个AZ完全故障 为了减少AZ间恢复流量,每个AZ有2份AZ内冗余数据,AZ内故障两给服务器,数据恢复能在AZ内完成。           服务器或者硬盘故障时,后台会以2 TB/s的速度触发重构,确保数据在最短时间内恢复到正常状态。         存算分离方案      MapReduce服务 MRS           为客户提供Hudi、ClickHouse、Spark、Flink、Kafka、HBase等Hadoop生态的高性能大数据组件,支持数据湖、数据仓库、BI、AI融合等能力。          MRS同时支持混合云和公有云两种形态:            混合云版本,一个架构实现离线、实时、逻辑三种数据湖,以云原生架构助力客户智能升级;            公有云版本,协助客户快速构建低成本、灵活开放、安全可靠的一站式大数据平台。        架构图        产品优势     Spark        Spark简介          2009年诞生于美国加州大学伯克利分校AMP实验室。          Apache Spark是一种基于内存的快速、通用、可扩展的大数据计算引擎。          Spark 是一站式解决方案,集批处理(Spark Core )、实时流处理(Spark Streaming )、交互式查询(Spark SQL )、图计算(GraphX )与机器学习(MLLib )于一体。         Spark应用场景           批处理可用于ETL (抽取、转换、加载)。          机器学习可用于自动判断淘宝的买家评论是好评还是差评。          交互式分析可用于查询Hive数据仓库。          流处理可用于页面点击流分析,推荐系统,舆情分析等实时业务。         Spark架构        Spark特点        SparkSQL          Spark SQL是Spark中用于结构化数据处理的模块。          在Spark应用中,可以无缝的使用SQL语句亦或是DataFrame API对结构化数据进行查询。         SparkStreaming          Spark Streaming的基本原理是将实时输入数据流以时间片(秒级)为单位进行拆分,然后经Spark引擎以类似批处理的方式处理每个时间片数据。动手实验:实验避坑分享:论 《基于Spark实现车主驾驶行为分析》 实验避坑,100%完成_华为云开发者学堂_华为云论坛 (huaweicloud.com)下一期预告:使用DLI Flink SQL进行电商实时业务数据分析开发
总条数:134 到第
上滑加载中