• [openEuler] 基于openEuler-20.03-LTS-SP1安装部署Kafka 2.2.0【单机版】
    一、操作系统和软件版本介绍1.操作系统为openEuler 20.03 (LTS-SP1)   可用如下命令查询:   cat /etc/os-release 2.软件版本   Kafka版本为:Kafka 2.2.0 3.JDK依赖   Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK。openEuler 20.03 (LTS-SP1) 默认没有安装JDK环境,需要自己自行安装。   这里安装JDK环境做简单说明,请开发者自行安装,我这里安装的是 openjdk version "1.8.0_242"。   yum install java    输入 y   查看JDK版本   java -version二、详细安装步骤1.在根目录创建data目录   mkdir data2.进入data目录并下载Kafka 2.2.0  cd data   wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz3.解压并进入到解压后的目录tar -zvxf kafka_2.12-2.2.0.tgz   cd kafka_2.12-2.2.04.启动zookeeper服务和kafka服务,可以在命令的结尾加个&符号,这样服务就可以在后台运行   ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &   ./bin/kafka-server-start.sh ./config/server.properties &     三、验证1.创建一个叫"euler"的topic,它只有一个分区,一个副本:   ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic euler2.发送消息,运行producer并在控制台中输一些消息,这些消息将被发送到服务端   ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic euler3.在另一个终端开启consumer,可以读取到刚才发出的消息并输出   ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic euler 至此所有操作已完成,请指正。
  • [维护宝典] 如何根据strace信息定位异常连接的客户端
    问题背景描述:某局点某Kafka节点文件句柄数很高,导致此节点上Kafka的请求失败,lsof -p kafkapid发现Kafka进程有大量socket泄漏,如下图所示:问题分析步骤:1. 执行ls -l /proc/kafkapid/fd >> fd.txt命令查看Kafka进程的文件描述符2. 执行trace -T -tt -f -F output.txt -p kafkapid命令打出kafka进程的调用信息3. 几分钟后再次执行执行ls -l /proc/kafkapid/fd >> fd1.txt命令查看Kafka进程的文件描述符4. 对比1和3步骤中的文件描述符,查找增加的socket连接,多查找几个增加socket连接,根据socket对应的id号从2步骤中获取的信息中查看对应的id连接调用信息5. 如果查找socket连接对应的客户端,可以通过accept查找,如下图所示此查询查询到对应的socketfd对应的所有的客户端连接,多次查找后可对比客户端连接来进一步确认客户端的范围。6. 通过accept查找的原因是socket通信中accept函数获取客户端的连接:int accept(int sockfd,struct sockaddr * addr,socklen_t * addrlen);sockfd的参数为listen()函数返回的监听套接字;addr是一个传出参数,表示客户端的地址,该参数设置为NULL时,表示不关心客户端的地址。addrlen为一个传入传出参数,传入时为函数调用时提供参数addr的长度,传出时为客户端地址结构体的实际长度;accept的返回值也是一个套接字,该套接字用于与本次通信的客户端进行数据交互。
  • [问题求助] 线下kafka如何对接mrs的kafka?
    大数据的项目。想把客户线下kafka的消息传递到mrs的kafka中。这个可以如何实现?
  • [维护宝典] Kafka使用 kafka-consumer-groups.sh --list查询不到消费者组
    问题描述:FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。问题分析:1、C80版本kafka-consumer-groups.sh命令使用list查询groupid源码分析如下:a. kafka-consumer-groups.sh命令的入口是执行ConsumerGroupCommand中的main函数b. ConsumerGroupCommand中的main函数执行时根据使用的是--zookeeper还是--bootstrap-server判断consumerGroupService是ZkConsumerGroupService还是KafkaConsumerGroupService。如果命令中有--list,就执行listGroups().foreach(println(_))c. KafkaConsumerGroupService中listGroups()的实现是通过adminClient获取所有的consumerGroup,源码如下:d. AdminClient中获取listAllConsumerGroupsFlattened的实现是获取所有的group组并过滤出所有协议是consumer的group组,源码如下:2. 提交一个flink作业,groupid设置为“testgroup”,使用Kafka中的AdminClient来获取listAllGroupsFlattened的信息,执行结果如下:如上图所示:执行结果中GroupOverview中testgroup对应的协议为空,而不是consumer,所以,使用list查询不到结果。3. Flink作业消费kafka时,GroupOverview中groupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据。4. 此问题在651版本已经解决,651版本中增加的协议为空的判断逻辑,源码如下所示:
  • [维护宝典] FusionInsight Kafka创建Topic失败原因及解决方案
    一、 无权限创建Topic会报错“ERROR org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics/topicName。解决办法是使用具体kafkaadmin权限的用户。二、 Quota限制创建Topic会报错“Nodes number exceed quota limit for /config/topics/topicName”。解决办法是:检查Kafka全部配置中Quota配置quota.number是否是250000,如果不是改为此值。同步zk配置。FusionInsight Manager页面“集群->Zookeeper->更多->同步配置”,同步配置操作每5分钟执行一次,最多等5分钟,zk客户端执行listquota /kafka查看quota的count值是否为250000。zk客户端执行操作方法如下:进入FI客户端(例如/opt/hadoopclient)执行命令source bigdata_env执行命令kinit 用户名(普通模式跳过)执行sh -server zk业务IP:24002三、 副本个数大于正常节点数创建Topic会报错“ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: X larger than available brokers: X”解决办法是:解决异常节点问题或减少副本数。四、 客户端版本低客户端是低版本而服务端是高版本时,创建Topic可能不会报错但是查看topic的describe信息时leader为none。原因是zk上报没有权限访问“/brokers/topics/topicName”目录,此目录下无法创建partitions目录。解决办法是:使用与服务端对应的客户端版本。
  • [问题求助] DMS Kafka能力上能匹配Confluent 5.2.2版本吗?
    华为云DMS Kafka能否在能力上匹配下Confluent
  • [二次开发] 【flink产品】【flink功能】flink跨集群访问开启Kerberos认证的kafka
    【功能模块】FI集群的flink 是开启的Kerberos认证 再flink-conf.yml中配置过的。现在需要用flink消费另外一个集群中开启Kerberos认证的kafka  拿到了kafka集群的的认证文件 krb5.conf jaas.conf user.keytab。【操作步骤&问题现象】1、整个flink应用是需要自己集群的认证 ,怎么在一个flink程序中认证另外一个kafka ,将其当作该Flink应用的source源,然后处理呢?拜托各位大佬提供一下处理思路!
  • [问题求助] DLI Flink作业如何动态感知Kafka分区的变化
    我使用Flink Opensource SQL, 采用Flink 1.10版本。初期Flink作业规划的Kafka的分区数partition设置过小或过大,后期需要更改Kafka区分数,很麻烦。如何不停止作业实现自动感知?
  • [其他] FI651 filebeat对接安全模式kafka
    再使用kafka的过程中,用到filebeat采集日志到kfka,非安全模式的kafka开源直接使用,但是安全模式的kafka,尝试使用类似logstash对接安全模式kafka,未能连通,请问filebeat对接安全的kafka需要如何配置?
  • [问题求助] 除了kafka,有没有什么服务能无缝迁移rocketmq业务
    除了kafka,有没有什么服务能无缝迁移rocketmq业务
  • [维护宝典] FusionInsight Kafka删除Topic失败原因及解决方案
    一、无权限创建Topic会报错“ERROR kafka.admin.AdminOperationException: Error while deleting topic topicName解决办法是使用具有kafkaadmin权限的用户。二、“topic.enable”配置为falseFusionInsight Manager页面“集群->Kafka->配置->全部配置”查看Kafka服务端delete.topic.enable参数配置。如果参数设置为false,改为true后保存并重启kafka服务。三、节点异常或节点磁盘下线或数据目录异常1. 节点异常FusionInsight Manager页面“集群->Kafka->实例”查看各个实例状态是否是良好。2. 磁盘下线FusionInsight Manager有没有“数据目录状态异常”的告警。Topic副本所在节点server.log日志中搜“offline”关键字和“checkpoint file”关键字查看磁盘是否下线或因checkpoint文件问题没有上线。搜“cannot allocate memory”关键字查看是否内存不足导致。搜“No space left”关键字查看是否磁盘写满。注意:出现“数据目录状态异常”的告警后,只有重启告警节点才可以使磁盘重新上线。3. 数据目录权限异常节点上Kafka数据目录(一般是“/srv/BigData/kafka/dataX/kafka-logs”)目录权限是否正常。四、删除后又自动创建TopicFusionInsight Manager页面“集群->Kafka”查看controller所在主机节点,此节点上Kafka日志目录中查看controller.log,如果日志中有“Deletion of topic topicName successfully completed”和“New topics: [Set(topicName)]”说明topic又被自动创建。五、Controller未执行删除如果controller.log日志中只有“Starting topic deletion”日志而未打印具体的执行成功或失败的日志,可尝试切controller,步骤如下:1. 进FI客户端(例如/opt/hadoopclient)2. 执行zkCli.sh -server zk业务IP:24002/kafka3. 执行get /controller4. 若获取到controller信息,执行deleteall /controller和get /controller。
  • [运维管理] HD651的flink如何收集应用运行日志到kafka
    【求助】 1.任务提交到集群是使用的配置是集群上的flink/conf/下的日志配置文件 2.如何在任务提交时指定自定义的配置文件?
  • [二次开发] canal采集binlog日志到kafka报 Failed to update metadata after 60000ms。
    【功能模块】【操作步骤&问题现象】1报错信息如下:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [二次开发] 使用开源canal采集binlog日志到kafka,说是server not found in kerberos databas
    【功能模块】【操作步骤&问题现象】1、具体报错信息如下:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • 华为云kafka全景实践
    登录ECS控制台登录ECS控制台,找到已购买的云服务器并登录点击远程登录,(用户名:root)输入密码(密码不显示,但可正常输入),即可登录ECS下载Kafka,并上传至ECS执行命令下载Kafka:wget https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz解压:tar zxvf kafka_2.12-1.1.0.tgzroot@ecs-kc1-large-2-linux-20200906181836:~# wget https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz--2020-09-15 10:18:47--  https://repo.huaweicloud.com/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgzResolving repo.huaweicloud.com (repo.huaweicloud.com)... 61.147.106.222, 59.56.28.88, 125.77.150.15, ...Connecting to repo.huaweicloud.com (repo.huaweicloud.com)|61.147.106.222|:443... connected.HTTP request sent, awaiting response... 200 OKLength: 50326212 (48M) [application/octet-stream]Saving to: ‘kafka_2.12-1.1.0.tgz’kafka_2.12-1.1.0.tgz                            100%[====================================================================================================>]  47.99M  2.80MB/s    in 17s     2020-09-15 10:19:04 (2.81 MB/s) - ‘kafka_2.12-1.1.0.tgz’ saved [50326212/50326212]root@ecs-kc1-large-2-linux-20200906181836:~# tar zxvf kafka_2.12-1.1.0.tgzkafka_2.12-1.1.0/...kafka_2.12-1.1.0/libs/kafka-streams-examples-1.1.0.jarroot@ecs-kc1-large-2-linux-20200906181836:~#在Kafka实例详情页,获取Kafka broker地址创建topic登录ECS执行生产脚本脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/kafka-console-producer.sh执行命令:bin/kafka-console-producer.sh --broker-list 192.168.0.186:9092 --topic topic-1628361972root@ecs-kc1-large-2-linux-20200906181836:~# lltotal 49196drwx------  7 root root     4096 Sep 15 10:22 ./drwxr-xr-x 25 root root     4096 Sep 15 10:15 ../-rw-------  1 root root        0 Dec  2  2019 .bash_history-rw-r--r--  1 root root     3106 Apr  9  2018 .bashrcdrwx------  2 root root     4096 Nov 26  2019 .cache/drwx------  3 root root     4096 Nov 26  2019 .gnupg/drwxr-xr-x  6 root root     4096 Mar 24  2018 kafka_2.12-1.1.0/-rw-r--r--  1 root root 50326212 Mar 28  2018 kafka_2.12-1.1.0.tgzdrwxr-xr-x  2 root root     4096 Dec  2  2019 .oracle_jre_usage/-rw-r--r--  1 root root      148 Aug 17  2015 .profiledrwx------  2 root root     4096 Sep 15 10:15 .ssh/-rw-------  1 root root     4864 Dec  2  2019 .viminfo-rw-------  1 root root       82 Sep 15 10:16 .Xauthorityroot@ecs-kc1-large-2-linux-20200906181836:~# cd kafka_2.12-1.1.0/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-console-producer.sh --broker-list 192.168.0.186:9092 --topic topic-1628361972>>hengji>2020.9.15>10.37>shanghai ^H^H>exit>–broker-list:kafka broker地址,多个已逗号隔开–topic:指定生产消息的topic(在3中创建输入任意字符后回车,即生产消息登录ECS执行消费脚本脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/kafka-console-consumer.sh执行命令: bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.186:9092 --topic topic-1628361972 --group test --consumer-property enable.auto.commit=true --from-beginningroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.186:9092 --topic topic-1628361972 --group test --consumer-property enable.auto.commit=true --from-beginning hengjishanghai 2020.9.15exit10.37–bootstrap-server:kafka broker地址,多个已逗号隔开–consumer-property enable.auto.commit=true,设置消费自动提交消费进度–topic:指定消费消息的topic(在3中创建)–group:指定消费组名称,可自定义在生产窗口输入消息,在消费窗口会显示消息生产代码demo为了方便操作,包已经打包好了:地址:链接: https://download.csdn.net/download/weixin_43572042/12845314.百度网盘: https://crazyhengji.blog.csdn.net/article/details/108636026.package dms.kafka.demo;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaProducerDemo{        public static void main(String[] args) throws InterruptedException, ExecutionException        {                if (args.length != 2)                {                        throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name.");                    }                Properties props = new Properties();                props.put("bootstrap.servers", args[0]);                props.put("acks", "all");                props.put("retries", 0);                props.put("batch.size", 16384);                props.put("linger.ms", 1);                props.put("buffer.memory", 33554432);                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");                Producer<String, String> producer = new KafkaProducer<>(props);                for (int i = 0; i < 100; i++)                {                        Future<RecordMetadata> result =                                    producer.send(new ProducerRecord<String, String>(args[1], Integer.toString(i), Integer.toString(i)));                        RecordMetadata rm = result.get();            System.out.println("topic: " + rm.topic() + ", partition: " + rm.partition() + ", offset: " + rm.offset());                }                producer.close();        }}将生产代码打包,上传至Kafka libs目录root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# cd libs/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# pwd/root/kafka_2.12-1.1.0/libsroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# lltotal 48976drwxr-xr-x 2 root root     4096 Sep 15 10:48 ./drwxr-xr-x 6 root root     4096 Mar 24  2018 ../-rw-r--r-- 1 root root    14768 Aug 14  2017 aopalliance-repackaged-2.5.0-b32.jar-rw-r--r-- 1 root root    90347 Aug 14  2017 argparse4j-0.7.0.jar-rw-r--r-- 1 root root   479881 Aug 14  2017 commons-lang3-3.5.jar-rw-r--r-- 1 root root    89496 Mar 24  2018 connect-api-1.1.0.jar-rw-r--r-- 1 root root    19542 Mar 24  2018 connect-file-1.1.0.jar-rw-r--r-- 1 root root    44928 Mar 24  2018 connect-json-1.1.0.jar-rw-r--r-- 1 root root   408225 Mar 24  2018 connect-runtime-1.1.0.jar-rw-r--r-- 1 root root    88980 Mar 24  2018 connect-transforms-1.1.0.jar-rw-r--r-- 1 root root     3225 Sep 15 10:48 dms.kafka.demo.jar......执行生产命令:java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.0.186:9092 topic-1628361972root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.0.186:9092 topic-1628361972topic: topic-1628361972, partition: 2, offset: 2topic: topic-1628361972, partition: 0, offset: 2...topic: topic-1628361972, partition: 0, offset: 45topic: topic-1628361972, partition: 0, offset: 46消费代码demo为了方便操作,包已经打包好了:地址:链接: https://download.csdn.net/download/weixin_43572042/12845314.百度网盘: https://crazyhengji.blog.csdn.net/article/details/108636026.package dms.kafka.demo;import java.util.Arrays; import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;public class KafkaConsumerDemo {        public static void main(String[] args)        {                if (args.length != 3)                {                        throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name groupname.");                }                Properties props = new Properties();                props.put("bootstrap.servers", args[0]);                props.put("group.id", args[2]);                props.put("enable.auto.commit", "true");               props.put("auto.offset.reset", "earliest");            props.put("auto.commit.interval.ms", "1000");           props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");           KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);            consumer.subscribe(Arrays.asList(args[1]));              while (true)                {                       ConsumerRecords<String, String> records = consumer.poll(200);               for (ConsumerRecord<String, String> record : records)                          System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());            }       }}将消费代码打包,上传至Kafka libs目录root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# cd libs/root@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# pwd/root/kafka_2.12-1.1.0/libsroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0/libs# lltotal 48976drwxr-xr-x 2 root root     4096 Sep 15 10:48 ./drwxr-xr-x 6 root root     4096 Mar 24  2018 ../-rw-r--r-- 1 root root    14768 Aug 14  2017 aopalliance-repackaged-2.5.0-b32.jar-rw-r--r-- 1 root root    90347 Aug 14  2017 argparse4j-0.7.0.jar-rw-r--r-- 1 root root   479881 Aug 14  2017 commons-lang3-3.5.jar-rw-r--r-- 1 root root    89496 Mar 24  2018 connect-api-1.1.0.jar-rw-r--r-- 1 root root    19542 Mar 24  2018 connect-file-1.1.0.jar-rw-r--r-- 1 root root    44928 Mar 24  2018 connect-json-1.1.0.jar-rw-r--r-- 1 root root   408225 Mar 24  2018 connect-runtime-1.1.0.jar-rw-r--r-- 1 root root    88980 Mar 24  2018 connect-transforms-1.1.0.jar-rw-r--r-- 1 root root     3225 Sep 15 10:48 dms.kafka.demo.jar......执行消费命令:java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.0.186:9092 topic-1628361972 testroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.0.186:9092 topic-1628361972 testoffset = 2, key = 1, value = 1offset = 3, key = 5, value = 5...offset = 33, key = 94, value = 94offset = 34, key = 95, value = 95登录ECS执行消费组信息查询脚本目录是:/pathtokafka/kafka_2.12-1.1.0/bin/ kafka-consumer-groups.sh(1)查询消费组列表bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 -listroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 -listNote: This will not show information about old Zookeeper-based consumers.test(2)查询消费组详情bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 --describe --group testroot@ecs-kc1-large-2-linux-20200906181836:~/kafka_2.12-1.1.0# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.0.186:9092 --describe --group testNote: This will not show information about old Zookeeper-based consumers.Consumer group 'test' has no active members.TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-IDtopic-1628361972 0          47              47              0               -               -               -topic-1628361972 2          24              24              0               -               -               -topic-1628361972 1          35              35              0               -               -               -TOPIC: 当前消费组消费的topicPARTITION:消费的分区CURRENT-OFFSET:分区的消费进度LOG-END-OFFSET:最新的消息offsetLAG:可消费消息数,为0代表该分区的消息都已消费完
总条数:158 到第
上滑加载中