-
【功能模块】bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005运行测试 例子 com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin运行不通过 【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)--sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei******************************************************************************************<topic> is the kafka topic name<bootstrap.servers> is the ip:port list of brokers******************************************************************************************java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Callback at com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin.main(WriteIntoKafka4SQLJoin.java:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Callback at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 18 more
-
想问一下 华为版 fi 的flink 重点构建的 代码 ,有没有开源GitHub , 有分支开源可以看的代码吗? Flink在当前版本中重点构建如下特性:DataStreamCheckpoint窗口Job Pipeline配置表其他特性继承开源社区,不做增强,具体请参 :https://ci.apache.org/projects/flink/flink-docs-release-1.12/。
-
【功能模块】我们要调 flink sql 例子, huaweicloud-mrs-example有好几个分支,怎么知道用哪个分支?【操作步骤&问题现象】1、 导入了 https://github.com/huaweicloud/huaweicloud-mrs-example/branches 的 mrs-3.0.2Updated last mont 分支 Compar2、 测试 flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.JavaStreamSqlExample ./../tmp/s.jar测试 com.huawei.bigdata.flink.examples.JavaStreamSqlExample的 例子,不通过 【截图信息】【日志信息】(可选,上传日志内容或者附件)bin/flink run --class com.huawei.bigdata.flink.examples.JavaStreamSqlExample <path of StreamSqlExample jar>*********************************************************************************java.lang.NoClassDefFoundError: org/apache/flink/table/factories/StreamTableSourceFactory at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:411) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:143) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:118) at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112) at com.huawei.bigdata.flink.examples.JavaStreamSqlExample.main(JavaStreamSqlExample.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
-
FI 的 flink 1.7版本的 flink sql 推荐使用什么方法, 推荐用哪个工程做StreamSQLExample.,是否直接从 GitHub 官网的 flink 下载 源码, 直接整 StreamSQLExample ???
-
编译flink 源码 打算,运行 StreamSQLExample ,不通过JPS incremental annotation processing is disabled. Compilation results on partial recompilation may be inaccurate. Use build process "jps.track.ap.dependencies" VM flag to enable/disable incremental annotation processing environmentjava: 警告: 源发行版 11 需要目标发行版 11
-
Flink SQL> SELECT 'Hello World';>[ERROR] Could not execute SQL statement. Reason:java.lang.NoSuchMethodError: scala.collection.JavaConversions$.deprecated$u0020seqAsJavaList(Lscala/collection/Seq;)Ljava/util/List;
-
添加了jar包之后,还是报错了 NoSuchFieldError: PYFILES_OPTION/FusionInsight/client/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Exception in thread "main" java.lang.NoSuchFieldError: PYFILES_OPTION at org.apache.flink.table.client.cli.CliOptionsParser.getEmbeddedModeClientOptions(CliOptionsParser.java:156) at org.apache.flink.table.client.cli.CliOptionsParser.<clinit>(CliOptionsParser.java:139) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:195)
-
接着上一个还是 FusionInsight/client/Flink的版本[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar'这个报错是?./bin/sql-client.sh embedded[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in /xxxx/xxxFusionInsightxxx/xxclient/Flink/flink/opt.还要额外放入jar包. 才能跑起来吗?
-
华为版本 fi的 flink 直接运行start-cluster.sh 报错了, 是不是不用直接启动? ./start-cluster.shNo masters file. Please specify masters in 'conf/masters'.默认的路径下载有flink配置 /srv/BigData/data1/FusionInsight/client/Flink/flink/bin ,启动./start-cluster.sh 报错了 No masters file. Please specify masters in 'conf/masters'.求支援!
-
# 视频在本帖二楼 ## 一、Flink简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 ## 二、Flink客户端准备,可参考MRS二次开发(15/27)Flink构造DataStream样例,有详细的视频介绍 **Kerberos认证** 从服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。 修改客户端配置文件/opt/client/Flink/flink/conf/flink-conf.yaml内容: ``` keytab路径。例如:security.kerberos.login.keytab: /opt/testclient/flinkuser.keytab principal名。例如:security.kerberos.login.principal: flinkuser 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下: zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient ``` **Security Cookie认证** 1. 用户需要获取SSL证书,放置到Flink客户端中。 参考 拷贝其中代码生成generate_keystore.sh脚本,放置在Flink客户端的bin目录下。 在客户端目录下执行source bigdata_env。 执行命令“sh generate_keystore.sh ”即可,例如“sh generate_keystore.sh 123456",会在Flink客户端的conf目录下生成flink.keystore,flink.truststore文件。 在Flink客户端同级目录下新建ssl目录,例如“/opt/testclient/Flink/flink/ssl”,将生成的flink.keystore,flink.truststore文件拷贝到ssl目录中。 1. 获取证书后,在Flink客户端的conf目录下配置文件“flink-conf.yaml”中将以下配置项进行赋值。 将配置项“security.ssl.keystore”设置为keystore文件的相对路径,例如“ssl/flink.keystore” 将配置项“security.ssl.truststore”设置为truststore文件的相对路径,例如“ssl/flink.truststore” 将配置项“security.cookie”设置为一串密码,该密码可以是一串随机规则密码,可以取默认 使用Manager明文加密API进行获取密文:curl -k -i -u 用户名:密码 -X POST -HContent-type:application/json -d '{"plainText":""}' '';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。 将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”设置为如上指令获取到的密文。 2. 打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功。 #### 加密传输 配置SSL传输,用户主要在客户端的“flink-conf.yaml”文件中做如下配置: 1. 打开SSL开关和设置SSL加密算法,配置如下: security.ssl.enabled:true akka.ssl.enabled: true blob.service.ssl.enabled: true taskmanager.data.ssl.enabled: true security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 2. 配置keystore或truststore文件路径为相对路径时,Flink Client执行命令的目录需要可以直接访问该相对路径。 在Flink的CLI yarn-session.sh命令中增加“-t”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/yarn-session.sh **-t** ssl/ ” 在Flink run命令中增加“-yt”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/flink run **-yt** ssl/ -ys 3 -yn 3 -m yarn-cluster -c com.huawei.SocketWindowWordCount ../lib/flink-eg-1.0.jar --hostname r3-d3 --port 9000” 3. 配置keystore或truststore文件路径为绝对路径时,需要在Flink Client以及Yarn各个节点的该绝对路径上放置keystore或truststore文件。 执行命令中不需要使用“-t”或“-yt”来传输keystore和truststore文件。 4. 将客户端安装节点的业务ip和manager界面浮动ip追加到jobmanager.web.allow-access-address配置中用“,”隔开 ## 三、样例背景 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: 实时统计总计网购时间超过2个小时的女性网民信息。周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 ## 四、样例调试 前提:Linux环境有安装集群客户端,环境准备,参考第一课 数据规划: Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 1. 创建Topic。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true” 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。 在kafka客户端目录执行如下指令,ZooKeeper集群信息以自己的为准: ``` bin/kafka-topics.sh --create --zookeeper 10.244.231.91:24002,10.244.230.245:24002,10.244.230.229:24002/kafka --partitions 5 --replication-factor 1 --topic topic3 ``` 2. Kerberos认证配置 修改flink客户端flink-conf.yaml如下配置: security.kerberos.login.keytab: /opt/testclient/user.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false 调试步骤: 1. 比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟 2. 检查linux环境的JDK版本为1.8 3. 配置linux环境的/etc/hosts文件 4. 检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息 5. 在IDEA打开样例代码的FlinkKafkaJavaExample目录,检查SDK配置 6. 默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕 7. 在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。 8. 在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。 9. 根据实际情况设置Jar包的名称、类型以及输出路径。 10. 选择“Put into Output Root”。然后单击“Apply”。 11. 在IDEA主页面,选择“Build > Build Artifacts...”。在弹出的菜单中构建我们的jar包 12. 从IDEA项目out目录下的获取到Jar包,拷贝到Flink客户端目录,如“/opt/testclient/Flink/flink”。 13. 将如下包拷到"/opt/testclient/Flink/flink/lib"目录下,若已存在则忽略。 - kafka-clients-2.4.0-hw-ei-302002.jar - flink-connector-kafka_2.11-1.10.0-hw-ei-302002.jar - flink-connector-kafka-base_2.11-1.10.0-hw-ei-302002.jar - flink-dist_2.11-1.10.0-hw-ei-302002.jar 14. 在Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn session命令,启动flink集群。 例如我们使用ssl相对路径指令为“bin/yarn-session.sh **-t** ssl/ -jm 1024 -tm 1024” 15. 在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码,例如: a. 生产数据的执行命令启动程序: ``` bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` b. 消费数据 ``` bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` 16. 使用Flink Web页面查看Flink应用程序运行情况。 ## 五、问题互动渠道 FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html
上滑加载中
推荐直播
-
OpenHarmony应用开发之网络数据请求与数据解析
2025/01/16 周四 19:00-20:30
华为开发者布道师、南京师范大学泰州学院副教授,硕士研究生导师,开放原子教育银牌认证讲师
科技浪潮中,鸿蒙生态强势崛起,OpenHarmony开启智能终端无限可能。当下,其原生应用开发适配潜力巨大,终端设备已广泛融入生活各场景,从家居到办公、穿戴至车载。 现在,机会敲门!我们的直播聚焦OpenHarmony关键的网络数据请求与解析,抛开晦涩理论,用真实案例带你掌握数据访问接口,轻松应对复杂网络请求、精准解析Json与Xml数据。参与直播,为开发鸿蒙App夯实基础,抢占科技新高地,别错过!
回顾中 -
Ascend C高层API设计原理与实现系列
2025/01/17 周五 15:30-17:00
Ascend C 技术专家
以LayerNorm算子开发为例,讲解开箱即用的Ascend C高层API
回顾中
热门标签