• [数据编排] 虚拟kafka数据源如何配置
  • [维护宝典] 如何根据strace信息定位异常连接的客户端
    问题背景描述:某局点某Kafka节点文件句柄数很高,导致此节点上Kafka的请求失败,lsof -p kafkapid发现Kafka进程有大量socket泄漏,如下图所示:问题分析步骤:1. 执行ls -l /proc/kafkapid/fd >> fd.txt命令查看Kafka进程的文件描述符2. 执行trace -T -tt -f -F output.txt -p kafkapid命令打出kafka进程的调用信息3. 几分钟后再次执行执行ls -l /proc/kafkapid/fd >> fd1.txt命令查看Kafka进程的文件描述符4. 对比1和3步骤中的文件描述符,查找增加的socket连接,多查找几个增加socket连接,根据socket对应的id号从2步骤中获取的信息中查看对应的id连接调用信息5. 如果查找socket连接对应的客户端,可以通过accept查找,如下图所示此查询查询到对应的socketfd对应的所有的客户端连接,多次查找后可对比客户端连接来进一步确认客户端的范围。6. 通过accept查找的原因是socket通信中accept函数获取客户端的连接:int accept(int sockfd,struct sockaddr * addr,socklen_t * addrlen);sockfd的参数为listen()函数返回的监听套接字;addr是一个传出参数,表示客户端的地址,该参数设置为NULL时,表示不关心客户端的地址。addrlen为一个传入传出参数,传入时为函数调用时提供参数addr的长度,传出时为客户端地址结构体的实际长度;accept的返回值也是一个套接字,该套接字用于与本次通信的客户端进行数据交互。
  • [问题求助] 线下kafka如何对接mrs的kafka?
    大数据的项目。想把客户线下kafka的消息传递到mrs的kafka中。这个可以如何实现?
  • [维护宝典] Kafka使用 kafka-consumer-groups.sh --list查询不到消费者组
    问题描述:FusionInsight C80版本,通过Flink提交消费Kafka的任务,消费Kafka的消费者组信息通过kafka-consumer-groups.sh --list命令查询不到,但是通过kafka-consumer-groups.sh --describe可以查询到信息。问题分析:1、C80版本kafka-consumer-groups.sh命令使用list查询groupid源码分析如下:a. kafka-consumer-groups.sh命令的入口是执行ConsumerGroupCommand中的main函数b. ConsumerGroupCommand中的main函数执行时根据使用的是--zookeeper还是--bootstrap-server判断consumerGroupService是ZkConsumerGroupService还是KafkaConsumerGroupService。如果命令中有--list,就执行listGroups().foreach(println(_))c. KafkaConsumerGroupService中listGroups()的实现是通过adminClient获取所有的consumerGroup,源码如下:d. AdminClient中获取listAllConsumerGroupsFlattened的实现是获取所有的group组并过滤出所有协议是consumer的group组,源码如下:2. 提交一个flink作业,groupid设置为“testgroup”,使用Kafka中的AdminClient来获取listAllGroupsFlattened的信息,执行结果如下:如上图所示:执行结果中GroupOverview中testgroup对应的协议为空,而不是consumer,所以,使用list查询不到结果。3. Flink作业消费kafka时,GroupOverview中groupid对应的协议为空的原因是:flink不是直接调用的kafka consumer client端的消费接口,而是通过自己的逻辑去消费、只用kafka中的__consumer_offsets保存数据。4. 此问题在651版本已经解决,651版本中增加的协议为空的判断逻辑,源码如下所示:
  • [维护宝典] FusionInsight Kafka创建Topic失败原因及解决方案
    一、 无权限创建Topic会报错“ERROR org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /config/topics/topicName。解决办法是使用具体kafkaadmin权限的用户。二、 Quota限制创建Topic会报错“Nodes number exceed quota limit for /config/topics/topicName”。解决办法是:检查Kafka全部配置中Quota配置quota.number是否是250000,如果不是改为此值。同步zk配置。FusionInsight Manager页面“集群->Zookeeper->更多->同步配置”,同步配置操作每5分钟执行一次,最多等5分钟,zk客户端执行listquota /kafka查看quota的count值是否为250000。zk客户端执行操作方法如下:进入FI客户端(例如/opt/hadoopclient)执行命令source bigdata_env执行命令kinit 用户名(普通模式跳过)执行sh -server zk业务IP:24002三、 副本个数大于正常节点数创建Topic会报错“ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: X larger than available brokers: X”解决办法是:解决异常节点问题或减少副本数。四、 客户端版本低客户端是低版本而服务端是高版本时,创建Topic可能不会报错但是查看topic的describe信息时leader为none。原因是zk上报没有权限访问“/brokers/topics/topicName”目录,此目录下无法创建partitions目录。解决办法是:使用与服务端对应的客户端版本。
  • [问题求助] DMS Kafka能力上能匹配Confluent 5.2.2版本吗?
    华为云DMS Kafka能否在能力上匹配下Confluent
  • [二次开发] 【flink产品】【flink功能】flink跨集群访问开启Kerberos认证的kafka
    【功能模块】FI集群的flink 是开启的Kerberos认证 再flink-conf.yml中配置过的。现在需要用flink消费另外一个集群中开启Kerberos认证的kafka  拿到了kafka集群的的认证文件 krb5.conf jaas.conf user.keytab。【操作步骤&问题现象】1、整个flink应用是需要自己集群的认证 ,怎么在一个flink程序中认证另外一个kafka ,将其当作该Flink应用的source源,然后处理呢?拜托各位大佬提供一下处理思路!
  • [问题求助] DLI Flink作业如何动态感知Kafka分区的变化
    我使用Flink Opensource SQL, 采用Flink 1.10版本。初期Flink作业规划的Kafka的分区数partition设置过小或过大,后期需要更改Kafka区分数,很麻烦。如何不停止作业实现自动感知?
  • [其他] FI651 filebeat对接安全模式kafka
    再使用kafka的过程中,用到filebeat采集日志到kfka,非安全模式的kafka开源直接使用,但是安全模式的kafka,尝试使用类似logstash对接安全模式kafka,未能连通,请问filebeat对接安全的kafka需要如何配置?
  • [问题求助] 除了kafka,有没有什么服务能无缝迁移rocketmq业务
    除了kafka,有没有什么服务能无缝迁移rocketmq业务
  • [维护宝典] FusionInsight Kafka删除Topic失败原因及解决方案
    一、无权限创建Topic会报错“ERROR kafka.admin.AdminOperationException: Error while deleting topic topicName解决办法是使用具有kafkaadmin权限的用户。二、“topic.enable”配置为falseFusionInsight Manager页面“集群->Kafka->配置->全部配置”查看Kafka服务端delete.topic.enable参数配置。如果参数设置为false,改为true后保存并重启kafka服务。三、节点异常或节点磁盘下线或数据目录异常1. 节点异常FusionInsight Manager页面“集群->Kafka->实例”查看各个实例状态是否是良好。2. 磁盘下线FusionInsight Manager有没有“数据目录状态异常”的告警。Topic副本所在节点server.log日志中搜“offline”关键字和“checkpoint file”关键字查看磁盘是否下线或因checkpoint文件问题没有上线。搜“cannot allocate memory”关键字查看是否内存不足导致。搜“No space left”关键字查看是否磁盘写满。注意:出现“数据目录状态异常”的告警后,只有重启告警节点才可以使磁盘重新上线。3. 数据目录权限异常节点上Kafka数据目录(一般是“/srv/BigData/kafka/dataX/kafka-logs”)目录权限是否正常。四、删除后又自动创建TopicFusionInsight Manager页面“集群->Kafka”查看controller所在主机节点,此节点上Kafka日志目录中查看controller.log,如果日志中有“Deletion of topic topicName successfully completed”和“New topics: [Set(topicName)]”说明topic又被自动创建。五、Controller未执行删除如果controller.log日志中只有“Starting topic deletion”日志而未打印具体的执行成功或失败的日志,可尝试切controller,步骤如下:1. 进FI客户端(例如/opt/hadoopclient)2. 执行zkCli.sh -server zk业务IP:24002/kafka3. 执行get /controller4. 若获取到controller信息,执行deleteall /controller和get /controller。
  • [数据编排] datafactory: 虚拟kafka数据源如何配置
    【功能模块】数据源【操作步骤&问题现象】
  • [技术讨论] 鲲鹏-kafka-2.13.2-2.6.0移植
    环境要求虚拟机服务器 KVM Virtual Machine 虚拟机配置:aarch64架构 、 8G内存 、 4核 、50G磁盘空间 虚拟机操作系统: Linux version 4.19.90-2009.3.0.0045.up1.uel20.aarch64 (abuild@armbuild-02) (gcc version 7.3.0 (GCC)) #1 SMP Sun Oct 11 16:12:59 UTC 2020 软件版本:kafka-2.6.0 版本获取方式https://archive.apache.org/dist/kafka/2.6.0/kafka-2.6.0-src.tgz 参考编译指南和问题Kafka2.1.0编译指南 https://support.huaweicloud.com/prtg-apache-kunpengbds/kunpengbds_02_0008.html 在UOS上按照kafks2.10编译指南编译2.1.0可行,编译2.6.0不通过。 2.6.0编译执行如下:gradle  ./gradlew clean releaseTarGz 编译2.6.0问题如下: Could not find method scala()  Gradle需要6.0以上的版本才有该方法,配置中是6.5版本。5.6-6.0可以注释掉build.gradle  447行处的代码,gradle版本小于5.6编译不通过。   下载地址: https://downloads.gradle.org/distributions/gradle-6.5-bin.zip加入华为源后出现编译错误按照移植指南加入源后 编译时出现如下错误: 删除加的华为源,clean后重新构建./gradlew clean 编译完成后还有x86依赖    从华为源下载编译好的aarch64的jar包https://mirrors.huaweicloud.com/kunpeng/maven/org/rocksdb/rocksdbjni/5.18.4/rocksdbjni-5.18.4.jar 找到本地仓库中的rocksdbjni-5.18.4.jar 包并且替换 find / -name rocksdbjni-5.18.4.jar  重新编译就可以了。
  • [运维管理] HD651的flink如何收集应用运行日志到kafka
    【求助】 1.任务提交到集群是使用的配置是集群上的flink/conf/下的日志配置文件 2.如何在任务提交时指定自定义的配置文件?
  • [二次开发] canal采集binlog日志到kafka报 Failed to update metadata after 60000ms。
    【功能模块】【操作步骤&问题现象】1报错信息如下:2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
总条数:181 到第
上滑加载中