• java消费DRS同步到kafka中的数据(mysql的binlog日志),出现编码问题
    根据帖子https://bbs.huaweicloud.com/blogs/172085 解析avro数据,获取中文字段编码问题
  • [问题求助] kafka消费组状态一直是EMPTY
    kafka消费组状态一直是EMPTY,但是flink有真实消费成功,并且监控中消费组的消息堆积是有起伏的
  • [二次开发] 本地环境消费开源kafka,将数据写入mrs kafka
    生产者代码:报错情况:
  • [技术干货] RocketMQ与Kafka实践上对比
    前言RocketMQ与Kafka 是我们最常用的消息中间件,那么它们之间有什么区别呢?本文带你一起来研究下这个问题!首先,淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kafka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用Java语言编写了RocketMQ,定位于非日志的可靠消息传输(日志场景也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。数据可靠性RocketMQ支持异步实时刷盘,同步刷盘,同步Replication,异步ReplicationKafka使用异步刷盘方式,异步Replication总结:RocketMQ的同步刷盘在单机可靠性上比Kafka更高,不会因为操作系统Crash,导致数据丢失。同时同步Replication也比Kafka异步Replication更可靠,数据完全无单点。另外Kafka的Replication以topic为单位,支持主机宕机,备机自动切换,但是这里有个问题,由于是异步Replication,那么切换后会有数据丢失,同时Leader如果重启后,会与已经存在的Leader产生数据冲突。开源版本的RocketMQ不支持Master宕机,Slave自动切换为Master,阿里云版本的RocketMQ支持自动切换特性。性能对比Kafka单机写入TPS约在百万条/秒,消息大小10个字节RocketMQ单机写入TPS单实例约7万条/秒,单机部署3个Broker,可以跑到最高12万条/秒,消息大小10个字节总结:Kafka的TPS跑到单机百万,主要是由于Producer端将多个小消息合并,批量发向Broker。RocketMQ为什么没有这么做?Producer通常使用Java语言,缓存过多消息,GC是个很严重的问题Producer调用发送消息接口,消息未发送到Broker,向业务返回成功,此时Producer宕机,会导致消息丢失,业务出错Producer通常为分布式系统,且每台机器都是多线程发送,我们认为线上的系统单个Producer每秒产生的数据量有限,不可能上万。缓存的功能完全可以由上层业务完成。单机支持队列Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长RocketMQ单机支持最高5万个队列,Load不会发生明显变化队列多有什么好处?单机可以创建更多Topic,因为每个Topic都是由一批队列组成Consumer的集群规模和队列数成正比,队列越多,Consumer集群可以越大消息投递实时性Kafka使用短轮询方式,实时性取决于轮询间隔时间RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。消息失败重试Kafka消费失败不支持重试RocketMQ消费失败支持定时重试,每次重试间隔时间顺延总结:例如充值类应用,当前时刻调用运营商网关,充值失败,可能是对方压力过多,稍后在调用就会成功,如支付宝到银行扣款也是类似需求。这里的重试需要可靠的重试,即失败重试的消息不因为Consumer宕机导致丢失。更多面试题推荐:公众号Java精选,回复java面试,获取最新面试资料。严格的消息顺序Kafka支持消息顺序,但是一台Broker宕机后,就会产生消息乱序RocketMQ支持严格的消息顺序,在顺序消息场景下,一台Broker宕机后,发送消息会失败,但是不会乱序Mysql Binlog分发需要严格的消息顺序定时消息Kafka不支持定时消息RocketMQ支持两类定时消息开源版本RocketMQ仅支持定时Level阿里云ONS支持定时Level,以及指定的毫秒级别的延时时间分布式事务消息Kafka不支持分布式事务消息阿里云ONS支持分布式定时消息,未来开源版本的RocketMQ也有计划支持分布式事务消息消息查询Kafka不支持消息查询RocketMQ支持根据Message Id查询消息,也支持根据消息内容查询消息(发送消息时指定一个Message Key,任意字符串,例如指定为订单Id)总结:消息查询对于定位消息丢失问题非常有帮助,例如某个订单处理失败,是消息没收到还是收到处理出错了。推荐程序员摸鱼地址:https://www.yoodb.com/slack-off/home.html消息回溯Kafka理论上可以按照Offset来回溯消息RocketMQ支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息总结:典型业务场景如consumer做订单分析,但是由于程序逻辑或者依赖的系统发生故障等原因,导致今天消费的消息全部无效,需要重新从昨天零点开始消费,那么以时间为起点的消息重放功能对于业务非常有帮助。消息并行度Kafka的消费并行度依赖Topic配置的分区数,如分区数为10,那么最多10台机器来并行消费(每台机器只能开启一个线程),或者一台机器消费(10个线程并行消费)。即消费并行度和分区数一致。RocketMQ消费并行度分两种情况顺序消费方式并行度同Kafka完全一致乱序方式并行度取决于Consumer的线程数,如Topic配置10个队列,10台机器消费,每台机器100个线程,那么并行度为1000。消息轨迹Kafka不支持消息轨迹阿里云ONS支持消息轨迹开发语言友好度Kafka采用Scala编写RocketMQ采用Java语言编写Broker端消息过滤Kafka不支持Broker端的消息过滤RocketMQ支持两种Broker端消息过滤方式根据Message Tag来过滤,相当于子topic概念向服务器上传一段Java代码,可以对消息做任意形式的过滤,甚至可以做Message Body的过滤拆分。消息堆砌能力理论上Kafka要比RocketMQ的堆积能力更强,不过RocketMQ单机也可以支持亿级的消息堆积能力,我们认为这个堆积能力已经完全可以满足业务需求。开源社区活跃度Kafka社区更新较慢RocketMQ的github社区有250个个人、公司用户登记了联系方式,QQ群超过1000人。商业支持Kafka原开发团队成立新公司,目前暂没有相关产品看到RocketMQ在阿里云上已经开放公测近半年,目前以云服务形式免费供大家商用,并向用户承诺99.99%的可靠性,同时彻底解决了用户自己搭建MQ产品的运维复杂性问题成熟度Kafka在日志领域比较成熟RocketMQ在阿里集团内部有大量的应用在使用,每天都产生海量的消息,并且顺利支持了多次天猫双十一海量消息考验,是数据削峰填谷的利器。
  • [问题求助] 如何使用java将OPC(HDA、UA、DA) 中的数据放入kafka中?
    如何使用java将OPC(HDA、UA、DA) 中的数据放入kafka中?
  • [技术干货] 最经典的一道JAVA面试题,谈谈你对Kafka零拷贝原理的理解
    最近一位3年工作经验的小伙伴去某厂面试,被问到这样一个问题,说:”请你简单说一下Kafka的零拷贝原理“。然后,这位小伙伴突然愣住了,什么是零拷贝,零拷贝跟Kafka有关系吗?那么今天,我给大家来聊一聊我对Kafka零拷贝原理的理解。1、什么是零拷贝?在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,它必须要经过几个拷贝的过程,如图所示:1、从磁盘中读取目标文件内容拷贝到内核缓冲区2、CPU控制器再把内核缓冲区的数据赋值到用户空间的缓冲区中3、接着在应用程序中,调用write()方法,把用户空间缓冲区中的数据拷贝到内核下的Socket Buffer中。4、最后,把在内核模式下的SocketBuffer中的数据赋值到网卡缓冲区(NIC Buffer)5、网卡缓冲区再把数据传输到目标服务器上在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历4次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的,分别是:1、从内核空间赋值到用户空间2、从用户空间再次复制到内核空间除此之外,由于用户空间和内核空间的切换会带来CPU的上线文切换,对于CPU性能也会造成性能影响。而零拷贝,就是把这两次多于的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核中直接传输给Socket,而不需要再经过应用程序所在的用户空间,如下图所示零拷贝通过DMA(Direct Memory Access)技术把文件内容复制到内核空间中的Read Buffer,接着把包含数据位置和长度信息的文件描述符加载到Socket Buffer中,DMA引擎直接可以把数据从内核空间中传递给网卡设备。在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了2次cpu的上下文切换,对于效率有非常大的提高。2、Kafka零拷贝?所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。在程序中实现零拷贝的方式有三种:1、在Linux中,零拷贝技术依赖于底层的sendfile()方法实现2、在Java中,FileChannal.transferTo() 方法的底层调用的就是 sendfile() 方法。3、MMAP 文件映射机制。它的原理是,将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。以上就是我对于Kafka中零拷贝原理的理解本次的面试题涉及到一些计算机底层的原理,大家在平时的业务开发过程中也很少关注。但我想提醒大家,如果要在技术路上走得更远,还是要多关注一些底层的实现原理,把基础打牢固。
  • [二次开发] 基于 MRS3.0.2版本 二次开发(12/27): Kafka接口调用样例 开发程序打包运行报错
    依据MRS3.0.2版本 二次开发(12/27): Kafka接口调用样例中的样例代码进行了二次开发。业务程序在idea界面运行正常,消息接收及发送功能都是正常的。但将程序打为jar包后运行(包括在项目out目录下直接通过cmd窗口运行/在linux中运行)时就会报错Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set。经过多次打包重试、修改业务代码直至最终仅使用最初始代码逻辑运行结果还是一样。求助:这个项目如需打为jar包需要执行什么操作才可以正常执行LoginUtil中创建、设置jaas.conf文件的逻辑?
  • [热门活动] 【华为云618】研发工具、Redis、漏洞扫描等新客3.96元起,下单抽FreeBuds耳机!
    #实惠更实用,跑赢新开局 #华为云618 15+款云原生2.0产品助力应用现代化! Redis、Kafka、RockeMQ、研发工具等新客试用3.96元起 老客户也有专属优惠!下单抽FreeBuds耳机! 更多产品优惠戳:http://t.cn/A6a7JhLQ #华为云 #618促销季
  • [数据编排] 你好,我想问一下GDE体验环境是不是不可以使用数据编排功能
    我点进去数据编排首页他报这个错误,请问一下是什么原因啊?
  • [二次开发] Flink接收Kafka消息实时入库出现问题
    按照教程《MRS3.0.2版本 二次开发(18/27): Flink读写Kafka样例》及样例代码FlinkKafkaJavaExample工程调试成功基础消息收发功能,然后在样例代码基础上加入Flink组件使用JdbcSink方式实时消费数据写入dws数据库的业务部分。加入此步骤业务后重新运行程序,启动发送程序WriteIntoKafka发送编辑好的json字符串,启动接收程序ReadFromKafka接收消费消息。此时发生报错"No route to host (Host unreachable)",程序终止,数据无法写入库中。数据库连接已经过验证没有问题。(将数据库写入代码取出addSink方法,直接在main方法中建立connection及preparedStateMent语句操作数据库完全正常,只要将数据库操作业务代码放入addSink方法中就会报如上错误)
  • [热门活动] 【华为云12·12】全场中间件、软件开发、漏洞扫描等服务新客1.98元起,老客户低至7折,下单抽FreeBuds耳机!
    #华为云12·12#全场DevOps与中间件服务3折起,新用户1.98元起,老用户续订7折起!-Redis、Kafka、RabbitMQ、RocketMQ等低至5.5折,3年5折!-数字资产链7折起,区块链新客试用19.8元起!-软件开发生产线CodeArts,新客试用1.98元起!-漏洞扫描、二进制成分分析、APP合规检测6.6折起!下单抽FreeBuds无线耳机!戳连接:http://t.cn/A6a7JhLQ活动有效期至2022年12月31日,上云正当时!>点击这里,马上进入活动专场<
  • [最佳实践] 关于kafka的Producer的粘性分区的坑
    问题背景:某集群集群做了kafka集群切换zk服务变更。在变更期间,停止多个kafka业务(主要是Flink流作业)30分钟左右。变更结束后,恢复kafka业务,其中一个Flink作业写Kafka不稳定,运行几分钟后其中一个partition写入超时,导致作业失败。               问题分析:从报错堆栈上来看,数据向topic所在节点发送数据过程中出现了超时,也就是producer端与kafka端连接超时。触发报错的场景一般有两种原因:(1)网络问题:发送数据的客户端到服务端之间存在一定网络延迟,导致发送失败。通过检测网络质量,这个场景能够排除。(2)Kafka服务端异常。分区所在的kafka节点本身存在异常,导致数据发送超时。例如:磁盘、CPU等硬件资源使用过载会出现处理能力下降等。   在登录这个异常的broker节点后,通过磁盘io命令查看磁盘使用率,发现磁盘io长时间处于了90%以上。                 2. 磁盘io长时间处于100%,与节点的数据流量异常有关。对比异常的broker节点和其它的broker节点,发现数据流量较之前增长明显。并且这个节点上的分区大小相比较于其它分区大了将近10倍。                                                             3. 通常出现这种情况时,往往有如下几种场景:     (1)数据带了key而导致的数据倾斜。例如如下写法:                           通过排查业务侧的代码。数据中并没有带有key值,因此该假设不成立。       (2)分区倾斜:业务的数据量大,但是分区数量少。Topic的分区数均衡200分区,每个分区数量不一致。如果分区倾斜每个分区中的数据量应该是一致的。不会出现这种现象。故不成立。      4. 通过观查,出现问题的节点只有一个。也就是说,只要topic的分区在异常的broker节点上,这些分区的数据量就会异常。如果停止了这个异常节点,流量会迁移到另外一个节点上。原因分析Producer的粘性分区特性先来了解一下2.4版本以后引进的粘性特性:https://cwiki.apache.org/confluence/display/KAFKA/KIP-480:+Sticky+Partitioner关于粘性的介绍可以看下这篇文章:cid:link_0划重点:     在6.5.1版本(1.1.0版本)之前,如果数据没有key默认的分区散列算法如下:                                                                                   图一:原始的分区散列算法每条数据会随机选择一个分区数据进入分区所在的Deque队列,deque队列中以batch为单位进行数据缓存,每个batch大小默认为16384bytes(由生产者参数batch.size决定)当满足batch大小满足条件或者超过ling.ms设定时间时,触发数据发送。651版本(kafka为2.4版本)以后,数据不带key的默认发送场景为粘性发送。             ​​​​​​​                                      图二:粘性分区散列算法(1)随即挑选一个可用分区(如果leader不为-1或者none均为可用分区,被选择过的分区在下次选择时候不再作为候选分区)。见代码:               (2) 当至少将分区填满或者达到linger.ms上限后,发送整个分区的数据。         根据上述说明,当设置了linger.ms就意味着要等到到达linger.ms设置的限定时间或者batch.size后才能发送数据。     1. 使用原始的发送方法。数据均匀散列到各个分区,batch.size很难填满,此时就必须要等待到达linger.ms设定的时间限制。在到达时间后,topic的所有分区同时发送请求,例如图一中的topic有三个分区,等待时间超过linger.ms后才会发送请求。    2.  使用粘性分区发送。数据会集中发送到一个分区,这个分区会写满一个batch才会选择另外一个分区。如果在linger.ms设定的时间内写满,那么就会体现发送这个batch的数据,并且在同一时间只产生一个请求。    通过比对,粘性分区从吞吐率和资源使用上都有一定程度的优化。但是粘性设计仍然存在一定的缺陷。见粘性优化方案:KAFKA-10888二,粘性分区的问题​​​​​​​     回到问题中,为什么粘性会带来数据倾斜。上文提到如果数据的发送依赖于linger.ms和batch.size两个参数。在默认情况下linger.ms会配置为0,也就是立即发送。这样每个分区中的数据难达到batch.size的大小就会立即发送。       生产者中有一个参数能够限制生产者最大的请求数量:max.in.flight.requests.per.connection 该参数能够限制生产者与一个broker的链接上最大的请求数量,也就是说当生产者与broker建立一个常链接后,这个链接上能够持有的最多未通过acks确认的发送请求最大数。默认值为5。假设5个链接全部被占用,那么生产者中的数据将的缓存起来,当有可用的链接时。缓存中的数据将以batch的形式发出去。      在有可用发送线程的情况,如下图:                             Producerbatch能够及时发送到kafka的broker节点,并且由于linger.ms设置为0,batch.Size不会写满就会发送。如果kafka的broker节点出现性能问题,例如CPU、磁盘IO、网络等问题导致节点响应慢,就会出现大批量的batch挤压,多数batch都会被填满。如下图:                          此时,这样就会产生这样的现象:1.无异常的节点batch无法写满,发送的量少,分区中的数据量少。2. 异常节点由于响应慢,请求池被占用完,大量的数据挤压,每个batch的数据全部写满。分区中的数据会越来越多。最后所达到的现象就是。每个分区的数据量差异变大。                      ​​​​​​​​​​​​​​更严重的是,如果这个现象一旦出现,性能差的节点会成为短板节点,很难自行恢复,并且性可能会越来越差。解决方案:通过修改分区散列算法能够规避这个问题。(1)Kafka生产者原生API:将散列算法修改为RoundRobin 随机算法。如下配置                         初始化properties时加入配置:"partitioner.class",并且修改value为"org.apache.kafka.clients.producer.RoundRobinPartitioner"(2)如果使用的是Flink作为生产者。如果配置了下图中的内容将使用粘性分区。              可以将上图的红框内容替换为Optional.of(new FlinkFixedPartitioner<>())。
  • [最佳实践] 开启kafka-request.log日志并分析每个请求阶段的耗时
    一、 Kafka request日志打开和关闭方式打开:集群 -> Kafka -> 配置 -> 全部配置 -> kafka.request.log.level配置改为DEBUG后保存。2. 关闭:集群 -> Kafka -> 配置 -> 全部配置 -> kafka.request.log.level配置改为INFO后保存。保存后不用重启Kafka实例,一般建议开5到10分钟。二、 Kafka request分析Kafka request原理 Kafka请求处理模型如下图所示:Kafka server端启动一个Acceptor线程,负责监听socket新的连接请求、注册OP_ACCEPT事件。Acceptor将监听到的新连接以轮询方式发送给ProcessorProcessor注册OP_READ事件,把接收到的请求放到request队列KafkaRequestHandler从Request队列中获取请求交给kafkaApi处理KakfaApi通过其handle()方法执行响应的处理逻辑KakfaApi处理完后将Response放回到Request对应的Processor的ResponseQueue中。processor从response队列中取出待发送的response发送给客户端。Request请求监控指标Request类中request处理流程如下图所示:如上图所示,流程可分为六个阶段:请求被processor放入Request队列等待处理:此过程处理速度与num.io.threads配置有关。IO线程在本地节点处理:处理速度与本节点CPU、IO有关。IO线程在远端节点处理:处理速度与远端节点CPU、IO有关,也与节点间的网络有关。限流等待:与限流设置有关。Response请求放在response队列中等待返回:此过程处理速度与num.network.threads配置有关。Response请求被发送给客户端:处理速度与服务端和客户端的网络有关。以上六个阶段对应的request日志中的指标信息是:requestQueueTime: 请求在request队列中等待时间,此过程耗时多可增大num.io.threads来缓解。localTime: 请求在本节点处理耗时,此过程耗时多可检查节点上的CPU和IO使用率是否高及IO线程数配置是否过小。remoteTime: 请求在其他节点处理耗时,此过程耗时多需检查节点间的网络是否有延迟、对端节点的CPU和IO使用率是否高。throttleTime: 限流的时间。responseQueueTime: 请求在response队列中等待时间此过程耗时多可增大num.network.threads来缓解。sendTime: processor从response队列中获取处理结果并发送给客户端的时间,此过程耗时多说明服务端与客户端网络存在较大延迟。除了上述指标外,request日志中还涉及以下指标:totalTime:请求处理总时间。securityProtocol: 请求的协议类型。principal:User: 请求的用户。clientId:生产和消费如果不设置clientId,Kafka会自动生成clientid,命名为producer-X和consumer-X,副本数据同步的clientId为broker-X-fetcher-X。apiKey:表示所调用的请求。生产和消费问题排查常看的请求有生产请求PRODUCE,消费请求FETCH,元数据请求METADATA,提交offset请求OFFSET_COMMIT。生产请求request日志说明生产请求示例:2020-12-07 11:14:52,861 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-5] | Completed request:RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=producer-1, correlationId=14) -- {acks=1,timeout=30000,numPartitions=1},response:{responses=[{topic=test,partition_responses=[{partition=1,error_code=0,base_offset=4032196,log_append_time=-1,log_start_offset=0}]}],throttle_time_ms=0} from connection 100.112.22.201:21005-10.144.116.89:53748-1;totalTime:0.525,requestQueueTime:0.059,localTime:0.385,remoteTime:0.0,throttleTime:0.074,responseQueueTime:0.036,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:Default#Principal,listener:PLAINTEXT | kafka.request.logger (RequestChannel.scala:209)生产请求查找关键字:apiKey=PRODUCE、topic=topicName和生产端IP。无论生产端发送数据是否成功,request日志中都有相应的日志信息。Responses信息中的error_code如果为0表示生产段发送数据成功,error_code如果为非0值表示生产端发送数据失败,例如error_code为10,表示生产端发送数据的大小大于服务端设置的消息最大值。三、消费请求request日志说明消费请求示例:2020-12-07 17:41:42,143 | DEBUG | [data-plane-kafka-network-thread-1-ListenerName(PLAINTEXT)-PLAINTEXT-3] | Completed request:RequestHeader(apiKey=FETCH, apiVersion=7, clientId=consumer-1, correlationId=7) -- {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=0,epoch=0,topics=[{topic=test,partitions=[{partition=1,fetch_offset=1500,log_start_offset=-1,max_bytes=1048576},{partition=3,fetch_offset=0,log_start_offset=-1,max_bytes=1048576}]}],forgetten_topics_data=[]},response:{throttle_time_ms=0,error_code=0,session_id=758584167,responses=[{topic=test,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=5884956,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=org.apache.kafka.common.record.FileRecords@40f12818},{partition_header={partition=3,error_code=0,high_watermark=0,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]} from connection 100.112.22.201:21005-10.144.116.89:57910-3;totalTime:499.237,requestQueueTime:0.09,localTime:0.549,remoteTime:0.0,throttleTime:0.276,responseQueueTime:0.033,sendTime:498.585,securityProtocol:PLAINTEXT,principal:User:Default#Principal,listener:PLAINTEXT | kafka.request.logger (RequestChannel.scala:209)消费请求查找关键字:apiKey=FETCH、clientId和消费端IP,topic=topicName也可作为查找的依据,但是topics中的内容有时为空,可能查找不到。topics中的内容为空是因为其内容已被缓存在session链路两侧。无论消费是否成功,request日志中都有相应的日志信息。Responses信息中的error_code如果为0表示消费成功,error_code如果为非0值表示消费失败,例如error_code为3,表示服务端没有这个topic partition。
  • [最佳实践] Kafka集群性能的检测方式
    生产性能测试脚本分析使用客户端脚本kafka-producer-perf-test.sh能够测试当前集群的生产的性能基线。如下图:使用方法(以6.5.1版本为例)如下:./kafka-producer-perf-test.sh --topic topicName --num-records 1000000 --record-size 1000 --throughput 20000 --producer.config ../config/producer.properties --print-metrics效果图如下:参数解读:--topic topic名称--num-records 要发送的数据条数--record-size 要发送的单条数据大小--throughput 测试速率显示,建议现网测试的时候如果有生产业务在跑,建议合理设置该值。--producer.config 生产者配置。-print-metrics 打印测试的统计信息。该脚本的测试场景主要用于检测集群的性能基线、检查网络带宽等。注意:在检测性能基线时需要停止业务并且将—throughput 设置为-1。在检测网络带宽时,需要在集群内和集群外的客户端节点分别部署FI的客户端。并对比性能的测试结果。禁止使用该脚本向生产使用的topic中发送数据。消费性能测试脚本分析使用客户端脚本kafka-consumer-perf-test.sh能够测试当前集群的消费的性能基线。使用方法如下:sh kafka-consumer-perf-test.sh --group id1 --topic test --messages 1000 --broker-list kafka业务IP:21007 --consumer.config ../config/consumer.properties –threads 2 --print-metrics--group 为消费用的消费者组,注意一定不要与生产使用的消费者组重复。--topic topic名称--messages 需要消费的消息条数--broker-list broker的业务ip列表–threads 测试使用的消费并行度--print-metrics 是否输出统计信息。该脚本的测试场景主要用于检测集群的性能基线。注意:当使用该脚本测试现网消费性能过程中,groupid需要自己随机指定,禁止使用业务所使用的groupid。在检测网络带宽时,需要在集群内和集群外的客户端节点分别部署FI的客户端。并对比性能的测试结果。注意:如果命令在FusionInsight HD集群内部(例如在主OMS节点部署了kafka客户端)使用以上脚本测试出的性能若不稳定且持续在几M到几十M之间转换并伴随发送超时,则可以认为kafka集群性能异常。如下图如果命令在FusionInsight HD集群外部(非FI集群内部节点)的客户端节点执行性能低于90M但是FI集群内部性能远大于90M,则认为客户端到服务端时间存在带宽问题。
  • [最佳实践] 检测网络异常的通用方式
    图:client为业务的客户端节点(生产数据、消费数据的节点上图为例,client节点与kafka集群的broker-3节点可能存在网络问题,那么需要用以下的手段进行检测:网络延迟检测:从client节点向broker-3节点发送ping包然后查看前30次的延时,看一下是否有网络抖动,如下命令:ping –s 5000 broker-3IP延时在20ms以内则认为正常。2. 带宽检测:网络延迟没有问题并不代表带宽是正常的,通常带宽的检查方法使用scp命令就能够看出带宽的大小。流程如下:找一个1G左右的压缩文件(不要使用文件夹)执行scp命令,将文件传送到broker-3节点的/tmp目录下,命令如下:scp 文件root@broker-3 ip:/tmp/  执行结果例如如下:如上结果,每秒传送性能在107M左右,如果这个性能低于80M则认为带宽不足或者带宽占用量大。可定性为网络质量问题。
总条数:25 到第
上滑加载中