-
客户目前的程序是基于flink1.13开发的,版本都是适配1.13的,如果迁移到DLI ,flink是1.15的,那么程序依赖connector是否能正常使用呢?
-
请问DLI Spark 作业支持读写华为云GeminiDB Cassandra 数据源吗?
-
为啥datatool的flink 写入表中会多出[""]
-
DLI解耦计算和存储负载为什么能提高性能?总所周知,网络传输数据延迟很大,远不如数据在单机上要来的快。那为啥分离存储和技术在不同主机,反而能提高性能呢
-
1.资产轨迹Flink脚本中的时间加8小时逻辑描述2.为什么要加8小时,以致于轨迹时间不是当前时间,而是加8小时后的时间
-
需要将dws数据源A中的多张表join以后得结果集,写入另一个dws数据源中,有什么好办法?不想在源dws数据源中创建很多表,想将多表join后查询结果直接写入另一个dws数据源中
-
对于集成后在贴原层的数据,含有加密的数据,在有密钥的情况下如何进行解密,平台支持吗?求助大佬~
-
[问题求助] MRS 3.1.2-LTS.3有Kerberos认证,flink在停止任务时候报zk keeperErrorCode = Session closed because client failed to authenticate for停止任务命令:flink stop d173e065c36b4385f2d386fb6e480274 -p obs://ddos/AIAE/savepoint/tianmen/ -yid application_1676631574356_0854报错截图 keeperErrorCode = Session closed because client failed to authenticate for /flink_base/flink zk截图
-
kafka消息正文-- key:`yd_test`.`test`, -- value:{ -- "mysqlType":{"name":"char","id":"int","age":"int"}, -- "id":606, -- "es":1662693580000, -- "ts":1662693580897, -- "database":"yd_test", -- "table":"test", -- "type":"INSERT", -- "isDdl":false, -- "sql":"", -- "sqlType":{"name":1,"id":4,"age":4}, -- "data":[{"name":"yd","id":"4","age":"95"}], -- "old":null, -- "pkNames":null -- }需要指定data的id字段为upsert-kafka源表的主键,怎么指定呢?经测试,我这样指定不对!--kafka源表 CREATE table test2( data ARRAY>, PRIMARY KEY (id) ) with ( "connector" = "upsert-kafka", "properties.bootstrap.servers" = "",--kafka的内网连接地址和端口 "properties.group.id" = "", "topic" = "",--Kafka的Topic --'key.format' = 'json', --'value.format' = 'json', "scan.startup.mode" = "latest-offset" );
-
IDEA 上的database 连接数据湖
-
云服务公共资源服务开通服务官网云服务社区入门材料赋能&产品文档等DGC大数据领域公共资源:1、大数据福利专场 0元试用 - 数据域主力产品0元试用https://activity.huaweicloud.com/Date-free.html2、微信公众号:智能数据湖微信号:ei-datalake 1、免费注册-[教程]DGC免费实例购买流程2.0https://bbs.huaweicloud.com/forum/thread-193738-1-1.html华为云-数据湖治理中心DGC-服务官网https://www.huaweicloud.com/product/dayu.html云社区 -EI企业智能数据湖治理中心DGChttps://bbs.huaweicloud.com/forum/forum-890-1.html1、快速入门:提供3个入门示例场景https://support.huaweicloud.com/qs-dgc/dgc_04_0021.html2、数据湖治理中心 DGC> 视频:入门准备https://support.huaweicloud.com/dgc_video/index.html1、DGC官方使用帮助文档:DGC的每个功能提供详细指导https://support.huaweicloud.com/dgc/index.html2、DGC 赋能视频:数据湖治理中心(DGC)伙伴赋能课程https://education.huaweicloud.com/courses/course-v1:HuaweiX+CBUCNXE133+Self-paced/about3、华为伙伴暨开发者大会2022数据治理生产线,加速构建企业数据资产视频回看:https://live.huawei.com/HPDC/meeting/cn/10741.htmlMRS1、云原生数据湖MRS集群开通https://support.huaweicloud.com/qs-mrs/mrs_09_0010.html华为云-云原生数据湖MRS-服务官网https://www.huaweicloud.com/product/mrs.html云社区 -云原生数据湖MRShttps://bbs.huaweicloud.com/forum/forum-612-1.html云原生数据湖MRS> 视频:入门介绍、操作&二次开发指导https://support.huaweicloud.com/mrs_video/index.html1、云原生数据湖MRS帮助文档:MRS的每个功能提供详细指导https://support.huaweicloud.com/mrs/index.html2、云原生数据湖MRS最佳实践https://support.huaweicloud.com/bestpractice-mrs/mrs_05_0023.htmlDLI免费注册-[教程]DLI免费实例购买流程2.0https://bbs.huaweicloud.com/forumreview/thread-193899-1-1.html华为云-数据湖探索 DLI-服务官网https://www.huaweicloud.com/product/dli.html云社区 -数据湖探索 DLIhttps://bbs.huaweicloud.com/forum/forum-599-1.html1、快速入门:使用DLI SQL分析OBS数据https://support.huaweicloud.com/bestpractice-dli/dli_05_0044.html2、数据湖探索 DLI> 视频:入门准备https://support.huaweicloud.com/dli_video/index.html1、DLI官方使用帮助文档:DLI的每个功能提供详细指导https://support.huaweicloud.com/wtsnew-dli/index.html2、DLI 赋能视频:数据湖探索(DLI)伙伴赋能课程https://education.huaweicloud.com/courses/course-v1:HuaweiX+CBUCNXE100+Self-paced/about
-
1、 账号注册a) 在免费试用页面 点击跳转购买页,找到数据湖探索DLI b) 点击立即购买c) 输入手机号,验证码及密码,点击注册d) 在新的窗口中勾选阅读并同意,点击开通e) 注册成功2、 实名认证a) 微信扫描上图中的二维码完成实名认证b) 认证成功截图如下: 3、 购买DLI免费实例a) 认证成功后,点击立即购买,进入新页面,按照下图进行购买0元购买“扫描数据量套餐包” b) 点击立即购买按钮,跳转下一页,点击去支付c) 在新页面的折扣中选择“数据湖探索服务DLI 0折”,并确认付款d) 支付成功,步骤结束。 PS:点击下方的返回数据湖探索控制台,根据 快速入门 指引进行初次的DLI探索,使用DLI默认的 default 队列进行海量数据分析计算,真正按照执行的单条SQL扫描数据量计费(可从本次购买的扫描量套餐包中抵扣),不使用不花钱。Welcome to DLI !
-
本文主要分析并介绍传统实时数据湖解决方案、华为云DLI实时数据湖解决方案和相关的客户案例。云服务介绍:DLI Flink:是完全兼容Flink开源生态,提供基于Flink 1.12开源深度优化的流处理服务DWS:兼容PostgreSQL/Oracle生态的分析型数据库服务RDS:提供基于MySQL/PostgreSQL/SQL Server的在线关系型云数据库服务CSS: 提供托管的分布式搜索能力,兼容Elasticsearch、Kibana、Cerebro等软件 1. 传统实时数据湖解决方案采用 Debezium 订阅 MySQL 的 Binlog 传输到 Kafka,后端是由计算程序从 Kafka 里消费,最后将数据写入到其他存储。Kafka 消息队列做消峰、解耦,经Flink进行流式ETL后,再将数据写入到目的端。目的端可以是各种DB,数据湖,实时数仓和离线数仓。 但是该方案也有明显缺陷:部署难度大:方案中数据采集链路长,额外增加了组件部署、运维成本,技术门槛较高,无法做到可视化操作。数据重复:云原生Debezium可能存在消息重复。数据一致性:由于Kafka中接收的消息存在重复,导致Flink sink端在保证数据的一致性上存在难度。 2. 华为云现有实时数据湖解决方案与业界传统的实时数据湖解决方案相比,华为云现有实时数据湖解决方案采用 DRS 数据复制服务订阅 MySQL 的 Binlog 传输到 分布式消息服务DMS,再由DLI Flink从 DMS 里消费,最后将数据写入到其他存储。同时,支持的源端较丰富,DLI Flink除了支持开源存储引擎外,同时对于华为云云服务等进行了深度优化,能够更好的支撑客户实时数据分析和同步。 方案优势:操作可视化:全程可视化操作,上手门槛低支持源端丰富: DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。低时延:在金融、科技等一系列对数据处理实时性方面有非常高要求的行业,至关重要数据一致性:DLI Flink结合DMS确保数据最终一致性。 3. Flink CDC介绍在 DLI Flink 1.12 版本中引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors。 4. 华为云DLI实时数据湖解决方案方案优势:流程简便:省去了传统CDC,采集数据库binlog到Kafka的过程,将采集、计算都在DLI flink中完成。全程可视化操作,上手门槛低,集成进入DGC作业调度,与Spark离线、openLooKeen交互式作业数据复用、元数据统一降低成本、减少数据冗余:与传统华为云实时大数据方案相比,数据库与DLI flink链路中间,可以省去DRS数据复制服务于DMS kafka分布式消息服务的成本,并减少了数据冗余。缩短时延:在金融、科技等一系列对数据处理实时性方面有非常高要求的行业,至关重要 5. 实时日志分析解决方案介绍一句话应用场景:在复杂业务场景下,存在海量繁杂的日志信息,需要拥有能够实时更新的强大全文信息检测能力。 CSS云搜索服务,是一个基于Elasticsearch且完全托管的在线分布式搜索服务。能为用户提供结构化、非结构化文本的多条件检索、统计、BI报表功能。5.1 实时日志分析解决方案方案优势:无需Flink本身提供CDC变更数据捕获功能。历史、变更数据统一输出到DMS/Kafka中,DLI Flink再订阅DMS/Kafka进行实时消费。此外,DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。 5.2 进阶CDC实时日志分析解决方案方案优势:Flink 支持读取CDC源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。链路简洁、时延降低、节省总体成本。 6. 实时数仓解决方案介绍一句话应用场景:该方案旨在为客户分析师团队提供可实时追加更新数据的秒级增强型简单ETL+实时BI分析的场景化解决方案。 DWS数据仓库服务是一种基于华为云基础架构和平台的在线数据处理数据库,能够为各行业提供有竞争力的PB级海量大数据分析能力。6.1 实时数仓解决方案方案优势:无需Flink本身提供CDC变更数据捕获功能。历史、变更数据统一输出到DMS/Kafka中,DLI Flink再订阅DMS/Kafka进行实时消费。此外,DRS支持对应的数据库种类繁多,包括Mysql、PostgreSQL、Oracle、GaussDB、MongoDB等。 6.2 进阶CDC实时数仓解决方案方案优势:Flink 支持读取CDC源表,会先读取数据库的历史全量数据,并平滑切换到Binlog读取上,保证数据的完整读取。链路简洁、时延降低、节省总体成本。 7. 当前客户实时解决方案介绍目前点触科技、爱库存、能链、富米等多家NA客户均采用该实时解决方案方案价值:免运维:Serverless云服务,无需客户运维,聚焦业务开发,降低人力投入成本,优化基础IT设施易开发:全链路可视化开发,DLI Flink提供简单易用的Flink SQL开发和调试能力,轻松掌握实时数据湖统一引擎:DLI Flink统一提供引擎运维、升级能力,全兼容开源Flink能力 客户诉求:运维难度高:自建Kafka、Flink等自运维难度高,需要投入额外人力进行组件升级维护开发效率低:开源Flink不具备可视化开发界面,作业开发依赖命令行+jar,管理复杂实时引擎混乱:自建实时引擎版本较多,存在历史遗留问题,多版本管理和升级都比较复杂维表管理混乱:维表存放位置较多,没有很好的统一,导致数据源连接多,维护复杂
-
操作场景DLI支持用户编写代码创建Spark作业来创建数据库、创建DLI表或OBS表和插入表数据等操作。本示例完整的演示通过编写java代码、使用Spark作业创建数据库、创建表和插入表数据的详细操作,帮助您在DLI上进行作业开发。约束限制不支持的场景:在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。不支持创建加密的DLI表,即不支持创建DLI表时设置encryption=true。例如,如下建表语句不支持:CREATE TABLE tb1(id int) using parquet options(encryption=true)支持的场景在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。环境准备在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。表1 Spark Jar作业开发环境准备项说明操作系统Windows系统,支持Windows7以上版本。安装JDKJDK使用1.8版本。安装和配置IntelliJ IDEAIntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。安装Maven开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。开发流程DLI进行Spark作业访问DLI元数据开发流程参考如下:图1 Spark作业访问DLI元数据开发流程表2 开发流程说明序号阶段操作界面说明1创建DLI通用队列DLI控制台创建作业运行的DLI队列。2OBS桶文件配置OBS控制台如果是创建OBS表,则需要上传文件数据到OBS桶下。配置Spark创建表的元数据信息“spark.sql.warehouse.dir”的存储路径。3新建Maven工程,配置pom文件IntelliJ IDEA参考样例代码说明,编写程序代码创建DLI表或OBS表。4编写程序代码5调试,编译代码并导出Jar包6上传Jar包到OBS和DLIOBS控制台将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。7创建Spark Jar作业DLI控制台在DLI控制台创建Spark Jar作业并提交运行作业。8查看作业运行结果DLI控制台查看作业运行状态和作业运行日志。步骤1:创建DLI通用队列第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。在DLI管理控制台的左侧导航栏中,选择“队列管理”。单击“队列管理”页面右上角“购买队列”进行创建队列。创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。单击“立即购买”,确认配置。配置确认无误,单击“提交”完成队列创建。步骤2:OBS桶文件配置如果需要创建OBS表,则需要先上传数据到OBS桶目录下。本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。12,Michael 27,Andy 30,Justin进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。步骤3:新建Maven工程,配置pom依赖以下通过IntelliJ IDEA 2020.2工具操作演示。打开IntelliJ IDEA,选择“File > New > Project”。图2 新建Project选择Maven,Project SDK选择1.8,单击“Next”。定义样例工程名和配置样例工程存储路径,单击“Finish”完成工程创建。如上图所示,本示例创建Maven工程名为:SparkJarMetadata,Maven工程路径为:“D:\DLITest\SparkJarMetadata”。在pom.xml文件中添加如下配置。<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> </dependencies>图3 修改pom.xml文件在工程路径的“src > main > java”文件夹上鼠标右键,选择“New > Package”,新建Package和类文件。Package根据需要定义,本示例定义为:“com.huawei.dli.demo”,完成后回车。在包路径下新建Java Class文件,本示例定义为:DliCatalogTest。步骤4:编写代码编写DliCatalogTest程序创建数据库、DLI表和OBS表。完整的样例请参考Java样例代码,样例代码分段说明如下:导入依赖的包。import org.apache.spark.sql.SparkSession;创建SparkSession会话。创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.CarbonInternalExtensions"+","+"org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate();创建数据库。如下样例代码演示,创建名为test_sparkapp的数据库。spark.sql("create database if not exists test_sparkapp").collect();创建DLI表并插入测试数据。spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect();创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect();关闭SparkSession会话spark。spark.stop();步骤5:调试、编译代码并导出Jar包单击IntelliJ IDEA工具右侧的“Maven”,参考下图分别单击“clean”、“compile”对代码进行编译。编译成功后,单击“package”对代码进行打包。打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。步骤6:上传Jar包到OBS和DLI下登录OBS控制台,将生成的“SparkJarMetadata-1.0-SNAPSHOT.jar”Jar包文件上传到OBS路径下。将Jar包文件上传到DLI的程序包管理中,方便后续统一管理。登录DLI管理控制台,单击“数据管理 > 程序包管理”。在“程序包管理”页面,单击右上角的“创建”创建程序包。在“创建程序包”对话框,配置以下参数。包类型:选择“JAR”。OBS路径:程序包所在的OBS路径。分组设置和组名称根据情况选择设置,方便后续识别和管理程序包。图4 创建程序包单击“确定”,完成创建程序包。步骤7:创建Spark Jar作业登录DLI控制台,单击“作业管理 > Spark作业”。在“Spark作业”管理界面,单击“创建作业”。在作业创建界面,配置对应作业运行参数。具体说明如下:表3 Spark Jar作业参数填写参数名参数值所属队列选择已创建的DLI通用队列。例如当前选择步骤1:创建DLI通用队列创建的通用队列“sparktest”。作业名称(--name)自定义Spark Jar作业运行的名称。当前定义为:SparkTestMeta。应用程序选择步骤6:上传Jar包到OBS和DLI下中上传到DLI程序包。例如当前选择为:“SparkJarObs-1.0-SNAPSHOT.jar”。主类格式为:程序包名+类名。例如当前为:com.huawei.dli.demo.DliCatalogTest。Spark参数(--conf)spark.dli.metaAccess.enable=truespark.sql.warehouse.dir=obs://dli-test-obs01/warehousepath说明:spark.sql.warehouse.dir参数的OBS路径为步骤2:OBS桶文件配置中配置创建。访问元数据选择:是其他参数保持默认值即可。图5 创建Spark Jar作业单击“执行”,提交该Spark Jar作业。在Spark作业管理界面显示已提交的作业运行状态。查看作业运行结果在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。图6 查看创建的数据库双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。图7 查看表双击DLI表名dli_testtable,单击“执行”查询DLI表数据。图8 查询DLI表数据注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。图9 查询OBS表数据如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。图10 查看Driver日志原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。后续指引如果您想通过Spark Jar作业访问其他数据源,请参考《使用Spark作业跨源访问数据源》。创建DLI表的语法请参考创建DLI表,创建OBS表的语法请参考创建OBS表。如果是通过API接口调用提交该作业请参考以下操作说明:调用创建批处理作业接口,参考以下请求参数说明。详细的API参数说明请参考《数据湖探索API参考》>《创建批处理作业》。将请求参数中的“catalog_name”参数设置为“dli”。conf 中需要增加"spark.dli.metaAccess.enable":"true"。如果需要执行DDL,则还要在conf中配置"spark.sql.warehouse.dir": "obs://bucket/warehousepath"。完整的API请求参数可以参考如下示例说明。{ "queue":"citest", "file":"SparkJarMetadata-1.0-SNAPSHOT.jar", "className":"DliCatalogTest", "conf":{"spark.sql.warehouse.dir": "obs://bucket/warehousepath", "spark.dli.metaAccess.enable":"true"}, "sc_type":"A", "executorCores":1, "numExecutors":6, "executorMemory":"4G", "driverCores":2, "driverMemory":"7G", "catalog_name": "dli" }Java样例代码本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下:package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.CarbonInternalExtensions"+","+"org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }scala样例代码scala样例代码object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions", Seq("org.apache.spark.sql.CarbonInternalExtensions", "org.apache.spark.sql.DliSparkExtension").mkString(",")) .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }Python样例代码Python样例代码#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions",','.join(["org.apache.spark.sql.CarbonInternalExtensions","org.apache.spark.sql.DliSparkExtension"])) \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
上滑加载中
推荐直播
-
华为AI技术发展与挑战:集成需求分析的实战指南
2024/11/26 周二 18:20-20:20
Alex 华为云学堂技术讲师
本期直播将综合讨论华为AI技术的发展现状,技术挑战,并深入探讨华为AI应用开发过程中的需求分析过程,从理论到实践帮助开发者快速掌握华为AI应用集成需求的框架和方法。
去报名 -
华为云DataArts+DWS助力企业数据治理一站式解决方案及应用实践
2024/11/27 周三 16:30-18:00
Walter.chi 华为云数据治理DTSE技术布道师
想知道数据治理项目中,数据主题域如何合理划分?数据标准及主数据标准如何制定?数仓分层模型如何合理规划?华为云DataArts+DWS助力企业数据治理项目一站式解决方案和应用实践告诉您答案!本期将从数据趋势、数据治理方案、数据治理规划及落地,案例分享四个方面来助力企业数据治理项目合理咨询规划及顺利实施。
去报名
热门标签