-
异常代码配置文件yarn报错
-
华为云上搭建的高可用Flink集群最新版本是 flink1.18.0,jdk是21DLI山的最新flink只是 flink1.15,jdk每看到这两个有啥区别吗?DLI的flink版本为什么不和flink集群的保持一致呢?那我在本地基于flink1.18开发的程序,打包后是不是就不能提交到DLI上执行了?
-
请问Flink可以使用update语句吗,我这边使用update启动不了
-
前言 本文主要概述了Flink-CDC. 1. CDC 概述 1.1 什么是CDC? CDC是(Change Data Capture 变更数据获取)的简称 ,在广义的概念上,只要是能捕获数据变更的技术,都可以称之为 CDC。 核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 技术的应用场景非常广泛: 数据同步:用于数据备份,容灾; 数据分发:一个数据源分发给多个下游系统; 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。 1.2 CDC的实现机制 CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: 1) 基于主动查询的 CDC: 用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。 特点: 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据; 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; 持续的频繁查询对数据库的压力较大。 不保障实时性,基于离线调度存在天然的延迟。 2) 基于事件接收CDC: 可以通过触发器(Trigger)或者日志(例如 Transaction log、Binary log、Write-ahead log 等)来实现。当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源; 保障数据一致性,因为 binlog 文件包含了所有历史变更明细; 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。 基于查询的CDC 基于Binlog的CDC 经过以上对比,可以发现基于日志CDC 有以下这几种优势: 能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失 每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势 无需入侵业务,业务解耦,无需更改业务模型 1.3 常见的开源 CDC 方案 对比全量同步能力: 基于查询或者日志的 CDC 方案基本都支持,除了 Canal(仅支持增量)。 对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。 对比增量同步能力: 基于日志的方式,可以很好的做到增量同步; 而基于查询的方式是很难做到增量同步的。 从架构角度去看: 该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。 在数据转换 / 数据清洗能力上: 当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合。 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据; DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。 在生态扩展方面: 这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。 ———————————————— 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 原文链接:https://blog.csdn.net/syhiiu/article/details/140697517
-
sql如下:-- 统一接入数据源 CREATE TABLE mqs_res_datalog ( detail STRING, operateSource STRING, operateTarget STRING, operateType STRING, applicationName STRING, userId STRING ) WITH ( 'connector' = 'kafka-mqs', 'properties.group.id' = 'flink', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.endpoint.identification.algorithm' = '', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'parameter' = '-namesrvUrls {{G_kafka_namesrv_urls}} -appId {{G_app_id_ronghe}} -appSecret {{G_topic_T_M_WRITELOG}} -topic {{G_app_secret_ronghe}} -mappings applicationName=applicationName;detail=detail;operateSource=operateSource;operateTarget=operateTarget;operateType=operateType;userId=userId' ); -- 临时表1: 过滤其他系统消息,实时流数据类型转换接收中间流 CREATE VIEW trans_msg_src ( detail, operateSource, operateTarget, operateType, applicationName, userId ) AS SELECT detail, operateSource, operateTarget, operateType, applicationName, userId FROM mqs_res_datalog; CREATE TABLE dwi_env_monitoring_indicator_sensor ( Timestamp DATETIME, UserID INT, FloorCode VARCHAR(255), BuildingCode VARCHAR(255), X DECIMAL(10, 6), Y DECIMAL(10, 6), genre VARCHAR(255), RegionValidityCheck BOOLEAN, IsCardSeparated BOOLEAN ) WITH ( 'connector' = 'openGauss', 'driver' = 'org.postgresql.Driver', 'url' = '{{G_dws_jdbc_url_bpa}}', 'username' = '{{G_dws_username_bpa}}', 'password' = '{{G_dws_password_bpa}}', 'table-name' = 'dwi_env_Personnel_trajectory' ); INSERT INTO dwi_env_monitoring_indicator_sensor ( Timestamp, UserID, FloorCode, BuildingCode, X, Y, genre, RegionValidityCheck, IsCardSeparated ) SELECT Timestamp, UserID, FloorCode, BuildingCode, X, Y, genre, RegionValidityCheck, IsCardSeparated FROM trans_msg_src;
-
流表配置如下,使用hash结构:flinksql语句,简单统计时间和数量:所得结果:问题:想要使用begin_time作为hash的key,sum作为value该如何配置
-
MRS是安全模式,kakfa集群把Ranger鉴权停了也连不上,测试报未知错误,但是kafka在客户端中是可以正常使用的。
-
flink支持两种划分窗口的方式(time和count) 如果根据时间划分窗口,那么它就是一个time-window 如果根据数据划分窗口,那么它就是一个count-window flink支持窗口的两个重要属性(size和interval) 如果size=interval,那么就会形成tumbling-window(无重叠数据) 如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size<interval,那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。 ———————————————— 本文来自微信公众号:五分钟学大数据,转载请在公众号后台获取作者微信进行授权 原文链接:https://blog.csdn.net/helloHbulie/article/details/120333974
-
生产者代码:报错情况:
-
mrs 提交任务到yarn报错 Could not load service provider for table factories
-
能否使用flink-sql-connector-oceanbase-cdc 进行连接,请问mrs flink是否支持这么做,如果支持,我该把jar放入那个位置才能生效
-
flink sql:CREATE TABLE ob_tbl1 ( col1 INT, col2 VARCHAR(20), col3 INT) WITH ('connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'tenant-name' = 'mq_t1', 'username' = 'root@mq_t1', 'password' = 'pswd', 'database-name' = 'test_ob_to_kafka', 'table-name' = 'tbl1', 'hostname' = '10.10.101.64', 'port' = '2881', 'rootserver-list' = '10.10.101.64:2882:2881', 'logproxy.host' = '10.10.101.64', 'logproxy.port' = '2983');
-
用flink实时读取dws数据,结果执行一会显示任务成功,不能流式执行?难道dws不支持吗?
-
DWS能作为实时数据的数仓分层吗?就是不仅能实时写入,还能实时读取,处理后再实时写入
-
[问题求助] FusionInsight_HD_8.2.0.1产品,在Flink SQL客户端中select 'hello'报错KeeperErrorCode = ConnectionLoss for /flink_base/flinkflinkSQL client中select 还是报错的,请帮忙指点下,哪里有问题?谢谢org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$SessionClosedRequireAuthException: KeeperErrorCode = Session closed because client failed to authenticate for /flink_base/flink或者org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /flink_base/flinkzookeeper已经启动,192.168.0.82:24002 ,而且zookeeper中的ACL权限已经设置,但是在设置配额失败[zk: 192.168.0.82:24002(CONNECTED) 5] setquota -n 1000000 /flink_base/flink Insufficient permission : /flink_base/flink tail -f /home/dmp/app/ficlient/Flink/flink/log/flink-root-sql-client-192-168-0-85.log 中的日志如下flink-conf.yaml中的全部配置如下akka.ask.timeout: 120 s akka.client-socket-worker-pool.pool-size-factor: 1.0 akka.client-socket-worker-pool.pool-size-max: 2 akka.client-socket-worker-pool.pool-size-min: 1 akka.framesize: 10485760b akka.log.lifecycle.events: false akka.lookup.timeout: 30 s akka.server-socket-worker-pool.pool-size-factor: 1.0 akka.server-socket-worker-pool.pool-size-max: 2 akka.server-socket-worker-pool.pool-size-min: 1 akka.ssl.enabled: true akka.startup-timeout: 10 s akka.tcp.timeout: 60 s akka.throughput: 15 blob.fetch.backlog: 1000 blob.fetch.num-concurrent: 50 blob.fetch.retries: 50 blob.server.port: 32456-32520 blob.service.ssl.enabled: true classloader.check-leaked-classloader: false classloader.resolve-order: child-first client.rpc.port: 32651-32720 client.timeout: 120 s compiler.delimited-informat.max-line-samples: 10 compiler.delimited-informat.max-sample-len: 2097152 compiler.delimited-informat.min-line-samples: 2 env.hadoop.conf.dir: /home/dmp/app/ficlient/Flink/flink/conf env.java.opts.client: -Djava.io.tmpdir=/home/dmp/app/ficlient/Flink/tmp env.java.opts.jobmanager: -Djava.security.krb5.conf=/opt/huawei/Bigdata/common/runtime/krb5.conf -Djava.io.tmpdir=${PWD}/tmp -Des.security.indication=true env.java.opts.taskmanager: -Djava.security.krb5.conf=/opt/huawei/Bigdata/common/runtime/krb5.conf -Djava.io.tmpdir=${PWD}/tmp -Des.security.indication=true env.java.opts: -Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M -Djdk.tls.ephemeralDHKeySize=3072 -Djava.library.path=${HADOOP_COMMON_HOME}/lib/native -Djava.net.preferIPv4Stack=true -Djava.net.preferIPv6Addresses=false -Dbeetle.application.home.path=/opt/huawei/Bigdata/common/runtime/security/config -Dwcc.configuration.path=/opt/huawei/Bigdata/common/runtime/security/config -Dscc.configuration.path=/opt/huawei/Bigdata/common/runtime/securityforscc/config -Dscc.bigdata.common=/opt/huawei/Bigdata/common/runtime env.yarn.conf.dir: /home/dmp/app/ficlient/Flink/flink/conf flink.security.enable: true flinkserver.alarm.cert.skip: true flinkserver.host.ip: fs.output.always-create-directory: false fs.overwrite-files: false heartbeat.interval: 10000 heartbeat.timeout: 120000 high-availability.job.delay: 10 s high-availability.storageDir: hdfs://hacluster/flink/recovery high-availability.zookeeper.client.acl: creator high-availability.zookeeper.client.connection-timeout: 90000 high-availability.zookeeper.client.max-retry-attempts: 5 high-availability.zookeeper.client.retry-wait: 5000 high-availability.zookeeper.client.session-timeout: 90000 high-availability.zookeeper.client.tolerate-suspended-connections: true high-availability.zookeeper.path.root: /flink high-availability.zookeeper.path.under.quota: /flink_base high-availability.zookeeper.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 high-availability.zookeeper.quota.enabled: true high-availability: zookeeper job.alarm.enable: true jobmanager.heap.size: 1024mb jobmanager.web.403-redirect-url: https://192.168.0.82:28443/web/pages/error/403.html jobmanager.web.404-redirect-url: https://192.168.0.82:28443/web/pages/error/404.html jobmanager.web.415-redirect-url: https://192.168.0.82:28443/web/pages/error/415.html jobmanager.web.500-redirect-url: https://192.168.0.82:28443/web/pages/error/500.html jobmanager.web.access-control-allow-origin: * jobmanager.web.accesslog.enable: true jobmanager.web.allow-access-address: * jobmanager.web.backpressure.cleanup-interval: 600000 jobmanager.web.backpressure.delay-between-samples: 50 jobmanager.web.backpressure.num-samples: 100 jobmanager.web.backpressure.refresh-interval: 60000 jobmanager.web.cache-directive: no-store jobmanager.web.checkpoints.disable: false jobmanager.web.checkpoints.history: 10 jobmanager.web.expires-time: 0 jobmanager.web.history: 5 jobmanager.web.logout-timer: 600000 jobmanager.web.pragma-value: no-cache jobmanager.web.refresh-interval: 3000 jobmanager.web.ssl.enabled: false jobmanager.web.x-frame-options: DENY library-cache-manager.cleanup.interval: 3600 metrics.internal.query-service.port: 28844-28943 metrics.reporter.alarm.factory.class: com.huawei.mrs.flink.alarm.FlinkAlarmReporterFactory metrics.reporter.alarm.interval: 30 s metrics.reporter.alarm.job.alarm.checkpoint.consecutive.failures.num: 5 metrics.reporter.alarm.job.alarm.failure.restart.rate: 80 metrics.reporter.alarm.job.alarm.task.backpressure.duration: 180 s metrics.reporter: alarm nettyconnector.message.delimiter: $_ nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range: 28444-28843 nettyconnector.ssl.enabled: false parallelism.default: 1 query.client.network-threads: 0 query.proxy.network-threads: 0 query.proxy.ports: 32541-32560 query.proxy.query-threads: 0 query.server.network-threads: 0 query.server.ports: 32521-32540 query.server.query-threads: 0 resourcemanager.taskmanager-timeout: 300000 rest.await-leader-timeout: 30000 rest.bind-port: 32261-32325 rest.client.max-content-length: 104857600 rest.connection-timeout: 15000 rest.idleness-timeout: 300000 rest.retry.delay: 3000 rest.retry.max-attempts: 20 rest.server.max-content-length: 104857600 rest.server.numThreads: 4 restart-strategy.failure-rate.delay: 10 s restart-strategy.failure-rate.failure-rate-interval: 60 s restart-strategy.failure-rate.max-failures-per-interval: 1 restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s restart-strategy: none security.cookie: 9477298cd52a3e409ed0bc570bdc795179fcc7c301a1225e22f47fe0a3db47c2 security.enable: true security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.keytab: security.kerberos.login.principal: security.kerberos.login.use-ticket-cache: true security.networkwide.listen.restrict: true security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 security.ssl.enabled: false security.ssl.encrypt.enabled: false security.ssl.key-password: Bapuser@9000 security.ssl.keystore-password: Bapuser@9000 security.ssl.keystore: ssl/flink.keystore security.ssl.protocol: TLSv1.2 security.ssl.rest.enabled: false security.ssl.truststore-password: Bapuser@9000 security.ssl.truststore: ssl/flink.truststore security.ssl.verify-hostname: false slot.idle.timeout: 50000 slot.request.timeout: 300000 state.backend.fs.checkpointdir: hdfs://hacluster/flink/checkpoints state.backend.fs.memory-threshold: 20kb state.backend.incremental: true state.backend: rocksdb state.savepoints.dir: hdfs://hacluster/flink/savepoint task.cancellation.interval: 30000 task.cancellation.timeout: 180000 taskmanager.data.port: 32391-32455 taskmanager.data.ssl.enabled: false taskmanager.debug.memory.logIntervalMs: 0 taskmanager.debug.memory.startLogThread: false taskmanager.heap.size: 1024mb taskmanager.initial-registration-pause: 500 ms taskmanager.max-registration-pause: 30 s taskmanager.maxRegistrationDuration: 5 min taskmanager.memory.fraction: 0.7 taskmanager.memory.off-heap: false taskmanager.memory.preallocate: false taskmanager.memory.segment-size: 32768 taskmanager.network.detailed-metrics: false taskmanager.network.memory.buffers-per-channel: 2 taskmanager.network.memory.floating-buffers-per-gate: 8 taskmanager.network.memory.fraction: 0.1 taskmanager.network.memory.max: 1gb taskmanager.network.memory.min: 64mb taskmanager.network.netty.client.connectTimeoutSec: 300 taskmanager.network.netty.client.numThreads: -1 taskmanager.network.netty.num-arenas: -1 taskmanager.network.netty.sendReceiveBufferSize: 4096 taskmanager.network.netty.server.backlog: 0 taskmanager.network.netty.server.numThreads: -1 taskmanager.network.netty.transport: nio taskmanager.network.numberOfBuffers: 2048 taskmanager.network.request-backoff.initial: 100 taskmanager.network.request-backoff.max: 10000 taskmanager.numberOfTaskSlots: 1 taskmanager.refused-registration-pause: 10 s taskmanager.registration.timeout: 5 min taskmanager.rpc.port: 32326-32390 taskmanager.runtime.hashjoin-bloom-filters: false taskmanager.runtime.max-fan: 128 taskmanager.runtime.sort-spilling-threshold: 0.8 use.path.filesystem: true use.smarterleaderlatch: true web.submit.enable: false web.timeout: 10000 yarn.application-attempt-failures-validity-interval: 600000 yarn.application-attempts: 5 yarn.application-master.port: 32586-32650 yarn.heap-cutoff-min: 384 yarn.heap-cutoff-ratio: 0.25 yarn.heartbeat-delay: 5 yarn.heartbeat.container-request-interval: 500 yarn.maximum-failed-containers: 5 yarn.per-job-cluster.include-user-jar: ORDER zk.ssl.enabled: false zookeeper.clientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002 zookeeper.root.acl: OPEN zookeeper.sasl.disable: false zookeeper.sasl.login-context-name: Client zookeeper.sasl.service-name: zookeeper zookeeper.secureClientPort.quorum: 192.168.0.82:24002,192.168.0.81:24002,192.168.0.80:24002
上滑加载中
推荐直播
-
OpenHarmony应用开发之网络数据请求与数据解析
2025/01/16 周四 19:00-20:30
华为开发者布道师、南京师范大学泰州学院副教授,硕士研究生导师,开放原子教育银牌认证讲师
科技浪潮中,鸿蒙生态强势崛起,OpenHarmony开启智能终端无限可能。当下,其原生应用开发适配潜力巨大,终端设备已广泛融入生活各场景,从家居到办公、穿戴至车载。 现在,机会敲门!我们的直播聚焦OpenHarmony关键的网络数据请求与解析,抛开晦涩理论,用真实案例带你掌握数据访问接口,轻松应对复杂网络请求、精准解析Json与Xml数据。参与直播,为开发鸿蒙App夯实基础,抢占科技新高地,别错过!
回顾中 -
Ascend C高层API设计原理与实现系列
2025/01/17 周五 15:30-17:00
Ascend C 技术专家
以LayerNorm算子开发为例,讲解开箱即用的Ascend C高层API
回顾中
热门标签