• [问题求助] flink 自定义sink 将数据写入es和hbase 本地环境可以正常运行,提交yarn找不到配置文件
    异常代码配置文件yarn报错
  • [问题求助] 高可用Flink集群和DLI的flink版本问题
    华为云上搭建的高可用Flink集群最新版本是 flink1.18.0,jdk是21DLI山的最新flink只是 flink1.15,jdk每看到这两个有啥区别吗?DLI的flink版本为什么不和flink集群的保持一致呢?那我在本地基于flink1.18开发的程序,打包后是不是就不能提交到DLI上执行了?
  • [问题求助] Flink可以使用update语句吗
    请问Flink可以使用update语句吗,我这边使用update启动不了
  • [技术干货] Flink-CDC解析(第47天)-转载
    前言 本文主要概述了Flink-CDC.  1. CDC 概述 1.1 什么是CDC? CDC是(Change Data Capture 变更数据获取)的简称 ,在广义的概念上,只要是能捕获数据变更的技术,都可以称之为 CDC。 核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 技术的应用场景非常广泛:  数据同步:用于数据备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。 1.2 CDC的实现机制 CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:  1) 基于主动查询的 CDC: 用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。 特点:  离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;  无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;  持续的频繁查询对数据库的压力较大。  不保障实时性,基于离线调度存在天然的延迟。  2) 基于事件接收CDC: 可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。  实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;  保障数据一致性,因为 binlog 文件包含了所有历史变更明细;  保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。 基于查询的CDC 基于Binlog的CDC  经过以上对比,可以发现基于日志CDC 有以下这几种优势:  能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失  每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势  无需入侵业务,业务解耦,无需更改业务模型 1.3 常见的开源 CDC 方案  对比全量同步能力:  基于查询或者日志的 CDC 方案基本都支持,除了 Canal(仅支持增量)。  对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。  对比增量同步能力:  基于日志的方式,可以很好的做到增量同步;  而基于查询的方式是很难做到增量同步的。  从架构角度去看: 该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。  在数据转换 / 数据清洗能力上: 当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合。  在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;  DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。  在生态扩展方面: 这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。 ————————————————                              版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。                          原文链接:https://blog.csdn.net/syhiiu/article/details/140697517 
  • [问题求助] 新建Flink作业接收topic消息插入启动失败
    sql如下:-- 统一接入数据源 CREATE TABLE mqs_res_datalog (   detail STRING,   operateSource STRING,   operateTarget STRING,   operateType STRING,   applicationName STRING,   userId STRING ) WITH (   'connector' = 'kafka-mqs',   'properties.group.id' = 'flink',   'properties.sasl.mechanism' = 'PLAIN',   'properties.security.protocol' = 'SASL_SSL',   'properties.ssl.endpoint.identification.algorithm' = '',   'properties.auto.offset.reset' = 'latest',   'scan.startup.mode' = 'latest-offset',   'format' = 'json',   'json.ignore-parse-errors' = 'true',   'parameter' = '-namesrvUrls {{G_kafka_namesrv_urls}} -appId {{G_app_id_ronghe}} -appSecret {{G_topic_T_M_WRITELOG}} -topic {{G_app_secret_ronghe}} -mappings applicationName=applicationName;detail=detail;operateSource=operateSource;operateTarget=operateTarget;operateType=operateType;userId=userId' ); -- 临时表1: 过滤其他系统消息,实时流数据类型转换接收中间流 CREATE VIEW trans_msg_src (   detail,   operateSource,   operateTarget,   operateType,   applicationName,   userId ) AS SELECT   detail,   operateSource,   operateTarget,   operateType,   applicationName,   userId FROM   mqs_res_datalog;    CREATE TABLE dwi_env_monitoring_indicator_sensor (   Timestamp DATETIME,   UserID INT,   FloorCode VARCHAR(255),   BuildingCode VARCHAR(255),   X DECIMAL(10, 6),   Y DECIMAL(10, 6),   genre VARCHAR(255),   RegionValidityCheck BOOLEAN,   IsCardSeparated BOOLEAN ) WITH (   'connector' = 'openGauss',   'driver' = 'org.postgresql.Driver',   'url' = '{{G_dws_jdbc_url_bpa}}',   'username' = '{{G_dws_username_bpa}}',   'password' = '{{G_dws_password_bpa}}',   'table-name' = 'dwi_env_Personnel_trajectory' ); INSERT INTO   dwi_env_monitoring_indicator_sensor (     Timestamp,     UserID,     FloorCode,     BuildingCode,     X,     Y,     genre,     RegionValidityCheck,     IsCardSeparated   ) SELECT   Timestamp,   UserID,   FloorCode,   BuildingCode,   X,   Y,   genre,   RegionValidityCheck,   IsCardSeparated FROM   trans_msg_src; 
  • [问题求助] MRS中使用Flinksql,使用redis流表,将kafka数据写入redis的问题
    流表配置如下,使用hash结构:flinksql语句,简单统计时间和数量:所得结果:问题:想要使用begin_time作为hash的key,sum作为value该如何配置
  • [问题求助] MRS 创建FlinkServer数据连接kafka 报未知错误
    MRS是安全模式,kakfa集群把Ranger鉴权停了也连不上,测试报未知错误,但是kafka在客户端中是可以正常使用的。
  • [技术干货] flink基本知识
     flink支持两种划分窗口的方式(time和count)  如果根据时间划分窗口,那么它就是一个time-window  如果根据数据划分窗口,那么它就是一个count-window  flink支持窗口的两个重要属性(size和interval)  如果size=interval,那么就会形成tumbling-window(无重叠数据)  如果size>interval,那么就会形成sliding-window(有重叠数据)  如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。  ————————————————                              本文来自微信公众号:五分钟学大数据,转载请在公众号后台获取作者微信进行授权                          原文链接:https://blog.csdn.net/helloHbulie/article/details/120333974 
  • [二次开发] 本地环境消费开源kafka,将数据写入mrs kafka
    生产者代码:报错情况:
  • [问题求助] mrs 提交任务到yarn报错 Could not load service provider for table factories
    mrs 提交任务到yarn报错   Could not load service provider for table factories
  • [问题求助] 请问如何使用mrs flinksql与oceanbase对接
    能否使用flink-sql-connector-oceanbase-cdc 进行连接,请问mrs flink是否支持这么做,如果支持,我该把jar放入那个位置才能生效
  • [问题求助] mrs是否支持 flinkserver 对接oceanbase
    flink sql:CREATE TABLE ob_tbl1 (     col1 INT,     col2 VARCHAR(20),     col3 INT)     WITH ('connector' = 'oceanbase-cdc',     'scan.startup.mode' = 'initial',     'tenant-name' = 'mq_t1',     'username' = 'root@mq_t1',     'password' = 'pswd',     'database-name' = 'test_ob_to_kafka',     'table-name' = 'tbl1',     'hostname' = '10.10.101.64',     'port' = '2881',     'rootserver-list' = '10.10.101.64:2882:2881',     'logproxy.host' = '10.10.101.64',     'logproxy.port' = '2983');
  • [开发应用] DWS支持实时流数据读取吗?
    用flink实时读取dws数据,结果执行一会显示任务成功,不能流式执行?难道dws不支持吗?
  • [开发应用] DWS能作为实时数据的数仓分层吗?就是不仅能实时写入,还能实时读取,处理后再实时写入
    DWS能作为实时数据的数仓分层吗?就是不仅能实时写入,还能实时读取,处理后再实时写入
  • [问题求助] FusionInsight_HD_8.2.0.1产品,在Flink SQL客户端中select 'hello'报错KeeperErrorCode = ConnectionLoss for /flink_base/flink
    flinkSQL client中select 还是报错的,请帮忙指点下,哪里有问题?谢谢org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$SessionClosedRequireAuthException: KeeperErrorCode = Session closed because client failed to authenticate for /flink_base/flink或者org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /flink_base/flinkzookeeper已经启动,192.168.0.82:24002 ,而且zookeeper中的ACL权限已经设置,但是在设置配额失败[zk: 192.168.0.82:24002(CONNECTED) 5] setquota -n 1000000 /flink_base/flink Insufficient permission : /flink_base/flink tail -f /home/dmp/app/ficlient/Flink/flink/log/flink-root-sql-client-192-168-0-85.log  中的日志如下flink-conf.yaml中的全部配置如下akka.ask.timeout: 120 s akka.client-socket-worker-pool.pool-size-factor: 1.0 akka.client-socket-worker-pool.pool-size-max: 2 akka.client-socket-worker-pool.pool-size-min: 1 akka.framesize: 10485760b akka.log.lifecycle.events: false akka.lookup.timeout: 30 s akka.server-socket-worker-pool.pool-size-factor: 1.0 akka.server-socket-worker-pool.pool-size-max: 2 akka.server-socket-worker-pool.pool-size-min: 1 akka.ssl.enabled: true akka.startup-timeout: 10 s akka.tcp.timeout: 60 s akka.throughput: 15 blob.fetch.backlog: 1000 blob.fetch.num-concurrent: 50 blob.fetch.retries: 50 blob.server.port: 32456-32520 blob.service.ssl.enabled: true classloader.check-leaked-classloader: false classloader.resolve-order: child-first client.rpc.port: 32651-32720 client.timeout: 120 s compiler.delimited-informat.max-line-samples: 10 compiler.delimited-informat.max-sample-len: 2097152 compiler.delimited-informat.min-line-samples: 2 env.hadoop.conf.dir: /home/dmp/app/ficlient/Flink/flink/conf env.java.opts.client: -Djava.io.tmpdir=/home/dmp/app/ficlient/Flink/tmp env.java.opts.jobmanager: -Djava.security.krb5.conf=/opt/huawei/Bigdata/common/runtime/krb5.conf -Djava.io.tmpdir=${PWD}/tmp -Des.security.indication=true env.java.opts.taskmanager: -Djava.security.krb5.conf=/opt/huawei/Bigdata/common/runtime/krb5.conf -Djava.io.tmpdir=${PWD}/tmp -Des.security.indication=true env.java.opts: -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M -Djdk.tls.ephemeralDHKeySize=3072 -Djava.library.path=${HADOOP_COMMON_HOME}/lib/native -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false -Dbeetle.application.home.path=/opt/huawei/Bigdata/common/runtime/security/config -Dwcc.configuration.path=/opt/huawei/Bigdata/common/runtime/security/config -Dscc.configuration.path=/opt/huawei/Bigdata/common/runtime/securityforscc/config -Dscc.bigdata.common=/opt/huawei/Bigdata/common/runtime env.yarn.conf.dir: /home/dmp/app/ficlient/Flink/flink/conf flink.security.enable: true flinkserver.alarm.cert.skip: true flinkserver.host.ip: fs.output.always-create-directory: false fs.overwrite-files: false heartbeat.interval: 10000 heartbeat.timeout: 120000 high-availability.job.delay: 10 s high-availability.storageDir: hdfs://hacluster/flink/recovery high-availability.zookeeper.client.acl: creator high-availability.zookeeper.client.connection-timeout: 90000 high-availability.zookeeper.client.max-retry-attempts: 5 high-availability.zookeeper.client.retry-wait: 5000 high-availability.zookeeper.client.session-timeout: 90000 high-availability.zookeeper.client.tolerate-suspended-connections: true high-availability.zookeeper.path.root: /flink high-availability.zookeeper.path.under.quota: /flink_base high-availability.zookeeper.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 high-availability.zookeeper.quota.enabled: true high-availability: zookeeper job.alarm.enable: true jobmanager.heap.size: 1024mb jobmanager.web.403-redirect-url: https://192.168.0.82:28443/web/pages/error/403.html jobmanager.web.404-redirect-url: https://192.168.0.82:28443/web/pages/error/404.html jobmanager.web.415-redirect-url: https://192.168.0.82:28443/web/pages/error/415.html jobmanager.web.500-redirect-url: https://192.168.0.82:28443/web/pages/error/500.html jobmanager.web.access-control-allow-origin: * jobmanager.web.accesslog.enable: true jobmanager.web.allow-access-address: * jobmanager.web.backpressure.cleanup-interval: 600000 jobmanager.web.backpressure.delay-between-samples: 50 jobmanager.web.backpressure.num-samples: 100 jobmanager.web.backpressure.refresh-interval: 60000 jobmanager.web.cache-directive: no-store jobmanager.web.checkpoints.disable: false jobmanager.web.checkpoints.history: 10 jobmanager.web.expires-time: 0 jobmanager.web.history: 5 jobmanager.web.logout-timer: 600000 jobmanager.web.pragma-value: no-cache jobmanager.web.refresh-interval: 3000 jobmanager.web.ssl.enabled: false jobmanager.web.x-frame-options: DENY library-cache-manager.cleanup.interval: 3600 metrics.internal.query-service.port: 28844-28943 metrics.reporter.alarm.factory.class: com.huawei.mrs.flink.alarm.FlinkAlarmReporterFactory metrics.reporter.alarm.interval: 30 s metrics.reporter.alarm.job.alarm.checkpoint.consecutive.failures.num: 5 metrics.reporter.alarm.job.alarm.failure.restart.rate: 80 metrics.reporter.alarm.job.alarm.task.backpressure.duration: 180 s metrics.reporter: alarm nettyconnector.message.delimiter: $_ nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range: 28444-28843 nettyconnector.ssl.enabled: false parallelism.default: 1 query.client.network-threads: 0 query.proxy.network-threads: 0 query.proxy.ports: 32541-32560 query.proxy.query-threads: 0 query.server.network-threads: 0 query.server.ports: 32521-32540 query.server.query-threads: 0 resourcemanager.taskmanager-timeout: 300000 rest.await-leader-timeout: 30000 rest.bind-port: 32261-32325 rest.client.max-content-length: 104857600 rest.connection-timeout: 15000 rest.idleness-timeout: 300000 rest.retry.delay: 3000 rest.retry.max-attempts: 20 rest.server.max-content-length: 104857600 rest.server.numThreads: 4 restart-strategy.failure-rate.delay: 10 s restart-strategy.failure-rate.failure-rate-interval: 60 s restart-strategy.failure-rate.max-failures-per-interval: 1 restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s restart-strategy: none security.cookie: 9477298cd52a3e409ed0bc570bdc795179fcc7c301a1225e22f47fe0a3db47c2 security.enable: true security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.keytab: security.kerberos.login.principal: security.kerberos.login.use-ticket-cache: true security.networkwide.listen.restrict: true security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 security.ssl.enabled: false security.ssl.encrypt.enabled: false security.ssl.key-password: Bapuser@9000 security.ssl.keystore-password: Bapuser@9000 security.ssl.keystore: ssl/flink.keystore security.ssl.protocol: TLSv1.2 security.ssl.rest.enabled: false security.ssl.truststore-password: Bapuser@9000 security.ssl.truststore: ssl/flink.truststore security.ssl.verify-hostname: false slot.idle.timeout: 50000 slot.request.timeout: 300000 state.backend.fs.checkpointdir: hdfs://hacluster/flink/checkpoints state.backend.fs.memory-threshold: 20kb state.backend.incremental: true state.backend: rocksdb state.savepoints.dir: hdfs://hacluster/flink/savepoint task.cancellation.interval: 30000 task.cancellation.timeout: 180000 taskmanager.data.port: 32391-32455 taskmanager.data.ssl.enabled: false taskmanager.debug.memory.logIntervalMs: 0 taskmanager.debug.memory.startLogThread: false taskmanager.heap.size: 1024mb taskmanager.initial-registration-pause: 500 ms taskmanager.max-registration-pause: 30 s taskmanager.maxRegistrationDuration: 5 min taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: false taskmanager.memory.preallocate: false taskmanager.memory.segment-size: 32768 taskmanager.network.detailed-metrics: false taskmanager.network.memory.buffers-per-channel: 2 taskmanager.network.memory.floating-buffers-per-gate: 8 taskmanager.network.memory.fraction: 0.1 taskmanager.network.memory.max: 1gb taskmanager.network.memory.min: 64mb taskmanager.network.netty.client.connectTimeoutSec: 300 taskmanager.network.netty.client.numThreads: -1 taskmanager.network.netty.num-arenas: -1 taskmanager.network.netty.sendReceiveBufferSize: 4096 taskmanager.network.netty.server.backlog: 0 taskmanager.network.netty.server.numThreads: -1 taskmanager.network.netty.transport: nio taskmanager.network.numberOfBuffers: 2048 taskmanager.network.request-backoff.initial: 100 taskmanager.network.request-backoff.max: 10000 taskmanager.numberOfTaskSlots: 1 taskmanager.refused-registration-pause: 10 s taskmanager.registration.timeout: 5 min taskmanager.rpc.port: 32326-32390 taskmanager.runtime.hashjoin-bloom-filters: false taskmanager.runtime.max-fan: 128 taskmanager.runtime.sort-spilling-threshold: 0.8 use.path.filesystem: true use.smarterleaderlatch: true web.submit.enable: false web.timeout: 10000 yarn.application-attempt-failures-validity-interval: 600000 yarn.application-attempts: 5 yarn.application-master.port: 32586-32650 yarn.heap-cutoff-min: 384 yarn.heap-cutoff-ratio: 0.25 yarn.heartbeat-delay: 5 yarn.heartbeat.container-request-interval: 500 yarn.maximum-failed-containers: 5 yarn.per-job-cluster.include-user-jar: ORDER zk.ssl.enabled: false zookeeper.clientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 zookeeper.root.acl: OPEN zookeeper.sasl.disable: false zookeeper.sasl.login-context-name: Client zookeeper.sasl.service-name: zookeeper zookeeper.secureClientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 
总条数:141 到第
上滑加载中