• [二次开发] kafka接口调用样例中,创建机机账号测试时,身份认证报错,报错信息如下
    【功能模块】参考kafka接口调用样例培训视频指导配置【操作步骤&问题现象】1、客户端配置文件已加入resources,机机账号凭证文件也已加入resources2、代码中对应的变量值也已同步修改,启动Producer报身份认证错误,错误详情如下【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [知识分享] 6张图为你分析Kafka Producer 消息缓存模型
    本文分享自华为云社区《[图解Kafka Producer 消息缓存模型](https://bbs.huaweicloud.com/blogs/337169?utm_source=csdn&utm_medium=bbs-ex&utm_campaign=other&utm_content=content)》,作者:石臻臻的杂货铺。 在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果。 1. 发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 2. 当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了? 3. 当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢? 4. 那么创建ProducerBatch的时候,应该分配多少的内存呢? # 什么是消息累加器RecordAccumulator kafka为了提高Producer客户端的发送吞吐量和提高性能,选择了将消息暂时缓存起来,等到满足一定的条件, 再进行批量发送, 这样可以减少网络请求,提高吞吐量。 而缓存这个消息的就是RecordAccumulator类。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648176965633248324.png) 上图就是整个消息存放的缓存模型,我们接下来一个个来讲解。 # 消息缓存模型 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648176986967712158.png) 上图表示的就是 消息缓存的模型, 生产的消息就是暂时存放在这个里面。 1. 每条消息,我们按照TopicPartition维度,把他们放在不同的Deque 队列里面。 TopicPartition相同,会在相同Deque 的里面。 2. ProducerBatch : 表示同一个批次的消息, 消息真正发送到Broker端的时候都是按照批次来发送的, 这个批次可能包含一条或者多条消息。 3. 如果没有找到消息对应的ProducerBatch队列, 则创建一个队列。 4. 找到ProducerBatch队列队尾的Batch,发现Batch还可以塞下这条消息,则将消息直接塞到这个Batch中 5. 找到ProducerBatch队列队尾的Batch,发现Batch中剩余内存,不够塞下这条消息,则会创建新的Batch 6. 当消息发送成功之后, Batch会被释放掉。 **ProducerBatch的内存大小** >那么创建ProducerBatch的时候,应该分配多少的内存呢? 先说结论: 当消息预估内存大于batch.size的时候,则按照消息预估内存创建, 否则按照batch.size的大小创建(默认16k). 我们来看一段代码,这段代码就是在创建ProducerBatch的时候预估内存的大小 RecordAccumulator#append /** * 公众号: 石臻臻的杂货铺 * 微信:szzdzhp001 **/ // 找到 batch.size 和 这条消息在batch中的总内存大小的 最大值 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); // 申请内存 buffer = free.allocate(size, maxTimeToBlock); 1. 假设当前生产了一条消息为M, 刚好消息M找不到可以存放消息的ProducerBatch(不存在或者满了),那么这个时候就需要创建一个新的ProducerBatch了 2. 预估消息的大小 跟batch.size 默认大小16384(16kb). 对比,取最大值用于申请的内存大小的值。 原文地址:图解Kafka Producer 消息缓存模型 >那么, 这个消息的预估是如何预估的?纯粹的是消息体的大小吗? DefaultRecordBatch#estimateBatchSizeUpperBound 预估需要的Batch大小,是一个预估值,因为没有考虑压缩算法从额外开销 /** * 使用给定的键和值获取只有一条记录的批次大小的上限。 * 这只是一个估计,因为它没有考虑使用的压缩算法的额外开销。 **/ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } 1. 预估这个消息M的大小 + 一个RECORD_BATCH_OVERHEAD的大小 2. RECORD_BATCH_OVERHEAD是一个Batch里面的一些基本元信息,总共占用了 61B 3. 消息M的大小也并不是单单的只有消息体的大小,总大小=(key,value,headers)的大小+MAX_RECORD_OVERHEAD 4. MAX_RECORD_OVERHEAD :一条消息头最大占用空间, 最大值为21B 也就是说创建一个ProducerBatch,最少就要83B . 比如我发送一条消息 " 1 " , 预估得到的大小是 86B, 跟batch.size(默认16384) 相比取最大值。 那么申请内存的时候取最大值 16384 。 关于Batch的结构和消息的结构,我们回头**单独用一篇文章来讲解**。 # 内存分配 我们都知道RecordAccumulator里面的缓存大小是一开始定义好的, 由buffer.memory控制, 默认33554432 (32M) 当生产的速度大于发送速度的时候,就可能出现Producer写入阻塞。 而且频繁的创建和释放ProducerBatch,会导致频繁GC, 所有kafka中有个缓存池的概念,这个缓存池会被重复使用,但是只有固定( batch.size)的大小才能够使用缓存池。 **PS:以下16k指得是 batch.size的默认值.** 原文地址:图解Kafka Producer 消息缓存模型 **Batch的创建和释放** **1. 内存16K 缓存池中有可用内存** ①. 创建Batch的时候, 会去缓存池中,获取队首的一块内存ByteBuffer 使用。 ②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据。以便下次重复使用 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177179179208885.png) **2. 内存16K 缓存池中无可用内存** ①. 创建Batch的时候, 去非缓存池中的内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池nonPooledAvailableMemory 减少 16K 的内存, 然后Batch正常创建就行了, **不要误以为好像真的发生了内存的转移**。 ②. 消息发送完成,释放Batch, 则会把这个ByteBuffer,放到缓存池的队尾中,并且调用ByteBuffer.clear 清空数据, 以便下次重复使用 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177198437732586.png) 原文地址:图解Kafka Producer 消息缓存模型 **3. 内存非16K 非缓存池中内存够用** ①. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。 ②. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177225759310952.png) **4. 内存非16K 非缓存池内存不够用** ①. 先尝试将 缓存池中的内存一个一个释放到 非缓存池中, 直到非缓存池中的内存够用与创建Batch了 ②. 创建Batch的时候, 去非缓存池(nonPooledAvailableMemory)内存获取一部分内存用于创建Batch. 注意:这里说的获取内存给Batch, 其实就是让 非缓存池(nonPooledAvailableMemory) 减少对应的内存, 然后Batch正常创建就行了, 不要误以为好像真的发生了内存的转移。 ③. 消息发送完成,释放Batch, 纯粹的是在非缓存池(nonPooledAvailableMemory)中加上刚刚释放的Batch内存大小。 当然这个Batch会被GC掉 例如: 下面我们需要创建 48k的batch, 因为超过了16k,所以需要在非缓存池中分配内存, 但是非缓存池中当前可用内存为0 , 分配不了, 这个时候就会尝试去 缓存池里面释放一部分内存到 非缓存池。 释放第一个ByteBuffer(16k) 不够,则继续释放第二个,直到释放了3个之后总共48k,发现内存这时候够了, 再去创建Batch。 ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177250727367208.png) 注意:这里我们涉及到的 非缓存池中的内存分配, 仅仅指的的内存数字的增加和减少。 # 问题和答案 >发送消息的时候, 当Broker挂掉了,消息体还能写入到消息缓存中吗? 当Broker挂掉了,Producer会提示下面的警告⚠️, 但是发送消息过程中 这个消息体还是可以写入到 消息缓存中的,也仅仅是写到到缓存中而已。 WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available ![image.png](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/20223/25/1648177290305278499.png) >当最新的ProducerBatch还有空余的内存,但是接下来的一条消息很大,不足以加上上一个Batch中,会怎么办呢? 那么会创建新的ProducerBatch。 >那么创建ProducerBatch的时候,应该分配多少的内存呢? 触发创建ProducerBatch的那条消息预估大小大于batch.size ,则以预估内存创建。 否则,以batch.size创建。 还有一个问题供大家思考: **当消息还存储在缓存中的时候, 假如Producer客户端挂掉了,消息是不是就丢失了?**
  • [迁移工具] 鲲鹏代码迁移工具实践:重构开源软件atlas软件包实践
    迁移概述背景介绍在国产替代的大背景下,鲲鹏计算平台是一个非常有潜力的产业。为了解决开发者将应用从x86平台向鲲鹏平台移植的过程中遇到的一系列的痛点问题(例如:分析过程投入工作量大,周期长,需反复试错定位,准确率也低下,而且要求移植人员专业技能高等),推出了鲲鹏代码迁移工具(Porting Advisor),帮助开发者加速将x86环境下的应用迁移至鲲鹏平台。本文总结了鲲鹏代码迁移工具(Porting Advisor)的软件包分析和重构的实际使用经验,期望能帮助开发者了解如何使用工具,提高开发者的软件迁移效率。鲲鹏代码迁移工具介绍鲲鹏代码迁移工具(Porting Advisor)支持如下六大功能:分析软件安装包扫描x86平台软件安装包,识别安装包对系统SO的依赖和包内部的SO、JAR依赖,支持的软件安装包格式包括RPM、DEB、JAR、WAR、ZIP、TAR、GZIP。该功能位于工具一级菜单“软件迁移评估”下,工具安装在x86环境和鲲鹏环境下时均可用。分析已安装软件扫描x86环境中用户已安装的软件,识别已安装软件的SO、JAR依赖关系。该功能位于工具一级菜单“软件迁移评估”下,仅当工具安装在x86环境下时可用。分析源代码扫描x86平台软件的C/C++/Fortran/汇编源代码,识别源代码中的SO依赖关系,扫描需要修改的代码行并给出修改建议,根据系统设定的代码修改效率,给出评估的工作量,供领导层进行项目决策。该功能位于工具一级菜单“源码迁移”下,工具安装在x86环境和鲲鹏环境下时均可用。软件包重构对用户提供的x86平台RPM包、DEB包中x86平台相关的so文件、jar包进行替换,重构输出可用于鲲鹏平台的RPM包、DEB包。重构期间需要的鲲鹏版本的so文件、jar包需要用户通过依赖包上传功能在重构任务创建过程中上传。如果这些文件是可以直接从华为云镜像源下载的,并且用户安装鲲鹏代码迁移工具的服务器可联网,则用户创建重构任务时可以授权工具重构期间连接到华为云镜像源进行自动下载。该功能位于工具一级菜单“软件包重构”下,仅当工具安装在鲲鹏环境下时可用。专项软件迁移一级菜单“专项软件功能”下,针对已经完成迁移的部分BoostKit组件,用户可以通过专项软件迁移功能进行重复迁移。迁移过程中的每个执行步骤都是可视的,用户可根据自己的需求定制由工具执行其中的某些步骤而自己手工执行另外一些步骤,从而达到对这些组件的定制化的目的。该功能仅当工具安装在鲲鹏环境下时可用。增强功能一级菜单“增强功能”下提供了64位代码迁移预检、字节对齐检查、弱内存序检查修复三项子功能。64位代码迁移预检功能针对32位老旧代码执行检查动作,从编译器层面识别编译出64位应用时代码中存在的修改点,该功能仅当工具安装在x86环境下时可用。字节对齐检查功能辅助用户检查应用从32位模式改为64位模式时,数据结构定义方面的变化,以便用户优化代码,该功能仅当工具安装在x86环境下时可用。弱内存序检查修复则提供了编译器自动修复工具和静态检查工具两个选项,分别供用户在GCC编译以及工具运行两种模式下使用以修复ARM架构下独有的应用弱内存序问题,该功能仅当工具安装在鲲鹏环境下时可用。环境要求根据各功能的平台依赖性,需要准备如表 鲲鹏平台环境所示的环境。表1 鲲鹏平台环境项目说明服务器TaiShan 200 2280 服务器(等同于其它基于鲲鹏916或者鲲鹏920的服务器)CPU鲲鹏920 96核处理器OSCentOS 7.6安装的工具Porting Advisor 2.2.T3可使用场景分析软件安装包分析源代码软件包重构专项软件迁移弱内存序检查修复前提条件服务器和操作系统正常运行。PC端已经安装SSH远程登录工具。Porting Advisor已在准备好的x86平台环境和鲲鹏平台环境中完成安装并正常运行。待迁移的相关软件包、源代码已准备就绪。迁移计划本文将总结演示重构开源软件atlas软件包kafka-2.4.1-1.el7.noarch.rpm。说明:本次重构动作和验证动作均可通过鲲鹏平台环境完成。重构开源软件atlas软件包kafka-2.4.1-1.el7.noarch.rpm的步骤演示,将有助于读者了解在有x86平台rpm包、deb包需要修改重构为鲲鹏平台的包时,如何才能完成这个迁移过程。将利用Porting Advisor的软件安装包分析功能对获取到的kafka-2.4.1-1.el7.noarch.rpm进行扫描,获取其依赖关系和可迁移性分析结果。根据Porting Advisor的软件安装包分析功能分析得到的kafka-2.4.1-1.el7.noarch.rpm依赖关系去准备重构为鲲鹏平台RPM包时需要的SO库和JAR包。利用准备好的资源包和RPM包,通过Porting Advisor的软件包重构功能,完成鲲鹏版本kafka-2.4.1-1.el7的RPM包重构工作。针对重构得到的鲲鹏版本kafka-2.4.1-1.el7的RPM包进行简单的验证。重构开源软件atlas软件包操作步骤从https://ci.bigtop.apache.org/view/Releases/job/Bigtop-3.0.0/DISTRO=centos-7,PLATFORM=amd64-slave/lastSuccessfulBuild/artifact/output/kafka/noarch/下载获取待使用的rpm包,如图 kafka-2.4.1-1.el7.noarch.rpm所示。 图1 kafka-2.4.1-1.el7.noarch.rpm双击“kafka-2.4.1-1.el7.noarch.rpm”即可获取本例中所需要的软件包。利用Porting Advisor的分析软件包功能完成对该包的扫描分析。勾选“分析软件包”,单击“上传”,上传前面下载到的kafka-2.4.1-1.el7.noarch.rpm,并点中输入框,选择安装包存放路径为/opt/portadv/portadmin/package/kafka-2.4.1-1.el7.noarch.rpm,如图 Porting Advisor所示。         图2 Porting Advisor                 4. 单击“开始分析”,进行分析并得到扫描分析报告,如图 扫描分析报告所示。        图3 扫描分析报告         准备依赖库从图 扫描分析报告1和图 扫描分析报告2提供的依赖库信息看,所依赖的包中,有2个是安装过程中需要的可执行文件,有3个是被扫描的rpm包内包含的jar包。报告中针对rpm包内包含的jar包提供了华为鲲鹏产品官方maven仓库中的下载链接,直接点击下载即可。   图4 扫描分析报告1         图5 扫描分析报告2        重构软件包操作步骤打开软件包重构功能页面后,通过“上传”按钮将待重构的RPM包kafka-2.4.1-1.el7.noarch.rpm上传到服务器,如图 选择待重构软件包所示。         图6 选择待重构软件包              2. 单击“下一步”,进入“配置依赖文件”步骤,选择手工上传依赖文件、勾选“授权访问外部网络获取重构软件包需要的依赖文件”以允许工具在重构过程中自动连接到各OS发行版的         官方网站或者鲲鹏Maven仓库在华为云镜像源上的下载地址下载需要的依赖文件,如图 配置依赖文件所示。       图7 配置依赖文件           3. 单击“下一步”,进入“执行重构”步骤,这里需要再进行最后一次确认才能开始重构操作,如图 重构任务执行中所示。        图1 重构任务执行中              注意:       如果在执行本步骤操作前,服务器上没有安装rpmrebuild,则会遇到如图 rpmrebuild未安装所示的报错。此时用户需要自行下载rpmrebuild安装包,在工具所在安装服务器上安装该       组件,然后重试重构动作。          4. 重构成功后,单击“下载重构软件包”按钮即可下载重构好的软件包atlas-metadata_3_1_0_0_78-1.1.0.3.1.0.0-78.aarch64.rpm,如图7 下载重构软件包所示。      图1 下载重构软件包                  也可以在关闭重构结果窗口后,从历史记录中下载重构结果。验证重构后RPM包kafka安装包在鲲鹏环境安装时,需要依赖bigtop-utils、zookeeper等包,读者在执行本章节验证前,需完成环境搭建工作,在相关的环境依赖具备条件下,kafka的安装只需要通过最普通的RPM包安装命令(rpm –ivh xx.rpm)即可完成。本节重点介绍环境搭建后如何进行功能验证。安装kafka     切换到终端工具     执行以下命令:cd /opt/portadv/portadmin/data/20220310175550/     说明:此路径可以从第4步重构成功后的右下角弹窗中显示的软件包存放路径获取。     rpm –ivh kafka-2.4.1-1.el7.noarch.rpm     启动zookeeper    执行以下命令启动zookeeper:   cd /usr/local/zookeeper/bin   ./zkServer.sh start    功能验证    命令行方式创建一个主题      cd /usr/lib/kafka/bin      kafka-topics.sh --zookeeper localhost:2181 --create --topic sandbox-experiment -partitions 2 --replication-factor 1       启动消费者服务       kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sandbox-experiment       启动生产者服务       kafka-console-producer.sh --broker-list localhost:9092 –topic sandbox-experiment        并发送消息“beginning  hello kafka”       消费者服务收到消息            证明重构后的kafka-2.4.1-1.el7.noarch.rpm软件包,在鲲鹏服务器上课正常安装使用。   -----结束
  • [产品介绍] 【DRS云小课】如何通过DRS实现RDS for MySQL到Kafka的数据同步
    数据复制服务(DRS)是一种易用、稳定、高效、用于数据同步的云服务,本节小课为您介绍,如何通过DRS将RDS for MySQL实例的增量数据同步到分布式消息服务Kafka。使用场景DRS实时同步功能一般用于建立数据同步通道,解决数据共享问题,也可以用于数据流式集成,具有数据转换能力,如库表映射,行列过滤等。本实践中的选择均为测试简化基本操作,仅做参考,实际情况请用户按业务场景选择,更多关于DRS的使用场景请单击这里了解。部署架构本示例中,DRS源数据库为华为云RDS for MySQL,目标端为华为云同Region下的分布式消息服务Kafka,通过VPC网络,将源数据库的增量数据同步到目标端。更多关于DRS的使用场景请单击这里了解。源端RDS for MySQL准备创建RDS for MySQL实例如何创建RDS for MySQL实例,请点击这里查看详细步骤。构造数据1. 登录华为云控制台。2. 单击管理控制台左上角的,选择区域“华南-广州”。3. 单击左侧的服务列表图标,选择“数据库 > 云数据库 RDS”。4. 选择RDS实例,单击实例后的“更多 > 登录”。5. 在弹出的对话框中单击“测试连接”检查。6. 连接成功后单击“登录”。7. 输入实例密码,登录RDS实例。8. 单击“新建数据库”,创建db_test测试库。9. 在db_test库中执行如下语句,创建对应的测试表table3_。CREATE TABLE `db_test`.`table3_` ( `Column1` INT(11) UNSIGNED NOT NULL, `Column2` TIME NULL, `Column3` CHAR NULL, PRIMARY KEY (`Column1`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci;目标端Kafka准备创建Kafka实例1. 登录华为云控制台。2. 单击管理控制台左上角的,选择区域“华南-广州”。3. 单击左侧的服务列表图标,选择“应用中间件 > 分布式消息服务Kafka版”。4. 单击“购买Kafka实例”。5. 选择实例区域和可用区。6. 配置实例名称和实例规格等信息。7. 选择存储空间和容量阈值策略。8. 选择实例所属的VPC和安全组。VPC和安全组已在创建VPC和安全组中准备好。9. 配置实例密码。10. 单击“立即购买”。11. 返回实例列表。当Kafka实例运行状态为“运行中”时,表示实例创建完成。创建Topic1. 在“Kafka专享版”页面,单击Kafka实例的名称。2. 选择“Topic管理”页签,单击“创建Topic”。3. 在弹出的“创建Topic”的对话框中,填写Topic名称和配置信息,单击“确定”,完成创建Topic。创建DRS同步任务本章节介绍创建DRS实例,将RDS for MySQL上的数据库增量同步到Kafka。同步前检查在创建任务前,需要针对同步条件进行手工自检,以确保您的同步任务更加顺畅。本示例中,为RDS for MySQL到Kafka的出云同步,您可以参考出云同步使用须知获取相关信息。操作步骤介绍RDS for MySQL到Kafka增量同步任务的详细操作过程。1. 登录华为云控制台。2. 单击管理控制台左上角的,选择区域“华南-广州”。3. 单击左侧的服务列表图标,选择“数据库 > 数据复制服务 DRS”。4. 选择左侧“实时同步管理”,单击“创建同步任务”。5. 填写同步任务参数:配置同步任务名称。选择需要同步任务的源库、目标数据库以及网络信息。这里的目标库选择源端RDS for MySQL准备创建的RDS实例。企业项目选择“default”。   6. 单击“下一步”。同步实例创建中,大约需要5-10分钟。7. 配置源库信息和目标库数据库密码。配置源库信息。单击“测试连接”。当界面显示“测试成功”时表示连接成功。      选择目标库所在VPC和子网,填写Kafka的IP地址和端口。单击“测试连接”。当界面显示“测试成功”时表示连接成功。      8. 单击“下一步”。9. 选择同步信息、策略、消息格式和对象等,投递到Kafka的消息格式。本次选择如下。表1 同步设置类别设置同步Topic策略集中投递到一个Topic,Topic名称“testTopic”。同步到Kafka partition策略按表名+库名的hash值投递到不同Partition。投递到Kafka的数据格式可选择JSON格式,可参考Kafka消息格式。同步对象同步对象选择db_test下的table3_表。10.单击“下一步”。11. 选择数据加工方式。RDS for MySQL到Kafka数据同步目前只支持列加工,列加工提供列级的查询和过滤能力。12. 单击“下一步”,等待预检查结果。13. 当所有检查都是“通过”时,单击"下一步”。14. 确认同步任务信息正确后,单击“启动任务”。返回DRS实时同步管理,查看同步任务状态。启动中状态一般需要几分钟,请耐心等待。当状态变更为“增量同步”,表示同步任务已启动。说明:目前RDS for MySQL到Kafka仅支持增量同步,任务启动后为增量同步状态。如果创建的任务为全量同步,任务启动后进行全量数据同步,数据同步完成后任务自动结束。如果创建的任务为全量+增量同步,任务启动后先进入全量同步,全量数据同步完成后进入增量同步状态。增量同步会持续性同步增量数据,不会自动结束。确认同步任务执行结果由于本次实践为增量同步模式,DRS任务会将源库的产生的增量数据持续同步至目标库中,直到手动任务结束。下面我们通过在源库RDS for MySQL中插入数据,查看Kafka的接收到的数据来验证同步结果。操作步骤1. 登录华为云控制台。2. 单击管理控制台左上角的,选择区域“华南-广州”。3. 单击左侧的服务列表图标,选择“数据库 > 云数据库 RDS””。4. 单击RDS实例后的“更多 > 登录”。5. 在弹出的对话框中单击“测试连接”检查。6. 连接成功后单击“登录”。7. 输入实例密码,登录RDS实例。8. 在DRS同步对象的db_test.table3_表中,执行如下语句,插入数据。INSERT INTO `db_test`.`table3_` (`Column1`,`Column2`,`Column3`) VALUES(4,'00:00:44','ddd');9. 单击左侧的服务列表图标,选择“应用中间件 > 分布式消息服务Kafka版”。10. 在“Kafka专享版”页面,单击Kafka实例的名称。11. 选择“消息查询”页签,在Kafka对应的Topic中,查看接收到相应的JSON格式数据。12. 结束同步任务。根据业务情况,确认数据已全部同步至目标库,可以结束当前任务。单击“操作”列的“结束”。仔细阅读提示后,单击“是”,结束任务。     
  • [问题求助] 【鲲鹏架构服务器】【taos(涛思数据)、kafka】对应的下载安装方式
    【功能模块】【鲲鹏架构服务器】【taos(涛思数据)、kafka】【操作步骤&问题现象】1、taos(涛思数据)、kafka对应的下载安装包和安装方式2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [运维管理] HD 6.5.1.3 版本集群 zookeeper和kafka是否有限制客户端ip访问的方法?kerberos除外
    【操作步骤&问题现象】HD 6.5.1.3 版本集群 zookeeper和kafka是否有限制客户端ip访问的方法?kerberos除外
  • [基础组件] 【MRS3.1.2产品】【CDL组件功能】CDL监控MySQL数据生产到Kafka
    【功能模块】创建CDL作业-MySQL--kafka,任务可以成功运行,且能够监控MySQL新增的数据,生产到Kafka中【操作步骤&问题现象】问题一、①MySQL中insert的时间类型的数据是2021-01-01 00:00:00②生产到Kafak的对应字段的数据变成了2020-12-31T16:00:00Z,时间出现了晚一天现象问题二、①在创建CDL作业的时候,Mysql的配置信息,Schema Auto Create我选择了否②然后消费kafka的数据发现还有大量的Schema信息被生产到Kafka中,希望的是不需要额外大量没有用处的信息
  • [赋能学习] 华为FusionInsight MRS实战 - FlinkSQL从kafka写入hive
    # 华为FusionInsight MRS实战 - FlinkSQL从kafka写入hive ## 背景说明 随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。 SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。 本文介绍如何使用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而处理复杂的嵌套Json格式 ## Kafka样例数据 模拟物联网场景的数据 ``` {"device":"Demo1","signal":"60","life":"24","times":"2021-12-20 15:46:37"} {"device":"Demo2","signal":"78","life":"20","times":"2021-12-20 15:46:37"} {"device":"Demo3","signal":"41","life":"6","times":"2021-12-20 15:46:38"} {"device":"Demo4","signal":"71","life":"29","times":"2021-12-20 15:46:38"} {"device":"Demo5","signal":"38","life":"19","times":"2021-12-20 15:46:38"} {"device":"Demo6","signal":"98","life":"10","times":"2021-12-20 15:46:38"} {"device":"Demo7","signal":"80","life":"19","times":"2021-12-20 15:46:38"} {"device":"Demo8","signal":"55","life":"27","times":"2021-12-20 15:46:38"} {"device":"Demo9","signal":"93","life":"13","times":"2021-12-20 15:46:38"} {"device":"Demo10","signal":"46","life":"2","times":"2021-12-20 15:46:38"} {"device":"Demo11","signal":"94","life":"28","times":"2021-12-20 15:46:38"} {"device":"Demo12","signal":"24","life":"26","times":"2021-12-20 15:46:38"} {"device":"Demo13","signal":"64","life":"3","times":"2021-12-20 15:46:38"} {"device":"Demo14","signal":"97","life":"22","times":"2021-12-20 15:46:38"} {"device":"Demo15","signal":"82","life":"13","times":"2021-12-20 15:46:38"} {"device":"Demo16","signal":"2","life":"2","times":"2021-12-20 15:46:38"} {"device":"Demo17","signal":"19","life":"22","times":"2021-12-20 15:46:38"} {"device":"Demo18","signal":"51","life":"22","times":"2021-12-20 15:46:38"} {"device":"Demo19","signal":"1","life":"20","times":"2021-12-20 15:46:38"} {"device":"Demo20","signal":"41","life":"24","times":"2021-12-20 15:46:38"} ``` ## 使用华为MRS Flinkserver对接Hive ### 前提条件 - 集群已安装HDFS、Yarn、Kafka、Flink和Hive等服务。 - 包含Hive服务的客户端已安装,安装路径如:/opt/client。 - Flink支持1.12.2及以上版本,Hive支持3.1.0及以上版本。 - 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI的用户。 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/1736360gbj0jcxg8s5b01r.png) - 参考创建集群连接中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。 ### 操作步骤 以映射表类型为Kafka对接Hive流程为例。 1. 使用flink_admin访问Flink WebUI,请参考访问Flink WebUI。 2. 新建集群连接,如:flink_hive。 a. 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。 b. 单击“创集集群连接”,在弹出的页面中参考表1填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。 表1 创建集群连接信息 | 参数名称 | 参数描述 | 取值样例 | | ---- | ---- | ---- | |集群连接名称|集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。|flink_hive| |描述|集群连接名称描述信息。|-| |版本|选择集群版本。|MRS 3| |是否安全版本|是,安全集群选择是。需要输入访问用户名和上传用户凭证; 否,非安全集群选择否。|是| |访问用户名|访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。“是否安全版本”选择“是”时存在此参数。|flink_admin| |客户端配置文件|集群客户端配置文件,格式为tar。|-| |用户凭据|FusionInsight Manager中用户的认证凭据,格式为tar。“是否安全版本”选择“是”时存在此参数。输入访问用户名后才可上传文件。|flink_admin的用户凭| ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173701okrumrzvi0plfwut.png) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173725okb4gjbtuycxpgt0.png) 3. 新建Flink SQL流作业,如:kafka_to_hive。 在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。 ``` CREATE TABLE test_kafka ( device varchar, signal varchar, life varchar, times timestamp ) WITH ( 'properties.bootstrap.servers' = '172.16.9.116:21007', 'format' = 'json', 'topic' = 'example-metric1', 'connector' = 'kafka', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default', 'cluster.name' = 'flink_hive' ); use catalog myhive; set table.sql-dialect = hive;create table test_avro_signal_table_orc ( device STRING, signal STRING, life STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as orc TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' ); INSERT into test_avro_signal_table_orc SELECT device, signal, life, times, DATE_FORMAT(times, 'yyyy-MM-dd'), DATE_FORMAT(times, 'HH'), DATE_FORMAT(times, 'mm') FROM default_catalog.default_database.test_kafka; ``` ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/1737526tbsjqu5lhyh0qpm.png) 注意:作业SQL开发完成后,请勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。 4. 启动任务 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173831ksr246oxpzfdjojy.png) 5. 启动kafka生产者插入样例数据 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173856n53f2m49esjuttwn.png) 6. 查看hive数据 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173926jzcxzw96afacb6tg.png)
  • [二次开发] 【大数据Spark2X】sparkstreaming对接kafka程序yarn-cluster提交到华为集群后运行失败,提示入口
    【功能模块】【操作步骤&问题现象】1、sparkstreaming对接kafka程序,本地环境测试正常2、程序打包后通过如下命令提交到yarn上3、提交成功,运行失败,报错:找不到主类【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [赋能学习] 使用pytho接口访问kafka安全模式
    环境准备1.          Manager下载krb5.conf,放到/etc/krb5.conf 目前开源代码不支持自定义krb5.conf路径2.          Manager下载客户端,并安装3.          yum -y install epel-release 测试环境使用CentOS,需要安装EPEL源才能安装pip4.          yum install python-pip 安装python安装管理工具pip5.          pip install kafka-python 需要修改源码 /usr/lib/python2.7/site-packages/kafka/conn.py line 570 将self.host修改为‘hadoop.hadoop.com’。或者直接从git下载最新代码,通过saslkerberosdomain_name参数指定为hadoop.hadoop.com.6.          yum install python-gssapi 安装gssapi库7.          kinit 执行pythong脚本前先通过kinit方式认证8.          执行kafka python脚本。创建topic#!/usr/bin/env bash # cd /opt/client 进入实际客户端安装目录 # source bigdata_env 导入环境变量 # kinit 使用具有KafkaAdmin权限的用户登录 # cd ./Kafka/kafka/bin/ 进入Kafka脚本目录  # 查询当前已有的topic ./kafka-topics.sh --list --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka  # 如果没有test-topic,则创建。partition数量跟同一组中的consumer的数量保持一致。 # 如果要保证消息按顺序被消费,就只建一个partition。 ./kafka-topics.sh --create --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --partitions 3 --replication-factor 2 --topic test-topic  # 给producer赋予向topic中生产数据的权限 ./kafka-acls.sh --authorizer-properties zookeeper.connect=172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --add --allow-principal User:developuser --producer --topic test-topic  # 给consumer赋予从topic中消费数据的权限 ./kafka-acls.sh --authorizer-properties zookeeper.connect=172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --add --allow-principal User:developuser --consumer --topic test-topic --group test-groupProducer样例from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=["172.21.3.101:21007"],                          security_protocl="SASL_PLAINTEXT",                          sasl_mechanism="GSSAPI",                          sasl_kerberos_service_name="kafka") for _ in range(100):     response = producer.send("test-topic", b"testmessage")     result = response.get(timeout=50)     print(result)    Consumer样例from kafka import KafkaConsumer consumer = KafkaConsumer("test-topic",                          bootstrap_servers=["172.21.3.101:21007"],                          group_id="test-group",                          enable_auto_commit="true",                          security_protocl="SASL_PLAINTEXT",                          sasl_mechanism="GSSAPI",                          sasl_kerberos_service_name="kafka") for message in consumer:     print(message)
  • [问题求助] 【FI】【Kafka(KCluster)】开启kerberos认证后,代码无法从kafka集群中获取topic
    如题所示,报下面的错误,借鉴网上https://blog.csdn.net/li1987by/article/details/82856873的改法。将kafka-clients-2.6.0.jar 替换为kafka-clients-1.1.0.jar后,改错误消失,但是在哪里可以下载到华为kafka-clients-2.6.0.jar 的jar??我们需要后面spark 任务需要kafka-client2.6
  • [问题求助] 【FI】【Kafka(KCluster)】开启kerberos认证后,代码无法从kafka集群中获取topic
    如题所述,kafka-console-producer.sh & kafka-console-consumer.sh 执行正常。但是代码执行失败, kerberos验证开启debug模式后,日志观察成功。jar引用是华为kafka client libs。kafka版本为:2.11-1.1.0哪位大佬给指点下,万分感谢。----------------------------String krb5 = args[0];String jaasPath = args[1];String broker= args[2];// todoSystem.setProperty("java.security.krb5.conf", krb5);System.setProperty("java.security.auth.login.config", jaasPath);System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");Properties props = new Properties();props.put("bootstrap.servers", broker);props.put("group.id", "g1");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", ByteBufferDeserializer.class.getName());// todoprops.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.kerberos.service.name", "kafka");// adminclientAdminClient client = AdminClient.create(props);ListTopicsResult listTopics = client.listTopics();Set<String> strings = listTopics.names().get();System.out.println(strings);client.close();// kafkaconsumerKafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();System.out.println(stringListMap.size());consumer.close();
  • [运维管理] HD 6.5.1.7版本 kafka节点出现单台故障实例后,执行删除topic标记删除问题
    【操作步骤&问题现象】HD 6.5.1.7版本 kafka节点出现单台故障实例后,执行删除topic标记删除问题业务侧自己执行zk 将对应topic信息删除,后创建topic使用时报错元数据不存在在单节点故障情况下是否可以执行,清理zk元数据 addauth krbgroup deleteall /kafka/brokers/topics/topicnamedeleteall /kafka/config/topics/topicname手动清理Kafka所有节点上的该topic的数据文件重启kafka实例 恢复?
  • [行业资讯] Apache Kafka 3.0.0 正式发布
    Apache Kafka 3.0.0正式发布,Apache Kafka是一个分布式流平台,具有四个核心API。本次更新是一个重要的版本更新,其中包括许多新的功能:对Java 8和Scala 2.12的支持被废弃了;Kafka Raft支持元数据主题的快照,以及self-managed quorum方面的其他改进;废弃了消息格式v0和v1;默认情况下为Kafka Producer启用更强的交付保证;增强了Kafka Streams中时间戳同步的语义;修改了Stream的TaskId的公共API;优化了OffsetFetch和FindCoordinator请求等等。
  • [基础组件] 程序中连接kafka时报错且确认配置的bootstrap.servers正确
    【功能模块】kafka【操作步骤&问题现象】1、编写程序,大致逻辑:SparkStreaming读取kafka中的数据,然后写入hbase中2、此demo是华为云上的样例demo,视频地址:https://bbs.huaweicloud.com/forum/thread-90888-1-1.html提交方式使用的是yarn-client【截图信息】【日志信息】(可选,上传日志内容或者附件)一直在重复报地址连不上[Consumer clientId=consumer-testGroup-1, groupId=testGroup] Bootstrap broker 172.31.8.38:21007 (id: -2 rack: null) disconnected | org.apache.kafka.clients.NetworkClient.handleServerDisconnect,详细日志见附件
总条数:181 到第
上滑加载中