• [问题求助] DLI解耦计算和存储负载为什么能提高性能?
    DLI解耦计算和存储负载为什么能提高性能?总所周知,网络传输数据延迟很大,远不如数据在单机上要来的快。那为啥分离存储和技术在不同主机,反而能提高性能呢
  • [问题求助] opcDA协议数据是否可以接入数据湖中?
    opcDA协议数据是否可以接入数据湖中?
  • [问题求助] 【香港启德项目】资产轨迹Flink脚本中的时间加8小时逻辑描述
    1.资产轨迹Flink脚本中的时间加8小时逻辑描述2.为什么要加8小时,以致于轨迹时间不是当前时间,而是加8小时后的时间
  • [问题求助] 如何将一个dws sql节点中sql块的查询结果集写入另一个dws sql节点,两个节点的数据源链接不一样
    需要将dws数据源A中的多张表join以后得结果集,写入另一个dws数据源中,有什么好办法?不想在源dws数据源中创建很多表,想将多表join后查询结果直接写入另一个dws数据源中
  • [技术交流] 数据湖DLI对集成后含有加密的数据,有密钥的情况下,如何解密
    对于集成后在贴原层的数据,含有加密的数据,在有密钥的情况下如何进行解密,平台支持吗?求助大佬~
  • [问题求助] MRS 3.1.2-LTS.3有Kerberos认证,flink在停止任务时候报zk keeperErrorCode = Session closed because client failed to authenticate for
    停止任务命令:flink stop d173e065c36b4385f2d386fb6e480274 -p obs://ddos/AIAE/savepoint/tianmen/ -yid application_1676631574356_0854报错截图 keeperErrorCode = Session closed because client failed to authenticate for /flink_base/flink zk截图
  • [问题求助] FLINK SQL 怎么指定嵌套json的主键
    kafka消息正文-- key:`yd_test`.`test`, -- value:{ -- "mysqlType":{"name":"char","id":"int","age":"int"}, -- "id":606, -- "es":1662693580000, -- "ts":1662693580897, -- "database":"yd_test", -- "table":"test", -- "type":"INSERT", -- "isDdl":false, -- "sql":"", -- "sqlType":{"name":1,"id":4,"age":4}, -- "data":[{"name":"yd","id":"4","age":"95"}], -- "old":null, -- "pkNames":null -- }需要指定data的id字段为upsert-kafka源表的主键,怎么指定呢?经测试,我这样指定不对!--kafka源表 CREATE table test2(   data ARRAY>,   PRIMARY KEY (id) )    with (     "connector" = "upsert-kafka",      "properties.bootstrap.servers" = "",--kafka的内网连接地址和端口     "properties.group.id" = "",     "topic" = "",--Kafka的Topic     --'key.format' = 'json',     --'value.format' = 'json',     "scan.startup.mode" = "latest-offset" );
  • [问题求助] 数据湖是否可以使用远程工具等连接
    IDEA 上的database 连接数据湖
  • [指导教程] 资源汇总:DLI等大数据核心服务-官网&开通&学习材料 -V1.0
    云服务公共资源服务开通服务官网云服务社区入门材料赋能&产品文档等DGC大数据领域公共资源:1、大数据福利专场 0元试用 - 数据域主力产品0元试用https://activity.huaweicloud.com/Date-free.html2、微信公众号:智能数据湖微信号:ei-datalake    1、免费注册-[教程]DGC免费实例购买流程2.0https://bbs.huaweicloud.com/forum/thread-193738-1-1.html华为云-数据湖治理中心DGC-服务官网https://www.huaweicloud.com/product/dayu.html云社区 -EI企业智能数据湖治理中心DGChttps://bbs.huaweicloud.com/forum/forum-890-1.html1、快速入门:提供3个入门示例场景https://support.huaweicloud.com/qs-dgc/dgc_04_0021.html2、数据湖治理中心 DGC> 视频:入门准备https://support.huaweicloud.com/dgc_video/index.html1、DGC官方使用帮助文档:DGC的每个功能提供详细指导https://support.huaweicloud.com/dgc/index.html2、DGC 赋能视频:数据湖治理中心(DGC)伙伴赋能课程https://education.huaweicloud.com/courses/course-v1:HuaweiX+CBUCNXE133+Self-paced/about3、华为伙伴暨开发者大会2022数据治理生产线,加速构建企业数据资产视频回看:https://live.huawei.com/HPDC/meeting/cn/10741.htmlMRS1、云原生数据湖MRS集群开通https://support.huaweicloud.com/qs-mrs/mrs_09_0010.html华为云-云原生数据湖MRS-服务官网https://www.huaweicloud.com/product/mrs.html云社区 -云原生数据湖MRShttps://bbs.huaweicloud.com/forum/forum-612-1.html云原生数据湖MRS> 视频:入门介绍、操作&二次开发指导https://support.huaweicloud.com/mrs_video/index.html1、云原生数据湖MRS帮助文档:MRS的每个功能提供详细指导https://support.huaweicloud.com/mrs/index.html2、云原生数据湖MRS最佳实践https://support.huaweicloud.com/bestpractice-mrs/mrs_05_0023.htmlDLI免费注册-[教程]DLI免费实例购买流程2.0https://bbs.huaweicloud.com/forumreview/thread-193899-1-1.html华为云-数据湖探索 DLI-服务官网https://www.huaweicloud.com/product/dli.html云社区 -数据湖探索 DLIhttps://bbs.huaweicloud.com/forum/forum-599-1.html1、快速入门:使用DLI SQL分析OBS数据https://support.huaweicloud.com/bestpractice-dli/dli_05_0044.html2、数据湖探索 DLI> 视频:入门准备https://support.huaweicloud.com/dli_video/index.html1、DLI官方使用帮助文档:DLI的每个功能提供详细指导https://support.huaweicloud.com/wtsnew-dli/index.html2、DLI 赋能视频:数据湖探索(DLI)伙伴赋能课程https://education.huaweicloud.com/courses/course-v1:HuaweiX+CBUCNXE100+Self-paced/about
  • [技术干货] 数据湖探索(DLI)免费实例购买流程2.0
    ​1、 账号注册a) 在免费试用页面 点击跳转购买页,找到数据湖探索DLI​ ​​​b) 点击立即购买c) 输入手机号,验证码及密码,点击注册​d) 在新的窗口中勾选阅读并同意,点击开通​e) 注册成功​2、 实名认证a) 微信扫描上图中的二维码完成实名认证​​b) 认证成功截图如下: ​3、 购买DLI免费实例a) 认证成功后,点击立即购买,进入新页面,按照下图进行购买0元购买“扫描数据量套餐包”​ ​​​​​b) 点击立即购买按钮,跳转下一页,点击去支付c) 在新页面的折扣中选择“数据湖探索服务DLI  0折”,并确认付款d) 支付成功,步骤结束。 PS:点击下方的返回数据湖探索控制台,根据 快速入门 指引进行初次的DLI探索,使用DLI默认的 default 队列进行海量数据分析计算,真正按照执行的单条SQL扫描数据量计费(可从本次购买的扫描量套餐包中抵扣),不使用不花钱。Welcome to DLI !​
  • [解决方案] 【解决方案】华为云DLI实时数据湖解决方案介绍
    本文主要分析并介绍传统实时数据湖解决方案、华为云DLI实时数据湖解决方案和相关的客户案例。云服务介绍:DLI Flink:是完全兼容Flink开源生态,提供基于Flink 1.12开源深度优化的流处理服务DWS:兼容PostgreSQL/Oracle生态的分析型数据库服务RDS:提供基于MySQL/PostgreSQL/SQL Server的在线关系型云数据库服务CSS: 提供托管的分布式搜索能力,兼容Elasticsearch、Kibana、Cerebro等软件 1.   传统实时数据湖解决方案采用 Debezium 订阅 MySQL 的 Binlog  传输到 Kafka,后端是由计算程序从 Kafka 里消费,最后将数据写入到其他存储。Kafka 消息队列做消峰、解耦,经Flink进行流式ETL后,再将数据写入到目的端。目的端可以是各种DB,数据湖,实时数仓和离线数仓。 但是该方案也有明显缺陷:部署难度大:方案中数据采集链路长,额外增加了组件部署、运维成本,技术门槛较高,无法做到可视化操作。数据重复:云原生Debezium可能存在消息重复。数据一致性:由于Kafka中接收的消息存在重复,导致Flink sink端在保证数据的一致性上存在难度。 2.   华为云现有实时数据湖解决方案与业界传统的实时数据湖解决方案相比,华为云现有实时数据湖解决方案采用 DRS 数据复制服务订阅 MySQL 的 Binlog  传输到 分布式消息服务DMS,再由DLI Flink从 DMS 里消费,最后将数据写入到其他存储。同时,支持的源端较丰富,DLI Flink除了支持开源存储引擎外,同时对于华为云云服务等进行了深度优化,能够更好的支撑客户实时数据分析和同步。 方案优势:操作可视化:全程可视化操作,上手门槛低支持源端丰富: DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。低时延:在金融、科技等一系列对数据处理实时性方面有非常高要求的行业,至关重要数据一致性:DLI Flink结合DMS确保数据最终一致性。 3.   Flink CDC介绍在 DLI Flink 1.12 版本中引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors。 4.   华为云DLI实时数据湖解决方案方案优势:流程简便:省去了传统CDC,采集数据库binlog到Kafka的过程,将采集、计算都在DLI flink中完成。全程可视化操作,上手门槛低,集成进入DGC作业调度,与Spark离线、openLooKeen交互式作业数据复用、元数据统一降低成本、减少数据冗余:与传统华为云实时大数据方案相比,数据库与DLI flink链路中间,可以省去DRS数据复制服务于DMS kafka分布式消息服务的成本,并减少了数据冗余。缩短时延:在金融、科技等一系列对数据处理实时性方面有非常高要求的行业,至关重要 5.   实时日志分析解决方案介绍一句话应用场景:在复杂业务场景下,存在海量繁杂的日志信息,需要拥有能够实时更新的强大全文信息检测能力。 CSS云搜索服务,是一个基于Elasticsearch且完全托管的在线分布式搜索服务。能为用户提供结构化、非结构化文本的多条件检索、统计、BI报表功能。5.1 实时日志分析解决方案方案优势:无需Flink本身提供CDC变更数据捕获功能。历史、变更数据统一输出到DMS/Kafka中,DLI Flink再订阅DMS/Kafka进行实时消费。此外,DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。 5.2 进阶CDC实时日志分析解决方案方案优势:Flink 支持读取CDC源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。链路简洁、时延降低、节省总体成本。 6.   实时数仓解决方案介绍一句话应用场景:该方案旨在为客户分析师团队提供可实时追加更新数据的秒级增强型简单ETL+实时BI分析的场景化解决方案。 DWS数据仓库服务是一种基于华为云基础架构和平台的在线数据处理数据库,能够为各行业提供有竞争力的PB级海量大数据分析能力。6.1 实时数仓解决方案方案优势:无需Flink本身提供CDC变更数据捕获功能。历史、变更数据统一输出到DMS/Kafka中,DLI Flink再订阅DMS/Kafka进行实时消费。此外,DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。 6.2 进阶CDC实时数仓解决方案方案优势:Flink 支持读取CDC源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。链路简洁、时延降低、节省总体成本。 7.   当前客户实时解决方案介绍目前点触科技、爱库存、能链、富米等多家NA客户均采用该实时解决方案方案价值:免运维:Serverless云服务,无需客户运维,聚焦业务开发,降低人力投入成本,优化基础IT设施易开发:全链路可视化开发,DLI Flink提供简单易用的Flink SQL开发和调试能力,轻松掌握实时数据湖统一引擎:DLI Flink统一提供引擎运维、升级能力,全兼容开源Flink能力 客户诉求:运维难度高:自建Kafka、Flink等自运维难度高,需要投入额外人力进行组件升级维护开发效率低:开源Flink不具备可视化开发界面,作业开发依赖命令行+jar,管理复杂实时引擎混乱:自建实时引擎版本较多,存在历史遗留问题,多版本管理和升级都比较复杂维表管理混乱:维表存放位置较多,没有很好的统一,导致数据源连接多,维护复杂 
  • [最佳实践] 使用IntelliJ IDEA Java创建和使用DLI表
    操作场景DLI支持用户编写代码创建Spark作业来创建数据库、创建DLI表或OBS表和插入表数据等操作。本示例完整的演示通过编写java代码、使用Spark作业创建数据库、创建表和插入表数据的详细操作,帮助您在DLI上进行作业开发。约束限制不支持的场景:在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。不支持创建加密的DLI表,即不支持创建DLI表时设置encryption=true。例如,如下建表语句不支持:CREATE TABLE tb1(id int) using parquet options(encryption=true)支持的场景在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。环境准备在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。表1 Spark Jar作业开发环境准备项说明操作系统Windows系统,支持Windows7以上版本。安装JDKJDK使用1.8版本。安装和配置IntelliJ IDEAIntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。安装Maven开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。开发流程DLI进行Spark作业访问DLI元数据开发流程参考如下:图1 Spark作业访问DLI元数据开发流程表2 开发流程说明序号阶段操作界面说明1创建DLI通用队列DLI控制台创建作业运行的DLI队列。2OBS桶文件配置OBS控制台如果是创建OBS表,则需要上传文件数据到OBS桶下。配置Spark创建表的元数据信息“spark.sql.warehouse.dir”的存储路径。3新建Maven工程,配置pom文件IntelliJ IDEA参考样例代码说明,编写程序代码创建DLI表或OBS表。4编写程序代码5调试,编译代码并导出Jar包6上传Jar包到OBS和DLIOBS控制台将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。7创建Spark Jar作业DLI控制台在DLI控制台创建Spark Jar作业并提交运行作业。8查看作业运行结果DLI控制台查看作业运行状态和作业运行日志。步骤1:创建DLI通用队列第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。在DLI管理控制台的左侧导航栏中,选择“队列管理”。单击“队列管理”页面右上角“购买队列”进行创建队列。创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。单击“立即购买”,确认配置。配置确认无误,单击“提交”完成队列创建。步骤2:OBS桶文件配置如果需要创建OBS表,则需要先上传数据到OBS桶目录下。本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。12,Michael 27,Andy 30,Justin进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。步骤3:新建Maven工程,配置pom依赖以下通过IntelliJ IDEA 2020.2工具操作演示。打开IntelliJ IDEA,选择“File > New > Project”。图2 新建Project选择Maven,Project SDK选择1.8,单击“Next”。定义样例工程名和配置样例工程存储路径,单击“Finish”完成工程创建。如上图所示,本示例创建Maven工程名为:SparkJarMetadata,Maven工程路径为:“D:\DLITest\SparkJarMetadata”。在pom.xml文件中添加如下配置。<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> </dependencies>图3 修改pom.xml文件在工程路径的“src > main > java”文件夹上鼠标右键,选择“New > Package”,新建Package和类文件。Package根据需要定义,本示例定义为:“com.huawei.dli.demo”,完成后回车。在包路径下新建Java Class文件,本示例定义为:DliCatalogTest。步骤4:编写代码编写DliCatalogTest程序创建数据库、DLI表和OBS表。完整的样例请参考Java样例代码,样例代码分段说明如下:导入依赖的包。import org.apache.spark.sql.SparkSession;创建SparkSession会话。创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.CarbonInternalExtensions"+","+"org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate();创建数据库。如下样例代码演示,创建名为test_sparkapp的数据库。spark.sql("create database if not exists test_sparkapp").collect();创建DLI表并插入测试数据。spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect();创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect();关闭SparkSession会话spark。spark.stop();步骤5:调试、编译代码并导出Jar包单击IntelliJ IDEA工具右侧的“Maven”,参考下图分别单击“clean”、“compile”对代码进行编译。编译成功后,单击“package”对代码进行打包。打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。步骤6:上传Jar包到OBS和DLI下登录OBS控制台,将生成的“SparkJarMetadata-1.0-SNAPSHOT.jar”Jar包文件上传到OBS路径下。将Jar包文件上传到DLI的程序包管理中,方便后续统一管理。登录DLI管理控制台,单击“数据管理 > 程序包管理”。在“程序包管理”页面,单击右上角的“创建”创建程序包。在“创建程序包”对话框,配置以下参数。包类型:选择“JAR”。OBS路径:程序包所在的OBS路径。分组设置和组名称根据情况选择设置,方便后续识别和管理程序包。图4 创建程序包单击“确定”,完成创建程序包。步骤7:创建Spark Jar作业登录DLI控制台,单击“作业管理 > Spark作业”。在“Spark作业”管理界面,单击“创建作业”。在作业创建界面,配置对应作业运行参数。具体说明如下:表3 Spark Jar作业参数填写参数名参数值所属队列选择已创建的DLI通用队列。例如当前选择步骤1:创建DLI通用队列创建的通用队列“sparktest”。作业名称(--name)自定义Spark Jar作业运行的名称。当前定义为:SparkTestMeta。应用程序选择步骤6:上传Jar包到OBS和DLI下中上传到DLI程序包。例如当前选择为:“SparkJarObs-1.0-SNAPSHOT.jar”。主类格式为:程序包名+类名。例如当前为:com.huawei.dli.demo.DliCatalogTest。Spark参数(--conf)spark.dli.metaAccess.enable=truespark.sql.warehouse.dir=obs://dli-test-obs01/warehousepath说明:spark.sql.warehouse.dir参数的OBS路径为步骤2:OBS桶文件配置中配置创建。访问元数据选择:是其他参数保持默认值即可。图5 创建Spark Jar作业单击“执行”,提交该Spark Jar作业。在Spark作业管理界面显示已提交的作业运行状态。查看作业运行结果在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。图6 查看创建的数据库双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。图7 查看表双击DLI表名dli_testtable,单击“执行”查询DLI表数据。图8 查询DLI表数据注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。图9 查询OBS表数据如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。图10 查看Driver日志原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。后续指引如果您想通过Spark Jar作业访问其他数据源,请参考《使用Spark作业跨源访问数据源》。创建DLI表的语法请参考创建DLI表,创建OBS表的语法请参考创建OBS表。如果是通过API接口调用提交该作业请参考以下操作说明:调用创建批处理作业接口,参考以下请求参数说明。详细的API参数说明请参考《数据湖探索API参考》>《创建批处理作业》。将请求参数中的“catalog_name”参数设置为“dli”。conf 中需要增加"spark.dli.metaAccess.enable":"true"。如果需要执行DDL,则还要在conf中配置"spark.sql.warehouse.dir": "obs://bucket/warehousepath"。完整的API请求参数可以参考如下示例说明。{ "queue":"citest", "file":"SparkJarMetadata-1.0-SNAPSHOT.jar", "className":"DliCatalogTest", "conf":{"spark.sql.warehouse.dir": "obs://bucket/warehousepath", "spark.dli.metaAccess.enable":"true"}, "sc_type":"A", "executorCores":1, "numExecutors":6, "executorMemory":"4G", "driverCores":2, "driverMemory":"7G", "catalog_name": "dli" }Java样例代码本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下:package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.CarbonInternalExtensions"+","+"org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }scala样例代码scala样例代码object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions", Seq("org.apache.spark.sql.CarbonInternalExtensions", "org.apache.spark.sql.DliSparkExtension").mkString(",")) .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }Python样例代码Python样例代码#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions",','.join(["org.apache.spark.sql.CarbonInternalExtensions","org.apache.spark.sql.DliSparkExtension"])) \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • [最佳实践] 使用DLI分析OBS数据
    DLI支持将数据存储到OBS上,后续再通过创建OBS表即可对OBS上的数据进行分析和处理。本指导中的操作内容包括:创建OBS表、导入OBS表数据、插入和查询OBS表数据等内容来帮助您更好的在DLI上对OBS表数据进行处理。前提条件已创建OBS的桶。具体OBS操作可以参考《对象存储服务控制台指南》。本指导中的OBS桶名都为“dli-test-021”。已创建DLI的SQL队列。创建队列详细介绍请参考创建队列。注意:创建队列时,队列类型必须要选择为:SQL队列。前期准备创建DLI数据库登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列。在SQL编辑器中输入以下语句创建数据库“testdb”。详细的DLI创建数据库的语法可以参考创建DLI数据库。create database testdb;后续章节操作都需要在testdb数据库下进行操作。DataSource和Hive两种语法创建OBS表的区别两种语法创建OBS表主要差异点参见表1。表1 DataSource语法和Hive语法创建OBS表的差异点语法支持的数据类型范围创建分区表时分区字段差异支持的分区数DataSource语法支持orc,parquet,json,csv,carbon,avro类型创建分区表时,分区字段在表名和PARTITIONED BY后都需要指定。具体可以参考DataSource语法创建单分区OBS表。单表分区数最多允许7000个。Hive语法支持TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET, CARBON创建分区表时,指定的分区字段不能出现在表后,只能通过PARTITIONED BY指定分区字段名和类型。具体可以参考Hive语法创建OBS分区表。单表分区数最多允许100000个。创建OBS表的DataSource语法可以参考使用DataSource语法创建OBS表。创建OBS表的Hive语法可以参考使用Hive语法创建OBS表。使用DataSource语法创建OBS表以下通过创建CSV格式的OBS表举例,创建其他数据格式的OBS表方法类似,此处不一一列举。创建OBS非分区表指定OBS数据文件,创建csv格式的OBS表。按照以下文件内容创建“test.csv”文件,并将“test.csv”文件上传到OBS桶“dli-test-021”的根目录下。Jordon,88,23 Kim,87,25 Henry,76,26登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”,执行以下命令创建OBS表。CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/test.csv");注意:如果是通过指定的数据文件创建的OBS表,后续不支持在DLI通过insert表操作插入数据。OBS文件内容和表数据保持同步。查询已创建的“testcsvdatasource”表数据。select * from testcsvdatasource;查询结果显示如下本地修改原始的OBS表文件“test.csv”,增加一行“Aarn,98,20”数据,重新替换OBS桶目录下的“test.csv”文件。Jordon,88,23 Kim,87,25 Henry,76,26 Aarn,98,20在DLI的SQL编辑器中再次查询“testcsvdatasource”表数据,DLI上可以查询到新增的“Aarn,98,20”数据。select * from testcsvdatasource;指定OBS数据文件目录,创建csv格式的OBS表。指定的OBS数据目录不包含数据文件。在OBS桶“dli-test-021”根目录下创建数据文件目录“data”。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata2source”。CREATE TABLE testcsvdata2source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data");通过insert语句插入表数据。insert into testcsvdata2source VALUES('Aarn','98','20');insert作业运行成功后,查询OBS表“testcsvdata2source”数据。select * from testcsvdata2source;在OBS桶的“obs://dli-test-021/data”目录下刷新后查询,生成了csv数据文件,文件内容为insert插入的数据内容。指定的OBS数据目录包含数据文件。在OBS桶“dli-test-021”根目录下创建数据文件目录“data2”。创建如下内容的测试数据文件“test.csv”,并上传文件到“obs://dli-test-021/data2”目录下。Jordon,88,23 Kim,87,25 Henry,76,26登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata3source”。CREATE TABLE testcsvdata3source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data2");通过insert语句插入表数据。insert into testcsvdata3source VALUES('Aarn','98','20');insert作业运行成功后,查询OBS表“testcsvdata3source”数据。select * from testcsvdata3source;在OBS桶的“obs://dli-test-021/data2”目录下刷新后查询,生成了一个csv数据文件,内容为insert插入的表数据内容。创建OBS分区表创建单分区OBS表在OBS桶“dli-test-021”根目录下创建数据文件目录“data3”。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建以“classNo”列为分区的OBS分区表“testcsvdata4source”,指定OBS目录“obs://dli-test-021/data3”。CREATE TABLE testcsvdata4source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data3") PARTITIONED BY (classNo);在OBS桶的“obs://dli-test-021/data3”目录下创建“classNo=25”的分区目录。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data3/classNo=25”目录下。Jordon,88,25 Kim,87,25 Henry,76,25在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata4source ”。ALTER TABLE testcsvdata4source ADD PARTITION (classNo = 25) LOCATION 'obs://dli-test-021/data3/classNo=25';查询OBS表“testcsvdata4source ”classNo分区为“25”的数据:select * from testcsvdata4source where classNo = 25;插入如下数据到OBS表“testcsvdata4source ”:insert into testcsvdata4source VALUES('Aarn','98','25'); insert into testcsvdata4source VALUES('Adam','68','24');查询OBS表“testcsvdata4source ”classNo分区为“25”和“24”的数据。注意:分区表在进行查询时where条件中必须携带分区字段,否则会查询失败,报:DLI.0005: There should be at least one partition pruning predicate on partitioned table。select * from testcsvdata4source where classNo = 25;select * from testcsvdata4source where classNo = 24;在OBS桶的“obs://dli-test-021/data3”目录下点击刷新,该目录下生成了对应的分区文件,分别存放新插入的表数据。图1 OBS上classNo分区为“25”文件数据图2 OBS上classNo分区为“24”文件数据创建多分区OBS表在OBS桶“dli-test-021”根目录下创建数据文件目录“data4”。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在“testdb”数据库下创建以“classNo”和“dt”列为分区的OBS分区表“testcsvdata5source”,指定OBS目录“obs://dli-test-021/data4”。CREATE TABLE testcsvdata5source (name STRING, score DOUBLE, classNo INT, dt varchar(16)) USING csv OPTIONS (path "obs://dli-test-021/data4") PARTITIONED BY (classNo,dt);给 testcsvdata5source表插入如下测试数据:insert into testcsvdata5source VALUES('Aarn','98','25','2021-07-27'); insert into testcsvdata5source VALUES('Adam','68','25','2021-07-28');根据classNo分区列查询testcsvdata5source数据。select * from testcsvdata5source where classNo = 25;根据dt分区列查询testcsvdata5source数据。select * from testcsvdata5source where dt like '2021-07%';在OBS桶“obs://dli-test-021/data4”目录下刷新后查询,会生成如下数据文件:文件目录1:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-27文件目录2:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-28在OBS桶的“obs://dli-test-021/data4”目录下创建“classNo=24”的分区目录,再在“classNo=24”目录下创建子分区目录“dt=2021-07-29”。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data4/classNo=24/dt=2021-07-29”目录下。Jordon,88,24,2021-07-29 Kim,87,24,2021-07-29 Henry,76,24,2021-07-29在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata5source ”。ALTER TABLE testcsvdata5source ADD PARTITION (classNo = 24,dt='2021-07-29') LOCATION 'obs://dli-test-021/data4/classNo=24/dt=2021-07-29';根据classNo分区列查询testcsvdata5source数据。select * from testcsvdata5source where classNo = 24;根据dt分区列查询所有“2021-07”月的所有数据。select * from testcsvdata5source where dt like '2021-07%';使用Hive语法创建OBS表以下通过创建TEXTFILE格式的OBS表举例,创建其他数据格式的OBS表方法类似,此处不一一列举。创建OBS非分区表在OBS桶的“dli-test-021”根目录下创建数据文件目录“data5”。根据以下文件内容创建数据文件“test.txt”并上传到OBS的“obs://dli-test-021/data5”目录下。Jordon,88,23 Kim,87,25 Henry,76,26登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。使用Hive语法创建OBS表,指定OBS文件路径为“obs://dli-test-021/data5/test.txt”,行数据分割符为','。CREATE TABLE hiveobstable (name STRING, score DOUBLE, classNo INT) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data5' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';说明:ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' :表示每行记录通过',' 进行分隔。查询hiveobstable表数据。select * from hiveobstable;插入表数据:insert into hiveobstable VALUES('Aarn','98','25'); insert into hiveobstable VALUES('Adam','68','25');查询表数据:select * from hiveobstable;在OBS桶“obs://dli-test-021/data5”目录下刷新后查询,生成了两个数据文件,分别对应新插入的数据。创建表字段为复杂数据格式的OBS表在OBS桶的“dli-test-021”根目录下创建数据文件目录“data6”。根据以下文件内容创建数据文件“test.txt”并上传到OBS的“obs://dli-test-021/data6”目录下。Jordon,88-22,23:21 Kim,87-22,25:22 Henry,76-22,26:23登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。使用Hive语法创建OBS表,指定OBS文件路径为“obs://dli-test-021/data6”。CREATE TABLE hiveobstable2 (name STRING, hobbies ARRAY<string>, address map<string,string>) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data6' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY '-' MAP KEYS TERMINATED BY ':';说明:ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' :表示每条记录通过',' 进行分隔。COLLECTION ITEMS TERMINATED BY '-':表示第二个字段hobbies是array形式,元素与元素之间通过'-'分隔。MAP KEYS TERMINATED BY ':':表示第三个字段address是k-v形式,每组k-v内部由':'分隔。查询hiveobstable2表数据。select * from hiveobstable2;创建OBS分区表在OBS桶的“dli-test-021”根目录下创建数据文件目录“data7”。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。创建以classNo为分区列的OBS分区表,指定OBS路径“obs://dli-test-021/data7”。CREATE TABLE IF NOT EXISTS hiveobstable3(name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';注意:创建Hive语法的OBS分区表时,分区字段只能通过PARTITIONED BY指定,该分区字段不能出现在表名后的字段列表中。如下就是错误的示例:CREATE TABLE IF NOT EXISTS hiveobstable3(name STRING, score DOUBLE, classNo INT) PARTITIONED BY (classNo) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7';插入表数据:insert into hiveobstable3 VALUES('Aarn','98','25'); insert into hiveobstable3 VALUES('Adam','68','25');查询表数据:select * from hiveobstable3 where classNo = 25;在OBS桶的“obs://dli-test-021/data7”目录下刷新后查询,新生成了分区目录“classno=25”,该分区目录下文件内容为新插入的表数据。在OBS桶的“obs://dli-test-021/data7”目录下,创建分区目录“classno=24”。根据以下文件内容创建文件“test.txt”,并上传该文件到“obs://dli-test-021/data7/classno=24”目录下。Jordon,88,24 Kim,87,24 Henry,76,24在SQL编辑器中执行以下命令,手工导入分区数据到OBS表“hiveobstable3”。ALTER TABLE hiveobstable3 ADD PARTITION (classNo = 24) LOCATION 'obs://dli-test-021/data7/classNo=24';查询表“hiveobstable3”数据。select * from hiveobstable3 where classNo = 24;常见问题问题一:查询OBS分区表报错,报错信息如下:DLI.0005: There should be at least one partition pruning predicate on partitioned table `xxxx`.`xxxx`.;问题根因:查询OBS分区表时没有携带分区字段。解决方案:查询OBS分区表时,where条件中至少包含一个分区字段。问题二:使用DataSource语法指定OBS文件路径创建OBS表,insert数据到OBS表,显示作业运行失败,报:“DLI.0007: The output path is a file, don't support INSERT...SELECT” 错误。问题示例语句参考如下:CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data/test.csv");问题根因:创建OBS表指定的OBS路径为具体文件,导致不能插入数据。例如上述示例中的OBS路径为:"obs://dli-test-021/data/test.csv"。解决方案:使用DataSource语法创建OBS表指定的OBS文件路径改为文件目录即可,后续即可通过insert插入数据。上述示例,建表语句可以修改为:CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data");问题三:使用Hive语法创建OBS分区表时,提示语法格式不对。例如,如下使用Hive语法创建以classNo为分区的OBS表:CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE, classNo INT) PARTITIONED BY (classNo) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7';问题根因:使用Hive语法创建OBS分区表时,分区字段不能出现在表名后的字段列表中,只能定义在PARTITIONED BY后。解决方案:使用Hive语法创建OBS分区表时,分区字段指定在PARTITIONED BY后。例如:CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7';
  • [最佳实践] DLI创建连接各外部数据源的增强型跨源连接
    背景信息DLI在创建运行作业需要连接外部其他数据源,如:DLI连接MRS、RDS、CSS、Kafka、DWS时,需要打通DLI和外部数据源之间的网络。DLI增强型跨源连接,底层采用对等连接的方式打通与目的数据源的vpc网络,通过点对点的方式实现数据互通。创建增强型跨源连接网络不通的问题,可以根据本指导的整体流程和步骤进行排查验证。前提条件已创建DLI队列。创建队列详见创建DLI队列操作指导。注意:队列的计费类型需要为:“包年/包月”或“按需计费”时勾选“专属资源模式”才能创建增强型跨源连接。已创建对应的外部数据源集群。具体对接的外部数据源根据业务自行选择。表1 创建各外部数据源参考服务名参考文档链接RDSRDS MySQL快速入门。DWS创建DWS集群。DMS Kafka创建Kafka实例。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。CSS创建CSS集群。MRS创建MRS集群。注意:绑定跨源的DLI队列网段和其他数据源子网网段不能重合。系统default队列不支持创建跨源连接。整体流程创建增强型跨源连接,整体分为以下操作步骤:图1 增强型跨源连接配置流程步骤1:获取外部数据源的内网IP、端口和安全组各数据源信息获取DMS Kafka在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息。Kafka的基本信息页面,“网络 > 安全组”参数下获取Kafka的安全组。RDS在RDS控制台“实例管理”页面,单击对应实例名称,查看“连接信息”,获取“内网地址”、“虚拟私有云”、“子网”、“数据库端口”和“安全组”信息。CSS在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”、“子网”和“安全组”信息,方便后续操作步骤使用。DWS在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”、“子网”和“安全组”信息,方便后续操作步骤使用。MRS HBase以MRS 3.x版本集群为例。登录MRS管理控制台,单击“集群列表 > 现有集群”,单击对应的集群名称,进入到集群概览页面。在集群概览页面“基本信息”中获取“虚拟私有云”、“子网”和“安全组”。因为在创建连接MRS HBase的作业时,需要用到MRS集群的ZooKeeper实例和端口,则还需要获取MRS集群主机节点信息。参考访问MRS Manager登录MRS Manager,在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”,根据“主机名称”和“业务IP”获取ZooKeeper的主机信息。在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 配置 > 全部配置”,搜索参数“clientPort”,获取“clientPort”的参数值即为ZooKeeper的端口。使用root用户ssh登录任意一个MRS主机节点。具体请参考登录MRS集群节点。执行以下命令获取MRS对应主机节点的hosts信息,复制保存。cat /etc/hosts例如,查询结果参考如下,将内容复制保存,以备后续步骤使用。步骤2:获取DLI队列网段在DLI管理控制台,单击“队列管理”,选择运行作业的队列,单击队列名称旁的按钮,获取队列的网段信息。步骤3:外部数据源的安全组添加放通DLI队列网段的规则进入到对应数据源的管理控制台,参考步骤1:获取外部数据源的内网IP、端口和安全组获取对应数据源的安全组。登录VPC控制台,单击“访问控制 > 安全组”,单击对应的安全组名称,在“入方向规则”中添加放通队列网段的规则。规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填或者填写为步骤1:获取外部数据源的内网IP、端口和安全组获取的数据源的端口,类型:IPV4,源地址为:步骤2:获取DLI队列网段获取的队列网段,单击“确定”完成安全组规则添加。步骤4:创建增强型跨源连接登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。队列:选择DLI的队列。虚拟私有云:选择步骤1:获取外部数据源的内网IP、端口和安全组获取的外部数据源的虚拟私有云。子网:选择步骤1:获取外部数据源的内网IP、端口和安全组获取的外部数据源的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。如果是连接MRS HBase,则还需要添加MRS的主机节点信息,具体步骤如下:在“跨源管理 > 增强型跨源”中,在已创建的增强型跨源连接的“操作”列,单击“更多 > 修改主机信息”。在“主机信息”参数中,将步骤1:获取外部数据源的内网IP、端口和安全组中获取到的MRS HBase主机节点信息拷贝追加进去。图2 修改主机信息单击“确定”完成主机信息添加。步骤5:测试网络连通性单击“队列管理”,选择操作的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据步骤1:获取外部数据源的内网IP、端口和安全组中获取的数据源的IP和端口,地址栏输入“数据源内网IP:数据源端口”,单击“测试”测试DLI到外部数据源网络是否可达。说明:MRS HBase在测试网络连通性的时候,使用:ZooKeeperIP地址:ZooKeeper端口,或者,ZooKeeper的主机信息:ZooKeeper端口。
  • [最佳实践] 迁移DWS数据至DLI
    本文为您介绍如何通过CDM数据同步功能,迁移数据仓库服务DWS数据至DLI。前提条件已创建DLI的SQL队列。创建DLI队列的操作可以参考创建DLI队列。注意:创建DLI队列时队列类型需要选择为“SQL队列”。已创建数据仓库服务DWS集群。具体创建DWS集群的操作可以参考创建DWS集群。本示例创建的DWS集群版本为:8.1.1.205。已创建CDM迁移集群。创建CDM集群的操作可以参考创建CDM集群。说明:如果目标数据源为云下的数据库,则需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP、CDM云上安全组出方向放通云下数据源所在的主机、数据源所在的主机可以访问公网且防火墙规则已开放连接端口。数据源为云上的DWS、MRS等服务时,网络互通需满足如下条件:i. CDM集群与云上服务处于不同区域的情况下,需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP,数据源所在的主机可以访问公网且防火墙规则已开放连接端口。ii. CDM集群与云上服务同区域情况下,同虚拟私有云、同子网、同安全组的不同实例默认网络互通;如果同虚拟私有云但是子网或安全组不同,还需配置路由规则及安全组规则。配置路由规则请参见如何配置路由规则章节,配置安全组规则请参见如何配置安全组规则章节。iii. 此外,您还必须确保该云服务的实例与CDM集群所属的企业项目必须相同,如果不同,需要修改工作空间的企业项目。本示例CDM集群的虚拟私有云、子网以及安全组和DWS集群保持一致。步骤一:数据准备DWS集群上创建数据库和表。参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“gaussdb”:gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -rgaussdb:DWS集群默认数据库。DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。dbadmin:创建集群时设置的默认管理员用户名。-W:默认管理员用户的密码。在命令行窗口输入以下命令创建数据库“testdwsdb”。CREATE DATABASE testdwsdb;执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。\q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令创建表并插入数据。创建表:CREATE TABLE table1(id int, a char(6), b varchar(6),c varchar(6)) ;插入表数据:INSERT INTO table1 VALUES(1,'123','456','789'); INSERT INTO table1 VALUES(2,'abc','efg','hif');查询表数据确认数据插入成功。select * from table1;图1 查询表数据在DLI上创建数据库和表。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列。在编辑器中输入以下语句创建数据库,例如当前创建迁移后的DLI数据库testdb。详细的DLI创建数据库的语法可以参考创建DLI数据库。create database testdb;在“SQL编辑器”中,数据库选择“testdb”,执行以下建表语句创建数据库下的表。详细的DLI建表语法可以参考创建DLI表。create table tabletest(id INT, name1 string, name2 string, name3 string);步骤二:数据迁移配置CDM数据源连接。创建源端DWS数据库的连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“数据仓库服务(DWS)”,单击“下一步”。配置连接DWS的数据源连接参数,具体参数配置如下。表1 DWS数据源配置参数值名称自定义DWS数据源名称。例如当前配置为:source_dws。数据库服务器单击输入框旁边的“选择”按钮,选择当前已创建的DWS集群名称。端口DWS数据库的端口,默认为:8000。数据库名称当前需要迁移的DWS数据库名称。当前示例为DWS集群上创建数据库和表中创建的数据库“testdwsdb”。用户名待连接数据库的用户。该数据库用户需要有数据表的读写权限,以及对元数据的读取权限。本示例使用创建DWS数据库实例的默认管理员用户“dbadmin”。密码对应的DWS数据库用户的密码。图2 CDM配置DWS数据源其他更多参数保持默认即可,如果需要了解更多参数说明,可以参考配置关系数据库连接。单击“保存”完成DWS数据源连接配置。创建目的端DLI数据源的连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“数据湖探索(DLI)”,单击“下一步”。图3 创建DLI数据源连接配置目的端DLI数据源连接。具体参数配置可以参考在CDM上配置DLI连接。图4 创建DLI数据源连接配置完成后,单击“保存”完成DLI数据源配置。创建CDM迁移作业。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在“作业管理”界面,选择“表/文件迁移”,单击“新建作业”。在新建作业界面,配置当前作业配置信息,具体参数参考如下:图5 CDM数据迁移作业配置作业名称:自定义数据迁移的作业名称。例如,当前定义为:test。源端作业配置,具体参考如下:表2 源端作业配置参数名参数值源连接名称选择1.a中已创建的数据源名称。使用SQL语句“使用SQL语句”选择“是”时,您可以在这里输入自定义的SQL语句,CDM将根据该语句导出数据。本示例当前选择为“否”。模式或表空间“使用SQL语句”选择“否”时,显示该参数,表示待抽取数据的模式或表空间名称。单击输入框后面的按钮可进入模式选择界面,用户也可以直接输入模式或表空间名称。本示例因为DWS集群上创建数据库和表中没有创建SCHEMA,则本参数为默认的“public”。如果选择界面没有待选择的模式或表空间,请确认对应连接里的帐号是否有元数据查询的权限。说明:说明:该参数支持配置通配符(*),实现导出以某一前缀开头或者以某一后缀结尾的所有数据库。例如:SCHEMA*表示导出所有以“SCHEMA”开头的数据库。*SCHEMA表示导出所有以“SCHEMA”结尾的数据库。*SCHEMA*表示数据库名称中只要有“SCHEMA”字符串,就全部导出。表名待迁移的DWS数据表名。当前为DWS集群上创建数据库和表中的“table1”表。更多详细参数配置请参考配置关系数据库源端参数。目的端作业参数配置,具体参考如下:表3 目的端作业配置参数名参数值目的连接名称选择已创建的DLI数据源连接。资源队列选择已创建的DLI SQL类型的队列。数据库名称选择DLI下已创建的数据库。当前示例为在DLI上创建数据库和表创建的数据库名,即为“testdb”。表名选择DLI下已创建的表名。当前示例为在DLI上创建数据库和表创建的表名,即为“tabletest”。导入前清空数据选择导入前是否清空目的表的数据。当前示例选择为“否”。如果设置为是,任务启动前会清除目标表中数据。详细的参数配置可以参考:CDM配置DLI目的端参数。单击“下一步”,进入到字段映射界面,CDM会自动匹配源和目的字段。如果字段映射顺序不匹配,可通过拖拽字段调整。如果选择在目的端自动创建类型,这里还需要配置每个类型的字段类型、字段名称。CDM支持迁移过程中转换字段内容,详细请参见字段转换。图6 字段映射单击“下一步”配置任务参数,一般情况下全部保持默认即可。该步骤用户可以配置如下可选功能:作业失败重试:如果作业执行失败,可选择是否自动重试,这里保持默认值“不重试”。作业分组:选择作业所属的分组,默认分组为“DEFAULT”。在CDM“作业管理”界面,支持作业分组显示、按组批量启动作业、按分组导出作业等操作。是否定时执行:如果需要配置作业定时自动执行,请参见配置定时任务。这里保持默认值“否”。抽取并发数:设置同时执行的抽取任务数。这里保持默认值“1”。是否写入脏数据:如果需要将作业执行过程中处理失败的数据、或者被清洗过滤掉的数据写入OBS中,以便后面查看,可通过该参数配置,写入脏数据前需要先配置好OBS连接。这里保持默认值“否”即可,不记录脏数据。单击“保存并运行”,回到作业管理界面,在作业管理界面可查看作业执行进度和结果。图7 迁移作业进度和结果查询步骤三:结果查询CDM迁移作业运行完成后,再登录到DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择在DLI上创建数据库和表中已创建的数据库,执行DLI表查询语句,查询DWS表数据是否已成功迁移到DLI的“tabletest”表中。select * from tabletest;图8 查询DLI表数据
总条数:66 到第
上滑加载中