• [二次开发] 【FI hd 6513】【开源flink 华为kafka】 开源flink连接华为kafka问题
    【功能模块】 环境:HD(6.5.1.3,x86_64) Kafka-2(2.11-1.1.0) 已开启安全认证 开源flink 1.14.2 、flink-kafka-connector-1.14.2【操作步骤&问题现象】1、需要使用开源flink连接HD(6.5.1.3,x86_64)的kafka2、已经将flink kafka connector中开源kafka-client替换为kafka-clients-2.4.0-h0.cbu.mrs.313.r103、在flink-conf.yml 中配置了认证信息#认证参数java.security.auth.login.config: /data/abc/jaas.confsecurity.kerberos.login.keytab: /data/abc/user.keytabsecurity.kerberos.login.principal: abc@EXAMPLE.COMsecurity.kerberos.login.contexts: Client,KafkaClientsecurity.kerberos.login.use-ticket-cache: false其中jaas.conf如下样例KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab=" /data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};4、代码中kafka消费者除了基本配置也添加了如下配置sasl.kerberos.service.name=kafkasecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIkerberos.domain.name=hadoop.example.com5、开源flink提交以yarn per job 方式提交【flink、yarn都是开源版本,无安全认证,只有kafka是FI hd的】【截图信息】我想确认一下hd 6513的kafka是否支持高版本flink连接,如果支持我应该做如何调整?【日志信息】(可选,上传日志内容或者附件)
  • [知识分享] 5种kafka消费端性能优化方法
    【摘要】 kafka消费端性能优化主要从下面几个方面优化:1.接口使用方面优化:旧版本highlevel-consumer:偏移量信息存储在zookeeper,最大消费线程数与分区数量相同,不推荐旧版本simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐新版本highlevel-consumer:偏移量信息存储在kafka指定的topic中,默认...本文分享自华为云社区《FusionInsight HD&MRS:kafka消费端性能优化方法》,作者: 穿夹克的坏猴子 。kafka消费端性能优化主要从下面几个方面优化:1.接口使用方面优化:旧版本highlevel-consumer:偏移量信息存储在zookeeper,最大消费线程数与分区数量相同,不推荐旧版本simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐新版本highlevel-consumer:偏移量信息存储在kafka指定的topic中,默认情况下最大消费线程数与分区数量相同,可以实现多线程消费单分区,推荐2.参数调优(以下参数需根据现网环境评估调至合适的值):2.1 旧版本消费者(kafka old API)参数调优fetch.message.max.bytes:该参数为一次性从kafka集群中获取的数据块大小。在升级到651版本后这个参数需要调大,否则容易出现获取数据限制的报错。建议调整大小不小于kafka的服务端参数message.max.bytes。注意如何确认为旧版本:如果生产者的配置方式包含如下这些配置,则为旧版本:group.id/zookeeper.connect2.2 新版本参数(kafka new API)参数调优max.poll.records:意味消费者一次poll()操作,能够获取的最大数据量,调整这个值能提升吞吐量,于此同时也需要同步提升max.poll.interval.ms的参数大小fetch.max.bytes:意味server端可返回给consumer的最大数据大小,增加可以提升吞吐量,但是在客户端和服务端网络延迟比较大的环境下,建议可以减小该值,防止业务处理数据超时。heartbeat.interval.ms:消费超时时间,consumer与kafka之间的超时时间,该参数不能超过session.timeout.ms,通常设置为session.timeout.ms的三分之一,默认值:3000max.partition.fetch.bytes:限制每个consumer发起fetch请求时候,读到数据(record)的限制,设置过大,consumer本地缓存的数据就会越多,可能影响内存的使用,默认值:1048576fetch.max.bytes:server端可返回给consumer的最大数据大小,数值可大于max.partition.fetch.bytes,一般设置为默认值即可,默认值:52428800session.timeout.ms:使用consumer组管理offset时,consumer与broker之间的心跳超时时间,如果consumer消费数据的频率非常低,建议增大这个参数值,默认值:10000auto.offset.reset:消费过程中无法找到数据消费到的offset位置,所选择的消费策略,earliest:从头开始消费,可能会消费到重复数据,latest:从数据末尾开始消费,可能会丢失数据。默认值:earlistmax.poll.interval.ms:消费者在每一轮poll() (拉取数据之间的最大时间延迟),如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将触发rebalance,以便将分区重新分配给别的成员。如果,再两次poll之间需要添加过多复杂的,耗时的逻辑,需要延长这个时间,默认值:300smax.poll.records:消费者一次poll()操作,能够获取的最大数据量,增加这个参数值,会增加一次性拉取数据的数据量,确保拉取数据的时间,至少在max.poll.interval.ms规定的范围之内,默认值:5002.3 Simpleconsumer参数调优simpleconsumer在初始化阶段需要传一个fetchsize的参数,比如:consumer=new SimpleConsumer(leaderBroker,a_port,100000,64*1024,clientName)中64*1024,该参数表示simpleconsumer一次性获取的数据大小,如果该值过大则可能会导致request时间过长,使用过程中应该降低这个值,保证消费频率,使用SimpleConsumer的核心需求是:多线程消费单个分区,以达到提升性能的要求,如果没有这样需求,不建议使用这个这种消费方式3.消费端频繁rebalance导致性能下降调优:3.1因业务处理能力不足导致的:session.timout.ms控制心跳超时时间。heartbeat.interval.ms控制心跳发送频率,建议该值不超过session.timout.ms的三分之一。max.poll.interval.ms控制每次poll的间隔,时间=获取数据的时间+处理数据的时间,如果max.poll.records设定的值在max.poll.interval.ms指定的时间内没有处理完成会触发rebalance,这里给出一个相对较为合理的配置,建议在预计的处理时间的基础上再加1分钟。max.poll.records 每个批次处理的数据条数,默认为500条。如果处理能力较低,建议可以减小这个值。3.2 非正常消费者频繁的访问kafka集群导致频繁rebalance:收集kafka-request.log,查看异常的topic有哪些客户端节点在消费,cat kafka-request.* | grep “topic=topicName” | grep “apikey=FETCH” | awk –F’from connection’ ‘{print $2}’ | awk –F’;’  ‘{print $1}’ | awk –F’-’ ‘{print $2}’ | awk –F’:’ ‘{print $1}’ | sort | uniq –c | sort -nr ,找出不应该产生消费行为的节点,停止异常节点上消费者4.版本引发性能下降优化FI 8.0.2版本之前kafka SimpleAclAuthorizer鉴权异常导致性能下降,8.0.2版本在使用非安全端口(21005或者9092端口)时会出现集群性能下降的问题,表现:kafka-root.log中出现大量ExitcodeException:id:Default#Principal:no such user报错解决办法:升级到FI 8023以上版本临时规避办法:业务侧使用21007端口访问kafka,去掉鉴权插件即allow.everyone.if.no.acl.found=true,将以下kafka服务端配置置为空:authorizer.class.name=5.FI 6513~6516版本的内核问题引发的性能异常6513版本在kafka引入社区的的lazy index功能后,在新的segment创建的过程中可能会导致并发创建失败的问题,常见的报错(server.log中)如以下两种类型:(1)java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code(2)java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index当出现以上两种类型的报错的时候可以断定是版本问题导致,问题预警如:https://support.huawei.com/enterprise/zh/bulletins-product/ENEWS2000007844;解决方案:升级到6517版本以上版本或者打入紧急补丁:https://support.huawei.com/enterprise/zh/cloud-computing/fusioninsight-hd-pid-21110924/software/251482609?idAbsPath=fixnode01%7C7919749%7C7941815%7C19942925%7C250430185%7C21110924;临时规避方案:重启异常的broker实例
  • [中间件类] Kafka的kerberos认证又失败了
    (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.)Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
  • [中间件类] Kafka的kerberos认证
    【功能模块】System.setProperty("java.security.auth.login.config", jaasPath);这个系统配置,无法通过,并且我的jaas.conf文件 放在linux目录下,是可以访问到的,程序已经测试过了,什么原因,我使用的是华为云MRS服务2.1.x版本的样例工程下载地址为:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。这个示例代码。
  • [技术干货] 事件中心学习之——微软Azure
    Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。以下方案是可以使用事件中心的部分方案:异常情况检测(欺诈/离群值)应用程序日志记录分析管道,例如点击流实时仪表板存档数据事务处理用户遥测数据处理设备遥测数据流式处理为何使用事件中心?仅当能够轻松处理并且能够从数据源获取即时见解时,数据才有价值。 事件中心提供低延迟、可无缝集成的分布式流处理平台,并在 Azure 的内部和外部提供数据和分析服务,用于构建完整的大数据管道。事件中心充当事件管道的“前门”,在解决方案体系结构中通常称作“事件引入器” 。 事件引入器是位于事件发布者与事件使用者之间的组件或服务,可以将事件流的生成与这些事件的使用分离开来。 事件中心提供统一的流式处理平台和时间保留缓冲区,将事件生成者与事件使用者分离开来。以下部分介绍 Azure 事件中心服务的重要功能:完全托管的 PaaS事件中心是完全托管的平台即服务 (PaaS),其配置或管理开销极低,因此你可以专注于业务解决方案。 Apache Kafka 生态系统的事件中心提供 PaaS Kafka 体验,使用户无需管理、配置或运行群集。支持实时处理和批处理实时引入、缓冲、存储和处理流,以获取可行的见解。 事件中心使用分区的使用者模型,可让多个应用程序同时处理流,并允许控制处理速度。 Azure 事件中心还能与 Azure Functions 集成,以构成无服务器体系结构。捕获事件数据在 Azure Blob 存储或 Azure Data Lake Storage 中近乎实时地捕获数据,以进行长期保留或微批处理。 可以基于用于派生实时分析的同一个流实现此行为。 设置捕获极其简单。 无需管理费用即可运行它,并且可以使用事件中心吞吐量单位或处理单位自动对它进行缩放。 使用事件中心可以专注于数据处理而不是数据捕获。可缩放使用事件中心可以从 MB 量级的数据流着手,然后逐步扩展到 GB 甚至 TB 量级的处理。 自动扩充功能是用于根据用量需求扩展吞吐量单位数或处理单位数的众多选项之一。丰富的生态系统借助基于行业标准 AMQP 1.0 协议并提供各种语言(.NET、Java、Python、JavaScript)的广泛生态系统,可以轻松地从事件中心开始处理流。 所有支持的客户端语言提供低级别集成。 该生态系统还为你提供了与 Azure 服务(如 Azure 流分析和 Azure Functions)的无缝集成,使你能够构建无服务器体系结构。用于 Apache Kafka 的事件中心Apache Kafka 生态系统的事件中心还可让 Apache Kafka(1.0 和更高版本)客户端和应用程序与事件中心通信。 无需设置、配置或管理你自己的 Kafka 和 Zookeeper 群集,也不需要使用某些不是 Azure 原生的 Kafka 即服务产品/服务。事件中心高级层和专用层事件中心高级层能够满足高端流式处理需求,即在托管的多租户 PaaS 环境中要求更高的性能、更好的隔离性、可预测的延迟和最小的干扰。 在标准产品/服务的所有功能之上,高级层提供了多种额外功能,例如动态分区纵向扩展、延长保持和客户管理的密钥。 有关详细信息,请参阅事件中心高级层。事件中心专用层提供单租户部署来满足客户的最苛刻流式处理需求。 此单租户套餐提供有保障的 99.99% SLA,只能在专用定价层上使用。 事件中心群集每秒能够引入数百万个事件,且提供有保障的容量和亚秒级的延迟。 在专用群集中创建的命名空间和事件中心不仅仅包括高级产品/服务的所有功能。 有关详细信息,请参阅事件中心专用层。如需更多详细信息,请参阅事件中心层之间的比较。Azure Stack Hub 上的事件中心使用 Azure Stack Hub 上的事件中心可以实现混合云方案。 支持使用基于流式处理和事件的解决方案进行本地处理和 Azure 云处理。 无论方案是混合(联网)还是离线的,解决方案都支持大规模的事件/流处理。 方案仅受事件中心群集大小的约束,但你可以根据需要预配群集大小。(Azure Stack Hub 和 Azure 上的)事件中心版本提供高度的功能奇偶一致性。 这种奇偶一致性意味着 SDK、示例、PowerShell、CLI 和门户提供类似的体验(差异很小)。有关详细信息,请参阅 Azure Stack Hub 上的事件中心概述。重要的体系结构组件事件中心包含以下关键组件:事件生成者:向事件中心发送数据的所有实体。 事件发布者可以使用 HTTPS、AMQP 1.0 或 Apache Kafka(1.0 和更高版本)发布事件。分区:每个使用者只读取消息流的特定子集或分区。使用者组:整个事件中心的视图(状态、位置或偏移量)。 通过使用者组来使用应用程序时,每个应用程序都有事件流的单独视图。 使用者根据自身的步调和情况独立读取流。吞吐量单位(标准层)或处理单位(高级层)或容量单位(专用):预先购买的容量单位,用于控制事件中心的吞吐量容量。事件接收者:从事件中心读取事件数据的所有实体。 所有事件中心使用者通过 AMQP 1.0 会话进行连接。 事件中心服务在事件变得可用时通过会话来提供事件。 所有 Kafka 使用者都通过 Kafka 协议 1.0 及更高版本进行连接。下图显示了事件中心流处理体系结构:后续步骤要开始使用事件中心,请参阅“发送和接收事件”教程 :.NET CoreJavaPythonJavaScriptGoC(仅发送)Apache Storm(仅接收)
  • [问题求助] 【datatool产品】【flinksql功能】为什么找不到topic,我们消息组件是kafka,现在显示的是Rocketmq
    【功能模块】【操作步骤&问题现象】1、配置flinksql2、点击运行【截图信息】【日志信息】(可选,上传日志内容或者附件)WARN  RocketmqClient                                               [] - can not find default topic, autoCreateTopicEnable is false
  • [技术交流] Kafka也能实现消息延时了?
    【摘要】 Kafka作为一个中间件组件,具有与其他中间件组件通用的功能 (异步处理、系统解耦、流量削峰、日志处理), 但在某些特殊的功能方面,每个中间件拥有其独特的特性,其中Kafka作为一个具有高吞吐、高性能的中间件, 它也有其不足的地方,在某些应用场景下面要求中间件实现消息延时的功能,但Kafka本身是不具备这种能力的。 此DEMO项目实现了每条消息实现自定义的延时,详细内容可阅读文章进行了解。《目录》背景开发环境云服务介绍方案设计方案简述方案架构图时序图代码参数指南代码实现结果反馈1、背景Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。2、开发环境![开发环境.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20226/24/1656061394394889826.png)3、云服务介绍分布式消息服务Kafka版: 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。4、方案设计i、方案简述此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。ii、方案架构图Kafka消息延时方案架构图![Kafka消息延时架构图.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20226/27/1656294689128363924.png) Kafka消息延时实现思路 1. 生产者将生产消息存入topic_delay主题中进行存储。 2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。 3. 取值判断是否满足延时要求。 a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。 b. 如果不满足延时要求,则等待自定义时间后重试判断。 4. 消费者最终从topic_out主题中拉取消息进行消费。iii、方案时序图Kafka消息延时方案时序图 ![Kafka消息延时时序图.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20226/27/1656298968188429219.png)5、代码参数指南本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。 #### Delay.java参数详情 1. delay:自定义延时时间,单位ms。 2. topic_delay变量:用于临时存储消息的topic名称。 3. topic_out变量:用于消费者拉取消息消费的topic名称。 4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs6、代码实现实现代码可参考Kafka消息延时 ``` package com.dms.delay; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedQueue; /** * Hello world! * */ public class Delay { //缓存队列 public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue(); //延迟时间(20秒),可根据需要设置延迟大小 public static long delay = 20000L; /** *入口 * @param args */ public static void main( String[] args ) { //延时主题(用于控制延时缓冲) String topic_delay = "topic_delay"; //输出主题(直接供消费者消费) String topic_out = "topic_out"; /* 消费线程 */ new Thread(new Runnable() { @Override public void run() { //消费者配置。请根据需要自行设置Kafka配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //指定消费主题 consumer.subscribe(Arrays.asList(topic_delay)); while (true) { //轮询消费 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10)); //遍历当前轮询批次拉取到的消息 for (ConsumerRecord<String, String> record : records){ System.out.println(record); //将消息添加到缓存队列 link.add(record); } } } }).start(); /* 生产线程 */ new Thread(new Runnable() { @Override public void run() { //生产者配置(请根据需求自行配置) Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建生产者 Producer<String, String> producer = new KafkaProducer<>(props); //持续从缓存队列中获取消息 while(true){ //如果缓存队列为空则放缓取值速度 if(link.isEmpty()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //获取缓存队列栈顶消息 ConsumerRecord<String, String> record = link.peek(); //获取该消息时间戳 long timestamp = record.timestamp(); Date now = new Date(); long nowTime = now.getTime(); if(timestamp+ Delay.delay <nowTime){ //获取消息值 String value = record.value(); //生产者发送消息到输出主题 producer.send(new ProducerRecord<String, String>(topic_out, "",value)); //从缓存队列中移除该消息 link.poll(); }else { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } } ```7、结果反馈![延时结果.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20226/27/1656299315693660978.png)
  • [技术干货] 如何构建本机的flink安全模式的开发环境,读取kafka数据
    本地idea如何构建本机的开发环境,flink读取sasl_plaintext的kafka?然后本机idea启动,直接消费kafka数据?在flink提交任务前加了if (LoginUtil.isSecurityModel()) { try { LOG.info("Securitymode start."); //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号 LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE); } catch (IOException e) { LOG.error("Security prepare failure."); LOG.error("The IOException occured : {}.", e); return; } LOG.info("Security prepare success.");}这部分认证,但是这个好像并没有提交到本地的flinkclient中。
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到Elasticsearch
    场景描述本示例场景对汽车驾驶的实时数据信息进行分析,将满足特定条件的数据结果进行汇总。数据的输入源为Kafka,结果输出到DWS中。例如,如下样例输入数据:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"} {"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"} {"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}预期输出:average_speed <= 90 和 total_miles <= 200000的车辆,即:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}前提条件已创建DMS Kafka实例,具体步骤可参考:DMS Kafka入门指引。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建Elasticsearch类型的CSS集群。具体创建CSS集群的操作可以参考创建CSS集群。本示例创建的CSS集群版本为:7.6.2,集群为非安全集群。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建Elasticsearch搜索索引:创建Elasticsearch搜索索引用于接收结果数据。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和CSS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在CSS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建Elasticsearch搜索索引登录CSS管理控制台,选择“集群管理 > Elasticsearch”。在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。在Console界面,执行如下命令创建索引“shoporders”。PUT /shoporders { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } } }步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接CSS的增强型跨源连接在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和CSS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和CSS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。弹性资源池:选择步骤1:创建队列中已经创建的队列。虚拟私有云:选择CSS的虚拟私有云。子网:选择CSS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaES。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建Elasticsearch结果表。CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic", --创建的Kafka Topic "format" = "json", "scan.startup.mode" = "latest-offset" ); CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '192.168.168.125:9200', --替换为CSS集群的内网地址和端口 'index' = 'shoporders' --创建的Elasticsearch搜索引擎 ); --将Kafka数据写入到Elasticsearch索引中 insert into elasticsearchSink select * from kafkaSource;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}发送成功后,在CSS集群的Kibana中执行下述语句并查看相应结果:GET shoporders/_search查询结果返回如下:{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "shoporders", "_type" : "_doc", "_id" : "6fswzIAByVjqg3_qAyM1", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "shoporders", "_type" : "_doc", "_id" : "6vs1zIAByVjqg3_qyyPp", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0002", "user_name" : "Jason", "area_id" : "330106" } } ] } }
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到DWS
    场景描述该场景为对汽车驾驶的实时数据信息进行分析,将满足特定条件的数据结果进行汇总。数据的输入源为Kafka,结果输出到DWS中。例如,如下样例输入数据:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"} {"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"} {"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}预期输出:average_speed <= 90 和 total_miles <= 200000的车辆,即:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"}前提条件已创建DMS Kafka实例,具体步骤可参考:创建Kafka实例。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建DWS实例,具体创建DWS集群的操作可以参考创建DWS集群。本示例创建的DWS集群版本为:8.1.1.205。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建DWS数据库和表:创建DWS数据库和表信息。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和DWS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建DWS数据库和表参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“gaussdb”:gsql -d gaussdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -rgaussdb:DWS集群默认数据库。DWS集群连接地址:请参见获取集群连接地址进行获取。如果通过公网地址连接,请指定为集群“公网访问地址”或“公网访问域名”,如果通过内网地址连接,请指定为集群“内网访问地址”或“内网访问域名”。如果通过弹性负载均衡连接,请指定为“弹性负载均衡地址”。dbadmin:创建集群时设置的默认管理员用户名。-W:默认管理员用户的密码。在命令行窗口输入以下命令创建数据库“testdwsdb”。CREATE DATABASE testdwsdb;执行以下命令,退出gaussdb数据库,连接新创建的数据库“testdwsdb”。\q gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r执行以下命令创建表。create schema test; set current_schema= test; drop table if exists qualified_cars; CREATE TABLE qualified_cars ( car_id VARCHAR, car_owner VARCHAR, car_age INTEGER , average_speed FLOAT8, total_miles FLOAT8 );步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列名。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接DWS的增强型跨源连接在DWS管理控制台,选择“集群管理”,单击已创建的DWS集群名称,进入到DWS的基本信息页面。在“基本信息”的“数据库属性”中获取该实例的“内网IP”、“端口”,“基本信息”页面的“网络”中获取“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和DWS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和DWS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_dws。弹性资源池:选择步骤1:创建队列中已经创建的队列名。虚拟私有云:选择DWS的虚拟私有云。子网:选择DWS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的DWS连接信息,地址栏输入“DWS内网IP:DWS端口”,单击“测试”测试DLI到DWS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaDWS。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到DWS,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建DWS结果表(RDS连接)。create table car_infos( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic",--创建的Kafka的Topic "format" = "json", "scan.startup.mode" = "latest-offset" ); create table qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed DOUBLE, total_miles DOUBLE ) WITH ( 'connector' = 'gaussdb', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://192.168.168.16:8000/testdwsdb', ---192.168.168.16:8000替换为DWS的内网IP和端口,testdwsdb为创建的DWS数据库名 'table-name' = 'test\".\"qualified_cars', ---test为创建的DWS表的schema,qualified_cars为对应的DWS表名 'username' = 'xxxx',--替换为DWS实例的用户名 'password' = 'xxxx',--替换为DWS实例的用户密码 'write.mode' = 'insert' ); /** 将合格车辆信息输出 **/ INSERT INTO qualified_cars SELECT * FROM car_infos where average_speed <= 90 and total_miles <= 200000;单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"car_id":"3027", "car_owner":"lilei", "car_age":"7", "average_speed":"76", "total_miles":"15000"} {"car_id":"3028", "car_owner":"hanmeimei", "car_age":"6", "average_speed":"92", "total_miles":"17000"} {"car_id":"3029", "car_owner":"zhangsan", "car_age":"10", "average_speed":"81", "total_miles":"230000"}参考使用gsql命令行客户端连接DWS集群连接已创建的DWS集群。执行以下命令连接DWS集群的默认数据库“testdwsdb”:gsql -d testdwsdb -h DWS集群连接地址 -U dbadmin -p 8000 -W password -r查询DWS的表数据。select * from test.qualified_cars;查询结果参考如下:car_id car_owner car_age average_speed total_miles 3027 lilei 7 76.0 15000.0
  • [最佳实践] 创建Flink OpenSource作业从Kafka读取数据写入到RDS
    场景描述该场景为根据商品的实时点击量,获取每小时内点击量最高的3个商品及其相关信息,数据的输入源为Kafka,结果输出到RDS中。例如,如下样例输入数据:{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}预期输出:2021-03-24 08:00:00 - 2021-03-24 08:59:59,0002,name1,2 2021-03-24 08:00:00 - 2021-03-24 08:59:59,0004,name2,2 2021-03-24 08:00:00 - 2021-03-24 08:59:59,0005,name4,2 2021-03-24 09:00:00 - 2021-03-24 09:59:59,0006,name5,2 2021-03-24 09:00:00 - 2021-03-24 09:59:59,0005,name4,1前提条件已创建DMS Kafka实例,具体步骤可参考:创建Kafka实例。注意:创建DMS Kafka实例时,不能开启Kafka SASL_SSL。已创建RDS MySQL实例,具体步骤可参考:RDS MySQL快速入门。本示例创建的RDS MySQL数据库版本选择为:8.0。整体作业开发流程整体作业开发流程参考图1。图1 作业开发流程步骤1:创建队列:创建DLI作业运行的队列。步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。步骤3:创建RDS数据库和表:创建RDS MySQL数据库和表信息。步骤4:创建增强型跨源连接:DLI上创建连接Kafka和RDS的跨源连接,打通网络。步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。步骤1:创建队列登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。在队列管理界面,单击界面右上角的“购买队列”。在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。区域和项目:保持默认值即可。名称:填写具体的队列名称。说明:新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。队列名称不区分大小写,系统会自动转换为小写。类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。AZ策略、CPU架构、规格:保持默认即可。企业项目:当前选择为“default”。高级选项:选择“自定义”。网段:配置队列网段。例如,当前配置为10.0.0.0/16。注意:队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。其他参数根据需要选择和配置。图2 创建队列参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。步骤2:创建Kafka的Topic登录Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka实例名称,进入到Kafka实例的基本信息页面。单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:Topic名称。本示例输入为:testkafkatopic。分区数:1。副本数:1其他参数保持默认即可。步骤3:创建RDS数据库和表登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。输入实例登录的用户名和密码。单击“登录”,即可进入RDS MySQL数据库并进行管理。在数据库实例界面,单击“新建数据库”,数据库名定义为:testrdsdb,字符集保持默认即可。在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS MySQL表。CREATE TABLE clicktop ( `range_time` VARCHAR(64) NOT NULL, `product_id` VARCHAR(32) NOT NULL, `product_name` VARCHAR(32), `event_count` VARCHAR(32), PRIMARY KEY (`range_time`,`product_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4;步骤4:创建增强型跨源连接创建DLI连接Kafka的增强型跨源连接在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。弹性资源池:选择步骤1:创建队列中已经创建的队列名称。虚拟私有云:选择Kafka的虚拟私有云。子网:选择Kafka的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。创建DLI连接RDS的增强型跨源连接在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPV4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。说明:本示例默认Kafka和RDS实例分别在两个VPC和子网下,所以要分别创建增强型跨源连接打通网络。如果Kafka和RDS实例属于同一VPC和子网下,则创建增强型跨源一次即可,4和5不需要再执行。在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。弹性资源池:选择步骤1:创建队列中已经创建的队列名称。虚拟私有云:选择RDS的虚拟私有云。子网:选择RDS的子网。其他参数可以根据需要选择配置。参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。步骤5:运行作业在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。在创建作业界面,作业类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaRds。单击“确定”,跳转到Flink作业编辑界面。在Flink OpenSource SQL作业编辑界面,配置如下参数。所属队列:选择步骤1:创建队列中创建的队列。Flink版本:选择1.12。保存作业日志:勾选。OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。开启Checkpoint:勾选。Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。说明:本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到RDS,故请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建JDBC结果表(RDS连接)。create table click_product( user_id string, --点击用户的id user_name string, --用户名称 event_time string, --点击时间 product_id string, --商品id product_name string --商品名称 ) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic",--创建的Kafka Topic名称 "format" = "json", "scan.startup.mode" = "latest-offset" ); --结果表 create table top_product ( range_time string, --计算的时间范围 product_id string, --商品id product_name string, --商品名称 event_count bigint, --点击次数 primary key (range_time, product_id) not enforced ) with ( "connector" = "jdbc", "url" = "jdbc:mysql://192.168.12.148:3306/testrdsdb",--testrdsdb为创建的RDS的数据库名,IP和端口替换为RDS MySQL的实例IP和端口 "table-name" = "clicktop", "username" = "xxxxx", --替换为RDS MySQL的实例的用户名 "password" = "xxxxx", --替换为RDS MySQL的实例的用户密码 "sink.buffer-flush.max-rows" = "1000", "sink.buffer-flush.interval" = "1s" ); create view current_event_view as select product_id, product_name, count(1) as click_count, concat(substring(event_time, 1, 13), ":00:00") as min_event_time, concat(substring(event_time, 1, 13), ":59:59") as max_event_time from click_product group by substring (event_time, 1, 13), product_id, product_name; insert into top_product select concat(min_event_time, " - ", max_event_time) as range_time, product_id, product_name, click_count from ( select *, row_number() over (partition by min_event_time order by click_count desc) as row_num from current_event_view ) where row_num <= 3单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。步骤6:发送数据和查询结果使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。发送样例数据如下:{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"} {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}登录RDS控制台,单击RDS数据库实例,单击创建的数据库名,如“testrdsdb”,在创建的表“clicktop”所在行的“操作”列,单击“SQL查询”,输入以下查询语句。select * from `clicktop`;在“SQL查询”界面,单击“执行SQL”,查看RDS表数据已写入成功。
  • [最佳实践] 迁移Kafka数据至DLI
    本文为您介绍如何通过CDM数据同步功能,迁移MRS Kafka数据至DLI。其他DMS Kafka或者Apache Kafka等服务数据,均可以通过CDM与DLI进行双向同步。前提条件已创建DLI的SQL队列。创建DLI队列的操作可以参考创建DLI队列。注意:创建DLI队列时队列类型需要选择为“SQL队列”。已创建包含Kafka组件的MRS安全集群。具体创建MRS集群的操作可以参考创建MRS集群。本示例创建的MRS集群版本为:MRS 3.1.0。本示例创建的MRS集群开启了Kerberos认证。已创建CDM迁移集群。创建CDM集群的操作可以参考创建CDM集群。创建CDM集群完成后,还需要提交工单开通迁移MRS Kafka数据到DLI的白名单,否则在CDM配置迁移作业时目的端配置查找不到DLI。说明:如果目标数据源为云下的数据库,则需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP、CDM云上安全组出方向放通云下数据源所在的主机、数据源所在的主机可以访问公网且防火墙规则已开放连接端口。数据源为云上的MRS、DWS时,网络互通需满足如下条件:i. CDM集群与云上服务处于不同区域的情况下,需要通过公网或者专线打通网络。通过公网互通时,需确保CDM集群已绑定EIP,数据源所在的主机可以访问公网且防火墙规则已开放连接端口。ii. CDM集群与云上服务同区域情况下,同虚拟私有云、同子网、同安全组的不同实例默认网络互通;如果同虚拟私有云但是子网或安全组不同,还需配置路由规则及安全组规则。配置路由规则请参见如何配置路由规则章节,配置安全组规则请参见如何配置安全组规则章节。iii. 此外,您还必须确保该云服务的实例与CDM集群所属的企业项目必须相同,如果不同,需要修改工作空间的企业项目。本示例CDM集群的虚拟私有云、子网以及安全组和创建的MRS集群保持一致。步骤一:数据准备MRS集群上创建Kafka的Topic并且向Topic发送消息。参考访问MRS Manager登录MRS Manager。在MRS Manager上,选择“系统 > 权限 > 用户”,单击“添加用户”,在添加用户页面分别配置如下参数。用户名:自定义的用户名。当前示例输入为:testuser2。用户类型:当前选择为“人机”。密码和确认密码:输入当前用户名对应的密码。用户组和主组:选择kafkaadmin。角色:选择Manager_viewer角色。图1 MRS Manager上创建Kafka用户在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > ZooKeeper > 实例”,获取ZooKeeper角色实例的IP地址,为后续步骤做准备。在MRS Manager上,选择“集群 > 待操作的集群名称 > 服务 > kafka > 实例”,获取kafka角色实例的IP地址,为后续步骤做准备。参考安装MRS客户端下载并安装Kafka客户端。例如,当前Kafka客户端安装在MRS主机节点的“/opt/kafkaclient”目录上。以root用户进入客户端安装目录下。例如:cd /opt/kafkaclient执行以下命令配置环境变量。source bigdata_env因为当前集群启用了Kerberos认证,则需要执行以下命令进行安全认证。认证用户为2中创建的用户。kinit 2中创建的用户名例如,kinit testuser2执行以下命令创建名字为kafkatopic的Kafka Topic。kafka-topics.sh --create --zookeeper ZooKeeper角色实例所在节点IP地址1:2181,ZooKeeper角色实例所在节点IP地址2:2181,ZooKeeper角色实例所在节点IP地址3:2181/kafka --replication-factor 1 --partitions 1 --topic kafkatopic上述命令中的“ZooKeeper角色实例所在节点IP地址”即为3中获取的ZooKeeper实例IP。执行以下命令向kafkatopic发送消息。kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址1:21007,Kafka角色实例所在节点的IP地址2:21007,Kafka角色实例所在节点的IP地址3:21007 --topic kafkatopic --producer.config /opt/kafkaclient/Kafka/kafka/config/producer.properties上述命令中的“Kafka角色实例所在节点的IP地址”即为4中获取的Kafka实例IP。发送测试消息内容如下:{"PageViews":5, "UserID":"4324182021466249494", "Duration":146,"Sign":-1}在DLI上创建数据库和表。登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列。在编辑器中输入以下语句创建数据库,例如当前创建迁移后的DLI数据库testdb。详细的DLI创建数据库的语法可以参考创建DLI数据库。create database testdb;创建数据库下的表。详细的DLI建表语法可以参考创建DLI表。CREATE TABLE testdlitable(value STRING);步骤二:数据迁移配置CDM数据源连接。配置源端MRS Kafka的数据源连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“MRS Kafka”,单击“下一步”。图2 创建MRS Kafka数据源配置源端MRS Kafka的数据源连接,具体参数配置如下。表1 MRS Kafka数据源配置参数值名称自定义MRS Kafka数据源名称。例如当前配置为“source_kafka”。Manager IP单击输入框旁边的“选择”按钮,选择当前MRS Kafka集群即可自动关联出来Manager IP。用户名在2中创建的MRS Kafka用户名。密码对应MRS Kafka用户名的密码。认证类型如果当前MRS集群为普通集群则选择为SIMPLE,如果是MRS集群启用了Kerberos安全认证则选择为KERBEROS。本示例选择为:KERBEROS。更多参数的详细说明可以参考CDM上配置Kafka连接。图3 CDM配置MRS Kafka数据源连接单击“保存”完成MRS Kafka数据源配置。配置目的端DLI的数据源连接。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在作业管理界面,选择“连接管理”,单击“新建连接”,连接器类型选择“数据湖探索(DLI)”,单击“下一步”。图4 创建DLI数据源连接配置目的端DLI数据源连接连接参数。具体参数配置可以参考在CDM上配置DLI连接。图5 配置DLI数据源连接参数配置完成后,单击“保存”完成DLI数据源配置。创建CDM迁移作业。登录CDM控制台,选择“集群管理”,选择已创建的CDM集群,在操作列选择“作业管理”。在“作业管理”界面,选择“表/文件迁移”,单击“新建作业”。在新建作业界面,配置当前作业配置信息,具体参数参考如下:图6 新建CDM作业作业配置作业名称:自定义数据迁移的作业名称。例如,当前定义为:test。源端作业配置,具体参考如下:表2 源端作业配置参数名参数值源连接名称选择1.a中已创建的数据源名称。Topics选择MRS Kafka待迁移的Topic名称,支持单个或多个Topic。当前示例为:kafkatopic。数据格式根据实际情况选择当前消息格式。本示例选择为:CDC(DRS_JSON),以DRS_JSON格式解析源数据。偏移量参数从Kafka拉取数据时的初始偏移量。本示例当前选择为:最新。最新:最大偏移量,即拉取最新的数据。最早:最小偏移量,即拉取最早的数据。已提交:拉取已提交的数据。时间范围:拉取时间范围内的数据。是否持久运行用户自定义是否永久运行。当前示例选择为:否。拉取数据超时时间持续拉取数据多长时间超时,单位分钟。当前示例配置为:15。等待时间可选参数,超出等待时间还是无法读取到数据,则不再读取数据,单位秒。当前示例不配置该参数。消费组ID用户指定消费组ID。当前使用MRS Kafka默认的消息组ID:“example-group1”。其他参数的详细配置说明可以参考:CDM配置Kafka源端参数。目的端作业配置,具体参考如下:表3 目的端作业配置参数名参数值目的连接名称选择1.b已创建的DLI数据源连接。资源队列选择已创建的DLI SQL类型的队列。数据库名称选择DLI下已创建的数据库。当前示例为在DLI上创建数据库和表中创建的数据库名,即为“testdb”。表名选择DLI下已创建的表名。当前示例为在DLI上创建数据库和表中创建的表名,即为“testdlitable”。导入前清空数据选择导入前是否清空目的表的数据。当前示例选择为“否”。如果设置为是,任务启动前会清除目标表中数据。详细的参数配置可以参考:CDM配置DLI目的端参数。单击“下一步”,进入到字段映射界面,CDM会自动匹配源和目的字段。如果字段映射顺序不匹配,可通过拖拽字段调整。如果选择在目的端自动创建类型,这里还需要配置每个类型的字段类型、字段名称。CDM支持迁移过程中转换字段内容,详细请参见字段转换。图7 字段映射单击“下一步”配置任务参数,一般情况下全部保持默认即可。该步骤用户可以配置如下可选功能:作业失败重试:如果作业执行失败,可选择是否自动重试,这里保持默认值“不重试”。作业分组:选择作业所属的分组,默认分组为“DEFAULT”。在CDM“作业管理”界面,支持作业分组显示、按组批量启动作业、按分组导出作业等操作。是否定时执行:如果需要配置作业定时自动执行,请参见配置定时任务。这里保持默认值“否”。抽取并发数:设置同时执行的抽取任务数。这里保持默认值“1”。是否写入脏数据:如果需要将作业执行过程中处理失败的数据、或者被清洗过滤掉的数据写入OBS中,以便后面查看,可通过该参数配置,写入脏数据前需要先配置好OBS连接。这里保持默认值“否”即可,不记录脏数据。单击“保存并运行”,回到作业管理界面,在作业管理界面可查看作业执行进度和结果。图8 迁移作业进度和结果查询步骤三:结果查询CDM迁移作业运行完成后,再登录到DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择已1已创建的数据库,执行DLI表查询语句,查询Kafka数据是否已成功迁移到DLI的“testdlitable”表中。select * from testdlitable;
  • [问题求助] arm架构有没有kafka的二进制包
    【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] kafka接口调用样例中,创建机机账号测试时,身份认证报错,报错信息如下
    【功能模块】参考kafka接口调用样例培训视频指导配置【操作步骤&问题现象】1、客户端配置文件已加入resources,机机账号凭证文件也已加入resources2、代码中对应的变量值也已同步修改,启动Producer报身份认证错误,错误详情如下【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [知识分享] 6张图为你分析Kafka Producer 消息缓存模型
    本文分享自华为云社区《[图解Kafka Producer 消息缓存模型](https://bbs.huaweicloud.com/blogs/337169?utm_source=csdn&utm_medium=bbs-ex&utm_campaign=other&utm_content=content)》,作者:石臻臻的杂货铺。 在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。 1. 发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 2. 当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了? 3. 当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢? 4. 那么创建ProducerBatch的时候,应该分配多少的内存呢? # 什么是消息累加器RecordAccumulator kafka为了提高Producer客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。 而缓存这个消息的就是RecordAccumulator类。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648176965633248324.png) 上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。 # 消息缓存模型 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648176986967712158.png) 上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。 1. 每条消息,我们按照TopicPartition维度,把他们放在不同的Deque 队列里面。 TopicPartition相同,会在相同Deque 的里面。 2. ProducerBatch : 表示同一个批次的消息, 消息真正发送到Broker端的时候都是按照批次来发送的, 这个批次可能包含一条或者多条消息。 3. 如果没有找到消息对应的ProducerBatch队列, 则创建一个队列。 4. 找到ProducerBatch队列队尾的Batch,发现Batch还可以塞下这条消息,则将消息直接塞到这个Batch中 5. 找到ProducerBatch队列队尾的Batch,发现Batch中剩余内存,不够塞下这条消息,则会创建新的Batch 6. 当消息发送成功之后, Batch会被释放掉。 **ProducerBatch的内存大小** >那么创建ProducerBatch的时候,应该分配多少的内存呢? 先说结论: 当消息预估内存大于batch.size的时候,则按照消息预估内存创建, 否则按照batch.size的大小创建(默认16k). 我们来看一段代码,这段代码就是在创建ProducerBatch的时候预估内存的大小 RecordAccumulator#append /** * 公众号: 石臻臻的杂货铺 * 微信:szzdzhp001 **/ // 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); // 申请内存 buffer = free.allocate(size, maxTimeToBlock); 1. 假设当前生产了一条消息为M, 刚好消息M找不到可以存放消息的ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的ProducerBatch了 2. 预估消息的大小 跟batch.size 默认大小16384(16kb). 对比,取最大值用于申请的内存大小的值。 原文地址:图解Kafka Producer 消息缓存模型 >那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗? DefaultRecordBatch#estimateBatchSizeUpperBound 预估需要的Batch大小,是一个预估值,因为没有考虑压缩算法从额外开销 /** * 使用给定的键和值获取只有一条记录的批次大小的上限。 * 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。 **/ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } 1. 预估这个消息M的大小 + 一个RECORD_BATCH_OVERHEAD的大小 2. RECORD_BATCH_OVERHEAD是一个Batch里面的一些基本元信息,总共占用了 61B 3. 消息M的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD 4. MAX_RECORD_OVERHEAD :一条消息头最大占用空间, 最大值为21B 也就是说创建一个ProducerBatch,最少就要83B . 比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384) 相比取最大值。 那么申请内存的时候取最大值 16384 。 关于Batch的结构和消息的结构,我们回头**单独用一篇文章来讲解**。 # 内存分配 我们都知道RecordAccumulator里面的缓存大小是一开始定义好的, 由buffer.memory控制, 默认33554432 (32M) 当生产的速度大于发送速度的时候,就可能出现Producer写入阻塞。 而且频繁的创建和释放ProducerBatch,会导致频繁GC, 所有kafka中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。 **PS:以下16k指得是 batch.size的默认值.** 原文地址:图解Kafka Producer 消息缓存模型 **Batch的创建和释放** **1. 内存16K 缓存池中有可用内存** ①. 创建Batch的时候, 会去缓存池中,获取队首的一块内存ByteBuffer 使用。 ②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据。以便下次重复使用 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177179179208885.png) **2. 内存16K 缓存池中无可用内存** ①. 创建Batch的时候, 去非缓存池中的内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池nonPooledAvailableMemory 减少 16K 的内存, 然后Batch正常创建就行了, **不要误以为好像真的发生了内存的转移**。 ②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据, 以便下次重复使用 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177198437732586.png) 原文地址:图解Kafka Producer 消息缓存模型 **3. 内存非16K 非缓存池中内存够用** ①. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。 ②. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177225759310952.png) **4. 内存非16K 非缓存池内存不够用** ①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建Batch了 ②. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。 ③. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉 例如: 下面我们需要创建 48k的batch, 因为超过了16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。 释放第一个ByteBuffer(16k) 不够,则继续释放第二个,直到释放了3个之后总共48k,发现内存这时候够了, 再去创建Batch。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177250727367208.png) 注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。 # 问题和答案 >发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 当Broker挂掉了,Producer会提示下面的警告⚠️, 但是发送消息过程中 这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。 WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177290305278499.png) >当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢? 那么会创建新的ProducerBatch。 >那么创建ProducerBatch的时候,应该分配多少的内存呢? 触发创建ProducerBatch的那条消息预估大小大于batch.size ,则以预估内存创建。 否则,以batch.size创建。 还有一个问题供大家思考: **当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?**