• [技术干货] 开源工作流引擎Azkaban在MRS中的实践[转载]
    环境输入实践版本:Apache azkaban 4.0.0 (以单机版举例,集群版本配置过程类似),MRS 3.1.0 普通集群。Azkaban 插件地址Azkaban 官网Azkaban 源码地址安装azkaban-solo-serverAzkaban不提供二进制包,需要用户下载源码编译打包,获取到“azkaban-solo-server.zip”和“azkaban-db.zip”。环境准备。在华为云购买Linux弹性云服务器ECS,用于安装运行MRS集群客户端和Azkaban,并绑定弹性公网IP。在弹性云服务器ECS上安装运行MRS集群客户端,例如安装目录为“/opt/client”。准备数据表,参考MySQL教程。安装MySQL并授予本机访问权限。注意:Azkaban 4.0.0版本默认适配MySQL 5.1.28版本。创建Azkaban数据库,解压“azkaban-db.zip”获取“create-all-sql-*.sql”,并初始化。上传安装包并解压上传“azkaban-solo-server.zip”至“/opt/azkaban”目录执行以下命令解压并删除安装包unzip azkaban-solo-server.ziprm -f unzip azkaban-solo-server.zip修改配置文件“azkaban-solo-server/conf/azkaban.properties”配置端口根据实际情况修改,“jetty.port”和“mysql.port”端口号可使用默认值jetty.port=8081 database.type=mysql mysql.port=3306 mysql.host=x.x.x.x mysql.database=azkaban mysql.user=xxx mysql.password=xxx启动azkaban-solo-serversource /opt/client/bigdata_envcd /opt/azkaban/azkaban-solo-serversh bin/start-solo.sh访问Azkaban WEB UI在浏览器输入“http://ECS弹性IP:port”网址,进入Azkaban WebUI登录界面,输入用户信息登录Azkaban服务。说明默认端口(port):8081;用户名/密码:azkaban/azkaban;用户账号配置文件: /opt/azkaban/azkaban-solo-server/conf/azkaban-users.xmlazkaban-hdfs-viewer plugin配置指导连接HDFS需要用户下载源码编译获取“az-hdfs-viewer.zip”,并已完成安装azkaban-solo-server。环境准备配置Azkaban用户,添加supergroup用户组授予访问HDFS权限在HDFS的配置文件“core-stie.xml”中增加Azkaban代理用户a. 登录Manager页面,选择“集群 > 服务 > HDFS > 配置 > 全部配置 > HDFS(服务) > 自定义”b. 在参数文件“core-site.xml”中添加如下配置项:名称值hadoop.proxyuser.azkaban.groups*hadoop.proxyuser.azkaban.hosts*c. 配置完成后,单击左上角“保存”d. 选择“概览 > 更多 > 重启服务”输入密码后重启HDFS服务上传安装包并解压上传“az-hdfs-viewer.zip”至“/opt/azkaban/azkaban-solo-server/plugins/viewer”目录执行以下命令解压并删除安装包unzip az-hdfs-viewer.ziprm -f az-hdfs-viewer.zip重命名解压后的文件名为"hdfs"mv az-hdfs-viewer hdfs修改并保存配置文件修改"azkaban-solo-server/plugins/viewer/hdfs/conf/plugin.properties"文件中的代理用户为步骤1中配置的Azkaban代理用户。修改"execute-as-user"的存放目录为Azkaban安装目录,如"opt/azkaban/azkaban-solo-server"。viewer.name=HDFS viewer.path=hdfs viewer.order=1 viewer.hidden=false viewer.external.classpaths=extlib/* viewer.servlet.class=azkaban.viewer.hdfs.HdfsBrowserServlet hadoop.security.manager.class=azkaban.security.HadoopSecurityManager_H_2_0 azkaban.should.proxy=false proxy.user=azkaban // mrs集群中配置的azkaban代理用户名 allow.group.proxy=true file.max.lines=1000 #Specifying the error message we want user to get when they don't have permissionsviewer.access_denied_message=The folder you are trying to access is protected. execute.as.user=false // execute-as-user存放目录 azkaban.native.lib=/opt/azkaban/azkaban-solo-server若不存在该文件需手动创建并配置以上内容拷贝HDFS插件所需包至"/opt/azkaban/azkaban-solo-server/extlib"目录cp /opt/client/HDFS/hadoop/share/hadoop/hdfs/*.jar /opt/azkaban/azkaban-solo-server/extlibcp /opt/client/HDFS/hadoop/share/hadoop/client/hadoop-client-api-3.1.1-mrs-2.0.jar /opt/azkaban/azkaban-solo-server/extlibcp /opt/client/HDFS/hadoop/share/hadoop/common/*.jar /opt/azkaban/azkaban-solo-server/extlib不同MRS版本所需Hadoop相关版本不同,通过find /opt/client查询目检查目录结构目录结构应当为:- azkaban-solo-server - bin - conf - extlib (hadoop相关插件第三方包) - lib - logs - plugins - jobtypes(job插件目录) - commonprivate.properties - hive - plugin.properties - private.properties - hadoopJava - plugin.properties - private.properties - viewer - hdfs - conf - plugin.properties - lib (az-hdfs-viewer.zip解压后的lib) - temp - web重启Azkaban-solo-server服务cd /opt/azkaban/azkaban-solo-serversh bin/shutdown-solo.shsh bin/start-solo.sh访问HDFS Browser在浏览器输入“http://ECS弹性IP:8081”网址,进入Azkaban WebUI登录界面,输入用户信息登录Azkaban服务单击"HDFS"plugins-jobtypes hadoop-job 部署运行安装azkaban-solo-server完成后,再部署验证hadoop-job环境准备获取"azkaban-plugins-3.0.0.zip"压缩包编译获取azkaban提供的hadoopjava-wordcount实例程序包“az-hadoop-jobtype-plugin.jar”上传插件配置文件解压"azkaban-plugins-3.0.0.zip" 获取"azkaban-plugins-3.0.0\plugins\jobtype\jobtypes"下"hadoopJava"文件夹将“hadoopJava”文件夹上传至“/plugin”目录。目录不存在则需新建修改配置文件"azkaban-solo-server/plugins/jobtypes/commonprivate.properties"# set execute-as-user execute.as.user=false hadoop.security.manager.class=azkaban.security.HadoopSecurityManager_H_2_0 azkaban.should.proxy=false obtain.binary.token=false proxy.user=azkaban // MRS集群中配置的Azkaban代理用户名 allow.group.proxy=true // execute-as-user存放目录 azkaban.native.lib=/opt/azkaban/azkaban-solo-server # hadoop hadoop.home=/opt/client/HDFS/hadoop //opt/client为MRS集群客户端安装目录 hive.home=/opt/client/Hive/Beeline spark.home=/opt/client/Spark/spark hadoop.classpath=${hadoop.home}/etc/hadoop,${hadoop.home}/share/hadoop/common/*,${hadoop.home}/share/hadoop/common/lib/*,${hadoop.home}/share/hadoop/hdfs/*,${hadoop.home}/share/hadoop/hdfs/lib/*,${hadoop.home}/share/hadoop/yarn/*,${hadoop.home}/share/hadoop/yarn/lib/*,${hadoop.home}/share/hadoop/mapreduce/*,${hadoop.home}/share/hadoop/mapreduce/lib/* jobtype.global.classpath=${hadoop.home}/etc/hadoop,${hadoop.home}/share/hadoop/common/*,${hadoop.home}/share/hadoop/common/lib/*,${hadoop.home}/share/hadoop/hdfs/*,${hadoop.home}/share/hadoop/hdfs/lib/*,${hadoop.home}/share/hadoop/yarn/*,${hadoop.home}/share/hadoop/yarn/lib/*,${hadoop.home}/share/hadoop/mapreduce/*,${hadoop.home}/share/hadoop/mapreduce/lib/*示例程序验证准备测试数据"input.txt"文件,文件内容可参考如下格式进行自定义,存放路径如"/opt/input.txt"Ross male 33 3674 Julie male 42 2019 Gloria female 45 3567 Carol female 36 2813通过HDFS客户端将测试数据"input.txt"上传至"hdfs /tmp/azkaban_test"a. 以客户端安装用户,登录安装客户端的节点b. 执行以下命令,切换到客户端安装目录 cd /opt/clientc. 执行以下命令配置环境变量 source bigdata_envd. 执行HDFS Shell命令上传文件 hdfs dfs -put /opt/input.txt /tmp/azkaban_test用户在本地编写并保存“wordcount.job”文件,内容如下type=hadoopJava job.extend=false job.class=azkaban.jobtype.examples.java.WordCount classpath=./lib/*,/opt/azkaban-solo-server-0.1.0-SNAPSHOT/lib/* force.output.overwrite=true input.path=/tmp/azkaban_test output.path=/tmp/azkaban_test_out在浏览器输入“http://ECS弹性IP:port”网址,进入Azkaban WebUI登录界面,输入用户信息登录Azkaban服务,提交job运行验证Spark command job—参考客户端命令spark任务有两种运行方式,一种是command方式,另一种是spark jobtype方式。Command方式:需要指定spark_home为/opt/client/Spark/spark/在MRS集群客户端节点可以通过echo $SPARK_HOME获取实际Spark安装地址。设置azkanban所在ECS全局环境变量,添加source {MRS客户端}后需要重启azkaban才可生效jobtype方式:参考plugins-jobtypes hadoop-job 部署运行。链接:https://bbs.huaweicloud.com/blogs/352933
  • [其他问题] FI5.6.1 提交spark任务,任务中连接HDFS,Authentication failure.Check your
    【功能模块】提交spark任务,任务中连接HDFS【操作步骤&问题现象】1、python3 提交spark任务(已完成kerberos认证),任务中通过hdfs.client 建立hdfs连接2、提交任务时已完成kerberos认证,任务中连接hdfs报错信息Authentication failure.Check your credentials.3、spark任务中连接其它组件是不是就不需要做kerberos认证?那认证失败输入哪种认证?4、实际hdfs操作通过hdfs web api方式提交请求,将文件上传到hdfs的hive目录中user/hive/warehouse【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [问题求助] DIS添加转储MRS任务时填写HDFS路径显示无效的
    而实际这个路径在集群界面的文件管理里是存在的,求大佬帮助这里DIS转储到MRS的HDFS路径怎么填
  • [数据集成 FDI] 【roma产品】【数据集成功能】api接口导入数据到hive各个表映射到hdfs路径如何配置
    【功能模块】【操作步骤&问题现象】1、接入数据源中大数据存储模块,hive、fi hive、mrs hive 、fi hdfs、mrs hdfs这几个存储方式有什么区别吗2、api接口导入数据到hive 各个表映射到hdfs路径如何配置?【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [管理与监控] hadoop命令行查看任务统计和日志
    web接口有时候配置到业务面,管理面网络接入查看web不方便,可以用下列命令查看统计信息和日志查看任务信息:hadoop job -ist all或者用:yarn application -ist 加上app状态yarn application -list -appStates FINISHED查看任务信息:yarn application -status application_1624729753990_0005查看详细日志,含container日志,信息比较全,太长就不贴了: yarn logs -applicationId application_1647314342108_0002查找APP的 apptempt ID及am的container IDyarn applicationattempt -list application_1647314342108_0002查看appattempt 状态:yarn applicationattempt -status appattempt_1624729753990_0005_000001mapred命令,主要用于查看统计信息#查询job IDmapred job -list all#查询job统计mapred job -status <jobid>查看job状态统计mapred job -history <jobid>统计各种counter还有成功失败的task,worse task统计:
  • [问题求助] 【MRS】【CDL组件】从Oracle抓取数据到HDFS失败
    【功能模块】从Oracle抓取数据到HDFS【操作步骤&问题现象】按照产品文档的“常见CDL作业示例”-“从Oracle抓取数据到HDFS”章节,不能实现数据抓取。【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [沙箱纠错] 基于鲲鹏应用使能套件进行Hadoop调优 - 无法登陆BMS,如WRF实验一样
  • [知识分享] HDFS 细粒度锁优化,FusionInsight MRS有妙招
    本文分享自华为云社区《[FusionInsight MRS HDFS 细粒度锁优化实践](https://bbs.huaweicloud.com/blogs/353362?utm_source=csdn&utm_medium=bbs-ex&utm_campaign=ei&utm_content=content)》,作者:pippo。# 背景HDFS依赖NameNode作为其元数据服务。NameNode将整个命名空间信息保存在内存中提供服务。读取请求(getBlockLocations、listStatus、getFileInfo)等从内存中获取信息。写请求(mkdir、create、addBlock)更新内存状态,并将日志事务写入到日志服务(QJM)。HDFS NameNode的性能决定了整个Hadoop集群的可扩展性。命名空间性能的改进对于进一步扩展Hadoop集群至关重要。- Apache HDFS 整体架构如下:!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653615824071691620.png)- Apache HDFS 交互信息如下:!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653615845153231628.png)# 痛点HDFS NameNode的写操作的性能受全局命名空间系统锁的限制。每个写操作都会获取锁并保留锁,直到该操作执行完成。这样可以防止写入操作的并发执行,即使它们是完全独立的,例如命名空间中的对象不相交部分。# 什么是Fine Grained Locking(FGL)FGL【细粒度锁】的主要目的是通过在独立命名空间分区上用多个并发锁替换全局锁,允许写入操作的并发。# 当前状态HDFS设计思路为一次写,多次读。读操作使用共享锁,写操作使用独占锁。由于HDFS NameNode元数据被设计为单个内存空间中的命名空间树,因此树的任何级别的写操作都会阻塞其它写操作,直到当前写操作完成。虽然写是一次,但是当涉及大量并发读/写操作时,这就会影响整体性能。在HDFS NameNode中,内存中的元数据有三种不同的数据结构:- INodeMap: inodeid -> INode- BlocksMap: blockid -> Blocks- DataNodeMap: datanodeId -> DataNodeInfoINodeMap结构中包含inodeid到INode的映射,在整个Namespace目录树种存在两种不同类型的INode数据结构:INodeDirectory和INodeFile。其中INodeDirectory标识的是目录树中的目录,INodeFile标识的是目录树中的文件。BlocksMap结构中包含blockid到BlockInfo的映射。每一个INodeFile都会包含数量不同的Block,具体数量由文件大小以及每个Block大小来决定,这些Block按照所在文件的先后顺序组成BlockInfo数组,BlockInfo维护的是Block的元数据;通过blockid可以快速定位Block。DataNodeMap结果包含datanodeid到DataNodeInfo的映射。当集群启动过程中,通过机架感知逐步建立起整个集群的机架拓扑结构,一般在NameNode的生命周期内不会发生大变化。通过INodeMap和BlocksMap共同标识存储在HDFS中的每个文件及其块的信息。随着文件数量的增加,此数据结构大小也会随之增加,并对单个全局锁的性能产生很大影响。下面我们采用简单的文件目录树结构来演示现有的单一全局锁在文件系统的缺点。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653615920936272267.png)HDFS NameNode 内存目录树结构如上图所示,/D11/D21/D31/F2 和 /D12/D24/D38/F16是不相交的文件,即有不同的父节点和祖父节点。可以看到F2和F16是两个独立的文件,对其中一个文件的任何操作都不应该影响另一个文件。# 设计如前所述,HDFS NameNode将文件信息和元数据结构在内存中保存为一个目录树结构。当修改任意两个独立的文件时,第二次操作需要等到第一次操作完成并释放锁。释放锁以后,只有第二个操作获取锁后才能继续修改文件系统。类似的,后续操作也会阻塞,直到第二次操作释放锁。在下面的例子中,我们考虑2个文件并发写入(创建、删除、追加。。。)操作。F2和F16是文件系统下的2个独立文件(具有不同的父节点和祖父节点)。在将内容追加到F2时,F16也可以同时进行修改。但是由于整个目录树全局对象锁,对F16的操作必须等对F2的操作完成后才能执行。代替全局锁,可以将锁分布在一组名为“分区”的文件中,每个分区都可以有自己的锁。现在F2属于分区-1,F16属于分区-2。F2文件操作可以通过获取分区-1的锁来进行修改,F16文件操作可以通过获取分区-2的锁来进行修改。和以前一样,需要先获取全局锁,然后搜索每个文件属于哪个分区。找到分区后,获取分区锁并释放全局锁。因此全局锁并不会完全被删除。相反,通过减少全局锁时间跨度,一旦释放全局锁,则其它写操作可以获取全局锁并继续获取分区锁来进行文件操作。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653615954378433039.png)分区的数量如何决定?如果有效的定义分区从而获得更高的吞吐量?默认情况下,分区大小为65K,溢出系数为1.8。一旦分区达到溢出条件,将会创建新分区并加入到分区列表中。理想情况下,可以拥有等于NameNode可用CPU核数的分区数,过多的分区数量将会使得CPU过载,而过少的分区数量无法充分利用CPU。# 实现引入新的数据结构-PartitionedGSet,它保存命名空间创建的所有分区信息。PartitionEntry是一个分区的对象结构。LatchLock是新引入的锁,用于控制两级锁--顶层锁和子锁。# PartitionedGSetPartitionedGSet是一个两级层次结构。第一层RangeMap定义了INode的范围,并将它们映射到相应的分区中。分区构成了层次结构的第二级,每个分区存储属于指定范围的INode信息。为了根据键值查找INode,需要首先在RangeMap中找到对应键值的范围,然后在对应的RangeSet,使用哈希值获取到对应的INode。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653615992450186918.png)HDFS NameNode 两级层次结构RangeGSet的容量有一定的阈值。当达到阈值后,将创建新的RangeGSet。空的或者未充分利用的RangeGSet由后台RangeMonitor守护程序来进行垃圾回收。HDFS NameNode启动时,根据镜像中的INode数量计算合理的初始分区数。同时还需要考虑CPU核数,因为将分区数量提高到远超CPU核数并不会增加系统的并行性。- 动态分区:分区的大小有限,可以像平衡树一样可以进行分裂和合并。- 单个分区:只有一个分区,且只有一个与之相对应的锁,并且应和全局锁类似。这适用于小型集群或写入负载比较轻的集群。- 静态分区:有一个固定的RangeMap,不添加或者合并现有分区。这适用于分区均匀增长的文件系统。而且这将消除锁定RangeMap的要求,允许并行使用锁。# Latch LockRangeMap与RangeGSet分别有单独的锁。Latch Lock是一种锁模式,其中首先获取RangeMap的锁,以查找与给定INode键对应的范围,然后获取与分区对应的RangeGSet的锁,同时释放RangeMap锁。这样针对任何其它范围的下一个操作都可以开始并发执行。在RangeMap上持有锁类似于全局锁。目录删除、重命名、递归创建目录等几个操作可能需要锁定多个RangeGSet。这要确保当前HDFS语义所要求的操作的原子性。例如,如果重命名将文件从一个目录移动到另一个目录,则必须锁定包含文件、源和目标目录的RangeMap,以便使重命名成为原子。此锁定模式的一个理想优化是允许某些操作的Latch Lock与其他操作的全局锁结合使用。# INode KeysHDFS中的每个目录和文件都有一个唯一的INode,即使文件被重命名或者移动到其它位置,该INode会保持不变。INode键是以文件INode本身结尾,前面包含父INode的固定长度序列。Key Definition: key(f) = selfId是文件的INodeId,pId是父目录的INodeId,ppId是父目录的父目录的INodeId。INode键的这种表达不仅保证了同级,同时也保证了表亲(相同祖父节点)在大多数情况下被分区到相同的范围中。这些键基于INodeId而非文件名,允许简单的文件和目录进行重命名,称为就地重命名,而无需重新进行分区。# 效果经过测试验证使用和不使用FGL功能性能,在主要写入操作情况下,吞吐量平均提高了25%左右。## 详细性能对比使用Hadoop NN Benchmarking工具(NNThroughputBenchmark)来验证NameNode的性能。每个写入API验证并观察到平均25%的性能提升。有很少一部分轻微或者没有提升的API,分析并发现这些API均是轻量级API,因此没有太大的提升。NNThroughputBenchmark是用于NameNode性能基准测试工具。该工具提供了非常基本的API调用,比如创建文件,创建目录、删除。在这个基础上进行了增强,从而能够支持所有写入API,并能够捕获使用和不使用FGL的版本的性能数据。用于测试的数据集:线程数 1000、文件数 1000000、每个目录文件数 40。# 写入调用频率高的API!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653616077867487794.png)# 其它内部写API!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653616090511168604.png)# 常用读取API:通过完整的FGL实现,读取API也有很好的性能提升。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20225/27/1653616112137858409.png)# 运行基准测试工具的命令:./hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark -fs file:/// -op create -threads 200 -files 1000000 -filesPerDir 40 –close./hadoop org.apache.hadoop.hdfs.server.namenode.NNThroughputBenchmark -fs hdfs:x.x.x.x:dddd/hacluster -op create -threads 200 -files 1000000 -filesPerDir 40 -close# 参考与FGL相关的社区讨论Hadoop Meetup Jan 2019 — HDFS Scalability and Consistent Reads from Standby Node, which covers Three-Stage Scalability Plan. Slides 21–25社区中跟踪与NameNode可扩展性相关的其它JiraHDFS-5453. Support fine grain locking in FSNamesystemHDFS-5477. Block manager as a serviceHDFS-8286. Scaling out the namespace using KV storeHDFS-14703. Namenode Fine Grained Locking (design inspired us to implement it fully)# 总结华为云FusionInsight MRS云原生数据湖为政企客户提供湖仓一体、云原生的数据湖解决方案,构建一个架构可持续演进的离线、实时、逻辑三种数据湖,支撑政企客户全量数据的实时分析、离线分析、交互查询、实时检索、多模分析、数据仓库、数据接入和治理等大数据应用场景。华为云FusionInsight MRS通过FGL对HDFS NameNode锁机制进行优化,有效提升了NameNode的读写吞吐量,从而能够支持更多数据,更多业务请求访问,从而更好的支撑政企客户高效用数,业务洞见更准,价值兑现更快。
  • [干货汇总] 【大数据系列】不care工具,在大数据平台中Hive能自动处理SQL
    本文分享自华为云社区《[Hive执行原理](https://bbs.huaweicloud.com/blogs/348195?utm_source=csdn&utm_medium=bbs-ex&utm_campaign=other&utm_content=content)》,作者: JavaEdge 。MapReduce简化了大数据编程的难度,使得大数据计算不再是高不可攀的技术圣殿,普通工程师也能使用MapReduce开发大数据程序。但是对于经常需要进行大数据计算的人,比如从事研究商业智能(BI)的数据分析师来说,他们通常使用SQL进行大数据分析和统计,MapReduce编程还是有一定的门槛。而且如果每次统计和分析都开发相应的MapReduce程序,成本也确实太高了。有没有更简单的办法,可以直接将SQL运行在大数据平台?先看如何用MapReduce实现SQL数据分析。# MapReduce实现SQL的原理常见的一条SQL分析语句,MapReduce如何编程实现?```mysql` SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;````统计分析语句,统计不同年龄用户访问不同网页的兴趣偏好,具体数据输入和执行结果:!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609311443266186.png)- 左边,要分析的数据表- 右边,分析结果把左表相同的行求和,即得右表,类似WordCount计算。该SQL的MapReduce的计算过程,按MapReduce编程模型- map函数的输入K和V,主要看V V就是左表中每行的数据,如- map函数的输出就是以输入的V作为K,V统一设为1 比如map函数的输出经shuffle后,相同的K及其对应的V被放在一起组成一个,作为输入交给reduce函数处理。比如被map函数输出两次,那么到了reduce这里,就变成输入,这里的K是,V集合是。在reduce函数内部,V集合里所有的数字被相加,然后输出。所以reduce的输出就是!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609380761298262.png)如此,一条SQL就被MapReduce计算好了。在数据仓库中,SQL是最常用的分析工具,既然一条SQL可以通过MapReduce程序实现,那有无工具能自动将SQL生成MapReduce代码?这样数据分析师只要输入SQL,即可自动生成MapReduce可执行的代码,然后提交Hadoop执行。这就是Hadoop大数据仓库Hive。# Hive架构Hive能直接处理我们输入的SQL(Hive SQL语法和数据库标准SQL略不同),调用MapReduce计算框架完成数据分析操作。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609395656925319.png)通过Hive Client(Hive的命令行工具,JDBC等)向Hive提交SQL命令:- 若为DDL,Hive会通过执行引擎Driver将数据表的信息记录在Metastore元数据组件,该组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联HDFS文件路径等这些数据库的元信息- 若为DQL,Driver就会将该语句提交给自己的编译器Compiler进行语法分析、语法解析、语法优化等一系列操作,最后生成一个MapReduce执行计划。然后根据执行计划生成一个MapReduce的作业,提交给Hadoop MapReduce计算框架处理。对一个简单的SQL命令:```mysql SELECT * FROM status_updates WHERE status LIKE ‘michael jackson’;```其对应的Hive执行计划:!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609437583417627.png)Hive内部预置了很多函数,Hive执行计划就是根据SQL语句生成这些函数的DAG(有向无环图),然后封装进MapReduce的map、reduce函数。该案例中的map函数调用了三个Hive内置函数TableScanOperator、FilterOperator、FileOutputOperator,就完成了map计算,而且无需reduce函数。# Hive如何实现join操作除了简单的聚合(group by)、过滤(where),Hive还能执行连接(join on)操作。pv_users表的数据在实际中无法直接得到,因为pageid数据来自用户访问日志,每个用户进行一次页面浏览,就会生成一条访问记录,保存在page_view表中。而age年龄信息则记录在用户表user。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609484669408457.png)这两张表都有一个相同的字段userid,据该字段可连接两张表,生成前面例子的pv_users表:```mysql SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid);```该SQL命令也能转化为MapReduce计算,连接过程如下:!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20224/22/1650609508775654849.png)join的MapReduce计算过程和前面的group by稍有不同,因为join涉及两张表,来自两个文件(夹),所以需要在map输出的时候进行标记,比如来自第一张表的输出Value就记录为,这里的1表示数据来自第一张表。这样经过shuffle以后,相同的Key被输入到同一个reduce函数,就可以根据表的标记对Value数据求笛卡尔积,用第一张表的每条记录和第二张表的每条记录连接,输出就是join的结果。所以打开Hive源码,看join相关代码,会看到一个两层for循环,对来自两张表的记录进行连接操作。# 总结开发无需经常编写MapReduce程序,因为网站最主要的大数据处理就是SQL分析,因此Hive在大数据应用很重要。随Hive普及,我们对在Hadoop上执行SQL的需求越强,对大数据SQL的应用场景也多样化起来,于是又开发了各种大数据SQL引擎。Cloudera开发了Impala,运行在HDFS上的MPP架构的SQL引擎。和MapReduce启动Map和Reduce两种执行进程,将计算过程分成两个阶段进行计算不同,Impala在所有DataNode服务器上部署相同的Impalad进程,多个Impalad进程相互协作,共同完成SQL计算。在一些统计场景中,Impala可做到ms级计算速度。后来Spark诞生,也推出自己的SQL引擎Shark,即Spark SQL,将SQL语句解析成Spark的执行计划,在Spark上执行。由于Spark比MapReduce快很多,Spark SQL也相应比Hive快很多,并且随着Spark的普及,Spark SQL也逐渐被人们接受。后来Hive推出了Hive on Spark,将Hive的执行计划转换成Spark的计算模型。我们还希望在NoSQL执行SQL,毕竟SQL发展几十年,积累庞大用户,很多人习惯用SQL解决问题。于是Saleforce推出了Phoenix,一个执行在HBase上的SQL引擎。这些SQL引擎只支持类SQL语法,并不能像数据库那样支持标准SQL,特别是数据仓库领域几乎必然会用到嵌套查询SQL:在where条件里面嵌套select子查询,但几乎所有的大数据SQL引擎都不支持。然而习惯于传统数据库的使用者希望大数据也能支持标准SQL。回到Hive。Hive本身的技术架构其实并没有什么创新,数据库相关的技术和架构已经非常成熟,只要将这些技术架构应用到MapReduce上就得到了Hadoop大数据仓库Hive。但是想到将两种技术嫁接到一起,却是极具创新性的,通过嫁接产生出的Hive极大降低大数据的应用门槛,也使Hadoop得到普及。>参考- https://learning.oreilly.com/library/view/hadoop-the-definitive/9781491901687/ch17.html#TheMetastore
  • [问题求助] 安装apache-kylin-3.1.2-bin-hadoop3,启动服务后,访问WEB UI 7070端口,报错404
    各组件版本如下:
  • [技术干货] Spark & Hive 云原生改造在智领云的应用
    引 言随着 Kubernetes 越来越成熟,使用者越来越多,大数据应用上云的需求也越来越迫切。原有的大数据资源管理器 Yarn 很难做到所有应用资源统一控制,完全隔离,带来的主机应用和大数据计算应用互相抢占资源,由此导致的计算任务时间可能经常性抖动,多租户应用互相影响。而在 Kubernetes 上,此类问题天然解决。所有的应用都以 Pod 形式在 Kubernetes 平台上统一管理,在规范的 namespace 管理下,不存在不受控的应用来进行资源抢占。同时,我们可以轻松地实现多租户,资源配额,运行审计计费等功能。在此背景下,客户原有的大数据计算任务如何无缝迁移到 Kubernetes 上,大数据应用如何进行云原生改造,都成了我们最迫切要解决的问题。本篇文章主要针对这两类问题来进行探讨。Spark 和 Hive 在智领云业务场景中的使用Spark 和 Hive 是大数据平台中很常用的两个计算引擎。Hive 本身是一个基于 HDFS 的类 SQL 数据库,其 HQL 语句的底层执行引擎可以使用基于 Yarn 的 MapReduce,也可以使用 Spark。Spark 在 2021 年 3 月推出的 3.1 版本中实现了对 Kubernetes 支持的 GA(general availability,意味着生产级的支持)。在这个版本中,Spark 允许用户从命令行(spark-submit)上提交 Spark 任务到 Kubernetes 集群中。但是目前的 Hive 版本中,如果底层使用 Spark,还只能提交任务到 Yarn,而不能支持将 Hive 查询提交到 Kubernetes。除了从命令行提交 Spark 任务,智领云使用 Hive 和 Spark 相关的业务还有三个场景:第一个、用户创建 Hive 作业进行数据 ETL 任务开发或数仓分层计算工作流创建。第二个、用户应用程序(例如,批处理调度系统)提交 pySpark 文件或者 Spark jar 包到平台,进行数据计算。第三个、用户使用 Jupyterlab Spark kernel 来访问 Hive/Mysql/Hdfs/Glusterfs 等数据源文件进行数据探索、机器学习及人工智能算法开发。这三种场景下的共同点都是需要将计算任务转换成 Spark job 在集群中调度运行(Hive 的缺省引擎已经从 MR 变成了 Spark)。在每个场景下,我们都在 Spark on Kubernetes 的基础上提供了相应的解决方案:在 Kubernetes 平台上使用 Hive,我们使用了 Hive on Spark on Kubernetes。对于 Spark 作业,我们集成了 Spark On Kubernetes Operator。JupyterLab 中使用 Spark, 我们集成了SparkMagicKernel 和 Livy。每种集成方式,都有其独有的优势和缺陷及其适用场景,下面我们来一一讲解。Hive On Spark On Kubernetes在类 SQL 数据库中,Hive 是很多 Hadoop 生态系统缺省的选择。虽然 Spark 也提供了 SparkSQL 来支持 SQL 查询,但是 Hive 使用的 HQL 和 SparkSQL 在语法支持上还是存在比较大的差异,对于大型的数据仓库项目,用户可能积累了几千个 Hive 任务,如果此时想要快速地迁移到 Kubernetes 上,那么使用 SparkSQL 存在很大的迁移成本和风险。但是如果我们只更改 Hive 的底层执行引擎,改成 Spark on Kubernetes,那么,我们就能让客户的 Hive HQL 应用,无需修改地快速迁移到 Kubernetes 上。但是由于 Spark on Kubernetes 是最近才达到 GA 状态,和不同的 Hive 版本,Kubernetes 版本,以及很多相关大数据组件之间的适配还没有成熟,我们需要对现有的版本之间进行一些适配才能平滑运行 Hive On Spark On Kubernetes。首先,我们必须要确定 Hive、Spark、Hadoop(HDFS)以及相关的 Kerberos、Ranger 的版本。对于 Spark 版本的选择,我们开发选型的时候最新版为 3.1.1,目前 Spark 最新版已经到了 3.2.1,Spark 选取最新版本即可,最新版本能够增强对 Kubernetes 的支持。而 Hive 版本选取的主要原则,即查看 Spark-client 模块的代码是否支持 Kubernetes,如果不支持是否容易改造以便支持。而 4.0.0 版本开始,spark-client 模块重构了代码结构,增加了 SparkClient 的抽象类,有该结构支持,增加对 Kubernetes 的支持就容易了很多。这里还有一种选择,就是 Hive 选用 3.1.2 的稳定版本,然后把最新的 master 分支的 spark-client 模块代码 cherry pick 到 3.1.2 版本即可。在 Hive 代码中主要改造内容是增加 KubernetesSubmitSparkClient,主要内容是构造 SparkSubmit 向 Kubernetes 提交 Spark 任务的各种参数,包括和 Hive 中 RPC server 通信的配置,提交 Spark 作业后,Spark driver pod 启动后会连接 HiveServer2 中的 RPC server,连接成功后,HiveServer2 会发送相应的 Spark job 到 Spark driver 来进行计算。而 Spark 代码的改动,主要是修改 Spark 中的 hiveShim 模块,增加对 Hive 4.0.0 的支持。Hive On Spark 在智领云的数据平台,主要作为 Hive 作业/工作流以及 Hue 查询工具的底层执行引擎:调度系统通过 Beeline 来连接 HiveServer2,Hue 通过 JDBC 连接 HiveServer2,客户端发送用户的 SQL 语句到 HiveServer2。HiveServer2 解析完成 SQL 后,会生成一系列的 HQL taskplan,对于这些 HQL 的执行,HiveServer2 会启动一个 RPC server,SparkSubmit 会带上 RPC server 参数,启动一个 Spark Driver Pod 来和 HiveServer2 进行 RPC 通信,这个 Spark Driver Pod 的主要功能就是接收 HiveServer2 发送过来的 SQL Job 进行计算,计算完成后,将结果返回给 HiveServer2 中运行的 RPC server。在 Kubernetes 平台,SparkSubmit 客户端和 Kubernetes APIServer 通信,Kubernetes 在接收到 Spark 任务请求后,会调用 Scheduler 组件启动 Spark Driver Pod, Spark Driver 在启动完成后,会发送启动 Executor 请求给 Kubernetes APIServer, Kubernetes 再启动 Spark Executor Pod, Spark Driver 和 Executor 建立连接,完成整个 Spark 集群的创建。整体架构如下图所示:权限控制方面,我们使用 Ranger 来完成授权和鉴权操作,使用 Kerberos 来完成认证操作。对于 Ranger 鉴权插件, Hive 和 Spark 都有相应的解决方案。Hive 直接通过 Hive Ranger 插件和 Ranger 服务来通信,完成鉴权操作,Spark 则通过 Spark Authorizer 插件再调用 Hive Ranger 插件来完成鉴权。在 Hive On Spark 模式下,我们使用 Spark 对 Kerberos 的支持来完成用户身份认证操作,通过 Hive Ranger 插件来完成鉴权操作。Spark on Kubernetes OperatorSpark on Kubernetes Operator 项目是 Google 非官方推出的 Spark On Kubernetes 解决方案。它的内部实现是基于 Spark 官方的 Spark On Kubernetes 解决方案之上,更多的利用了 Kubernetes 特性,来增强在 Kubernetes 上使用 Spark 计算引擎的易用性和灵活性以及性能的提升。它本质上是一个 Kubernetes Operator,所以在该解决方案下,用户提交 Spark 作业只需要通过 Yaml 文件即可,并且可以定制 Kubernetes Schedule。比如,可以配置使用华为提供的针对大数据领域优化过的 Volcano 调度引擎。在智领云平台上,Spark on Kubernetes Operator 承载了用户提交 Jar 包或者 pySpark 文件类型的所有 Spark/Spark-streaming 作业的底层调度引擎。在 Spark OnKubernetes Operator 成熟之后,Hive on Spark 底层未来也可以增加 Spark On Kubernetes Operator 运行模式的支持,仅仅只需要在 spark-client 模块中增加KubernetesOperatorSparkClient 抽象类的支持即可。Spark Operator 方案也存在一个弊端,就是 Spark 作业配置 Yaml 的高度复杂化,该 Yaml 需要配置 Spark 作业的所有信息,包括Driver/Executor 的资源控制,包括 Spark 的镜像版本和调度算法。普通用户不需要关注这些配置。在此问题下,我们模仿 Apache Livy 的 API 增加了一个 Spark On Kubernetes Operator Server。该服务负责管理 Spark On Kubernetes Operator Job,提供创建/更新/删除 Job 接口,提供查询 Job 状态及日志请求。用户只需要配置少量Spark Job 参数,后台服务会根据参数完成 Spark Job Yaml 文件渲染,提交到 Kubernetes 集群。在权限控制这一块,我们可以使用 Spark 相关配置结合 Spark Operator 对 Kerberos 的支持来实现。对 Ranger Hive 插件的支持,我们可以使用 Spark Authorizer 插件来转接适配,不过该插件版本较老,我们需要修改其 POM 文件和相关代码来使其可以支持 Spark 3.1.1 版本。在 Spark Operator 模式下, Spark 作业的相关配置都在 Yaml 中配置,我们可以利用 Spark Operator 对 Sidecar 的支持来完成 Spark Operator 对 Ranger Hive 插件的支持。主要方法就是 Spark 3.1.1 版本的原生镜像不变,将 Ranger 相关的 Jars 通过 Sidecar 共享目录共享给 Spark 主 Container,并配置相关 ClassPath 参数,使 Spark 能够找到 Ranger 和 Spark Authorizer 相关 Jar 包。JupyterLab On KubernetesJupyterLab 作为数据科学家首选的 IDE,在数据及人工智能领域应用非常广泛。在智领云平台,我们的主要改造是打通JupyterLab 和我们的调度平台的互相访问,增加 Spark 读写 Hive / HDFS 的支持。这个场景和前两个场景的主要区别在于 JupyterLab Kernel 和 Spark Driver Pod 之间可能有持续的交互,而不是 run to finish。其次,在 UI 界面下的任务需要无需修改的在后台(测试或生产环境下)运行。在此需求之下,我们主要做了几点改动:选取了 SparkMagic Kernel 支持了用户编写测试 Spark 代码。改造 JupyterLab Server 代码,允许用户直接点击开启当前 Spark 任务的 4040 调试页面 UI。改造 JupyterLab Client 代码,允许用户可以直接在 JupyterLab Notebook 内直接引用系统或者用户自定义变量,并能够在调度和调试时生效。增加了 JupyterLab 调度 Worker,使调度平台可以直接调度运行用户的 ipynb 类型的 Notebook 文件。增加 JupyterLab Python 环境管理,允许 JupyterLab 在重启后保持其之前设置的 Python 环境。SparkMagic Kernel 执行 Spark 任务是利用 Apache Livy 服务来实现任务的提交以及交互Session 的维护。Apache Livy 目前版本对 Kubernetes 并不支持,我们需要添加 Kubernetes client 和状态查询的支持。Apache Livy 实现的对 Kubernetes 的支持实际上是和 Hive on Spark 模式类似,都是创建 RPC Server,然后调用 SparkSubmit 提交 Spark 任务和 RPC Server 通信,来完成 SQL 任务的交互。下图展示了整个流程的架构。在此种模式下,Hive 的权限控制配置和 Spark Operator 类似,都是使用 Spark Authorizer 和 Hive Ranger 插件来实现。未来在智领云平台,我们使用了存储和计算分离的方案,在计算层使用 Spark on Kubernetes 作为主要的计算引擎,底层可以采用 HDFS 兼容现有系统,也可以采用其它支持 HDFS 接口的云原生存储。这样的架构,加上对 Hive 等传统 Hadoop 生态的云原生改造,可以在最大程度的支持现有系统的同时逐步迁移到纯云原生的体系架构下,无缝集成新的大数据和人工智能系统。而基础架构即代码(Infra as Code)方式的使用, CI / CD 全链路的支持,为类似 DataOps,DataMesh 的新型数据应用开发运维范式提供了清晰可行的技术架构支持。而由此带来的业务开发效率的提升,业务管理运维效能的提升,都是质的变化。未来可期。转载于CSDN微信公众号
  • [技术干货] Apache-Hadoop-3.1.1移植指南
    1 简介Hadoop是一个开源的分布式存储及计算框架,被广泛用于海量数据的存储及处理,可以以可靠、高效、可伸缩的方式进行数据处理,开发语言为Java。 2 环境信息硬件版本软件版本服务器Taishan服务器CentOS7.6CPU鲲鹏920OS kernel4.14.0-115磁盘分区无JDK1.7.0_191网络可访问外网GCC4.8.5  Maven3.5.4  Protobuf2.5.03 配置编译环境3.1安装GCC1、 挂载OS镜像将OS镜像上传至服务器/root目录下mount -o loop CentOS-7-aarch64-Everything-1810.iso /mnt2、配置yum本地源cd /etc/yum.repos.d/mkdir /bakmv * /bak3、编辑配置文件:vim local.repo内容增加如下:[local]name=localbaseurl=file:///mntenable=1gpgcheck=0 执行下面使yum源生效:yum clean allyum makecache 4、安装GCC相关软件yum -y install gcc.aarch64 gcc-c++.aarch64 gcc-gfortran.aarch64 libgcc.aarch645、解决-fsigned-char问题(修改gcc) 使用command -v gcc(或者which gcc)寻找gcc所在路径 更改gcc的名字(例如改成gcc-impl)mv /usr/bin/gcc /usr/bin/gcc-impl执行 vim /usr/bin/gcc填如如下内容:#!bin/sh/usr/bin/gcc-impl –fsigned-char “$@” 执行如下命令给脚本添加权限chmod +x /usr/bin/gcc 使用gcc --version查看版本3.2安装OpenJDK1、下载并安装到指定目录wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u252-b09/OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gztar -zxf OpenJDK8U-jdk_aarch64_linux_hotspot_8u252b09.tar.gzmkdir -p /opt/tools/installed/mv jdk8u252-b09 /opt/tools/installed/2、配置环境变量vim /etc/profile添加如下:export JAVA_HOME=/opt/tools/installed/jdk8u252-b09export PATH=$JAVA_HOME/bin:$PATH使环境变量生效:source /etc/profile3、查看java版本java -version 3.3安装Maven1、下载并安装到指定目录(如/opt/tools/installed)wget https://archive.apache.org/dist/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gztar -zxf apache-maven-3.5.4-bin.tar.gzmkdir -p /opt/tools/installed/mv apache-maven-3.5.4 /opt/tools/installed/ 2、修改环境变量vim /etc/profile添加如下内容:export MAVEN_HOME=/opt/tools/installed/apache-maven-3.5.4export PATH=$MAVEN_HOME/bin:$PATH3、使环境变量生效source /etc/profile检查配置是否生效mvn -v4、修改Maven配置文件中的本地仓路径、远程仓等。配置文件:vim /opt/tools/installed/apache-maven-3.5.4/conf/settings.xml本地仓库默认在”~/.m2/“目录下,如果想修改成指定目录,则修改localRepository标签,如果没有特殊需求,可不必修改该参数。远程仓库配置修改成如下:<mirror> <id>huaweimaven</id> <name>huawei maven</name> <url>https://mirrors.huaweicloud.com/repository/maven/</url> <mirrorOf>central</mirrorOf></mirror>如果不能连接外网,需要配置代理: <proxies> <proxy> <id>optional</id> <active>true</active> <protocol>http</protocol> <username>用户名</username> <password>密码</password> <host>代理服务器网址</host> <port>代理服务器端口</port> <nonProxyHosts>local.net|some.host.com</nonProxyHosts> </proxy></proxies>根据实际情况来配置。3.4安装cmakeHadoop编译需要3.12及以上版本,CentOS自带版本较低,需要升级。1、下载并解压安装包wget https://cmake.org/files/v3.12/cmake-3.12.4.tar.gztar -xvf cmake-3.15.2.tar.gz2、编译安装tar -xvf cmake-3.15.2.tar.gzcd cmake-3.15.2/./bootstrapmake -j8make install3.5安装protocyum install -y protobuf protobuf-devel 3.6安装SnappySnappy是一个可选的、也是很常用的压缩库,可添加编译选项打包到hadoop软件包中。安装基本库yum install openssl-devel zlib-devel automake libtool下载Snappy依赖包:wget https://github.com/google/snappy/releases/tag/1.1.7 解压后进入文件夹 修改txt文件开启动态链接库编译vim CMakeLists.txt将该选项从“OFF”改为“ON”:option(BUILD_SHARED_LIBS "Build shared libraries(DLLs)." ON)创建编译目录并执行编译。编译hadoopmkdir buildcd buildcmake ../make install 3.7执行移植分析1、使用devkit进行扫描分析,如有依赖需要先编译相关依赖。4 编译hadoop 下载Hadoop源码包:wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.1/hadoop-3.1.1-src.tar.gz 2、解压后进入目录,修改pom.xml       添加鲲鹏maven仓库: <repository> <id>kunpengmaven</id> <name>kunpeng maven</name> <url>https://mirrors.huaweicloud.com/kunpeng/maven</url> </repository>       添加插件仓库:(pluginRepositories和repositories的节点级别一样、可能需要手动创建) <pluginRepositories> <pluginRepository> <id>huaweicloud-plugin</id> <url>http://mirrors.huaweicloud.com/repository/maven</url> <snapshots> <enabled>true</enabled> </snapshots> </pluginRepository></pluginRepositories>3、执行编译mvn package -DskipTests -Pdist,native -Dtar -Dsnappy.lib=/usr/local/lib64 –Dbundle.snappy -Dmaven.javadoc.skip=true 4、编译成功编译成功后,将在源码下的“hadoop-dist/target/”目录生成tar.gz包 5、检查native,支持snappy 5 FAQ:如果是OpenEuler,在使用本地yum源安装依赖的时候会有缺依赖相关报错,这是因为iso文件中缺包导致,可以使用OpenEuler的官方源进行挂载:repo.openeuler.org挂载方式与本地yum源基本相同,仅修改如下:baseuel=https://repo.openeuler.org/openEuler-20.03-LTS/everything/aarch_64 执行yum clean all   yum makecahce创建完成。
  • [问题求助] 补丁包合入问题
    打patch失败
  • [知识分享] 【大数据系列】大数据集群被窃取数据怎么办?透明加密可以一试
    >摘要:传统大数据集群中,用户数据明文保存在HDFS中,集群的维护人员或者恶意攻击者可在OS层面绕过HDFS的权限控制机制或者窃取磁盘直接访问用户数据。本文分享自华为云社区[《FusionInsight MRS透明加密方案》](https://bbs.huaweicloud.com/blogs/307406?utm_source=zhihu&utm_medium=bbs-ex&utm_campaign=ei&utm_content=content),作者: 一枚核桃 。# 概述传统大数据集群中,用户数据明文保存在HDFS中,集群的维护人员或者恶意攻击者可在OS层面绕过HDFS的权限控制机制或者窃取磁盘直接访问用户数据。FusionInsight MRS引入了Hadoop KMS服务并进行增强,通过对接第三方KMS,可实现数据的透明加密,保障用户数据安全。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202201/14/104621ua76d2mu1zio5bq2.png)- HDFS支持透明加密,Hive、HBase等在HDFS保存数据的上层组件也将通过HDFS加密保护,加密密钥通过HadoopKMS从第三方KMS获取。- 对于Kafka、Redis等业务数据直接持久化存储到本地磁盘的组件,通过基于LUKS的分区加密机制保护用户数据安全。# HDFS透明加密!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202201/14/104641tyfhpifcfjcwx6im.png)- HDFS透明加密支持AES、SM4/CTR/NOPADDING加密算法,Hive、HBase使用HDFS透明加密做数据加密保护。SM4加密算法由A-LAB基于OpenSSL提供。- 加密使用的密钥从集群内的KMS服务获取,KMS服务支持基于Hadoop KMS REST API对接第三方KMS。- 一套FusionInsight Manager内部署一个KMS服务,KMS服务到第三方KMS使用公私钥认证,每个KMS服务在第三方KMS对应拥有一个CLK。- 在CLK下可以申请多个EZK,与HDFS上的加密区对应,用于加密数据加密密钥,EZK在第三方KMS中持久化保存。- DEK由第三方KMS生成,通过EZK加密后持久化保存到NameNode中,使用的时候使用EZK解密。- CLK和EZK两层密钥可以轮转。CLK作为每个集群的根密钥,在集群侧不感知,轮转完全由第三方KMS控制管理。EZK可通过FI KMS管理,轮转在FI KMS可控制管理,同时第三方KMS管理员拥有KMS内密钥的管理能力,也可以做EZK的轮转。# LUKS分区加密对于Kafka、Redis等业务数据直接持久化存储到本地磁盘的组件,FusionInsight集群支持基于LUKS的分区加密进行敏感信息保护。FusionInsight安装过程的脚本工具使用Linux统一密钥设置(Linux Unified Key Setup,简称LUKS)分区加密方案,该方案加密分区时会在集群每个节点生成或者从第三方KMS获取访问密钥,用于加密数据密钥,以保护数据密钥安全性。磁盘分区加密后,重启操作系统或者更换磁盘场景下,系统能够自动获取密钥并挂载或创建新的加密分区。
  • [知识分享] 【大数据系列】MapReduce 示例:减少 Hadoop MapReduce 中的侧连接
    >摘要:在排序和reducer 阶段,reduce 侧连接过程会产生巨大的网络I/O 流量,在这个阶段,相同键的值被聚集在一起。本文分享自华为云社区《MapReduce 示例:减少 Hadoop MapReduce 中的侧连接》,作者:Donglian Lin。在这篇博客中,将使用 MapReduce 示例向您解释如何在 Hadoop MapReduce 中执行缩减侧连接。在这里,我假设您已经熟悉 MapReduce 框架并知道如何编写基本的 MapReduce 程序。本博客中讨论的主题如下:- 什么是加入?- MapReduce 中的连接- 什么是 Reduce 侧连接?- 减少侧连接的 MapReduce 示例- 结论# 什么是联接?join操作用于基于外键将两个或多个数据库表合并。通常,公司在其数据库中为客户和交易 记录维护单独的表 。而且,很多时候这些公司需要使用这些单独表格中的数据生成分析报告。因此,他们使用公共列(外键)(如客户 ID 等)对这些单独的表执行连接操作,以生成组合表。然后,他们分析这个组合表以获得所需的分析报告。# MapReduce 中的连接就像 SQL join 一样,我们也可以在 MapReduce 中对不同的数据集进行 join 操作。MapReduce 中有两种类型的连接操作:- **Map Side Join:顾名思义**,join操作是在map阶段本身进行的。因此,在 map side join 中,mapper 执行 join 并且每个 map 的输入都必须根据键进行分区和排序。- **减少副加入**:顾名思义,在减少侧加入,减速是 负责执行连接操作。由于排序和改组阶段将具有相同键的值发送到同一个 reducer,因此它比 map side join 相对简单和容易实现,因此,默认情况下,数据是为我们组织的。现在,让我们详细了解reduce side join。# 什么是减少侧连接?!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/03/102111pxtwvyu6hbc8t4gu.png)如前所述,reduce side join 是在reducer 阶段执行join 操作的过程。基本上,reduce side join 以下列方式发生:- Mapper 根据公共列或连接键读取要组合的输入数据。- 映射器处理输入并向输入添加标签以区分属于不同来源或数据集或数据库的输入。- 映射器输出中间键值对,其中键只是连接键。- 在排序和改组阶段之后,会为减速器生成一个键和值列表。- 现在,reducer 将列表中存在的值与键连接起来,以给出最终的聚合输出。# 减少边连接的 MapReduce 示例假设我有两个单独的运动场数据集:- **cust_details**: 它包含客户的详细信息。- **transaction_details**: 包含客户的交易记录。使用这两个数据集,我想知道每个客户的生命周期价值。在 这样做时,我将需要以下东西:- 此人的姓名以及该人访问的频率。- 他/她购买设备所花费的总金额。!(https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/03/10221328drd0eb1ovwpuyl.png)上图只是向您展示了我们将对其执行reduce side join 操作的两个数据集的schema。单击下面的按钮下载包含此 MapReduce 示例的源代码和输入文件的整个项目:在将上面的 MapReduce 示例项目在 reduce 端加入 Eclipse 时,请记住以下几点:- 输入文件位于项目的 input_files 目录中。将这些加载到您的 HDFS 中。- 不要忘记根据您的系统或VM构建Hadoop Reference Jars的路径(存在于reduce side join项目lib目录中)。现在,让我们了解在这个 MapReduce 示例中的 map 和 reduce 阶段内部发生了什么关于reduce side join:## 1. 地图阶段:我将为两个数据集中的每一个设置一个单独的映射器,即一个映射器用于 cust_details 输入,另一个用于 transaction_details 输入。**cust_details 的映射器:** public static class CustsMapper extends Mapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); } }- 我将一次读取一个元组的输入。- 然后,我将令牌化在元组的每个字并用的名字一起取卡斯特ID个人Ø ñ 。- Ť ħ È Ç乌斯ID将是我的键值对键,我的映射器将最终生成。- 我还将添加一个标签“ Ç乌斯” ,以表明该输入元组是cust_details类型。- 因此,我的 cust_details 映射器将生成以下中间键值对:**键 - 值对:[客户 ID,客户名称]**例如:[4000001,Ç乌斯 克里斯蒂娜],[4000002,卡斯特佩奇]等**transaction_details 的映射器:** public static class TxnsMapper extends Mapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); } }- 就像 cust_details 的映射器一样,我将在这里遵循类似的步骤。但是,会有一些差异: - 我将获取金额值而不是人名。 - 在这种情况下,我们将使用“tnxn”作为标签。- 因此,客户 ID 将是映射器最终生成的键值对的我的键。- 最后,transaction_details 映射器的输出将采用以下格式:**键值对:[客户 ID,tnxn 金额]**示例: [4000001, tnxn 40.33]、[4000002, tnxn 198.44] 等。## 2. 排序和洗牌阶段排序和改组阶段将生成与每个键对应的值的数组列表。换句话说,它将中间键值对中每个唯一键对应的所有值放在一起。排序和改组阶段的输出将采用以下格式:**键 - 值列表:**- {cust ID1 – [(cust name1), (tnxn amount1), (tnxn amount2), (tnxn amount3),.....]}- {客户 ID2 – [(客户名称 2), (tnxn amount1), (tnxn amount2), (tnxn amount3),.....]}- ……**例子:**- {4000001 – [(cust kristina), (tnxn 40.33), (tnxn 47.05),…]};- {4000002 – [(cust paige), (tnxn 198.44), (tnxn 5.58),…]};- ……现在,框架将为每个唯一的连接键(cust id)和相应的值列表调用 reduce() 方法(reduce(Text key, Iterable values, Context context))。 然后,reducer 将对相应值列表中存在的值执行连接操作,以最终计算所需的输出。因此,执行的reducer 任务的数量将等于唯一客户ID 的数量。现在让我们了解在这个 MapReduce 示例中,reducer 如何执行连接操作。## 3.减速器阶段如果您还记得,执行这种减少侧连接操作的主要目标是找出特定客户访问综合体育馆的次数以及该客户在不同运动上花费的总金额。因此,我的最终输出应采用以下格式:**Key – Value 对:[客户姓名] (Key) – [总金额,访问频率] (Value)****减速机代码:** public static class ReduceJoinReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String name = ""; double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); } }因此,将在每个减速器中采取以下步骤来实现所需的输出:- 在每个减速器中,我都会有一个键和值列表,其中键只是客户 ID。值列表将具有来自两个数据集的输入,即来自 transaction_details 的金额和来自 cust_details 的名称。- 现在,我将遍历 reducer 中的值列表中存在的值。- 然后,我将拆分值列表并检查该值是 transaction_details 类型还是 cust_details 类型。- 如果是transaction_details类型,我将执行以下步骤: - 我将计数器值加一来计算这个人的访问频率。 - 我将累积更新金额值以计算该人花费的总金额。- 另一方面,如果值是 cust_details 类型,我会将它存储在一个字符串变量中。稍后,我会将名称指定为我的输出键值对中的键。- 最后,我将在我的 HDFS 的输出文件夹中写入输出键值对。因此,我的减速器将生成的最终输出如下:**克里斯蒂娜,651.05 8****佩奇,706.97 6**…..而且,我们上面所做的整个过程在 MapReduce 中称为Reduce Side Join。**源代码:**上面的减少侧连接的 MapReduce 示例的源代码如下: import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { public static class CustsMapper extends Mapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("cust " + parts[1])); } } public static class TxnsMapper extends Mapper { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("tnxn " + parts[3])); } } public static class ReduceJoinReducer extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String name = ""; double total = 0.0; int count = 0; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("tnxn")) { count++; total += Float.parseFloat(parts[1]); } else if (parts[0].equals("cust")) { name = parts[1]; } } String str = String.format("%d %f", count, total); context.write(new Text(name), new Text(str)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Reduce-side join"); job.setJarByClass(ReduceJoin.class); job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class); Path outputPath = new Path(args[2]); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }**运行这个程序**最后,在reduce side join上运行上述MapReduce示例程序的命令 如下:hadoop jar reducejoin.jar ReduceJoin /sample/input/cust_details /sample/input/transaction_details /sample/output# 结论:在排序和reducer 阶段,reduce 侧连接过程会产生巨大的网络I/O 流量,在这个阶段,相同键的值被聚集在一起。因此,如果您有大量具有数百万个值的不同数据集,您很可能会遇到 OutOfMemory 异常,即您的 RAM 已满,因此溢出。在我看来,使用reduce side join的优点是:- 这很容易实现,因为我们利用 MapReduce 框架中的内置排序和改组算法,该算法组合相同键的值并将其发送到同一个减速器。- 在reduce side join 中,您的输入不需要遵循任何严格的格式,因此您也可以对非结构化数据执行连接操作。一般来说,人们更喜欢 Apache Hive,它是 Hadoop 生态系统的一部分,来执行连接操作。因此,如果您来自 SQL 背景,则无需担心编写 MapReduce Java 代码来执行连接操作。您可以使用 Hive 作为替代方案。
总条数:127 到第
上滑加载中