• [问题求助] FusionInsight_HD_8.2.0.1产品,在Flink SQL客户端中select 'hello'报错KeeperErrorCode = ConnectionLoss for /flink_base/flink
    1.在flink sql client中执行sql  直接报错[ERROR] Could not execute SQL statement. Reason: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /flink_base/flink 2.而且进入zookeeper中查询也是报错,求解求解[omm@192-168-0-82 zookeeper]$ pwd /opt/huawei/Bigdata/FusionInsight_HD_8.2.0.1/install/FusionInsight-Zookeeper-3.6.3/zookeeper [omm@192-168-0-82 zookeeper]$ bin/zkCli.sh -server 192.168.0.82:24002 Connecting to 192.168.0.82:24002 Welcome to ZooKeeper! JLine support is enabled  WATCHER::  WatchedEvent state:SyncConnected type:None path:null [zk: 192.168.0.82:24002(CONNECTING) 0] ls / KeeperErrorCode = Session closed because client failed to authenticate for / [zk: 192.168.0.82:24002(CONNECTED) 1] WATCHER::  WatchedEvent state:Disconnected type:None path:null  WATCHER::  WatchedEvent state:SyncConnected type:None path:null  WATCHER::  WatchedEvent state:Disconnected type:None path:null 后面是一直循环WATCHER:,flink-conf.yaml中的部分设置如下 flink.security.enable: true flinkserver.alarm.cert.skip: true flinkserver.host.ip: fs.output.always-create-directory: false fs.overwrite-files: false heartbeat.interval: 10000 heartbeat.timeout: 120000 high-availability.job.delay: 10 s high-availability.storageDir: hdfs://hacluster/flink/recovery high-availability.zookeeper.client.acl: creator high-availability.zookeeper.client.connection-timeout: 90000 high-availability.zookeeper.client.max-retry-attempts: 5 high-availability.zookeeper.client.retry-wait: 5000 high-availability.zookeeper.client.session-timeout: 90000 high-availability.zookeeper.client.tolerate-suspended-connections: true high-availability.zookeeper.path.root: /flink high-availability.zookeeper.path.under.quota: /flink_base high-availability.zookeeper.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 high-availability.zookeeper.quota.enabled: true high-availability: zookeeper yarn.application-attempts: 5 yarn.application-master.port: 32586-32650 yarn.heap-cutoff-min: 384 yarn.heap-cutoff-ratio: 0.25 yarn.heartbeat-delay: 5 yarn.heartbeat.container-request-interval: 500 yarn.maximum-failed-containers: 5 yarn.per-job-cluster.include-user-jar: ORDER zk.ssl.enabled: false zookeeper.clientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 zookeeper.root.acl: OPEN zookeeper.sasl.disable: false zookeeper.sasl.login-context-name: Client zookeeper.sasl.service-name: zookeeper zookeeper.secureClientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 
  • [问题求助] flinksql消费DRS过来的kafka消息,关键字怎么处理
    kafka消息格式,参考cid:link_0带有table、sql、old等关键字,flinksql建表时sql校验不通过,这种情况要如何处理
  • [技术干货] Flink(一)-基本概念【转】
    这篇文章主要按照以下思路,简单的交流一下Flink的基本概念和用途。自知资历尚浅,见闻有限,如有纰漏还望指正![点击并拖拽以移动]Flink 简介在当前的互联网用户,设备,服务等激增的时代下,其产生的数据量已不可同日而语了。各种业务场景都会有着大量的数据产生,如何对这些数据进行有效地处理是很多企业需要考虑的问题。以往我们所熟知的Map Reduce,Storm,Spark等框架可能在某些场景下已经没法完全地满足用户的需求,或者是实现需求所付出的代价,无论是代码量或者架构的复杂程度可能都没法满足预期的需求。新场景的出现催产出新的技术,Flink即为实时流的处理提供了新的选择。Apache Flink就是近些年来在社区中比较活跃的分布式处理框架,加上阿里在中国的推广,相信它在未来的竞争中会更具优势。 Flink的产生背景不过多介绍,感兴趣的可以Google一下。Flink相对简单的编程模型加上其高吞吐、低延迟、高性能以及支持exactly-once语义的特性,让它在工业生产中较为出众。相信正如很多博客资料等写的那样"Flink将会成为企业内部主流的数据处理框架,最终成为下一代大数据处理标准。" 2. Flink 架构中的服务类型下面是从Flink官网截取的一张架构图:[点击并拖拽以移动]在Flink运行时涉及到的进程主要有以下两个: JobManager:主要负责调度task,协调checkpoint已经错误恢复等。当客户端将打包好的任务提交到JobManager之后,JobManager就会根据注册的TaskManager资源信息将任务分配给有资源的TaskManager,然后启动运行任务。TaskManger从JobManager获取task信息,然后使用slot资源运行task; TaskManager:执行数据流的task,一个task通过设置并行度,可能会有多个subtask。 每个TaskManager都是作为一个独立的JVM进程运行的。他主要负责在独立的线程执行的operator。其中能执行多少个operator取决于每个taskManager指定的slots数量。Task slot是Flink中最小的资源单位。假如一个taskManager有3个slot,他就会给每个slot分配1/3的内存资源,目前slot不会对cpu进行隔离。同一个taskManager中的slot会共享网络资源和心跳信息。当然在Flink中并不是一个slot只可以执行一个task,在某些情况下,一个slot中也可能执行多个task,如下:[点击并拖拽以移动]一般情况下,flink都是默认允许共用slot的,即便不是相同的task,只要都是来同一个job即可。共享slot的好处有以下两点:当Job的最高并行度正好和flink集群的slot数量相等时,则不需要计算总的task数量。例如,最高并行度是6时,则只需要6个slot,各个subtask都可以共享这6个slot; 2. 共享slot可以优化资源管理。如下图,非资源密集型subtask source/map在不共享slot时会占用6个slot,而在共享的情况下,可以保证其他的资源密集型subtask也能使用这6个slot,保证了资源分配。[点击并拖拽以移动]Flink中的数据Flink中的数据主要分为两类:有界数据流(Bounded streams)和无界数据流(Unbounded streams)。 3.1 无界数据流顾名思义,无界数据流就是指有始无终的数据,数据一旦开始生成就会持续不断的产生新的数据,即数据没有时间边界。无界数据流需要持续不断地处理。 3.2 有界数据流相对而言,有界数据流就是指输入的数据有始有终。例如数据可能是一分钟或者一天的交易数据等等。处理这种有界数据流的方式也被称之为批处理:[点击并拖拽以移动]需要注意的是,我们一般所说的数据流是指数据集,而流数据则是指数据流中的数据。 4. Flink中的编程模型 4.1 编程模型在Flink,编程模型的抽象层级主要分为以下4种,越往下抽象度越低,编程越复杂,灵活度越高。[点击并拖拽以移动]这里先不一一介绍,后续会做详细说明。这4层中,一般用于开发的是第三层,即DataStrem/DataSetAPI。用户可以使用DataStream API处理无界数据流,使用DataSet API处理有界数据流。同时这两个API都提供了各种各样的接口来处理数据。例如常见的map、filter、flatMap等等,而且支持python,scala,java等编程语言,后面的demo主要以scala为主。 4.2 程序结构与其他的分布式处理引擎类似,Flink也遵循着一定的程序架构。下面以常见的WordCount为例:val env = ExecutionEnvironment.getExecutionEnvironment// get input data val text = env.readTextFile("/path/to/file")val counts = text.flatMap { _.toLowerCase.split("\W+") filter { .nonEmpty } } .map { (, 1) } .groupBy(0) .sum(1)counts.writeAsCsv(outputPath, "\n", " ")[点击并拖拽以移动]下面我们分解一下这个程序。第一步,我们需要获取一个ExecutionEnvironment(如果是实时数据流的话我们需要创建一个StreamExecutionEnvironment)。这个对象可以设置执行的一些参数以及添加数据源。所以在程序的main方法中我们都要通过类似下面的语句获取到这个对象:val env = ExecutionEnvironment.getExecutionEnvironment[点击并拖拽以移动]第二步,我们需要为这个应用添加数据源。这个程序中是通过读取文本文件的方式获取数据。在实际开发中我们的数据源可能有很多中,例如kafka,ES等等,Flink官方也提供了很多的connector以减少我们的开发时间。一般都是都通addSource方法添加的,这里是从文本读入,所以调用了readTextFile方法。当然我们也可以通过实现接口来自定义source。val text = env.readTextFile("/path/to/file")第三步,我们需要定义一系列的operator来对数据进行处理。我们可以调用Flink API中已经提供的算子,也可以通过实现不同的Function来实现自己的算子,这个我们会在后面讨论。这里我们只需要了解一般的程序结构即可。val counts = text.flatMap { _.toLowerCase.split("\W+") filter { .nonEmpty } } .map { (, 1) } .groupBy(0) .sum(1)[点击并拖拽以移动]上面的就是先对输入的数据进行分割,然后转换成(word,count)这样的Tuple,接着通过第一个字段进行分组,最后sum第二个字段进行聚合。第四步,数据处理完成之后,我们还要为它指定数据的存储。我们可以从外部系统导入数据,亦可以将处理完的数据导入到外部系统,这个过程称为Sink。同Connector类似,Flink官方提供了很多的Sink供用户使用,用户也可以通过实现接口自定义Sink。counts.writeAsCsv(outputPath, "\n", " ")[点击并拖拽以移动] 小结:以上,通过简单的介绍来了解Flink中的一些基本概念及编程方式。后面会对每个细节进行更为详尽地分析。Flink(一)-基本概念转自知乎:cid:link_0
  • [基础组件] MRS 812 FlinkSQL代码对接集群组件
    背景:为了方便业务开发人员开发FlinkSQL作业、提交FlinkJar作业,以及方便运维人员对Flink作业的管理,FusionInsight MRS研发了FlinkServer可视化开发平台。由于版本迭代问题,MRS812暂不支持FlinkSQL对接Elasticsearch以及客户端提交flinksql任务,MRS 820版本的FlinkSQL也跟MRS812版本的不尽相同, 某局点反馈 公网maven仓库缺少flink sql 对接 es、hbase、kafka、redis、hbase的包,以及flink sql对接这些组件的相关开发指导。当前MRS 820版本的相关包3月中旬已经提供上外部maven仓库,后续过来MRS 812版本也需要flinksql 对接这些组件。为了整合812版本中Flink配置问题以及支持对接组件问题,现给出flink-sql-demo进行客户端代码提交。在flink开发技术支持下 ,整理的MRS 812 flinksql对接es、hbase、kafka、redis、hbase 组件样例,具体对接指导见文件。介绍:该代码适用于flinksql对接es、hbase、kafka、redis、hbase 各组件,对应的sql在config目录下,执行对应组件需修改目录sqlPath的对应sql文件。public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); }}操作步骤:  在windows环境中执行打包,获取Target目录下的 flink-sql-8.1.2-312005.jar,上传在/opt/client/Flink/flink/下,所有使用sql均上传到/opt/client/Flink/flink/config/目录下。在linux客户端配置flink客户端,并将服务端flink的lib目录补充包,上传测试jar和sql文件。启动yarn-session.sh,修改sql文件为本集群的IP,执行 flink-sql-8.1.2-312005.jar验证代码执行flink sql。具体详细步骤见附件。
  • [问题求助] Flink 连接 Es Kerberos认证问题
    使用Flink 向 Es写数据,在Sink 端如何做 Kerberos 认证 ?
  • Flink经典维护案例集合
    Flink全部案例集合见维护宝典:https://support.huawei.com/hedex/hdx.do?docid=EDOC1100222546&lang=zh&idPath=22658044|22662728|22666212|22396131  (FusionInsight HD&MRS租户面集群故障案例(6.5.X-8.X)->维护故障类->Flink->常见故障)经典案例分类序号案例出现频次现网经典案例1.1关于table.exec.state.ttl参数的生效机制★★客户端安装常见问题2.1Flink客户端配置方法(维护宝典:维护类故障(维护类故障(6.5.X-8.X)>Flink>FAQ>下载并配置Flink客户端)★★★★★2.2升级后任务提交失败,报zk节点权限不足(维护宝典:维护类故障(6.5.X-8.X)>Flink>任务提交常见故障>FusionInsight HD大版本从6.5.1升级到8.x版本后任务提交失败)★★★★★2.3开启ssl后,正确的提交方式(维护宝典:维护类故障(6.5.X-8.X)>Flink>任务提交常见故障>创建Flink集群时执行yarn-session.sh命令失败)★★★★★ 
  • [最佳实践] 关于table.exec.state.ttl参数的生效机制
    Flink的table.exec.state.ttl参数说明:Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆 炸。列举两个场景:➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在 状态里,不会清理!要么设置 TTL,要么使用 FlinkSQL 的 interval join。➢ 使用 Top-N 语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时 或一天内),过了这段时间之后,对应的状态就不再需要了。Flink SQL 可以指定空闲状态(即未更新的状态)被保留的最小时间,当状态中某个 key 对应的状态未更新的时间达到阈值时,该条状态被自动清理:基于811版本测试,测试SQL的代码如下: EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();         builder.inStreamingMode();         builder.useBlinkPlanner();         EnvironmentSettings settings = builder.build();         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);          env.enableCheckpointing(6000L);       env.setStateBackend(new RocksDBStateBackend("hdfs:///xxxx));//启用hdfs状态后端       env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);       env.getCheckpointConfig().setCheckpointTimeout(600000L);       env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);       env.getCheckpointConfig().setFailOnCheckpointingErrors(false);       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);          Configuration configuration = tableEnv.getConfig().getConfiguration();         configuration.setString("table.exec.state.ttl","10000");          String sqlTable_1 =                 "CREATE TABLE source1 (\n" +                 "  name varchar(10),\n" +                 "  vaa varchar(10),\n" +                 "  ts AS PROCTIME()\n" +                 ") WITH (\n" +                 "  'connector' = 'kafka',\n" +                 "  'topic' = 'user_source1',\n" +                 "  'properties.bootstrap.servers' = 'kafka:21005',\n" +                 "  'properties.group.id' = 'testGroup',\n" +                 "  'scan.startup.mode' = 'latest-offset',\n" +                 "  'format' = 'csv'\n" +                 ")";          String SqlTable_2 = "" +                 "CREATE TABLE source2 (\n" +                 "  name varchar(10),\n" +                 "  vaa varchar(10),\n" +                 "  ts AS PROCTIME()\n" +                 ") WITH (\n" +                 "  'connector' = 'kafka',\n" +                 "  'topic' = 'user_source2',\n" +                 "  'properties.bootstrap.servers' = 'kafka:21005',\n" +                 "  'properties.group.id' = 'testGroup',\n" +                 "  'scan.startup.mode' = 'latest-offset',\n" +                 "  'format' = 'csv'\n" +                 ")";          String inputTable="create table p (\n" +                 "  name1 varchar(10),\n" +                 "  name2 varchar(10),\n" +                 "  vaa1 varchar(10),\n" +                 "  vaa2 varchar(10)\n" +                 ") with ('connector' = 'print')";          String sql = "insert into\n" +                 "  p\n" +                 "select\n" +                 "  source1.name,\n" +                 "  source2.name,\n" +                 "  source1.vaa,\n" +                 "  source2.vaa\n" +                 "FROM\n" +                 "  source1\n" +                 "  join source2 on source1.name = source2.name";          //创建表1         tableEnv.executeSql(sqlTable_1);         //创建表2         tableEnv.executeSql(SqlTable_2);         //创建输出表         tableEnv.executeSql(inputTable);         //执行结果         tableEnv.executeSql(sql); 执行sql的代码片段如下:在这个用例中我们使用的TTL时间参数为10s失效。当同时输入:topic:user_source1 的数据 zs,aaa   topic:user_source2的数据zs,bbb 结果如下:但是输入:topic:user_source1 的数据ls,aaa 间隔大于10s后输入 topic:user_source2的数据ls,bbb未出现结果:从测试结果来看:(1)常规联接是最通用的联接类型,其中任何新记录或对联接任一侧的更改都是可见的,并且会影响整个联接结果。例如左边有一条新记录,当product id 相等时,它会与右边所有以前和以后的记录合并。SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id对于流式查询,常规连接的语法是最灵活的,并且允许任何类型的更新(插入、更新、删除)输入表。但是,此操作具有重要的操作含义:它需要将连接输入的两侧永远保持在 Flink 状态。因此,计算查询结果所需的状态可能会无限增长,具体取决于所有输入表和中间连接结果的不同输入行的数量。(2)如果设置了TTL的时间后,过了TTL时间后,之前的状态数据会被删除。
  • [问题求助] MRS 3.1.0普通集群怎么安装开源的flink组件
    MRS 3.1.0普通集群怎么安装开源的flink组件,求助各位大神
  • [二次开发] 【FI hd 6513】【开源flink 华为kafka】 开源flink连接华为kafka问题
    【功能模块】 环境:HD(6.5.1.3,x86_64) Kafka-2(2.11-1.1.0) 已开启安全认证 开源flink 1.14.2 、flink-kafka-connector-1.14.2【操作步骤&问题现象】1、需要使用开源flink连接HD(6.5.1.3,x86_64)的kafka2、已经将flink kafka connector中开源kafka-client替换为kafka-clients-2.4.0-h0.cbu.mrs.313.r103、在flink-conf.yml 中配置了认证信息#认证参数java.security.auth.login.config: /data/abc/jaas.confsecurity.kerberos.login.keytab: /data/abc/user.keytabsecurity.kerberos.login.principal: abc@EXAMPLE.COMsecurity.kerberos.login.contexts: Client,KafkaClientsecurity.kerberos.login.use-ticket-cache: false其中jaas.conf如下样例KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab=" /data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};4、代码中kafka消费者除了基本配置也添加了如下配置sasl.kerberos.service.name=kafkasecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIkerberos.domain.name=hadoop.example.com5、开源flink提交以yarn per job 方式提交【flink、yarn都是开源版本,无安全认证,只有kafka是FI hd的】【截图信息】我想确认一下hd 6513的kafka是否支持高版本flink连接,如果支持我应该做如何调整?【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] Flink的JDBCsink,batchintervalMs和BatchSize参数问题
    Flink的JDBCsink,batchintervalMs和BatchSize参数,JdbcExecutionOptions.Builer()  .withBatchIntervalMS(1000)  .withBatchSize(500)  .withMaxRetries(10)  .build()batchintervalMs和BatchSize参数,提交的时候,是或的关系,还是且的关系?
  • [开发应用] MRS的Flink连接DWS,报错
    DWS版本:8.1.1MRS:3.0.2Flink:1.12.0Flink读取kafka数据,sink到dws里面。程序正常运行12小时左右就报错。但是超时的参数都配置的没问题。statement_timeout=0;session_timeout=0;这俩参数都没问题。但是不知道为什么就会中断了?
  • [二次开发] Flink Password verification failed
    认证文件都是按照步骤获取的。这个错误,能定位是哪个认证步骤出问题了吗?
  • [知识分享] MRS离线数据分析:通过Flink作业处理OBS数据
    【摘要】 MRS支持在大数据存储容量大、计算资源需要弹性扩展的场景下,用户将数据存储在OBS服务中,使用MRS集群仅做数据计算处理的存算分离模式。 本文将向您介绍如何在MRS集群中运行Flink作业来处理OBS中存储的数据。本文分享自华为云社区《【云小课】EI第47课 MRS离线数据分析-通过Flink作业处理OBS数据》,作者:Hello EI 。MRS支持在大数据存储容量大、计算资源需要弹性扩展的场景下,用户将数据存储在OBS服务中,使用MRS集群仅做数据计算处理的存算分离模式。Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。本文将向您介绍如何在MRS集群中运行Flink作业来处理OBS中存储的数据。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。在本示例中,我们使用MRS集群内置的Flink WordCount作业程序,来分析OBS文件系统中保存的源数据,以统计源数据中的单词出现次数。当然您也可以获取MRS服务样例代码工程,参考Flink开发指南开发其他Flink流作业程序。本案例基本操作流程如下所示:创建MRS集群创建并购买一个包含有Flink组件的MRS集群,详情请参见购买自定义集群。本文以购买MRS 3.1.0版本的集群为例,集群未开启Kerberos认证。在本示例中,由于我们要分析处理OBS文件系统中的数据,因此在集群的高级配置参数中要为MRS集群绑定IAM权限委托,使得集群内组件能够对接OBS并具有对应文件系统目录的操作权限。您可以直接选择系统默认的“MRS_ECS_DEFAULT_AGENCY”,也可以自行创建其他具有OBS文件系统操作权限的自定义委托。集群购买成功后,在MRS集群的任一节点内,使用omm用户安装集群客户端,具体操作可参考安装并使用集群客户端。例如客户端安装目录为“/opt/client”。准备测试数据在创建Flink作业进行数据分析前,我们需要在提前准备待分析的测试数据,并将该数据上传至OBS文件系统中。本地创建一个“mrs_flink_test.txt”文件,例如文件内容如下:This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.在云服务列表中选择“存储 > 对象存储服务”,登录OBS管理控制台。单击“并行文件系统”,创建一个并行文件系统,并上传测试数据文件。例如创建的文件系统名称为“mrs-demo-data”,单击系统名称,在“文件”页面中,新建一个文件夹“flink”,上传测试数据至该目录中。则本示例的测试数据完整路径为“obs://mrs-demo-data/flink/mrs_flink_test.txt”。上传数据分析应用程序。使用管理台界面直接提交作业时,将已开发好的Flink应用程序jar文件也可以上传至OBS文件系统中,或者MRS集群内的HDFS文件系统中。本示例中我们使用MRS集群内置的Flink WordCount样例程序,可从MRS集群的客户端安装目录中获取,即“/opt/client/Flink/flink/examples/batch/WordCount.jar”。将“WordCount.jar”上传至“mrs-demo-data/program”目录下。创建并运行Flink作业方式1:在控制台界面在线提交作业。登录MRS管理控制台,单击MRS集群名称,进入集群详情页面。在集群详情页的“概览”页签,单击“IAM用户同步”右侧的“单击同步”进行IAM用户同步。单击“作业管理”,进入“作业管理”页签。单击“添加”,添加一个Flink作业。作业类型:Flink作业名称:自定义,例如flink_obs_test。执行程序路径:本示例使用Flink客户端的WordCount程序为例。运行程序参数:使用默认值。执行程序参数:设置应用程序的输入参数,“input”为待分析的测试数据,“output”为结果输出文件。例如本示例中,我们设置为“--input obs://mrs-demo-data/flink/mrs_flink_test.txt --output obs://mrs-demo-data/flink/output”。服务配置参数:使用默认值即可,如需手动配置作业相关参数,可参考运行Flink作业。确认作业配置信息后,单击“确定”,完成作业的新增,并等待运行完成。方式2:通过集群客户端提交作业。使用root用户登录集群客户端节点,进入客户端安装目录。su - omm cd /opt/client source bigdata_env执行以下命令验证集群是否可以访问OBS。hdfs dfs -ls obs://mrs-demo-data/flink提交Flink作业,指定源文件数据进行消费。flink run -m yarn-cluster /opt/client/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demo-data/flink/mrs_flink_test.txt --output obs://mrs-demo/data/flink/output2执行后结果类似如下:... Cluster started: Yarn cluster with application id application_1654672374562_0011 Job has been submitted with JobID a89b561de5d0298cb2ba01fbc30338bc Program execution finished Job with JobID a89b561de5d0298cb2ba01fbc30338bc has finished. Job Runtime: 1200 ms查看作业执行结果作业提交成功后,登录MRS集群的FusionInsight Manager界面,选择“集群 > 服务 > Yarn”。单击“ResourceManager WebUI”后的链接进入Yarn Web UI界面,在Applications页面查看当前Yarn作业的详细运行情况及运行日志。等待作业运行完成后,在OBS文件系统中指定的结果输出文件中可查看数据分析输出的结果。下载“output”文件到本地并打开,可查看输出的分析结果。a 3 and 2 batch 1 both 1 computing 2 data 2 demo 1 distribution 1 engine 1 flink 2 for 1 framework 1 is 2 it 1 mrs 1 parallel 1 processing 3 provides 1 stream 2 supports 2 test 1 that 2 this 1 unified 1使用集群客户端命令行提交作业时,若不指定输出目录,在作业运行界面也可直接查看数据分析结果。Job with JobID xxx has finished. Job Runtime: xxx ms Accumulator Results: - e6209f96ffa423974f8c7043821814e9 (java.util.ArrayList) [31 elements] (a,3) (and,2) (batch,1) (both,1) (computing,2) (data,2) (demo,1) (distribution,1) (engine,1) (flink,2) (for,1) (framework,1) (is,2) (it,1) (mrs,1) (parallel,1) (processing,3) (provides,1) (stream,2) (supports,2) (test,1) (that,2) (this,1) (unified,1)
  • [环境搭建] flink提交任务运行失败
    按照用户文档提交flink的样例程序jar包。窗口日志显示:【Login successful for user flink_dev using keytab file user.keytab】说明已经登录认证成功。但是后面报错信息,依次是:【org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'Flink Streaming Job'.】【Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Flink Streaming Job'.】【Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.】【Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Unhandled error in ZooKeeperLeaderRetrievalDriver:Background exception was not retry-able or retry gave up】【Caused by: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /flink_base/flink/default】尤其是最后一个报错:【Caused by: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /flink_base/flink/default】是不是zookeeper连接不上?和zookeeper的认证有问题了?
  • [环境搭建] Flink的准备安全认证问题
    ①这个flink-conf.yaml在客户端配置后,是不是自己开发的jar包里面是不能有这个配置文件的?不然会影响客户端的这个文件?②这里的 security.kerberos.login.contexts:Client,里面的Client是固定参数吗?不用动这个值是吧?③security.kerberos.login.contexts:的值 Client,KafkaClien是不是固定值?不用动这个值是吧?