-
9月23日至25日,华为全联接2021在线上正式开幕,在“华为云FusionInsight智能数据湖打造千行百业数据底座”专题演讲中,华为云FusionInsight技术专家陈祥发表了“华为云FusionInsight智能数据湖版本新能力解读”演讲。华为云FusionInsight发布智能数据湖8.1.0新版本,提供湖仓一体的解决方案,兼顾历史与未来。在新版本中:MRS云原生数据湖提供三湖一集市能力,提供数据全链路实时分析,让供数时效从T+1走向T+0;通过IoTDB工业物联网数据库,云边端协同轻松构建时序数据集市;在新版本中,更是提供两地三中心等三种高可用容灾方案,确保关键业务的连续性。DWS云数据仓库推出高性能、高扩展、高可用特点的新一代全场景云数据仓库,一套内核一套架构同时支持标准数仓、实时数仓和云数仓。DGC数据湖治理中心提供一站式数据开发与治理工具,还增强了其一键数据服务和数据分层分级安全功能。华为云FusionInsight深耕大数据10年+,持续创新引领大数据技术发展,把复杂留给自己,把简单留给伙伴,携手800+合作伙伴,服务于全球60+国家和地区3000+政企客户,已广泛应用于政府、金融、运营商、大企业等行业。线上观看链接:https://live.huawei.com/huaweiconnect/meeting/cn/9404.html 。
-
2021年9月23-25日,华为全联接2021以“深耕数字化”为主题,在线上隆重举办。当前,数据已是第5种生产要素,如何发挥数据要素价值已是共同话题。本次将由华为云FusionInsight携手客户和伙伴,率先揭秘FusionInsight 8.1.0版本的实时数据湖、容灾、时序数据库新能力,工商银行讲述携手FusionInsight共建大数据体系;更有中国顶级高校-清华大学,携手顶级大数据厂商-华为云,共同打造全球大数据社区顶级项目-时序数据库IoTDB的创新故事,分享基于IoTDB成果的中国软件创新体系实践经验。数据要素价值和产业生态离不开众多伙伴,本次还邀请到华傲数据、东华博泰、永洪BI各行业伙伴,共同分享各行业的大数据联合解决方案,更多精彩请关注华为云全连接2021官网和FusionInsight专题演讲,地址:https://live.huawei.com/huaweiconnect/meeting/cn/9404.html 。
-
# 华为FusionInsight MRS实战 - 使用FlinkSQL处理数据并使用redis做实时展示 ## 场景说明 【需求】计算最近1小时各个账户交易总金额。 【分析】将账户交易数据接入Kafka中,通过Flink计算过去1小时各个账户的交易总金额,将计算结果写入Redis中。做实时大屏展示。 【实现】通过FlinkSQL的滚动窗口计算 数据流图  ## 操作步骤 - 登录华为FusionInisght MRS Flink WebUI  - 在作业管理选择新建作业创建一个FlinkSQL任务  - 编辑如下Flink SQL语句 ``` CREATE TABLE kafka_source ( account varchar(10), cost int, ts AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'redisdemo', 'properties.bootstrap.servers' = '172.16.9.117:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE SINK STREAM redis_sink( account varchar, costs int, PRIMARY KEY(account) ) WITH ( 'isSafeMode' = 'true', 'clusterAddress' = '172.16.9.117:22404,172.16.9.118:22404,172.16.9.113:22404', 'redistype' = 'String', 'type' = 'Redis', 'isSSLMode' = 'false' ); INSERT INTO redis_sink SELECT account, SUM(cost) FROM kafka_source GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), --为了快算看到计算结果使用10s窗口 account; ``` - 点击语义校验,确保语义校验通过  - 启动该Flink SQL任务  - 使用kafka客户端插入测试数据 ``` {"account": "A1","cost":"11"} {"account": "A1","cost":"22"} {"account": "A2","cost":"33"} {"account": "A3","cost":"44"} ```  注意: 因为flink窗口时间为10秒,并且redis是key value数据库,数据会根据主键覆盖,所以需要在10s内将数据全部输入 - 登录redis客户端查看结果: `redis-cli -c -h 172.16.9.117 -p 22404` 
-
## 背景 Apache Hudi 是目前最流行的实时数据湖解决方案之一,能够使大数据平台支持数据事务性,解决了数据从外部数据源实时入湖的痛点问题。 Apache Flink 作为目前最流行的流计算框架,在流式计算场景有天然的优势,当前,Flink 社区也在积极拥抱 Hudi 社区 Hudi 和 Fink 在 0.8.0 版本做了大量的集成工作[1]。核心的工作包括: - 实现了新的 Flink Streaming Writer - 支持 Flink SQL API - 支持 batch 和 streaming 的模式 Reader 本文介绍如何使用华为自研CDC工具CDL,结合自研的可视化开发平台Flink WebUI, 使用Flink SQL API 的方式实现数据的实时入湖。 数据流向图:  ## 前提条件 根据之前学习材料已掌握 - CDL已完成同步任务的设计,并运行 - 华为Flink WebUI的使用 - Flink SQL解析嵌套Json的方法 ## 操作步骤 - 登录华为FusionInisght MRS Flink WebUI  - 在作业管理选择新建作业创建一个FlinkSQL任务  - 编辑如下Flink SQL语句 注意check point必须打开  ``` CREATE TABLE huditableout_source( payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT )PARTITIONED BY (sex) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/huditest/huditableout', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uid', 'write.precombine.field' = 'ts', 'compaction.async.enabled' = 'false', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '10' ); insert into huditableout select ts, uid, uname, age, sex, mostlike, lastview, totalcost from huditableout_source; ``` 说明:hudi表创建时将 read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据, read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 10s, table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读。 - 点击语义校验,确保语义校验通过  - 启动该Flink SQL任务  - 在源MySQL库插入数据 ``` insert into hudi.hudisource values (1,"韦浩",52,"女","加减法","漫画",23142); insert into hudi.hudisource values (2,"汪烨霖",67,"男","手游","联盟",27211); insert into hudi.hudisource values (3,"谭昊焱",26,"女","查询网","全集观看",6280); insert into hudi.hudisource values (4,"顾明",34,"男","地图","模板",16771); insert into hudi.hudisource values (5,"白修杰",61,"女","快递单号","手游",24577); insert into hudi.hudisource values (6,"郑雨泽",50,"男","板","植物",5988); insert into hudi.hudisource values (7,"戴鹤轩",37,"女","网站大全","模板",30925); insert into hudi.hudisource values (8,"邹鸿煊",69,"男","官网","阅读网",14020); insert into hudi.hudisource values (9,"沈烨霖",67,"女","客户端","图",34704); insert into hudi.hudisource values (10,"丁彬",55,"男","百度","网站大全",30885); ``` - 检查hdfs路径是否有文件写入  - 参考如下命令在客户端将hdfs对应路径的hudi表写入hive `run_hive_sync_tool.sh --partitioned-by sex --base-path hdfs://hacluster//tmp/huditest/huditableout/ --table huditableout --partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor --support-timestamp `  - 登录hive客户端检查hive表  - 下面检查数据新增以及更新的同步情况 在源MySQL库插入数据 insert into hudi.hudisource values (11,"蒋语堂",38,"女","图","播放器",28732);  对应hive表同步情况  在源MySQL库对数据进行更改: UPDATE hudi.hudisource SET uname='Anne Marie333' WHERE uid=11;  对应hive表同步情况  引用链接 [1] 集成工作: https://issues.apache.org/jira/browse/HUDI-1521
-
# 华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践 ## 背景说明 随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。 SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。 本文介绍如何使用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而处理复杂的嵌套Json格式 ## Json内容 下面以cdl新增数据的json为例 ``` { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"DATA_STORE" }, { "type":"string", "optional":false, "field":"SEG_OWNER" }, { "type":"string", "optional":false, "field":"TABLE_NAME" }, { "type":"int64", "optional":false, "name":"org.apache.kafka.connect.data.Timestamp", "version":1, "field":"TIMESTAMP" }, { "type":"string", "optional":false, "field":"OPERATION" }, { "type":"string", "optional":true, "field":"LOB_COLUMNS" }, { "type":"struct", "fields":[ { "type":"array", "items":{ "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"name" }, { "type":"string", "optional":true, "field":"value" } ], "optional":false }, "optional":false, "field":"properties" } ], "optional":false, "name":"transaction", "field":"transaction" }, { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"uid" } ], "optional":true, "name":"unique", "field":"unique" }, { "type":"struct", "fields":[ { "type":"int64", "optional":false, "field":"uid" }, { "type":"string", "optional":true, "default":"", "field":"uname" }, { "type":"int64", "optional":true, "field":"age" }, { "type":"string", "optional":true, "field":"sex" }, { "type":"string", "optional":true, "field":"mostlike" }, { "type":"string", "optional":true, "field":"lastview" }, { "type":"int64", "optional":true, "field":"totalcost" } ], "optional":true, "name":"data", "field":"data" }, { "type":"struct", "fields":[ ], "optional":true, "name":"EMPTY", "field":"before" }, { "type":"string", "optional":true, "field":"HEARTBEAT_IDENTIFIER" } ], "optional":false, "name":"hudi.hudisource" }, "payload":{ "DATA_STORE":"MYSQL", "SEG_OWNER":"hudi", "TABLE_NAME":"hudisource", "TIMESTAMP":1631070742000, "OPERATION":"INSERT", "LOB_COLUMNS":"", "transaction":{ "properties":[ { "name":"file", "value":"mysql-bin.000005" }, { "name":"pos", "value":"32307" }, { "name":"gtid", "value":"" } ] }, "unique":{ "uid":11 }, "data":{ "uid":11, "uname":"蒋语堂", "age":38, "sex":"女", "mostlike":"图", "lastview":"播放器", "totalcost":28732 }, "before":null, "HEARTBEAT_IDENTIFIER":"998d66cc-1405-40e2-bbdc-41f2adf40724" } } ``` 上面的数据信息为复杂的json嵌套结构,包含了 Map、Array、Row 等类型, 对于这样的复杂格式需要有一种高效的方式进行解析,下面介绍如何实现。 ## 华为FusionInsight MRS Flink WebUI介绍 Flink WebUI提供基于Web的可视化开发平台,用户只需要编写SQL即可开发作业,极大降低作业开发门槛。同时通过作业平台能力开放,支持业务人员自行编写SQL开发作业来快速应对需求,大大减少Flink作业开发工作量。 Flink WebUI主要有以下特点: - 企业级可视化运维:运维管理界面化、作业监控、作业开发Flink SQL标准化等。 - 快速建立集群连接:通过集群连接功能配置访问一个集群,需要客户端配置、用户认证密钥文件。 - 快速建立数据连接:通过数据连接功能配置访问一个组件。创建“数据连接类型”为“HDFS”类型时需创建集群连接,其他数据连接类型的“认证类型”为“KERBEROS”需创建集群连接,“认证类型”为“SIMPLE”不需创建集群连接。 - 可视化开发平台:支持自定义输入/输出映射表,满足不同输入来源、不同输出目标端的需求。 - 图形化作业管理:简单易用。 下面介绍如何使用Flink WebUI开发FlinkSQL DDL语句解析出有效信息 ### 操作步骤 - 登录华为FusionInisght MRS Flink WebUI  - 在作业管理选择新建作业创建一个FlinkSQL任务  - 编辑Flink SQL语句 SQL说明:创建两张kafka流表,起作用为从kafka源端读取cdl对应topic,解析出需要的字段。并将结果写入另外一个kafka topic 1. Json 中的每个 {} 都需要用 Row 类型来表示 2. Json 中的每个 [] 都需要用 Arrary 类型来表示 3. 数组的下标是从 1 开始的不是 0 如下面 SQL 中的 \`schema\`.\`fields\`[1].type 4. 关键字在任何地方都需要加反引号 如上面 SQL 中的 \`type\` 5. select 语句中的字段类型和顺序一定要和结果表的字段类型和顺序保持一致 6. 可使用flink函数比如LOCALTIMESTAMP为获取flink系统时间  ``` CREATE TABLE huditableout_source( `schema` ROW `fields` ARRAY ROW> >, payload ROW `TIMESTAMP` BIGINT, `data` ROW uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source; ``` - 点击语义校验,确保语义校验通过  - 启动该Flink SQL任务  - 检查结果 源端kafka 数据  目标端kafka 数据 
-
# 华为FusionInsight MRS CDL使用指南 ## 说明 CDL是一种简单、高效的数据实时集成服务,能够从各种OLTP数据库中抓取Data Change事件,然后推送至Kafka中,最后由Sink Connector消费Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。 CDL服务包含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取任务的实例,CDLService是负责管理和创建任务的实例。 本此实践介绍以mysql作为数据源进行数据抓取 ## 前提条件 - MRS集群已安装CDL服务。 - MySQL数据库需要开启mysql的bin log功能(默认情况下是开启的)。 查看MySQL是否开启bin log: 使用工具或者命令行连接MySQL数据库(本示例使用navicat工具连接),执行show variables like 'log_%'命令查看。 例如在navicat工具选择"File > New Query"新建查询,输入如下SQL命令,单击"Run"在结果中"log_bin"显示为"ON"则表示开启成功。 `show variables like 'log_%'`  ## 工具准备 现在cdl只能使用rest api的方式进行命令提交,所以需要提前安装工具进行调试。本文使用VSCode工具。  完成之后安装rest client插件:  完成之后创建一个cdl.http的文件进行编辑:  ## 创建CDL任务 CDL任务创建的流程图如下所示:  说明:需要先创建一个MySQL link, 在创建一个Kafka link, 然后再创建一个CDL同步任务并启动。 MySQL link部分rest请求代码 ``` @hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal = "172.16.9.113:21005" @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### get links get https://{{host}}/api/v1/cdl/link ### mysql link validate post https://{{host}}/api/v1/cdl/link?validate=true content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重复 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的类型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] } } ### mysql link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重复 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的类型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] } } ### mysql link update put https://{{host}}/api/v1/cdl/link/MySQL_link content-type: application/json { "name": "MySQL_link", //link名,全局唯一,不能重复 "description":"MySQL connection", //link描述 "link-type":"mysql", //link的类型 "enabled":"true", "link-config-values": { "inputs": [ { "name": "host", "value": {{mysql_host}} }, //数据库安装节点的ip { "name": "port", "value": {{mysql_port}} },//数据库监听的端口 { "name": "database.name", "value": {{mysql_database}} }, //连接的数据库名 { "name": "user", "value": {{mysql_user}} }, //用户 { "name": "password","value": {{mysql_password}} } ,//密码 { "name":"schema", "value": {{mysql_database}}}//同数据库名 ] } } ``` Kafka link部分rest请求代码 ``` ### get links get https://{{host}}/api/v1/cdl/link ### kafka link validate post https://{{host}}/api/v1/cdl/link?validate=true content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] } } ### kafka link create post https://{{host}}/api/v1/cdl/link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] } } ### kafka link update put https://{{host}}/api/v1/cdl/link/kafka_link content-type: application/json { "name": "kafka_link", "description":"test kafka link", "link-type":"kafka", "enabled":"true", "link-config-values": { "inputs": [ { "name": "bootstrap.servers", "value": "172.16.9.113:21007" }, { "name": "sasl.kerberos.service.name", "value": "kafka" }, { "name": "security.protocol","value": "SASL_PLAINTEXT" }//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] } } ``` CDL任务命令部分rest请求代码 ``` @hostname = 172.16.9.113 @port = 21495 @host = {{hostname}}:{{port}} @bootstrap = "172.16.9.113:21007" @bootstrap_normal = "172.16.9.113:21005" @mysql_host = "172.16.2.118" @mysql_port = "3306" @mysql_database = "hudi" @mysql_user = "root" @mysql_password = "Huawei@123" ### create job post https://{{host}}/api/v1/cdl/job content-type: application/json { "job_type": "CDL_JOB", //job类型,目前只支持CDL_JOB这一种 "name": "mysql_to_kafka", //job名称 "description":"mysql_to_kafka", //job描述 "from-link-name": "MySQL_link", //数据源Link "to-link-name": "kafka_link", //目标源Link "from-config-values": { "inputs": [ {"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"}, {"name" : "schema", "value" : "hudi"}, {"name" : "db.name.alias", "value" : "hudi"}, {"name" : "whitelist", "value" : "hudisource"}, {"name" : "tables", "value" : "hudisource"}, {"name" : "tasks.max", "value" : "10"}, {"name" : "mode", "value" : "insert,update,delete"}, {"name" : "parse.dml.data", "value" : "true"}, {"name" : "schema.auto.creation", "value" : "false"}, {"name" : "errors.tolerance", "value" : "all"}, {"name" : "multiple.topic.partitions.enable", "value" : "false"}, {"name" : "topic.table.mapping", "value" : "[ {\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"} ]" }, {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//安全模式为SASL_PLAINTEXT,普通模式为PLAINTEXT ] }, "to-config-values": {"inputs": []}, "job-config-values": { "inputs": [ {"name" : "global.topic", "value" : "demo"} ] } } ### get all job get https://{{host}}/api/v1/cdl/job ### submit job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start ### get job status get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka ### stop job put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop ### delete job DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka ``` ## 场景验证 生产库MySQL原始数据如下:  提交CDL任务之后  增加操作: insert into hudi.hudisource values (11,"蒋语堂",38,"女","图","播放器",28732); 对应kafka消息体:  更改操作: UPDATE hudi.hudisource SET uname='Anne Marie333' WHERE uid=11; 对应kafka消息体:  删除操作:delete from hudi.hudisource where uid=11; 对应kafka消息体: 
-
【功能模块】关于kafka节点部署台数规划【操作步骤&问题现象】1、某局点kafka broker节点部署了4个,在开会讨论中,客户领导说kafka节点必须保持奇数,否则有台broker会浪费,而且leader选举时也会有问题2、但实际情况4台broker节点都进行了存储和使用,并没有出现客户说的问题,而且产品文档中也说明了,broker节点最少三台,并没有说必须保持奇数。只是zookeeper需要保持奇数。客户想让说明,问什么开源的需要保持奇数,但FusionInsight HD 没有这个要求,麻烦大佬帮忙解释下。【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
1.华为云MRS二次开发介绍二次开发赋能视频汇总:https://bbs.huaweicloud.com/forum/thread-90936-1-1.html学习材料《FusionInsight MRS二次开发样例.pdf》2.华为云MRS技术对接介绍FusionInsight MRS生态地图:https://fusioninsight.github.io/ecosystem/zh-hans/学习材料:《FusionInsight MRS技术生态介绍v3.pdf》3.华为云MRS运维调优介绍运维汇总FusionInsight MRS运维HDFS/Hive问题定位解决https://bbs.huaweicloud.com/videos/103220FusionInsight MRS运维HBase/Spark问题定位解决https://bbs.huaweicloud.com/videos/103222FusionInsight MRS运维ES问题定位解决https://bbs.huaweicloud.com/videos/103221调优汇总FusionInsight MRS Hive调优https://bbs.huaweicloud.com/videos/103825FusionInsight MRS Spark调优https://bbs.huaweicloud.com/videos/103830FusionInsight MRS HBase调优https://bbs.huaweicloud.com/videos/103824FusionInsight MRS ES调优https://bbs.huaweicloud.com/videos/103822FusionInsight MRS Kafka调优https://bbs.huaweicloud.com/videos/103827FusionInsight MRS Solr调优https://bbs.huaweicloud.com/videos/103829
-
1. 华为云原生数据湖MRS基线方案介绍学习材料《FusionInsight MRS云原生数据湖基线方案--离线数据湖.pdf》《FusionInsight MRS云原生数据湖基线方案--实时数据湖.pdf》《FusionInsight MRS云原生数据湖基线方案--逻辑数据湖.pdf》《FusionInsight MRS云原生数据湖基线方案--专题集市.pdf》2. FusionInsight MRS Hudi最佳实践视频介绍参考博文Hudi最佳实践材料材料链接华为MRS基于Hudi和HetuEngine构建实时数据湖最佳实践https://bbs.huaweicloud.com/blogs/290858华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStreamer工具最佳实践https://bbs.huaweicloud.com/blogs/2893153. FusionInisght MRS CDL最佳实践CDL最佳实践材料材料链接华为FusionInsight MRS CDL使用指南https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=152937华为FusionInsight MRS CDL最新版本使用指南https://bbs.huaweicloud.com/forum/thread-167340-1-1.html华为MRS CDL最新版本使用指南 - hudi实时入湖实战https://bbs.huaweicloud.com/forum/thread-167671-1-1.html4. FusionInisght MRS Flink最佳实践Flink最佳实践材料材料链接华为FusionInsight MRS Flink客户端配置https://bbs.huaweicloud.com/forum/thread-175741-1-1.html华为FusionInsight MRS Flink SQL-Client客户端配置https://bbs.huaweicloud.com/forum/thread-176103-1-1.html华为FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=153494华为FusionInsight MRS实战 - 使用CDL, FlinkSQL以及Hudi实时入湖https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=153823华为FusionInsight MRS实战 - 使用FlinkSQL处理数据并使用redis做实时展示https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=154299华为FusionInsight MRS实战 - Flink增强特性之可视化开发平台FlinkSever开发学习https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=161992华为FusionInsight MRS实战 - FlinkSQL从kafka写入hivehttps://bbs.huaweicloud.com/forum/thread-173950-1-1.html华为FusionInsight MRS实战 - 使用Flink SQL-Client连接hivehttps://bbs.huaweicloud.com/forum/thread-176154-1-1.html华为FusionInsight MRS实战 - Flink CDC特性学习https://bbs.huaweicloud.com/forum/thread-176113-1-1.html5. FusionInsight MRS HetuEngine介绍及最佳实践HetuEngine专场学习直播回放:https://www.huaweicloud.com/about/live/HetuEngine.html视频介绍HetuEngine动手实践材料材料链接HetuEngine学习1-创建HBase数据源并且构建样例表 https://bbs.huaweicloud.com/forum/thread-147626-1-1.htmlHetuEngine学习2-创建hive样例数据并且和hbase做跨源融合分析https://bbs.huaweicloud.com/forum/thread-147719-1-1.htmlHetuEngine学习3-创建dws数据源并和hive做跨仓融合分析https://bbs.huaweicloud.com/forum/thread-147732-1-1.htmlHetuEngine学习4-Jmeter压测工具使用之HetuEngine压力测试https://bbs.huaweicloud.com/forum/thread-141244-1-1.html6. FusionInsight MRS ClickHouse动手实践视频介绍ClickHouse动手实践材料材料链接MRS Clickhouse 学习01-如何创建复制表以及分布式表并导入数据https://bbs.huaweicloud.com/forum/thread-148243-1-1.html7. FusionInsight MRS Manager Rest接口学习华为FusionInsight MRS实战 - Manager rest接口基础学习https://bbs.huaweicloud.com/forum/thread-175716-1-1.html华为FusionInsight MRS实战 - Manager rest接口进阶学习https://bbs.huaweicloud.com/forum/thread-175718-1-1.html8. 常见问题答疑openlookeng官网:https://openlookeng.io/
-
【功能模块】【操作步骤&问题现象】 500节点的HD 集群,各种GC参数设置多大合适?主要涉及Namenode、DataNode、MetaStore、Spark2x等等【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
1.华为云原生数据湖MRS关键技术介绍材料:《华为云Stack 8.0.3 FusionInsight MRS云原生数据湖 技术主打.pdf》课程视频: 2.优势poc用例最佳实践2.1. MRS多租户介绍及操作实践多租户特性介绍:多租户实操材料材料连接MRS多租户学习1-资源共享和抢占https://bbs.huaweicloud.com/forum/thread-147066-1-1.htmlMRS多租户学习2-用户权重配置以及资源抢占https://bbs.huaweicloud.com/forum/thread-147420-1-1.htmlMRS多租户学习3-资源池配置及使用https://bbs.huaweicloud.com/forum/thread-147441-1-1.html2.2 MRS TPC-DS 测试工具操作实践MRS TPC-DS 测试工具学习材料:https://bbs.huaweicloud.com/forum/forum.php?mod=viewthread&tid=146814&page=1&authorid=&replytype=&extra=#pid1286986 2.3 MRS Ranger介绍及操作实践Ranger特性学习材料:Ranger动手操作视频MRS ranger操作视频 ElasticSearchhttps://v.qq.com/x/page/m3259j8lzqe.htmlMRS Ranger操作视频 HetuEnginehttps://v.qq.com/x/page/s3259i43cs7.htmlMRS Ranger操作视频 Hivehttps://v.qq.com/x/page/g3259fwpihf.htmlMRS Ranger操作视频 Kafkahttps://v.qq.com/x/page/g3259dahl8q.htmlMRS Ranger操作视频 Spark2xhttps://v.qq.com/x/page/h3259de5aul.htmlMRS Ranger操作视频 Yarnhttps://v.qq.com/x/page/j3259jt9q0m.htmlMRS Ranger操作视频 HDFShttps://v.qq.com/x/page/c3261lhxugz.html3.常见问题答疑安装问题答疑:安装问题论坛帖:https://bbs.huaweicloud.com/forum/thread-146731-1-1.html https://bbs.huaweicloud.com/forum/thread-146999-1-1.html HetuEngine相比Hive查询加速问题HetuEngine使用如下特点保证计算查询的快速1. MPP架构2. 计算下推3. 预先启动4. 资源自己管理(spark是交给yarn管理)5. 动态过滤6. 小表广播
-
1.华为云原生数据湖MRS关键技术介绍2.优势poc用例最佳实践2.1. MRS多租户介绍及操作实践2.2 MRS TPC-DS 测试工具操作实践2.3 MRS Ranger介绍及操作实践
-
【功能模块】 FusionInsight MRS 安装安装报错。在192上操作时报错【操作步骤&问题现象】也可以在192上具体查看下。对外浮动IP om_float_ip = 可以随意填吗?还是要填写真是的? 这三台机器上有设置浮动IP吗?还是操作时自行设置?对外时钟NTP服务器 可为空吗?PPT和文档上的内容很多,能不能针对这个3台服务器,整理一个可以具体操作的安装步骤文档?这样更方便上级操作。【截图信息】【日志信息】(可选,上传日志内容或者附件)
-
## 准备条件 1.集群已经安装完成并正常运行。 2.已经在集群Manager平台上为测试任务添加了一个人机用户,属组为hive、hadoop,supergroup,主组为supergroup。假设用户名为developuser,用户认证成功。 ``` cd /opt/hadoopclient/ source bigdata_env kinit developuser ``` 3.集群中Hive服务及依赖服务正常。 4.Yarn上提交任务的资源配置参数可以根据实际环境情况做调整。 修改Yarn配置: yarn.nodemanager.resource.cpu-vcores 可分配给container的CPU核数。 yarn.nodemanager.resource.memory-mb 表示该节点上YARN可使用的物理内存总量,默认为8192,单位MB。建议配置成节点物理内存总量的75%-90%。若该节点有其他业务的常驻进程,请降低此参数值给该进程预留足够运行资源。 客户端软件已经正确安装到客户端节点上。 ## 开始使用 1.获取mrs-test-demo.zip测试工具,请登录support.huawei.com直接搜索包名,注意对应版本,这里选用3.1.1版本。将工具包解压,选择hive_tpcds_tools_performence,上传到安装客户端的服务器/opt目录下。工具具体路径为 /opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence 2.修改权限(注意权限修改切勿将整个opt目录下的权限修改掉) ``` cd /opt/mrs-test-demo chmod -R 770 * ``` 3.准备tpcds造数工具,登录路径/opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence/tpcds-gen使用idea工具打开该路径的源码,编译工程,得到tpcds-gen-1.0-SNAPSHOT.jar, 存放路径为/opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence/tpcds-gen/target。 并且检查该路径下是否有lib目录包,里面为其他的依赖工具。 4.进入客户端安装目录,初始化环境变量。 ``` source /opt/hadoopclient/bigdata_env kinit developuser ``` 5.执行造数据脚本,造数据过程中需要等待的具体时间由当前环境的配置决定(数据量可以自行调整根据环境需求,本次执行2G。)。 ``` cd /opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence sh tpcds-setup-hive.sh 2 orc /opt/hadoopclient/ developuser 321@iewauH /tmp/hivedata ```  6.查询hdfs上生成的数据及需要测试的Hive表数。 ``` hdfs dfs -du -h /user/hive/warehouse/tpcds_bin_partitioned_orc_2.db/ ```  7.进入到sqlAll目录下将需要的sql复制到sample-queries-tpcds目录下,如备注中的基线指标sql。 ``` cd /opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence/sqlAll cp query41.sql query43.sql ../sample-queries-tpcds ``` 8.在/opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence目录下执行运行脚本。 ``` ./tpcds-run-hive.sh 2 orc /opt/hadoopclient/ developuser 321@iewauH ```  9.时间统计脚本统计时间sql_time.sh,脚本内容如下 ``` #!/bin/bash BASE_DIR=$1 num=0.0 for logfile in ${BASE_DIR}/*.log do result=`grep -Rns "selected (" $logfile | tail -1 | grep -Eo '[(](.*)[) seconds]' | grep -Eo '[0-9]+[.]*[0-9]+'` if [ -z $result ]; then continue; fi echo "****${logfile##*/} ${result}s****" num=$(echo "$num + $result"|bc) done echo "total cost time:${num}s" ``` 使用如下启动脚本 `sh sql_time.sh /opt/mrs-test-demo/basic-pack/hive_tpcds_tools_performence/log/tpcds_bin_partitioned_orc_2/querylog` 
-
下载最新的认证文件后依旧认证失败,修过过服务器时间后提示时钟未同步,求解决方案。
上滑加载中
推荐直播
-
华为开发者空间玩转DeepSeek
2025/03/13 周四 19:00-20:30
马欣 山东商业职业技术学院云计算专业讲师,山东大学、山东建筑大学等多所本科学校学生校外指导老师
同学们,想知道如何利用华为开发者空间部署自己的DeepSeek模型吗?想了解如何用DeepSeek在云主机上探索好玩的应用吗?想探讨如何利用DeepSeek在自己的专有云主机上辅助编程吗?让我们来一场云和AI的盛宴。
即将直播 -
华为云Metastudio×DeepSeek与RAG检索优化分享
2025/03/14 周五 16:00-17:30
大海 华为云学堂技术讲师 Cocl 华为云学堂技术讲师
本次直播将带来DeepSeek数字人解决方案,以及如何使用Embedding与Rerank实现检索优化实践,为开发者与企业提供参考,助力场景落地。
去报名
热门标签