• [问题求助] 想问一下 华为版 fi 的flink 重点构建的 代码 DataStream,有没有开源GitHub , 有分支开源可以看的
    想问一下  华为版 fi 的flink  重点构建的 代码 ,有没有开源GitHub , 有分支开源可以看的代码吗? Flink在当前版本中重点构建如下特性:DataStreamCheckpoint窗口Job Pipeline配置表其他特性继承开源社区,不做增强,具体请参 :https://ci.apache.org/projects/flink/flink-docs-release-1.12/。
  • [问题求助] 我们要调 flink sql 例子, huaweicloud-mrs-example有好几个分支,怎么知道用哪个分支?
    【功能模块】我们要调 flink sql 例子, huaweicloud-mrs-example有好几个分支,怎么知道用哪个分支?【操作步骤&问题现象】1、 导入了   https://github.com/huaweicloud/huaweicloud-mrs-example/branches   的   mrs-3.0.2Updated last mont   分支 Compar2、   测试     flink  run  -m  yarn-cluster   --class com.huawei.bigdata.flink.examples.JavaStreamSqlExample   ./../tmp/s.jar测试  com.huawei.bigdata.flink.examples.JavaStreamSqlExample的 例子,不通过  【截图信息】【日志信息】(可选,上传日志内容或者附件)bin/flink run --class com.huawei.bigdata.flink.examples.JavaStreamSqlExample <path of StreamSqlExample jar>*********************************************************************************java.lang.NoClassDefFoundError: org/apache/flink/table/factories/StreamTableSourceFactory        at java.lang.ClassLoader.defineClass1(Native Method)        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)        at java.security.AccessController.doPrivileged(Native Method)        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)        at java.lang.ClassLoader.defineClass1(Native Method)        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)        at java.security.AccessController.doPrivileged(Native Method)        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)        at java.lang.ClassLoader.loadClass(ClassLoader.java:411)        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)        at java.lang.Class.forName0(Native Method)        at java.lang.Class.forName(Class.java:348)        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)        at java.util.Iterator.forEachRemaining(Iterator.java:116)        at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)        at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)        at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)        at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:143)        at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:118)        at org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112)        at com.huawei.bigdata.flink.examples.JavaStreamSqlExample.main(JavaStreamSqlExample.java:32)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430)        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814)        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288)        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051)        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:422)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  • [问题求助] FI 的 flink 1.7版本的 flink sql 推荐使用什么方法, 推荐用哪个工程做StreamSQLExampl
    FI  的 flink 1.7版本的 flink  sql  推荐使用什么方法, 推荐用哪个工程做StreamSQLExample.,是否直接从 GitHub 官网的 flink 下载 源码, 直接整 StreamSQLExample  ???
  • [问题求助] 编译flink 源码 打算,运行 StreamSQLExample ,不通过
    编译flink 源码 打算,运行 StreamSQLExample ,不通过JPS incremental annotation processing is disabled. Compilation results on partial recompilation may be inaccurate. Use build process "jps.track.ap.dependencies" VM flag to enable/disable incremental annotation processing environmentjava: 警告: 源发行版 11 需要目标发行版 11
  • [问题求助] Flink SQL&gt; SELECT &apos;Hello World&apos;; 最简单的 sql测试都不行?
    Flink SQL> SELECT 'Hello World';>[ERROR] Could not execute SQL statement. Reason:java.lang.NoSuchMethodError: scala.collection.JavaConversions$.deprecated$u0020seqAsJavaList(Lscala/collection/Seq;)Ljava/util/List;
  • [问题求助] DLI flink中可以写解密过程对数据解密么 怎么操作
    数据直接接入的dis,是加密好的,再用dli flink去处理dis的数据时怎么写入解密过程 求一个详细的解决方案。
  • [问题求助] 调fi flink 的 ,sql-client.sh 报错了 NoSuchFieldError: PYFILES_OPTIO
    添加了jar包之后,还是报错了  NoSuchFieldError: PYFILES_OPTION/FusionInsight/client/HDFS/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]Exception in thread "main" java.lang.NoSuchFieldError: PYFILES_OPTION        at org.apache.flink.table.client.cli.CliOptionsParser.getEmbeddedModeClientOptions(CliOptionsParser.java:156)        at org.apache.flink.table.client.cli.CliOptionsParser.<clinit>(CliOptionsParser.java:139)        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:195)
  • [问题求助] [ERROR] Flink SQL Client JAR file &apos;flink-sql-client*.jar&apos;这个报错是?
     接着上一个还是   FusionInsight/client/Flink的版本[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar'这个报错是?./bin/sql-client.sh embedded[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in /xxxx/xxxFusionInsightxxx/xxclient/Flink/flink/opt.还要额外放入jar包.  才能跑起来吗?   
  • [问题求助] 华为版本 fi的 flink 直接运行start-cluster.sh 报错了, 是不是不用直接启动?
      华为版本 fi的 flink    直接运行start-cluster.sh 报错了, 是不是不用直接启动?       ./start-cluster.shNo masters file. Please specify masters in 'conf/masters'.默认的路径下载有flink配置  /srv/BigData/data1/FusionInsight/client/Flink/flink/bin    ,启动./start-cluster.sh 报错了  No masters file. Please specify masters in 'conf/masters'.求支援!
  • [赋能学习] MRS3.0.2版本 二次开发(18/27): Flink读写Kafka样例
    # 视频在本帖二楼 ## 一、Flink简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 ## 二、Flink客户端准备,可参考MRS二次开发(15/27)Flink构造DataStream样例,有详细的视频介绍 **Kerberos认证** 从服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。 修改客户端配置文件/opt/client/Flink/flink/conf/flink-conf.yaml内容: ``` keytab路径。例如:security.kerberos.login.keytab: /opt/testclient/flinkuser.keytab principal名。例如:security.kerberos.login.principal: flinkuser 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下: zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient ``` **Security Cookie认证** 1. 用户需要获取SSL证书,放置到Flink客户端中。 参考 拷贝其中代码生成generate_keystore.sh脚本,放置在Flink客户端的bin目录下。 在客户端目录下执行source bigdata_env。 执行命令“sh generate_keystore.sh ”即可,例如“sh generate_keystore.sh 123456",会在Flink客户端的conf目录下生成flink.keystore,flink.truststore文件。 在Flink客户端同级目录下新建ssl目录,例如“/opt/testclient/Flink/flink/ssl”,将生成的flink.keystore,flink.truststore文件拷贝到ssl目录中。 1. 获取证书后,在Flink客户端的conf目录下配置文件“flink-conf.yaml”中将以下配置项进行赋值。 将配置项“security.ssl.keystore”设置为keystore文件的相对路径,例如“ssl/flink.keystore” 将配置项“security.ssl.truststore”设置为truststore文件的相对路径,例如“ssl/flink.truststore” 将配置项“security.cookie”设置为一串密码,该密码可以是一串随机规则密码,可以取默认 使用Manager明文加密API进行获取密文:curl -k -i -u 用户名:密码 -X POST -HContent-type:application/json -d '{"plainText":""}' '';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。 将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”设置为如上指令获取到的密文。 2. 打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功。 #### 加密传输 配置SSL传输,用户主要在客户端的“flink-conf.yaml”文件中做如下配置: 1. 打开SSL开关和设置SSL加密算法,配置如下: security.ssl.enabled:true akka.ssl.enabled: true blob.service.ssl.enabled: true taskmanager.data.ssl.enabled: true security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 2. 配置keystore或truststore文件路径为相对路径时,Flink Client执行命令的目录需要可以直接访问该相对路径。 在Flink的CLI yarn-session.sh命令中增加“-t”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/yarn-session.sh **-t** ssl/ ” 在Flink run命令中增加“-yt”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/flink run **-yt** ssl/ -ys 3 -yn 3 -m yarn-cluster -c com.huawei.SocketWindowWordCount ../lib/flink-eg-1.0.jar --hostname r3-d3 --port 9000” 3. 配置keystore或truststore文件路径为绝对路径时,需要在Flink Client以及Yarn各个节点的该绝对路径上放置keystore或truststore文件。 执行命令中不需要使用“-t”或“-yt”来传输keystore和truststore文件。 4. 将客户端安装节点的业务ip和manager界面浮动ip追加到jobmanager.web.allow-access-address配置中用“,”隔开 ## 三、样例背景 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: 实时统计总计网购时间超过2个小时的女性网民信息。周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 ## 四、样例调试 前提:Linux环境有安装集群客户端,环境准备,参考第一课 数据规划: Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 1. 创建Topic。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true” 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。 在kafka客户端目录执行如下指令,ZooKeeper集群信息以自己的为准: ``` bin/kafka-topics.sh --create --zookeeper 10.244.231.91:24002,10.244.230.245:24002,10.244.230.229:24002/kafka --partitions 5 --replication-factor 1 --topic topic3 ``` 2. Kerberos认证配置 修改flink客户端flink-conf.yaml如下配置: security.kerberos.login.keytab: /opt/testclient/user.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false 调试步骤: 1. 比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟 2. 检查linux环境的JDK版本为1.8 3. 配置linux环境的/etc/hosts文件 4. 检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息 5. 在IDEA打开样例代码的FlinkKafkaJavaExample目录,检查SDK配置 6. 默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕 7. 在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。 8. 在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。 9. 根据实际情况设置Jar包的名称、类型以及输出路径。 10. 选择“Put into Output Root”。然后单击“Apply”。 11. 在IDEA主页面,选择“Build > Build Artifacts...”。在弹出的菜单中构建我们的jar包 12. 从IDEA项目out目录下的获取到Jar包,拷贝到Flink客户端目录,如“/opt/testclient/Flink/flink”。 13. 将如下包拷到"/opt/testclient/Flink/flink/lib"目录下,若已存在则忽略。 - kafka-clients-2.4.0-hw-ei-302002.jar - flink-connector-kafka_2.11-1.10.0-hw-ei-302002.jar - flink-connector-kafka-base_2.11-1.10.0-hw-ei-302002.jar - flink-dist_2.11-1.10.0-hw-ei-302002.jar 14. 在Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn session命令,启动flink集群。 例如我们使用ssl相对路径指令为“bin/yarn-session.sh **-t** ssl/ -jm 1024 -tm 1024” 15. 在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码,例如: a. 生产数据的执行命令启动程序: ``` bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` b. 消费数据 ``` bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` 16. 使用Flink Web页面查看Flink应用程序运行情况。 ## 五、问题互动渠道 FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html
总条数:140 到第
上滑加载中