• [技术干货] Flink的复杂事件处理CEP
    Flink的复杂事件处理CEP复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了Flink CEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。    1. CEP相关概念1) 配置依赖在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-cep_2.11</artifactId>    <version>1.13.6</version></dependency> 2) 事件定义简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。2. Pattern APIFlink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:1、输入事件流的创建2、Pattern的定义3、Pattern应用在事件流上检测4、选取结果1) 模式定义定义Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的。下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {    @Override    public boolean filter(LoginEvent value) throws Exception {        return value.getType().equals("fail");   }}) 1.设置循环次数对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。times:可以通过times指定固定的循环执行次数。//指定循环触发4次start.times(4);//可以执行触发次数范围,让循环执行次数在该范围之内start.times(2, 4);oneOrMore:可以通过oneOrMore方法指定触发一次或多次。// 触发一次或者多次start.oneOrMore();timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。// 触发两次或者多次start.timesOrMore(2); 2.定义条件每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。// 把通话成功的事件挑选出来start.where(_.getCallType == "success")组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。// 把通话成功,或者通话时长大于10秒的事件挑选出来val start = Pattern.begin[StationLog]("start_pattern").where(_.callType=="success").or(_.duration>10)终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。pattern.oneOrMore.until(_.callOut.startsWith("186"))3.模式序列将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。   宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系。   2) 模式检测调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream//cep 做模式检测PatternStream<LoginEvent> patternStream = CEP.pattern(ds1.keyBy(value -> value.getUserId()), pattern); 3) 选择结果得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。1.通过Select Funciton抽取正常事件可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {      //获取pattern中的startEvent      val startEvent = pattern.get("start_pattern").get.next        //获取Pattern中middleEvent        val middleEvent = pattern.get("middle").get.next        //返回结果        OUT(startEvent, middleEvent)}  2.通过Flat Select Funciton抽取正常事件Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {    //获取pattern中startEvent      val startEvent = pattern.get("start_pattern").get.next        //获取Pattern中middleEvent      val middleEvent = pattern.get("middle").get.next        //并根据startEvent的Value数量进行返回      for (i <- 0 to startEvent.getValue) {            collector.collect(OUT(startEvent, middleEvent))     }} 3.通过Select Funciton抽取超时事件如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。// 通过CEP.pattern方法创建val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)  //创建OutputTag,并命名为timeout-output  val timeoutTag = OutputTag[String]("timeout-output")  //调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] =   patternStream.select(timeoutTag){  //超时事件获取    (pattern: Map[String, Iterable[Event]], timestamp: Long) =>       TimeoutEvent()//返回异常事件  } { //正常事件获取pattern: Map[String, Iterable[Event]] =>      NormalEvent()//返回正常事件  }//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag) 
  • [问题求助] 开源Flink对接问题
    我们使用开源Flink 1.18.1 ,Flink on YARN 模式,目前作业提交到了MRS集群,但是Yarn Container启动失败(提示是认证的问题)1.MRS HDFS版本如下2.nodemanger日志(提交作业异常时) 如上是提交异常时,nodemanger日志,主要两个问题contaner启动时,聚合日志服务初始化异常(认证问题)contaner启动时,从hdfs获取flink的作业包异常(认证问题) 我的主要问题是,目前作业可以正常submit到yarn,为什么container启动时还会出现认证问题?我看了一下hadoop、flink源码,在我的场景下,flink会在客户端生成hdfs delegation token, 并在提交时发给yarn app context, yarn在初始化容器时会基于token转换为ugi,再和hdfs交互,目前我遇到问题看起来token有问题?无效的?不知道具体原因,或者社区还有其他排查方案吗?
  • [问题求助] MRS 支持使用 flink cdc吗?使用flink datastream 的做数据实时同步
    当前使用MRS 版本 3.5.0—LTS,可否支持使用Flink-cdc实时同步mysql的binlog数据。
  • [问题求助] flink 自定义sink 将数据写入es和hbase 本地环境可以正常运行,提交yarn找不到配置文件
    异常代码配置文件yarn报错
  • [问题求助] 高可用Flink集群和DLI的flink版本问题
    华为云上搭建的高可用Flink集群最新版本是 flink1.18.0,jdk是21DLI山的最新flink只是 flink1.15,jdk每看到这两个有啥区别吗?DLI的flink版本为什么不和flink集群的保持一致呢?那我在本地基于flink1.18开发的程序,打包后是不是就不能提交到DLI上执行了?
  • [问题求助] Flink可以使用update语句吗
    请问Flink可以使用update语句吗,我这边使用update启动不了
  • [技术干货] Flink-CDC解析(第47天)-转载
    前言 本文主要概述了Flink-CDC.  1. CDC 概述 1.1 什么是CDC? CDC是(Change Data Capture 变更数据获取)的简称 ,在广义的概念上,只要是能捕获数据变更的技术,都可以称之为 CDC。 核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 技术的应用场景非常广泛:  数据同步:用于数据备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。 1.2 CDC的实现机制 CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:  1) 基于主动查询的 CDC: 用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。 特点:  离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;  无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;  持续的频繁查询对数据库的压力较大。  不保障实时性,基于离线调度存在天然的延迟。  2) 基于事件接收CDC: 可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。  实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;  保障数据一致性,因为 binlog 文件包含了所有历史变更明细;  保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。 基于查询的CDC 基于Binlog的CDC  经过以上对比,可以发现基于日志CDC 有以下这几种优势:  能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失  每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势  无需入侵业务,业务解耦,无需更改业务模型 1.3 常见的开源 CDC 方案  对比全量同步能力:  基于查询或者日志的 CDC 方案基本都支持,除了 Canal(仅支持增量)。  对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。  对比增量同步能力:  基于日志的方式,可以很好的做到增量同步;  而基于查询的方式是很难做到增量同步的。  从架构角度去看: 该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。  在数据转换 / 数据清洗能力上: 当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合。  在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;  DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。  在生态扩展方面: 这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。 ————————————————                              版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。                          原文链接:https://blog.csdn.net/syhiiu/article/details/140697517 
  • [问题求助] 新建Flink作业接收topic消息插入启动失败
    sql如下:-- 统一接入数据源 CREATE TABLE mqs_res_datalog (   detail STRING,   operateSource STRING,   operateTarget STRING,   operateType STRING,   applicationName STRING,   userId STRING ) WITH (   'connector' = 'kafka-mqs',   'properties.group.id' = 'flink',   'properties.sasl.mechanism' = 'PLAIN',   'properties.security.protocol' = 'SASL_SSL',   'properties.ssl.endpoint.identification.algorithm' = '',   'properties.auto.offset.reset' = 'latest',   'scan.startup.mode' = 'latest-offset',   'format' = 'json',   'json.ignore-parse-errors' = 'true',   'parameter' = '-namesrvUrls {{G_kafka_namesrv_urls}} -appId {{G_app_id_ronghe}} -appSecret {{G_topic_T_M_WRITELOG}} -topic {{G_app_secret_ronghe}} -mappings applicationName=applicationName;detail=detail;operateSource=operateSource;operateTarget=operateTarget;operateType=operateType;userId=userId' ); -- 临时表1: 过滤其他系统消息,实时流数据类型转换接收中间流 CREATE VIEW trans_msg_src (   detail,   operateSource,   operateTarget,   operateType,   applicationName,   userId ) AS SELECT   detail,   operateSource,   operateTarget,   operateType,   applicationName,   userId FROM   mqs_res_datalog;    CREATE TABLE dwi_env_monitoring_indicator_sensor (   Timestamp DATETIME,   UserID INT,   FloorCode VARCHAR(255),   BuildingCode VARCHAR(255),   X DECIMAL(10, 6),   Y DECIMAL(10, 6),   genre VARCHAR(255),   RegionValidityCheck BOOLEAN,   IsCardSeparated BOOLEAN ) WITH (   'connector' = 'openGauss',   'driver' = 'org.postgresql.Driver',   'url' = '{{G_dws_jdbc_url_bpa}}',   'username' = '{{G_dws_username_bpa}}',   'password' = '{{G_dws_password_bpa}}',   'table-name' = 'dwi_env_Personnel_trajectory' ); INSERT INTO   dwi_env_monitoring_indicator_sensor (     Timestamp,     UserID,     FloorCode,     BuildingCode,     X,     Y,     genre,     RegionValidityCheck,     IsCardSeparated   ) SELECT   Timestamp,   UserID,   FloorCode,   BuildingCode,   X,   Y,   genre,   RegionValidityCheck,   IsCardSeparated FROM   trans_msg_src; 
  • [问题求助] MRS中使用Flinksql,使用redis流表,将kafka数据写入redis的问题
    流表配置如下,使用hash结构:flinksql语句,简单统计时间和数量:所得结果:问题:想要使用begin_time作为hash的key,sum作为value该如何配置
  • [问题求助] MRS 创建FlinkServer数据连接kafka 报未知错误
    MRS是安全模式,kakfa集群把Ranger鉴权停了也连不上,测试报未知错误,但是kafka在客户端中是可以正常使用的。
  • [技术干货] flink基本知识
     flink支持两种划分窗口的方式(time和count)  如果根据时间划分窗口,那么它就是一个time-window  如果根据数据划分窗口,那么它就是一个count-window  flink支持窗口的两个重要属性(size和interval)  如果size=interval,那么就会形成tumbling-window(无重叠数据)  如果size>interval,那么就会形成sliding-window(有重叠数据)  如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。  ————————————————                              本文来自微信公众号:五分钟学大数据,转载请在公众号后台获取作者微信进行授权                          原文链接:https://blog.csdn.net/helloHbulie/article/details/120333974 
  • [二次开发] 本地环境消费开源kafka,将数据写入mrs kafka
    生产者代码:报错情况:
  • [问题求助] mrs 提交任务到yarn报错 Could not load service provider for table factories
    mrs 提交任务到yarn报错   Could not load service provider for table factories
  • [问题求助] 请问如何使用mrs flinksql与oceanbase对接
    能否使用flink-sql-connector-oceanbase-cdc 进行连接,请问mrs flink是否支持这么做,如果支持,我该把jar放入那个位置才能生效
  • [问题求助] mrs是否支持 flinkserver 对接oceanbase
    flink sql:CREATE TABLE ob_tbl1 (     col1 INT,     col2 VARCHAR(20),     col3 INT)     WITH ('connector' = 'oceanbase-cdc',     'scan.startup.mode' = 'initial',     'tenant-name' = 'mq_t1',     'username' = 'root@mq_t1',     'password' = 'pswd',     'database-name' = 'test_ob_to_kafka',     'table-name' = 'tbl1',     'hostname' = '10.10.101.64',     'port' = '2881',     'rootserver-list' = '10.10.101.64:2882:2881',     'logproxy.host' = '10.10.101.64',     'logproxy.port' = '2983');