• [生态对接] carbon数据库如何实时抽取数据
      最近遇到个问题,数据上游推送到carbon的数据是实时的,大概5分钟一批。但是carbon数据库不知道怎么才能利用检测工具实时抽取数据到kafka中。有大佬帮忙给个建议吗?
  • [二次开发] 本地环境消费开源kafka,将数据写入mrs kafka
    生产者代码:报错情况:
  • [技术干货] springboot如何配置多kafka【转】
    springboot配置多kafkakafka,说起这个玩意,大家应该都不陌生,不知道啥是kafka的直接去搜索就像MySQL的使用一样,我们在用kafka的时候,也会碰到需要使用多个kafka的情况,比如我从kafkaA的一个topic里消费出数据,进行一次处理,然后我要写入到kafkaB的topic里从网上找了很多配置多kafka的教程,但是都不大好使,后来还是找到了,加上我自己改了点点东西,贴出来和大家分享一下~首先是pom文件,kafka的依赖1234<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>配置文件spring:   kafka:     listener:       concurrency: 10     one:       # kafka地址       bootstrap-servers: 192.168.217.117:9092       producer:         # 生产者每次发送消息的时间间隔(毫秒)         linger-ms: 5000         # 单条消息最大值(字节)         max-request-size: 16384         # 每次批量发送消息的数量         batch-size: 16384         # 缓存区大小         buffer-memory: 33554432         # 队列         topic: test       consumer:         # 是否自动提交         enable-auto-commit: true         # 队列         topic: topic         # group ID         group-id: consumer     two:       # kafka地址       bootstrap-servers: 192.168.217.160:9092       producer:         # 生产者每次发送消息的时间间隔(毫秒)         linger-ms: 100         # 单条消息最大值(单位为字节,且大小指的是经过序列化后的大小)         max-request-size: 16384         # 每次批量发送消息的数量         batch-size: 16384         # 缓存区大小         buffer-memory: 33554432         # 队列         topic: test       consumer:         # 是否自动提交         enable-auto-commit: true         # 队列         topic: test         # group ID         group-id: consumer有了配置文件当然就要读取配置文件了先读取第一个kafka@EnableKafka @Configuration public class KafkaConfigOne {       @Value("${spring.kafka.one.bootstrap-servers}")     private String bootstrapServers;     @Value("${spring.kafka.one.consumer.enable-auto-commit}")     private boolean enableAutoCommit;     @Value("${spring.kafka.one.consumer.group-id}")     private String groupId;     @Value("${spring.kafka.one.producer.linger-ms}")     private Integer lingerMs;     @Value("${spring.kafka.one.producer.max-request-size}")     private Integer maxRequestSize;     @Value("${spring.kafka.one.producer.batch-size}")     private Integer batchSize;     @Value("${spring.kafka.one.producer.buffer-memory}")     private Integer bufferMemory;         @Bean     public KafkaTemplate<String, String> kafkaOneTemplate() {         return new KafkaTemplate<>(producerFactory());     }       @Bean     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {         ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory());         factory.setConcurrency(3);         factory.getContainerProperties().setPollTimeout(3000);         return factory;     }       private ProducerFactory<String, String> producerFactory() {         return new DefaultKafkaProducerFactory<>(producerConfigs());     }       public ConsumerFactory<Integer, String> consumerFactory() {         return new DefaultKafkaConsumerFactory<>(consumerConfigs());     }       private Map<String, Object> producerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);         props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);         props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);         props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         return props;     }       private Map<String, Object> consumerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);         props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         return props;     } } 接着读取第二个kafka@EnableKafka @Configuration public class KafkaConfigTwo {       @Value("${spring.kafka.two.bootstrap-servers}")     private String bootstrapServers;     @Value("${spring.kafka.two.consumer.enable-auto-commit}")     private boolean enableAutoCommit;     @Value("${spring.kafka.two.consumer.group-id}")     private String groupId;     @Value("${spring.kafka.two.producer.linger-ms}")     private Integer lingerMs;     @Value("${spring.kafka.two.producer.max-request-size}")     private Integer maxRequestSize;     @Value("${spring.kafka.two.producer.batch-size}")     private Integer batchSize;     @Value("${spring.kafka.two.producer.buffer-memory}")     private Integer bufferMemory;         @Bean     public KafkaTemplate<String, String> kafkaTwoTemplate() {         return new KafkaTemplate<>(producerFactory());     }       @Bean     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {         ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory());         factory.setConcurrency(3);         factory.getContainerProperties().setPollTimeout(3000);         return factory;     }       private ProducerFactory<String, String> producerFactory() {         return new DefaultKafkaProducerFactory<>(producerConfigs());     }       public ConsumerFactory<Integer, String> consumerFactory() {         return new DefaultKafkaConsumerFactory<>(consumerConfigs());     }       private Map<String, Object> producerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);         props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);         props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);         props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         return props;     }       private Map<String, Object> consumerConfigs() {         Map<String, Object> props = new HashMap<>();         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);         props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         return props;     } } 使用:12345     @Autowired    @Qualifier("kafkaTwoTemplate")    private KafkaTemplate kafkaTwoTemplate;  kafkaTwoTemplate.send("topic", "message");直接注入使用就可以,但是,在这儿注意,因为配置了多个kafka,所以需要进行区分,此处我使用@Autowired和@Qualifier连用,大家也可以使用@DataSource,这个就是多数据源的注解而已,无所谓,按照个人习惯进行使用就OK当然,还有消费者1234@KafkaListener(topics = {"#{'${spring.kafka.two.consumer.topic}'}"}, containerFactory = "kafkaTwoContainerFactory")   public void listenerTwo (String data) {       System.out.println(data);   }
  • [运维管理] FusionInsight HD 6513 在线升级 FusionInsight HD 6517版本 需要多长时间?怎么评估的?
    FusionInsight HD 6513 在线升级 FusionInsight HD 6517版本  需要多长时间?怎么评估的?
  • [运维管理] FusionInsight HD 6513升级 FusionInsight HD 6517版本,是否支持部分组件(如kafka 、zookeeper)在线升级,其他组件离线升级?
    FusionInsight HD 6513升级 FusionInsight HD 6517版本,是否支持部分组件在线升级,其他组件离线升级?
  • [问题求助] flinksql消费DRS过来的kafka消息,关键字怎么处理
    kafka消息格式,参考cid:link_0带有table、sql、old等关键字,flinksql建表时sql校验不通过,这种情况要如何处理
  • [技术干货] 架构必备能力——kafka的选型对比及应用场景-转载
     一、Kafka的模型与优势 1. Kafka 模型 在Kafka中,有几个主要的概念需要了解,包括broke、topic、partition等  Broker Broker是Kafka集群中的一个节点,可以理解为是一个Kafka实例,一个Kafka集群由多个Broker组成。Broker负责存储数据,处理客户端的请求,以及协调分布式环境下的各种工作。  Topic Topic是Kafka中消息的基本单位,相当于消息的分类。每个Topic包含了若干个消息,这些消息被存储在Broker中的一个或多个Partition中。一个Topic可以有多个Partition,同时每个Partition只能属于一个Topic。Topic名称是一个字符串,通常表示Topic所代表的业务或功能,例如"order"、"log"等等。  Partition Partition是Kafka中的一个概念,表示一个物理上的存储单元。一个Topic可以被分割成多个Partition,每个Partition可以存储一定数量的消息。Partition中的消息是有序的,每个消息都有一个唯一的编号,称为Offset。Offset是Partition中消息的唯一标识符,客户端可以根据Offset从Partition中读取消息。  当然,Partition 其实也分种类,有着主备关系。如上图,Partition1 在 Broker1中是主分区(Leader)用实线表示。在其他分区中也有Partition1 ,但都是备份(follower),用虚线表示。  2. Kafka 优势 不难看出,Kafka的设计很像分布式文件系统,因为天然就是要多个Broker节点,所以具有很大的吞吐能力。再加上可选数量的备份,配合以高效的数据存储,使得其有很强的性能,我们可以总结一下Kafka的几项优势   高吞吐率 Kafka的高吞吐率是其最为突出的优势。在Kafka的设计中,每个分区都有多个副本,如果需要,每个副本都可以独立地对外提供服务。这种设计使得Kafka能够轻松地扩展到数千个节点,从而实现高吞吐率的数据传输。此外,Kafka支持批量消息传送,可以将小消息合并为一个大的批处理消息,从而减少网络传输的开销。  可靠性高 Kafka的分布式设计和多副本机制可以保证数据的高可靠性。每个分区都有多个副本,一旦某个副本出现故障,其他副本会自动接管服务。此外,Kafka支持消息的持久化存储,即使出现消息传输中断或节点崩溃,也可以在节点恢复后重新传输数据。  灵活性高 Kafka的灵活性也是其优势之一。Kafka不仅能够作为消息中间件,还可以作为日志收集和数据处理的平台。此外,Kafka的存储模型很灵活,支持多种不同的数据类型和格式,可以自定义消息格式和处理逻辑。  当然,除了性能优异,Kafka 的生态系统也很丰富,有多种不同的消费者和生产者客户端,支持多种编程语言,例如Java、Python和Go等。此外,Kafka还提供了Kafka Connect和Kafka Streams API,可以将Kafka与不同的外部系统集成,并且支持实时数据处理和流式计算。  二、Kafka与竞争对手的区别 1. 与RabbitMQ相比 在之前的文章《消息队列选型——为什么选择RabbitMQ》 中,其实我们已经对Kafka RabbitMQ进行了一些对比,这里我把当时的对比表格再放出来:   RabbitMQ是一个流行的AMQP消息代理,可提供很好的消息传递性能,还可以在高可靠性和事务性方面提供更好的支持。然而,相对于Kafka,RabbitMQ在可扩展性和处理大量数据时的性能方面不够强大。但是,对于许多大数据应用程序来说,Kafka的可扩展性和性能优势使其成为更好的选择。  2. 与ActiveMQ相比 ActiveMQ是Apache旗下的分布式消息代理,可提供良好的Java集成和可靠性。  对比项    ActiveMQ    Kafka 应用场景    应用于企业内部的消息传递、集成、异步通信等    应用于大规模数据处理、流式计算等 消息存储模式    消息被发送到队列或主题,存储在磁盘上    消息以分区的方式存储在Kafka集群的磁盘上 消息消费    消息被消费后会被删除    消息被消费后不会立即删除,而是根据设置的保留时间保留在磁盘上 吞吐量    吞吐量相对较低    吞吐量相对较高 可扩展性    相对较差    相对较好 消息保证    支持消息事务,可保证消息的可靠性    支持至少一次消息传递,不保证消息的可靠性 消息顺序保证    支持消息顺序保证    支持基于分区的消息顺序保证 管理维护    相对较简单    相对较复杂 生态系统    生态系统相对较完善    生态系统相对较单一 开发难度    开发难度相对较大    开发难度相对较低 消息传递方式    传递方式基于TCP协议    传递方式基于TCP协议,支持Zero-copy技术 消息过滤器    支持类SQL语言的消息过滤器    不支持消息过滤器 消息分发机制    消费者需要轮询服务器获取消息    消息通过推模式由服务器主动分发给消费者 消息重复消费问题    相对较少    相对较多 但是,相对于Kafka,ActiveMQ 在处理大量数据时的性能不足,并且在滞后和可扩展性方面也存在问题,这意味着,Kafka 在高性能、大规模数据处理时,具备很强的优势。  3. 与RocketMQ相比 Kafka和RocketMQ都是流行的分布式消息队列系统,它们都可以用于数据传输和处理,他们的一些特征对比如下  特性    Kafka    RocketMQ 适用场景    大规模实时数据处理,高吞吐量,低延迟    大规模分布式消息传递和处理 数据模型    基于日志的消息传递模型,消息有序    基于 JMS 的消息传递模型,支持消息批量发送 存储方式    消息使用队列存储,副本机制保证数据可靠性    消息使用主题存储,支持多种存储方式 分区设计    分布式分区,水平扩展容易    分布式分区,支持水平、竖直扩展 性能表现    高吞吐量,低延迟,处理大数据流效果更佳    处理高并发、大数据流效果更佳 可靠性    通过多个副本保证数据可靠性,并具有良好的容错性    基于分布式架构,具有较强的可靠性和容错性 社区支持    开源社区支持广泛,文档丰富,插件可扩展    独立开源社区支持,文档和插件相对较少 总的来说,RocketMQ在性能方面与Kafka相当。至于社区的话,两者现在都是Apache软件基金会的顶级项目,Kafka最初是由LinkedIn公司开发的,而RocketMQ最初是由阿里巴巴公司开发的,但是贡献给了Apache软件基金会稍微晚一些,所以相对活跃度低一些,但其在国内应用很广泛。  4. 与Pulsar对比 Apache Pulsar和Apache Kafka都是可扩展、可靠的流式数据平台。它们都具有高可用性、高并发性和高吞吐量,并支持分布式订阅和发布,他们的一些对比如下:  对比项    Apache pulsar    Kafka 发布时间    2017年    2011年 语言    Java    Scala 群集模式    多租户    无多租户 可伸缩性    低延迟和高容量    可扩展性极高 事务    支持    不支持 消息顺序    有序    有序 多语言客户端    支持    支持 跨数据中心复制    支持    支持 批量发送    支持    支持 多租户安全    支持    不支持 社区支持    相对较新,但增长迅速    相对成熟的社区支持 性能    Pulsar在延迟、吞吐量和可伸缩性方面表现出色,特别是在多租户和跨数据中心复制方面。    Kafka在吞吐量和可伸缩性方面表现出色,是一个可靠而高效的消息传递系统。 总的来说,Apache pulsar和Kafka都是高性能分布式消息传递系统,用于实时数据传输。它们都具有不同的功能和性能特点。Apache pulsar具有更多的功能,例如异地复制、多租户设计等,但Kafka具有更高的性能和更成熟的社区支持。  三、 Kafka的典型应用场景 1. 常用场景 消息队列 Kafka可以作为传统消息队列的替代方案。它可以快速传输大量消息,保持消息的可靠性和顺序性,并允许多个消费者读取消息,尽管在MQ功能性上的特点稍逊一筹,相比其他MQ插件,Kafka拥有更好的可扩展性和吞吐量。  日志收集 Kafka可以作为日志收集的理想平台。由于其可靠性和可扩展性,Kafka可以在数百个服务器上实时收集日志,这些日志可以进行后续处理和分析。Kafka的高效处理能力使其成为收集实时日志的最佳选择。我们在《日志搞不定?手把手教你如何使用Log4j2 》 里也提到可以配置Appenders将日志传输至Kafka服务器。 相比其他MQ插件,Kafka预设了这种场景,使用更容易,并且能够处理更高的数据量和更快的数据传输速率。  流处理 Kafka的流处理功能使其成为构建实时处理系统的首选平台。它可以让开发人员通过处理无限流来自动触发和响应事件,并可以在流中使用各种数据处理步骤。与其他MQ插件相比,Kafka使用分布式流处理,可以处理大量的数据并提供更高的可靠性。  事件驱动 Kafka可以作为事件驱动架构的后端,帮助处理大量的事件数据,包括用户行为数据、交易数据、日志数据等。相比其他MQ插件,Kafka拥有更好的扩展性和容错性。  2. 案例分析 场景: 一个大型电商网站需要实时监控用户的购买行为,以便及时调整商品推荐策略和优惠活动,提高用户购买率。这个网站有数千万的用户和数百万个商品,每秒钟会产生成千上万的购买行为事件,如何高效地收集、处理和分析这些数据,是一个非常具有挑战性的问题。  解决方案: 使用Kafka来搭建一个实时数据处理系统,主要包含以下组件:  1.数据收集:在电商网站的应用程序中,使用Kafka的Producer API将用户的购买行为数据发送到Kafka的Topic中。  2.数据处理:在Kafka的消费者端,运行一个或多个消费者进程来处理数据。消费者进程可以使用Kafka Connect将数据写入到NoSQL数据库、Hadoop集群等数据存储系统中。在处理数据时,消费者需要注意以下几个关键点:  保证数据的可靠性:使用Kafka的消息确认机制来保证数据不会丢失或重复处理。 支持分布式处理:使用Kafka的分区机制来实现高效的水平扩展,并避免单点故障的影响。 时间戳管理:在处理数据时,需要记录数据到Kafka中的时间戳来确保正确性。 3.数据分析:使用实时流处理工具,如Apache Storm、JStorm或Apache Flink,对数据进行实时分析和处理,并输出结果到实时报表和仪表盘中。在使用这些工具时,需要注意以下几个关键点:  窗口机制:使用窗口机制来控制处理数据的时间段,以便对数据进行聚合、分析和统计。 数据源管理:与Kafka相同,实时流处理工具也需要支持分布式处理,并且可以通过Kafka Connect来实现数据源的管理。 处理结果数据的可视化:使用可视化工具,如Grafana、Kibana等,将处理结果可视化,并输出到实时报表和仪表盘中,便于业务人员和技术人员了解实时数据变化。 总结 经过上述的讲解,我们不难知道Kafka的应用场景非常广泛,你可以只把他当MQ组件,也可以使用它进行日志传输或流处理。它的特点也非常鲜明,就是强大的吞吐量、扩展性和可靠性。当然它与传统MQ组件对比,它在复杂场景下的使用会比较麻烦。但其在大数据领域应用广泛,比如经常作为 Hadoop 的数据源,将数据传输到 Hadoop 中进行存储和处理。  当然,在实际选型中我们往往要考虑更多问题,除了明确需求和场景,还要考虑已用的技术栈情况、开发语言支持、版本更新情况。并没有哪一种框架是万金油。而对于一些要求比较单薄的场景,可能许多的框架都可以满足要求,那么易用和易维护就会成为选型的关键 ———————————————— 版权声明:本文为CSDN博主「战斧」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/u011709538/article/details/133791167 
  • [问题求助] 救助-java解析KAKFKA从DRS获取的AVRO数据反序列化问题
    问题描述:公司最近使用使用华为kafka,通过DRS获取ORACLE数据库日志文件并且使用ARVO格式序列化到kafka里的topic里,最后通过工具插入目标数据库中。问题点:现要通过java获取topic里的数据,并且反序列化ARVO格式成JSON可识别语句。目前已经获取到topic数据,但是反序列化出错,(使用的是官网给的反序列化demo和schem)kafka 消费者获取数据代码:出错代码行:报错信息:烦请大佬帮忙!!!
  • [技术干货] 最经典的一道JAVA面试题,谈谈你对Kafka零拷贝原理的理解
    最近一位3年工作经验的小伙伴去某厂面试,被问到这样一个问题,说:”请你简单说一下Kafka的零拷贝原理“。然后,这位小伙伴突然愣住了,什么是零拷贝,零拷贝跟Kafka有关系吗?那么今天,我给大家来聊一聊我对Kafka零拷贝原理的理解。1、什么是零拷贝?在实际应用中,如果我们需要把磁盘中的某个文件内容发送到远程服务器上,它必须要经过几个拷贝的过程,如图所示:1、从磁盘中读取目标文件内容拷贝到内核缓冲区2、CPU控制器再把内核缓冲区的数据赋值到用户空间的缓冲区中3、接着在应用程序中,调用write()方法,把用户空间缓冲区中的数据拷贝到内核下的Socket Buffer中。4、最后,把在内核模式下的SocketBuffer中的数据赋值到网卡缓冲区(NIC Buffer)5、网卡缓冲区再把数据传输到目标服务器上在这个过程中我们可以发现,数据从磁盘到最终发送出去,要经历4次拷贝,而在这四次拷贝过程中,有两次拷贝是浪费的,分别是:1、从内核空间赋值到用户空间2、从用户空间再次复制到内核空间除此之外,由于用户空间和内核空间的切换会带来CPU的上线文切换,对于CPU性能也会造成性能影响。而零拷贝,就是把这两次多于的拷贝省略掉,应用程序可以直接把磁盘中的数据从内核中直接传输给Socket,而不需要再经过应用程序所在的用户空间,如下图所示零拷贝通过DMA(Direct Memory Access)技术把文件内容复制到内核空间中的Read Buffer,接着把包含数据位置和长度信息的文件描述符加载到Socket Buffer中,DMA引擎直接可以把数据从内核空间中传递给网卡设备。在这个流程中,数据只经历了两次拷贝就发送到了网卡中,并且减少了2次cpu的上下文切换,对于效率有非常大的提高。2、Kafka零拷贝?所以,所谓零拷贝,并不是完全没有数据赋值,只是相对于用户空间来说,不再需要进行数据拷贝。对于前面说的整个流程来说,零拷贝只是减少了不必要的拷贝次数而已。在程序中实现零拷贝的方式有三种:1、在Linux中,零拷贝技术依赖于底层的sendfile()方法实现2、在Java中,FileChannal.transferTo() 方法的底层调用的就是 sendfile() 方法。3、MMAP 文件映射机制。它的原理是,将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。以上就是我对于Kafka中零拷贝原理的理解本次的面试题涉及到一些计算机底层的原理,大家在平时的业务开发过程中也很少关注。但我想提醒大家,如果要在技术路上走得更远,还是要多关注一些底层的实现原理,把基础打牢固。
  • [最佳实践] Python实现kafka生产和实时消费
     python实现kafka生产消息 python3环境准备 1. 安装python依赖 pip3 install confluent-kafka pip3 install kafka-python 2.  python代码 from confluent_kafka import Producer # Kafka 服务器地址 bootstrap_servers = 'x.x.x.x:21005, x.x.x.x:21005, x.x.x.x:21005' # 创建生产者配置 producer_config = {     'bootstrap.servers': bootstrap_servers } # 创建生产者实例 producer = Producer(producer_config) # 主题和消息列表 topic = 'your_topic' messages = ['Message 1', 'Message 2', 'Message 3'] # 批量发送消息到 Kafka for message in messages:     producer.produce(topic, message.encode('utf-8')) 刷新生产者缓冲区 producer.flush()  关闭生产者实例 producer.flush() producer.poll(0) # 等待传递所有消息 producer.flush() 4. 执行命令 ./python3 /opt/sandbox/python_kafkaproducer.py  python实现kafka消费消息 1.  python代码 from kafka import KafkaConsumer from kafka import KafkaProducer from kafka.errors import KafkaError import sys def start_consumer(consumertopicname): consumer = KafkaConsumer(consumertopicname, group_id='my-group', bootstrap_servers='x.x.x.x:21005,x.x.x.x:21005,x.x.x.x:21005') for message in consumer:     print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                           message.offset, message.key,                                           message.value))  if name == "main": if len(sys.argv) < 1: print("usage:python3 pythonkafka.py consumertopicname producertopicname") exit(-1)  consumertopicname=sys.argv[1]  start_consumer(consumertopicname)  2. 执行命令 ./python3 /opt/sandbox/test_spark.py your_topic`(参数填写需要消费的topic) 注: bootstrap_servers填写节点加端口,非安全为21005  
  • [技术干货] Kafka存储机制笔记分享
           Kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。所谓的分区其实就是在Kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号。①Topic       一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。②Partition       topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列③Segment       所谓的segment其实就是在分区对应的文件夹下产生的文件。一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另-方面可以基于这些segment文件进行历史数据的删除,提高效率。一个segment又由一个log和一个index文件组成。④Log       Log由多个Segment文件组成,接收到的新消息永远是以追加的方式于Segment文件中,Segment的文件个数随着数据量的累积而增加,每个消息有自增编号,这种只追加不修改的方式避免了变更前的查询消耗。⑤index       Index文件仅记录固定消息量的索引编号范围,Kafka在查询时,先从Index中定位到小范围的索引编号区间,再去Log中在小范围的数据块中查询具体数据,此索引区间的查询方式称为:稀疏索引。二、原理概念1、持久化       kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何OS下,对文件系统本身的优化是非常艰难的。文件缓存/直接内存映射等是常用的手段。因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升。2、性能       除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题。kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息。不过消息量的大小可以通过配置文件来指定。对于Kafka broker端,似乎有个Send file系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"、"内核内存"、"进程内存"、"网络缓冲区",多者之间的数据copy)。3、Topic模型       其他JMS实现,消息消费的位置是有provider保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态。这就要求JMS broker需要太多额外的工作。在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。由此可见,consumer客户端也很轻量级。4、负载均衡       kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息。消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"。       异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
  • [其他] Kafka高可用原理
    Kafka的多副本冗余设计不管是传统的基于关系型数据库设计的系统,还是分布式的如 zookeeper 、 redis 、 Kafka 、 HDFS等等,实现高可用的办法通常是采用冗余设计,通过冗余来解决节点宕机不可用问题。 首先简单了解Kafka的几个概念:Broker (节点):Kafka服务节点,简单来说一个 Broker 就是一台Kafka服务器,一个物理节点。Topic (主题):在Kafka中消息以主题为单位进行归类,每个主题都有一个 Topic Name ,生产者根据Topic Name将消息发送到特定的Topic,消费者则同样根据Topic Name从对应的Topic进行消费。Partition (分区):Topic (主题)是消息归类的一个单位,但每一个主题还能再细分为一个或多个 Partition (分区),一个分区只能属于一个主题。主题和分区都是逻辑上的概念,举个例子,消息1和消息2都发送到主题1,它们可能进入同一个分区也可能进入不同的分区(所以同一个主题下的不同分区包含的消息是不同的),之后便会发送到分区对应的Broker节点上。Offset (偏移量):分区可以看作是一个只进不出的队列(Kafka只保证一个分区内的消息是有序的),消息会往这个队列的尾部追加,每个消息进入分区后都会有一个偏移量,标识该消息在该分区中的位置,消费者要消费该消息就是通过偏移量来识别。在Kafka 0.8版本以前,是没有多副本冗余机制的,一旦一个节点挂掉,那么这个节点上的所有 Partition的数据就无法再被消费。这就等于发送到Topic的有一部分数据丢失了。在0.8版本后引入副本记者则很好地解决宕机后数据丢失的问题。副本是以 Topic 中每个 Partition的数据为单位,每个Partition的数据会同步到其他物理节点上,形成多个副本。每个 Partition 的副本都包括一个 Leader 副本和多个 Follower副本,Leader由所有的副本共同选举得出,其他副本则都为Follower副本。在生产者写或者消费者读的时候,都只会与Leader打交道,在写入数据后Follower就会来拉取数据进行数据同步。多少个副本才算够用?副本肯定越多越能保证Kafka的高可用,但越多的副本意味着网络、磁盘资源的消耗更多,性能会有所下降,通常来说副本数为3即可保证高可用,极端情况下将 replication-factor 参数调大即可。Follower和Lead之间没有完全同步怎么办?Follower和Leader之间并不是完全同步,但也不是完全异步,而是采用一种 ISR机制( In-Sync Replica)。每个Leader会动态维护一个ISR列表,该列表里存储的是和Leader基本同步的Follower。如果有Follower由于网络、GC等原因而没有向Leader发起拉取数据请求,此时Follower相对于Leader是不同步的,则会被踢出ISR列表。所以说,ISR列表中的Follower都是跟得上Leader的副本。一个节点宕机后Leader的选举规则是什么?分布式相关的选举规则有很多,像Zookeeper的 Zab 、 Raft 、 Viewstamped Replication 、微软的 PacificA 等。而Kafka的Leader选举思路很简单,基于我们上述提到的 ISR列表,当宕机后会从所有副本中顺序查找,如果查找到的副本在ISR列表中,则当选为Leader。另外还要保证前任Leader已经是退位状态了,否则会出现脑裂情况(有两个Leader)。怎么保证?Kafka通过设置了一个controller来保证只有一个Leader。Ack参数决定了可靠程度另外,这里补充一个面试考Kafka高可用必备知识点:request.required.asks 参数。Asks这个参数是生产者客户端的重要配置,发送消息的时候就可设置这个参数。该参数有三个值可配置:0、1、All 。第一种是设为0意思是生产者把消息发送出去之后,之后这消息是死是活咱就不管了,有那么点发后即忘的意思,说出去的话就不负责了。不负责自然这消息就有可能丢失,那就把可用性也丢失了。第二种是设为1意思是生产者把消息发送出去之后,这消息只要顺利传达给了Leader,其他Follower有没有同步就无所谓了。存在一种情况,Leader刚收到了消息,Follower还没来得及同步Broker就宕机了,但生产者已经认为消息发送成功了,那么此时消息就丢失了。注意,设为1是Kafka的默认配置可见Kafka的默认配置也不是那么高可用,而是对高可用和高吞吐量做了权衡折中。第三种是设为All(或者-1)意思是生产者把消息发送出去之后,不仅Leader要接收到,ISR列表中的Follower也要同步到,生产者才会任务消息发送成功。进一步思考, Asks=All 就不会出现丢失消息的情况吗?答案是否。当ISR列表只剩Leader的情况下, Asks=All 相当于 Asks=1 ,这种情况下如果节点宕机了,还能保证数据不丢失吗?因此只有在 Asks=All并且有ISR中有两个副本的情况下才能保证数据不丢失。Kafka 的一个节点宕机后为什么不可用?我在开发测试环境配置的 Broker 节点数是3, Topic 是副本数为3, Partition 数为6, Asks参数为1。解决问题当三个节点中某个节点宕机后,集群首先会怎么做?没错,正如我们上面所说的,集群发现有Partition的Leader失效了,这个时候就要从ISR列表中重新选举Leader。如果ISR列表为空是不是就不可用了?并不会,而是从Partition存活的副本中选择一个作为Leader,不过这就有潜在的数据丢失的隐患了。所以,只要将Topic副本个数设置为和Broker个数一样,Kafka的多副本冗余设计是可以保证高可用的,不会出现一宕机就不可用的情况(不过需要注意的是Kafka有一个保护策略,当一半以上的节点不可用时Kafka就会停止)。那仔细一想,Kafka上是不是有副本个数为1的Topic?问题出在了 __consumer_offset 上, __consumer_offset 是一个Kafka自动创建的 Topic,用来存储消费者消费的 offset (偏移量)信息,默认 Partition数为50。而就是这个Topic,它的默认副本数为1。如果所有的 Partition 都存在于同一台机器上,那就是很明显的单点故障了!当将存储 __consumer_offset 的Partition的Broker给Kill后,会发现所有的消费者都停止消费了。这个问题怎么解决?第一点需要将 __consumer_offset 删除,注意这个Topic时Kafka内置的Topic,无法用命令删除,我是通过将 logs 删了来实现删除。第二点需要通过设置 offsets.topic.replication.factor 为3来将 __consumer_offset 的副本数改为3。通过将 __consumer_offset 也做副本冗余后来解决某个节点宕机后消费者的消费问题。参考:cid:link_0
  • [热门活动] 【华为云开年采购季】研发工具、Redis、漏洞扫描等新客1.98元起,下单抽FreeBuds耳机!
     #华为云开年采购季#15+云原生产品全场低至3折,新客1.98元起! -Redis低至6折,Kafka、RabbitMQ、RocketMQ等低至5.5折,3年5折! -数字资产链新客119元起,华为链新客19.8元起! -软件开发生产线CodeArts,新客试用1.98元起! -漏洞扫描、二进制成分分析、APP合规检测6.6折起! -Serverless应用引擎CAE,新客试用109元起! -ROMAConnect集成平台,打通新老应用,试用19.8元起! 下单抽FreeBuds无线耳机!更多产品优惠戳:http://t.cn/A6a7JhLQ 活动有效期至2023年3月31日,上云正当时!>点击这里,马上进入活动专场<
  • [技术干货] kafka设置通过外网ip访问的教程
    概述Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。1、kafka 安装安装JDKtar xvf jdk1.8.0_231.tar.gz  -C /usr/local && cd   /usr/local ln -sv jdk1.8.0_231 jdk vim /etc/profile.d/java.sh    JAVA_HOME=/usr/local/jdk    PATH=$JAVA_HOME/bin:$PATHzookeeper安装(或使用kafka自带的)vim /usr/local/kafka/zookeeper/conf/zoo.cfg     tickTime=2000   initLimit=10   syncLimit=5   dataDir=/data/zookeeper   clientPort=2181   maxClientCnxns=0   # 集群版的zookeeper添加如下配置   # server.1=ip1:2888:3888   # server.2=ip2:2888:3888   # server.3=ip3:28888:3888下载kafka和安装kakfawget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd   /usr/local ln -sv kafka_2.11-0.10.2.1.tgz kafka修改kafka启动内存vim /usr/local/kafka/bin/kafka-server-start.sh  export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"kafka启动和停止/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties /usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties单独安装kafka的启停方式/usr/local/zookeeper/bin/zkServer.sh stop|stop2、 kafka 设置外网访问(如有需要提供外网访问)前提条件需要一个解析到内网ip地址的域名,内网环境也可以设置/etc/hosts参数设置host.name=kafka.test.com(对应的域名解析需要解到内网ip)高版本已弃用。低版本0.10.2.1可以用, 仅当listeners属性未配置时被使用,已用listeners属性代替。表示broker的hostnameadvertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,设置了advertised.listeners不用设置host.name)注册到zookeeper上并提供给客户端的监听器,如果没有配置则使用listeners。advertised.host.name(不需要设置,仅作参考)已弃用。仅当advertised.listeners或者listeners属性未配置时被使用。官网建议使用advertised.listenerslisteners(不需要设置,仅作参考)需要监听的URL和协议,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。如果未指定该配置,则使用java.net.InetAddress.getCanonicalHostName()函数的的返回值修改上broker的/etc/hosts文件[内网ip]    kafka.test.com修改外网访问服务器上的/etc/hosts文件[外网ip]    kafka.test.com3、kafka消费调试生产者/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC消费者/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外网IP:9092 --topic TOPIC --from-beginning --max-messages 14、logstash调试output {  stdout { codec => rubydebug { metadata => true }  } }5、logstash无法消费kakfa日志的问题排查a、topics_pattern 通配问题".* ","."一定不能少      topics_pattern=>"prefix-.*" b、filter中匹配规则,注意要能匹配到kafka中topic,不同的filebeat和不同的logstash版本对应的topic元数据可能不太一样,这点需要注意      if [type] =~ "prefix-*" {          grok { match =>["[type]","^prefix-(?<index_name>)"] }       }      if [kafka][topic] =~ "prefix-*" {        grok { match => [ "[kafka][topic]", "^prefix-(?<index_name>.*$)" ]}      }       if [@metadata][topic] =~ "prefix-*" {          grok { match =>["[@metadata][topic]","^prefix-(?<index_name>)"] }       }      if [@metadata][kafka][topic] =~ "prefix-*" {        grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?<index_name>.*$)" ]}      }原文链接:cid:link_0zixun/5909.html
  • [技术干货] 基于K-means(K均值)聚类算法的图像特征分割研究-含Matlab代码-转载
     一、引言 图像分割是一个跨学科的研究方向,涉及人工智能、机器学习、模式识别等。随着计算机技术的不断发展,图像分割的应用领域越来越广泛,特别是在农业、军事、遥感气象、医疗保健以及智能交通等领域有着重要的应用价值。目前,图像分割技术主要有基于阈值、基于边缘、基于聚类以及基于神经网络的方法。在诸多技术中,聚类法是最有效的方法之一,主要有 K-means 聚类、模糊 C-means 聚类、密度山峰聚类以及减法聚类等。 K-means 聚类方法语义明确、结构简单、计算速度快,是图像分割技术中最常用的聚类算法。随着研究的深入,国内外学者在基于 K-means 聚类法图像分割方面做了大量研究,并取得了丰富的研究成果。 KEEGAN 等人提出了一种基于多通道的图像分割方法,允许用户结合自己的信息通道,利用逻辑框架定义多目标函数来实现图像分割。 PHAM 等人针对图像分割中灰度不均匀的问题,提出了改进 K-means 算法中的目标函数来处理图像分割中的不均匀性,提高了图像分割的精度。 WALVOORT 等人选择均方最短距离作为目标函数,使用 K-means 聚类法使其最小化,结果表明在合理的计算范围内这种算法得到的效果最优。陈科尹等人提出基于统计直方图 K-means 聚类的水稻冠层图像分割方法,分别与 K-means、K-means++、 k-mc2、 afk-mc2 共 4 种常用的均值聚类水稻冠层图像特征像素提取方法进行对比,结果表明基于统计直方图K-means 聚类算法均优于以上 4 种聚类方法。王爱莲等人探讨 K-means 算法在图像分割时在 RGB 和 YUV 颜色空间的分割结果,结果表明使用 YUV 混合模型比单一YUV 颜色空间的分割效果更佳。郎成洪等人在医学领域利用 K-means++ 聚类算法进行区域分类,减少了错误的局部极小值。乔雪等人在马铃薯病虫害图像提取中采用 K-means 的图像分割方法,能够准确、完整地将目标病虫害色斑从彩色图像中提取出来,在农业病虫害治理方面具有较好的应用价值。  ✳️ 二、K-means 聚类算法原理 K-means 聚类算法是将原始数据集划分为 k 个不相交的样本数据组。首先,随机选取 k 个初始聚类中心,计算其他数据组到初始聚类中心的欧几里德距离。根据最邻近原则,将距离最小的数据组分配至与其距离最小的聚类中心对应的数据组。其次,分配完成后重新计算每个新的聚类中心。最后,计算每个聚类中心和每个数据组之间的新欧几里德距离,重复迭代直至到达确定的阈值后停止计算。利用基于 K-means聚类算法分割图像时,根据图像像素点的特征进行聚类。设一幅图像的分辨率为 x*y,该图像聚类为 k 个数据组,设 P(x,y)为输入像素, Ck为聚类中心,计算原理如下。  (1)合理选择 k 个初始聚类中心;  (2)根据式(1)计算图像中心和每个像素之间的欧氏距离 d;   (3)根据距离 d 将所有像素分配至最近的中心;  (4)在所有像素分配后,根据式(2)重新计算新的聚类中心;   (5)重复迭代计算,直至满足确定的阈值后停止计算;  (6)返回图像分割结果。  ✳️ 三、图像聚类分割实例 通过Matlab算法实现了基于K-means(K均值)聚类算法的图像分割,结果如下图所示:当使用3个聚类时,小猫能显著从背景中分割出来,但小猫内部白色毛发并未进行有效分割,当使用5~6个聚类时,小猫白色肚皮可进行有效分割。效果较为显著。   图1 基于K-means聚类图像分割结果 ✳️ 四、参考文献 [1] 高樱萍 , 宋丹 , 王雅静 , 等 . 一种改进的 K-means 聚类服装图像分割算法 [J]. 湖南工程学院学报 ( 自然科学版 ),2021,31(2):54-59. [2] 李立军 , 张晓光 . 基于动态粒子群优化与 K-means 聚类的图像分割算法 [J]. 现代电子技术 ,2018,41(10):164-168. [3]KEEGAN M S,SANDBERG B,CHAN T F.A multiphase logic framework for multichannel image segmentation[J].InverseProblems and Imaging,2012,6(1):95-110. [4]PHAM D L,PRINCE J L.An adaptive fuzzy c-means algorithm for image segmentation in the presence of intensity inhomogeneities[J].Pattern Recognition Letters,1999,20(1):57-68. [5]WALVOORT D,BRUS D J,GRUIJTER J.An R package for spatial coverage sampling and random sampling from compactgeographical strata by k-means[J].Computers & Geosciences, 2010,36(10):1261-1267.  ✳️ 五、Matlab代码获取 上述Matlab代码,可私信博主获取。 ———————————————— 版权声明:本文为CSDN博主「matlab科研中心」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/m0_70745318/article/details/127952210