-
python实现kafka生产消息 python3环境准备 1. 安装python依赖 pip3 install confluent-kafka pip3 install kafka-python 2. python代码 from confluent_kafka import Producer # Kafka 服务器地址 bootstrap_servers = 'x.x.x.x:21005, x.x.x.x:21005, x.x.x.x:21005' # 创建生产者配置 producer_config = { 'bootstrap.servers': bootstrap_servers } # 创建生产者实例 producer = Producer(producer_config) # 主题和消息列表 topic = 'your_topic' messages = ['Message 1', 'Message 2', 'Message 3'] # 批量发送消息到 Kafka for message in messages: producer.produce(topic, message.encode('utf-8')) 刷新生产者缓冲区 producer.flush() 关闭生产者实例 producer.flush() producer.poll(0) # 等待传递所有消息 producer.flush() 4. 执行命令 ./python3 /opt/sandbox/python_kafkaproducer.py python实现kafka消费消息 1. python代码 from kafka import KafkaConsumer from kafka import KafkaProducer from kafka.errors import KafkaError import sys def start_consumer(consumertopicname): consumer = KafkaConsumer(consumertopicname, group_id='my-group', bootstrap_servers='x.x.x.x:21005,x.x.x.x:21005,x.x.x.x:21005') for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) if name == "main": if len(sys.argv) < 1: print("usage:python3 pythonkafka.py consumertopicname producertopicname") exit(-1) consumertopicname=sys.argv[1] start_consumer(consumertopicname) 2. 执行命令 ./python3 /opt/sandbox/test_spark.py your_topic`(参数填写需要消费的topic) 注: bootstrap_servers填写节点加端口,非安全为21005
-
Kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。所谓的分区其实就是在Kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号。①Topic 一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。②Partition topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列③Segment 所谓的segment其实就是在分区对应的文件夹下产生的文件。一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另-方面可以基于这些segment文件进行历史数据的删除,提高效率。一个segment又由一个log和一个index文件组成。④Log Log由多个Segment文件组成,接收到的新消息永远是以追加的方式于Segment文件中,Segment的文件个数随着数据量的累积而增加,每个消息有自增编号,这种只追加不修改的方式避免了变更前的查询消耗。⑤index Index文件仅记录固定消息量的索引编号范围,Kafka在查询时,先从Index中定位到小范围的索引编号区间,再去Log中在小范围的数据块中查询具体数据,此索引区间的查询方式称为:稀疏索引。二、原理概念1、持久化 kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何OS下,对文件系统本身的优化是非常艰难的。文件缓存/直接内存映射等是常用的手段。因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升。2、性能 除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题。kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息。不过消息量的大小可以通过配置文件来指定。对于Kafka broker端,似乎有个Send file系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"、"内核内存"、"进程内存"、"网络缓冲区",多者之间的数据copy)。3、Topic模型 其他JMS实现,消息消费的位置是有provider保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态。这就要求JMS broker需要太多额外的工作。在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。由此可见,consumer客户端也很轻量级。4、负载均衡 kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息。消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。 异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
-
Kafka的多副本冗余设计不管是传统的基于关系型数据库设计的系统,还是分布式的如 zookeeper 、 redis 、 Kafka 、 HDFS等等,实现高可用的办法通常是采用冗余设计,通过冗余来解决节点宕机不可用问题。 首先简单了解Kafka的几个概念:Broker (节点):Kafka服务节点,简单来说一个 Broker 就是一台Kafka服务器,一个物理节点。Topic (主题):在Kafka中消息以主题为单位进行归类,每个主题都有一个 Topic Name ,生产者根据Topic Name将消息发送到特定的Topic,消费者则同样根据Topic Name从对应的Topic进行消费。Partition (分区):Topic (主题)是消息归类的一个单位,但每一个主题还能再细分为一个或多个 Partition (分区),一个分区只能属于一个主题。主题和分区都是逻辑上的概念,举个例子,消息1和消息2都发送到主题1,它们可能进入同一个分区也可能进入不同的分区(所以同一个主题下的不同分区包含的消息是不同的),之后便会发送到分区对应的Broker节点上。Offset (偏移量):分区可以看作是一个只进不出的队列(Kafka只保证一个分区内的消息是有序的),消息会往这个队列的尾部追加,每个消息进入分区后都会有一个偏移量,标识该消息在该分区中的位置,消费者要消费该消息就是通过偏移量来识别。在Kafka 0.8版本以前,是没有多副本冗余机制的,一旦一个节点挂掉,那么这个节点上的所有 Partition的数据就无法再被消费。这就等于发送到Topic的有一部分数据丢失了。在0.8版本后引入副本记者则很好地解决宕机后数据丢失的问题。副本是以 Topic 中每个 Partition的数据为单位,每个Partition的数据会同步到其他物理节点上,形成多个副本。每个 Partition 的副本都包括一个 Leader 副本和多个 Follower副本,Leader由所有的副本共同选举得出,其他副本则都为Follower副本。在生产者写或者消费者读的时候,都只会与Leader打交道,在写入数据后Follower就会来拉取数据进行数据同步。多少个副本才算够用?副本肯定越多越能保证Kafka的高可用,但越多的副本意味着网络、磁盘资源的消耗更多,性能会有所下降,通常来说副本数为3即可保证高可用,极端情况下将 replication-factor 参数调大即可。Follower和Lead之间没有完全同步怎么办?Follower和Leader之间并不是完全同步,但也不是完全异步,而是采用一种 ISR机制( In-Sync Replica)。每个Leader会动态维护一个ISR列表,该列表里存储的是和Leader基本同步的Follower。如果有Follower由于网络、GC等原因而没有向Leader发起拉取数据请求,此时Follower相对于Leader是不同步的,则会被踢出ISR列表。所以说,ISR列表中的Follower都是跟得上Leader的副本。一个节点宕机后Leader的选举规则是什么?分布式相关的选举规则有很多,像Zookeeper的 Zab 、 Raft 、 Viewstamped Replication 、微软的 PacificA 等。而Kafka的Leader选举思路很简单,基于我们上述提到的 ISR列表,当宕机后会从所有副本中顺序查找,如果查找到的副本在ISR列表中,则当选为Leader。另外还要保证前任Leader已经是退位状态了,否则会出现脑裂情况(有两个Leader)。怎么保证?Kafka通过设置了一个controller来保证只有一个Leader。Ack参数决定了可靠程度另外,这里补充一个面试考Kafka高可用必备知识点:request.required.asks 参数。Asks这个参数是生产者客户端的重要配置,发送消息的时候就可设置这个参数。该参数有三个值可配置:0、1、All 。第一种是设为0意思是生产者把消息发送出去之后,之后这消息是死是活咱就不管了,有那么点发后即忘的意思,说出去的话就不负责了。不负责自然这消息就有可能丢失,那就把可用性也丢失了。第二种是设为1意思是生产者把消息发送出去之后,这消息只要顺利传达给了Leader,其他Follower有没有同步就无所谓了。存在一种情况,Leader刚收到了消息,Follower还没来得及同步Broker就宕机了,但生产者已经认为消息发送成功了,那么此时消息就丢失了。注意,设为1是Kafka的默认配置可见Kafka的默认配置也不是那么高可用,而是对高可用和高吞吐量做了权衡折中。第三种是设为All(或者-1)意思是生产者把消息发送出去之后,不仅Leader要接收到,ISR列表中的Follower也要同步到,生产者才会任务消息发送成功。进一步思考, Asks=All 就不会出现丢失消息的情况吗?答案是否。当ISR列表只剩Leader的情况下, Asks=All 相当于 Asks=1 ,这种情况下如果节点宕机了,还能保证数据不丢失吗?因此只有在 Asks=All并且有ISR中有两个副本的情况下才能保证数据不丢失。Kafka 的一个节点宕机后为什么不可用?我在开发测试环境配置的 Broker 节点数是3, Topic 是副本数为3, Partition 数为6, Asks参数为1。解决问题当三个节点中某个节点宕机后,集群首先会怎么做?没错,正如我们上面所说的,集群发现有Partition的Leader失效了,这个时候就要从ISR列表中重新选举Leader。如果ISR列表为空是不是就不可用了?并不会,而是从Partition存活的副本中选择一个作为Leader,不过这就有潜在的数据丢失的隐患了。所以,只要将Topic副本个数设置为和Broker个数一样,Kafka的多副本冗余设计是可以保证高可用的,不会出现一宕机就不可用的情况(不过需要注意的是Kafka有一个保护策略,当一半以上的节点不可用时Kafka就会停止)。那仔细一想,Kafka上是不是有副本个数为1的Topic?问题出在了 __consumer_offset 上, __consumer_offset 是一个Kafka自动创建的 Topic,用来存储消费者消费的 offset (偏移量)信息,默认 Partition数为50。而就是这个Topic,它的默认副本数为1。如果所有的 Partition 都存在于同一台机器上,那就是很明显的单点故障了!当将存储 __consumer_offset 的Partition的Broker给Kill后,会发现所有的消费者都停止消费了。这个问题怎么解决?第一点需要将 __consumer_offset 删除,注意这个Topic时Kafka内置的Topic,无法用命令删除,我是通过将 logs 删了来实现删除。第二点需要通过设置 offsets.topic.replication.factor 为3来将 __consumer_offset 的副本数改为3。通过将 __consumer_offset 也做副本冗余后来解决某个节点宕机后消费者的消费问题。参考:cid:link_0
-
#华为云开年采购季#15+云原生产品全场低至3折,新客1.98元起! -Redis低至6折,Kafka、RabbitMQ、RocketMQ等低至5.5折,3年5折! -数字资产链新客119元起,华为链新客19.8元起! -软件开发生产线CodeArts,新客试用1.98元起! -漏洞扫描、二进制成分分析、APP合规检测6.6折起! -Serverless应用引擎CAE,新客试用109元起! -ROMAConnect集成平台,打通新老应用,试用19.8元起! 下单抽FreeBuds无线耳机!更多产品优惠戳:http://t.cn/A6a7JhLQ 活动有效期至2023年3月31日,上云正当时!>点击这里,马上进入活动专场<
-
概述Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。1、kafka 安装安装JDKtar xvf jdk1.8.0_231.tar.gz -C /usr/local && cd /usr/local ln -sv jdk1.8.0_231 jdk vim /etc/profile.d/java.sh JAVA_HOME=/usr/local/jdk PATH=$JAVA_HOME/bin:$PATHzookeeper安装(或使用kafka自带的)vim /usr/local/kafka/zookeeper/conf/zoo.cfg tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper clientPort=2181 maxClientCnxns=0 # 集群版的zookeeper添加如下配置 # server.1=ip1:2888:3888 # server.2=ip2:2888:3888 # server.3=ip3:28888:3888下载kafka和安装kakfawget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd /usr/local ln -sv kafka_2.11-0.10.2.1.tgz kafka修改kafka启动内存vim /usr/local/kafka/bin/kafka-server-start.sh export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"kafka启动和停止/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties /usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties单独安装kafka的启停方式/usr/local/zookeeper/bin/zkServer.sh stop|stop2、 kafka 设置外网访问(如有需要提供外网访问)前提条件需要一个解析到内网ip地址的域名,内网环境也可以设置/etc/hosts参数设置host.name=kafka.test.com(对应的域名解析需要解到内网ip)高版本已弃用。低版本0.10.2.1可以用, 仅当listeners属性未配置时被使用,已用listeners属性代替。表示broker的hostnameadvertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,设置了advertised.listeners不用设置host.name)注册到zookeeper上并提供给客户端的监听器,如果没有配置则使用listeners。advertised.host.name(不需要设置,仅作参考)已弃用。仅当advertised.listeners或者listeners属性未配置时被使用。官网建议使用advertised.listenerslisteners(不需要设置,仅作参考)需要监听的URL和协议,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。如果未指定该配置,则使用java.net.InetAddress.getCanonicalHostName()函数的的返回值修改上broker的/etc/hosts文件[内网ip] kafka.test.com修改外网访问服务器上的/etc/hosts文件[外网ip] kafka.test.com3、kafka消费调试生产者/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC消费者/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外网IP:9092 --topic TOPIC --from-beginning --max-messages 14、logstash调试output { stdout { codec => rubydebug { metadata => true } } }5、logstash无法消费kakfa日志的问题排查a、topics_pattern 通配问题".* ","."一定不能少 topics_pattern=>"prefix-.*" b、filter中匹配规则,注意要能匹配到kafka中topic,不同的filebeat和不同的logstash版本对应的topic元数据可能不太一样,这点需要注意 if [type] =~ "prefix-*" { grok { match =>["[type]","^prefix-(?<index_name>)"] } } if [kafka][topic] =~ "prefix-*" { grok { match => [ "[kafka][topic]", "^prefix-(?<index_name>.*$)" ]} } if [@metadata][topic] =~ "prefix-*" { grok { match =>["[@metadata][topic]","^prefix-(?<index_name>)"] } } if [@metadata][kafka][topic] =~ "prefix-*" { grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?<index_name>.*$)" ]} }原文链接:cid:link_0zixun/5909.html
-
一、引言 图像分割是一个跨学科的研究方向,涉及人工智能、机器学习、模式识别等。随着计算机技术的不断发展,图像分割的应用领域越来越广泛,特别是在农业、军事、遥感气象、医疗保健以及智能交通等领域有着重要的应用价值。目前,图像分割技术主要有基于阈值、基于边缘、基于聚类以及基于神经网络的方法。在诸多技术中,聚类法是最有效的方法之一,主要有 K-means 聚类、模糊 C-means 聚类、密度山峰聚类以及减法聚类等。 K-means 聚类方法语义明确、结构简单、计算速度快,是图像分割技术中最常用的聚类算法。随着研究的深入,国内外学者在基于 K-means 聚类法图像分割方面做了大量研究,并取得了丰富的研究成果。 KEEGAN 等人提出了一种基于多通道的图像分割方法,允许用户结合自己的信息通道,利用逻辑框架定义多目标函数来实现图像分割。 PHAM 等人针对图像分割中灰度不均匀的问题,提出了改进 K-means 算法中的目标函数来处理图像分割中的不均匀性,提高了图像分割的精度。 WALVOORT 等人选择均方最短距离作为目标函数,使用 K-means 聚类法使其最小化,结果表明在合理的计算范围内这种算法得到的效果最优。陈科尹等人提出基于统计直方图 K-means 聚类的水稻冠层图像分割方法,分别与 K-means、K-means++、 k-mc2、 afk-mc2 共 4 种常用的均值聚类水稻冠层图像特征像素提取方法进行对比,结果表明基于统计直方图K-means 聚类算法均优于以上 4 种聚类方法。王爱莲等人探讨 K-means 算法在图像分割时在 RGB 和 YUV 颜色空间的分割结果,结果表明使用 YUV 混合模型比单一YUV 颜色空间的分割效果更佳。郎成洪等人在医学领域利用 K-means++ 聚类算法进行区域分类,减少了错误的局部极小值。乔雪等人在马铃薯病虫害图像提取中采用 K-means 的图像分割方法,能够准确、完整地将目标病虫害色斑从彩色图像中提取出来,在农业病虫害治理方面具有较好的应用价值。 ✳️ 二、K-means 聚类算法原理 K-means 聚类算法是将原始数据集划分为 k 个不相交的样本数据组。首先,随机选取 k 个初始聚类中心,计算其他数据组到初始聚类中心的欧几里德距离。根据最邻近原则,将距离最小的数据组分配至与其距离最小的聚类中心对应的数据组。其次,分配完成后重新计算每个新的聚类中心。最后,计算每个聚类中心和每个数据组之间的新欧几里德距离,重复迭代直至到达确定的阈值后停止计算。利用基于 K-means聚类算法分割图像时,根据图像像素点的特征进行聚类。设一幅图像的分辨率为 x*y,该图像聚类为 k 个数据组,设 P(x,y)为输入像素, Ck为聚类中心,计算原理如下。 (1)合理选择 k 个初始聚类中心; (2)根据式(1)计算图像中心和每个像素之间的欧氏距离 d; (3)根据距离 d 将所有像素分配至最近的中心; (4)在所有像素分配后,根据式(2)重新计算新的聚类中心; (5)重复迭代计算,直至满足确定的阈值后停止计算; (6)返回图像分割结果。 ✳️ 三、图像聚类分割实例 通过Matlab算法实现了基于K-means(K均值)聚类算法的图像分割,结果如下图所示:当使用3个聚类时,小猫能显著从背景中分割出来,但小猫内部白色毛发并未进行有效分割,当使用5~6个聚类时,小猫白色肚皮可进行有效分割。效果较为显著。 图1 基于K-means聚类图像分割结果 ✳️ 四、参考文献 [1] 高樱萍 , 宋丹 , 王雅静 , 等 . 一种改进的 K-means 聚类服装图像分割算法 [J]. 湖南工程学院学报 ( 自然科学版 ),2021,31(2):54-59. [2] 李立军 , 张晓光 . 基于动态粒子群优化与 K-means 聚类的图像分割算法 [J]. 现代电子技术 ,2018,41(10):164-168. [3]KEEGAN M S,SANDBERG B,CHAN T F.A multiphase logic framework for multichannel image segmentation[J].InverseProblems and Imaging,2012,6(1):95-110. [4]PHAM D L,PRINCE J L.An adaptive fuzzy c-means algorithm for image segmentation in the presence of intensity inhomogeneities[J].Pattern Recognition Letters,1999,20(1):57-68. [5]WALVOORT D,BRUS D J,GRUIJTER J.An R package for spatial coverage sampling and random sampling from compactgeographical strata by k-means[J].Computers & Geosciences, 2010,36(10):1261-1267. ✳️ 五、Matlab代码获取 上述Matlab代码,可私信博主获取。 ———————————————— 版权声明:本文为CSDN博主「matlab科研中心」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/m0_70745318/article/details/127952210
-
#华为云11.11# 实惠更实用·“11”都如愿云中间件、DevSecOps、数字资产管理年度钜惠,多重福利来袭!福利一:全场低至3折,新客试用1.98元起!福利二:领万元礼包,15+款产品券后低至5.8折!福利三:下单抽好礼,有机会获FreeBuds耳机!福利四:新购满额送,华为P50 Pro等好礼等你拿!现在上云,一惠到底!活动有效期至2022年11月30日,上云正当时!>点击这里,马上进入活动专场<
-
【功能模块】 环境:HD(6.5.1.3,x86_64) Kafka-2(2.11-1.1.0) 已开启安全认证 开源flink 1.14.2 、flink-kafka-connector-1.14.2【操作步骤&问题现象】1、需要使用开源flink连接HD(6.5.1.3,x86_64)的kafka2、已经将flink kafka connector中开源kafka-client替换为kafka-clients-2.4.0-h0.cbu.mrs.313.r103、在flink-conf.yml 中配置了认证信息#认证参数java.security.auth.login.config: /data/abc/jaas.confsecurity.kerberos.login.keytab: /data/abc/user.keytabsecurity.kerberos.login.principal: abc@EXAMPLE.COMsecurity.kerberos.login.contexts: Client,KafkaClientsecurity.kerberos.login.use-ticket-cache: false其中jaas.conf如下样例KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};Client {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab=" /data/abc/user.keytab"principal="abc@EXAMPLE.COM"useTicketCache=falsestoreKey=truedebug=true;};4、代码中kafka消费者除了基本配置也添加了如下配置sasl.kerberos.service.name=kafkasecurity.protocol=SASL_PLAINTEXTsasl.mechanism=GSSAPIkerberos.domain.name=hadoop.example.com5、开源flink提交以yarn per job 方式提交【flink、yarn都是开源版本,无安全认证,只有kafka是FI hd的】【截图信息】我想确认一下hd 6513的kafka是否支持高版本flink连接,如果支持我应该做如何调整?【日志信息】(可选,上传日志内容或者附件)
-
【摘要】 kafka消费端性能优化主要从下面几个方面优化:1.接口使用方面优化:旧版本highlevel-consumer:偏移量信息存储在zookeeper,最大消费线程数与分区数量相同,不推荐旧版本simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐新版本highlevel-consumer:偏移量信息存储在kafka指定的topic中,默认...本文分享自华为云社区《FusionInsight HD&MRS:kafka消费端性能优化方法》,作者: 穿夹克的坏猴子 。kafka消费端性能优化主要从下面几个方面优化:1.接口使用方面优化:旧版本highlevel-consumer:偏移量信息存储在zookeeper,最大消费线程数与分区数量相同,不推荐旧版本simpleconsumer:自行选择存储偏移量的方式,可以实现多线程消费单分区,若无特殊的性能要求,不推荐新版本highlevel-consumer:偏移量信息存储在kafka指定的topic中,默认情况下最大消费线程数与分区数量相同,可以实现多线程消费单分区,推荐2.参数调优(以下参数需根据现网环境评估调至合适的值):2.1 旧版本消费者(kafka old API)参数调优fetch.message.max.bytes:该参数为一次性从kafka集群中获取的数据块大小。在升级到651版本后这个参数需要调大,否则容易出现获取数据限制的报错。建议调整大小不小于kafka的服务端参数message.max.bytes。注意如何确认为旧版本:如果生产者的配置方式包含如下这些配置,则为旧版本:group.id/zookeeper.connect2.2 新版本参数(kafka new API)参数调优max.poll.records:意味消费者一次poll()操作,能够获取的最大数据量,调整这个值能提升吞吐量,于此同时也需要同步提升max.poll.interval.ms的参数大小fetch.max.bytes:意味server端可返回给consumer的最大数据大小,增加可以提升吞吐量,但是在客户端和服务端网络延迟比较大的环境下,建议可以减小该值,防止业务处理数据超时。heartbeat.interval.ms:消费超时时间,consumer与kafka之间的超时时间,该参数不能超过session.timeout.ms,通常设置为session.timeout.ms的三分之一,默认值:3000max.partition.fetch.bytes:限制每个consumer发起fetch请求时候,读到数据(record)的限制,设置过大,consumer本地缓存的数据就会越多,可能影响内存的使用,默认值:1048576fetch.max.bytes:server端可返回给consumer的最大数据大小,数值可大于max.partition.fetch.bytes,一般设置为默认值即可,默认值:52428800session.timeout.ms:使用consumer组管理offset时,consumer与broker之间的心跳超时时间,如果consumer消费数据的频率非常低,建议增大这个参数值,默认值:10000auto.offset.reset:消费过程中无法找到数据消费到的offset位置,所选择的消费策略,earliest:从头开始消费,可能会消费到重复数据,latest:从数据末尾开始消费,可能会丢失数据。默认值:earlistmax.poll.interval.ms:消费者在每一轮poll() (拉取数据之间的最大时间延迟),如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将触发rebalance,以便将分区重新分配给别的成员。如果,再两次poll之间需要添加过多复杂的,耗时的逻辑,需要延长这个时间,默认值:300smax.poll.records:消费者一次poll()操作,能够获取的最大数据量,增加这个参数值,会增加一次性拉取数据的数据量,确保拉取数据的时间,至少在max.poll.interval.ms规定的范围之内,默认值:5002.3 Simpleconsumer参数调优simpleconsumer在初始化阶段需要传一个fetchsize的参数,比如:consumer=new SimpleConsumer(leaderBroker,a_port,100000,64*1024,clientName)中64*1024,该参数表示simpleconsumer一次性获取的数据大小,如果该值过大则可能会导致request时间过长,使用过程中应该降低这个值,保证消费频率,使用SimpleConsumer的核心需求是:多线程消费单个分区,以达到提升性能的要求,如果没有这样需求,不建议使用这个这种消费方式3.消费端频繁rebalance导致性能下降调优:3.1因业务处理能力不足导致的:session.timout.ms控制心跳超时时间。heartbeat.interval.ms控制心跳发送频率,建议该值不超过session.timout.ms的三分之一。max.poll.interval.ms控制每次poll的间隔,时间=获取数据的时间+处理数据的时间,如果max.poll.records设定的值在max.poll.interval.ms指定的时间内没有处理完成会触发rebalance,这里给出一个相对较为合理的配置,建议在预计的处理时间的基础上再加1分钟。max.poll.records 每个批次处理的数据条数,默认为500条。如果处理能力较低,建议可以减小这个值。3.2 非正常消费者频繁的访问kafka集群导致频繁rebalance:收集kafka-request.log,查看异常的topic有哪些客户端节点在消费,cat kafka-request.* | grep “topic=topicName” | grep “apikey=FETCH” | awk –F’from connection’ ‘{print $2}’ | awk –F’;’ ‘{print $1}’ | awk –F’-’ ‘{print $2}’ | awk –F’:’ ‘{print $1}’ | sort | uniq –c | sort -nr ,找出不应该产生消费行为的节点,停止异常节点上消费者4.版本引发性能下降优化FI 8.0.2版本之前kafka SimpleAclAuthorizer鉴权异常导致性能下降,8.0.2版本在使用非安全端口(21005或者9092端口)时会出现集群性能下降的问题,表现:kafka-root.log中出现大量ExitcodeException:id:Default#Principal:no such user报错解决办法:升级到FI 8023以上版本临时规避办法:业务侧使用21007端口访问kafka,去掉鉴权插件即allow.everyone.if.no.acl.found=true,将以下kafka服务端配置置为空:authorizer.class.name=5.FI 6513~6516版本的内核问题引发的性能异常6513版本在kafka引入社区的的lazy index功能后,在新的segment创建的过程中可能会导致并发创建失败的问题,常见的报错(server.log中)如以下两种类型:(1)java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code(2)java.lang.IllegalArgumentException: requirement failed: Attempt to append to a full index当出现以上两种类型的报错的时候可以断定是版本问题导致,问题预警如:https://support.huawei.com/enterprise/zh/bulletins-product/ENEWS2000007844;解决方案:升级到6517版本以上版本或者打入紧急补丁:https://support.huawei.com/enterprise/zh/cloud-computing/fusioninsight-hd-pid-21110924/software/251482609?idAbsPath=fixnode01%7C7919749%7C7941815%7C19942925%7C250430185%7C21110924;临时规避方案:重启异常的broker实例
-
(An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.)Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - LOOKING_UP_SERVER)
-
【功能模块】System.setProperty("java.security.auth.login.config", jaasPath);这个系统配置,无法通过,并且我的jaas.conf文件 放在linux目录下,是可以访问到的,程序已经测试过了,什么原因,我使用的是华为云MRS服务2.1.x版本的样例工程下载地址为:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。这个示例代码。
-
Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。以下方案是可以使用事件中心的部分方案:异常情况检测(欺诈/离群值)应用程序日志记录分析管道,例如点击流实时仪表板存档数据事务处理用户遥测数据处理设备遥测数据流式处理为何使用事件中心?仅当能够轻松处理并且能够从数据源获取即时见解时,数据才有价值。 事件中心提供低延迟、可无缝集成的分布式流处理平台,并在 Azure 的内部和外部提供数据和分析服务,用于构建完整的大数据管道。事件中心充当事件管道的“前门”,在解决方案体系结构中通常称作“事件引入器” 。 事件引入器是位于事件发布者与事件使用者之间的组件或服务,可以将事件流的生成与这些事件的使用分离开来。 事件中心提供统一的流式处理平台和时间保留缓冲区,将事件生成者与事件使用者分离开来。以下部分介绍 Azure 事件中心服务的重要功能:完全托管的 PaaS事件中心是完全托管的平台即服务 (PaaS),其配置或管理开销极低,因此你可以专注于业务解决方案。 Apache Kafka 生态系统的事件中心提供 PaaS Kafka 体验,使用户无需管理、配置或运行群集。支持实时处理和批处理实时引入、缓冲、存储和处理流,以获取可行的见解。 事件中心使用分区的使用者模型,可让多个应用程序同时处理流,并允许控制处理速度。 Azure 事件中心还能与 Azure Functions 集成,以构成无服务器体系结构。捕获事件数据在 Azure Blob 存储或 Azure Data Lake Storage 中近乎实时地捕获数据,以进行长期保留或微批处理。 可以基于用于派生实时分析的同一个流实现此行为。 设置捕获极其简单。 无需管理费用即可运行它,并且可以使用事件中心吞吐量单位或处理单位自动对它进行缩放。 使用事件中心可以专注于数据处理而不是数据捕获。可缩放使用事件中心可以从 MB 量级的数据流着手,然后逐步扩展到 GB 甚至 TB 量级的处理。 自动扩充功能是用于根据用量需求扩展吞吐量单位数或处理单位数的众多选项之一。丰富的生态系统借助基于行业标准 AMQP 1.0 协议并提供各种语言(.NET、Java、Python、JavaScript)的广泛生态系统,可以轻松地从事件中心开始处理流。 所有支持的客户端语言提供低级别集成。 该生态系统还为你提供了与 Azure 服务(如 Azure 流分析和 Azure Functions)的无缝集成,使你能够构建无服务器体系结构。用于 Apache Kafka 的事件中心Apache Kafka 生态系统的事件中心还可让 Apache Kafka(1.0 和更高版本)客户端和应用程序与事件中心通信。 无需设置、配置或管理你自己的 Kafka 和 Zookeeper 群集,也不需要使用某些不是 Azure 原生的 Kafka 即服务产品/服务。事件中心高级层和专用层事件中心高级层能够满足高端流式处理需求,即在托管的多租户 PaaS 环境中要求更高的性能、更好的隔离性、可预测的延迟和最小的干扰。 在标准产品/服务的所有功能之上,高级层提供了多种额外功能,例如动态分区纵向扩展、延长保持和客户管理的密钥。 有关详细信息,请参阅事件中心高级层。事件中心专用层提供单租户部署来满足客户的最苛刻流式处理需求。 此单租户套餐提供有保障的 99.99% SLA,只能在专用定价层上使用。 事件中心群集每秒能够引入数百万个事件,且提供有保障的容量和亚秒级的延迟。 在专用群集中创建的命名空间和事件中心不仅仅包括高级产品/服务的所有功能。 有关详细信息,请参阅事件中心专用层。如需更多详细信息,请参阅事件中心层之间的比较。Azure Stack Hub 上的事件中心使用 Azure Stack Hub 上的事件中心可以实现混合云方案。 支持使用基于流式处理和事件的解决方案进行本地处理和 Azure 云处理。 无论方案是混合(联网)还是离线的,解决方案都支持大规模的事件/流处理。 方案仅受事件中心群集大小的约束,但你可以根据需要预配群集大小。(Azure Stack Hub 和 Azure 上的)事件中心版本提供高度的功能奇偶一致性。 这种奇偶一致性意味着 SDK、示例、PowerShell、CLI 和门户提供类似的体验(差异很小)。有关详细信息,请参阅 Azure Stack Hub 上的事件中心概述。重要的体系结构组件事件中心包含以下关键组件:事件生成者:向事件中心发送数据的所有实体。 事件发布者可以使用 HTTPS、AMQP 1.0 或 Apache Kafka(1.0 和更高版本)发布事件。分区:每个使用者只读取消息流的特定子集或分区。使用者组:整个事件中心的视图(状态、位置或偏移量)。 通过使用者组来使用应用程序时,每个应用程序都有事件流的单独视图。 使用者根据自身的步调和情况独立读取流。吞吐量单位(标准层)或处理单位(高级层)或容量单位(专用):预先购买的容量单位,用于控制事件中心的吞吐量容量。事件接收者:从事件中心读取事件数据的所有实体。 所有事件中心使用者通过 AMQP 1.0 会话进行连接。 事件中心服务在事件变得可用时通过会话来提供事件。 所有 Kafka 使用者都通过 Kafka 协议 1.0 及更高版本进行连接。下图显示了事件中心流处理体系结构:后续步骤要开始使用事件中心,请参阅“发送和接收事件”教程 :.NET CoreJavaPythonJavaScriptGoC(仅发送)Apache Storm(仅接收)
-
【功能模块】【操作步骤&问题现象】1、配置flinksql2、点击运行【截图信息】【日志信息】(可选,上传日志内容或者附件)WARN RocketmqClient [] - can not find default topic, autoCreateTopicEnable is false
-
【摘要】 Kafka作为一个中间件组件,具有与其他中间件组件通用的功能 (异步处理、系统解耦、流量削峰、日志处理), 但在某些特殊的功能方面,每个中间件拥有其独特的特性,其中Kafka作为一个具有高吞吐、高性能的中间件, 它也有其不足的地方,在某些应用场景下面要求中间件实现消息延时的功能,但Kafka本身是不具备这种能力的。 此DEMO项目实现了每条消息实现自定义的延时,详细内容可阅读文章进行了解。《目录》背景开发环境云服务介绍方案设计方案简述方案架构图时序图代码参数指南代码实现结果反馈1、背景Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。2、开发环境3、云服务介绍分布式消息服务Kafka版: 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。4、方案设计i、方案简述此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。ii、方案架构图Kafka消息延时方案架构图 Kafka消息延时实现思路 1. 生产者将生产消息存入topic_delay主题中进行存储。 2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。 3. 取值判断是否满足延时要求。 a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。 b. 如果不满足延时要求,则等待自定义时间后重试判断。 4. 消费者最终从topic_out主题中拉取消息进行消费。iii、方案时序图Kafka消息延时方案时序图 5、代码参数指南本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。 #### Delay.java参数详情 1. delay:自定义延时时间,单位ms。 2. topic_delay变量:用于临时存储消息的topic名称。 3. topic_out变量:用于消费者拉取消息消费的topic名称。 4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs6、代码实现实现代码可参考Kafka消息延时 ``` package com.dms.delay; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedQueue; /** * Hello world! * */ public class Delay { //缓存队列 public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue(); //延迟时间(20秒),可根据需要设置延迟大小 public static long delay = 20000L; /** *入口 * @param args */ public static void main( String[] args ) { //延时主题(用于控制延时缓冲) String topic_delay = "topic_delay"; //输出主题(直接供消费者消费) String topic_out = "topic_out"; /* 消费线程 */ new Thread(new Runnable() { @Override public void run() { //消费者配置。请根据需要自行设置Kafka配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //指定消费主题 consumer.subscribe(Arrays.asList(topic_delay)); while (true) { //轮询消费 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10)); //遍历当前轮询批次拉取到的消息 for (ConsumerRecord<String, String> record : records){ System.out.println(record); //将消息添加到缓存队列 link.add(record); } } } }).start(); /* 生产线程 */ new Thread(new Runnable() { @Override public void run() { //生产者配置(请根据需求自行配置) Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建生产者 Producer<String, String> producer = new KafkaProducer<>(props); //持续从缓存队列中获取消息 while(true){ //如果缓存队列为空则放缓取值速度 if(link.isEmpty()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //获取缓存队列栈顶消息 ConsumerRecord<String, String> record = link.peek(); //获取该消息时间戳 long timestamp = record.timestamp(); Date now = new Date(); long nowTime = now.getTime(); if(timestamp+ Delay.delay <nowTime){ //获取消息值 String value = record.value(); //生产者发送消息到输出主题 producer.send(new ProducerRecord<String, String>(topic_out, "",value)); //从缓存队列中移除该消息 link.poll(); }else { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } } ```7、结果反馈
上滑加载中
推荐直播
-
华为开发者空间玩转DeepSeek
2025/03/13 周四 19:00-20:30
马欣 山东商业职业技术学院云计算专业讲师,山东大学、山东建筑大学等多所本科学校学生校外指导老师
同学们,想知道如何利用华为开发者空间部署自己的DeepSeek模型吗?想了解如何用DeepSeek在云主机上探索好玩的应用吗?想探讨如何利用DeepSeek在自己的专有云主机上辅助编程吗?让我们来一场云和AI的盛宴。
即将直播 -
华为云Metastudio×DeepSeek与RAG检索优化分享
2025/03/14 周五 16:00-17:30
大海 华为云学堂技术讲师 Cocl 华为云学堂技术讲师
本次直播将带来DeepSeek数字人解决方案,以及如何使用Embedding与Rerank实现检索优化实践,为开发者与企业提供参考,助力场景落地。
去报名
热门标签