-
【操作步骤&问题现象】kafka节点扩容后执行重分布,如果节点存在系统盘慢盘是否有影响?
-
【功能模块】ranger给kafka指定权限策略【操作步骤&问题现象】1、创建新用户,不添加角色和用户组,该用户可以访问kafka所有topic2、给用户添加角色或者配置用户组,以及添加相应的权限策略后,权限策略不生效【截图信息】问题一:创建一个新用户,正常应该是没有权限访问kafka的topic,但是却可以访问问题二:使用ranger添加禁止策略后还是策略不生效,还是可以访问kafka的topic【也是用rangeradmin用户添加过,依然不行,云服务器线下安装的FusionInsight 版本号8.0.2】添加的策略【日志信息】(可选,上传日志内容或者附件)无日志信息
-
我们使用kafka时,有时候会遇到发送数据失败的情况,其原因及解决方案如下:1. Kafka topic leader为-1Kafka客户端执行如下命令查看topic的leader信息:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka如果leader为-1,需查看Replicas中的副本节点是否正常,查看命令如下:kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka命令中可以查询到brokerid,说明节点kafka服务正常如果leader为-1的分区对应的Replicas中节点都不正常,需要先恢复异常节点kafka服务。如果都正常但ISR列表中无节点信息,或者ISR列表中的节点不正常,需要查看Kafka服务端的unclean.leader.election.enable参数是否为true或者topic端是否配置此参数为true。如果都不为true,需要对此topic修改配置为true。Kafka服务端的unclean.leader.election.enable参数配置查看方式如下: Kafka topic端unclean.leader.election.enable参数配置查看方式如下:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --topic topicName如果Config中无unclean.leader.election.enable信息,则与服务端配置一致。如果有,则此配置优先级高于服务端配置。修改topic端此配置的方式如下:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true2. DNS配置导致执行 vi /etc/resolv.conf,如果有“nameserver X.X.X.X”,把此内容注释掉3. 网络异常生产端节点ping服务端IP,ping -s 15000 IP,如果延迟高于5ms,说明网络延迟过高,也可通过长ping来判断是否有网络丢包。4. CPU或者IO过高也可能导致连接失败iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。5. 磁盘坏道或者其他原因也可能导致连接失败如果server.log日志中有大量“java.io.IOException: Connection to IP:21007 (id: 2 rack: null) failed”日志,查看IP节点操作系统日志中是否有“Sense Key : Medium Error”信息,如果有,说明出现磁盘坏道,需修复或更换磁盘。另外,还需要打此节点的jstack信息,排查是否有阻塞或者死锁。如果是C80版本,且jstack信息中有如下信息,需打死锁补丁:6. 如果报“TimeoutException”或偶尔发送失败,可先调大request.timeout.ms,并查看服务端num.io.threads和num.network.threads是否可以优化(这两个参数一般调整为节点磁盘个数的倍数)。根据发送的数据量的大小也可适当调整batch.size、buffer.memory、linger.ms的大小。如果发送的数据量很大且可容忍一定时延,也可以考虑开启压缩,compression.type指定压缩方式,可配置为“gzip”、“snappy”或“lz4”。7. ssh卡住ssh -v -p 端口号 异常节点ip如果如上图所示卡住,解决方式是将GSSAPIAuthentication修改为no8. 如果是集群外客户端生产发送失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
-
在使用Kafka时,我们有时候会遇到生产时连接Kafka失败的情况,原因及解决办法如下:1. Kafka服务异常通过FusionInsight Manager页面,选择“集群->Kafka”,查看当前Kafka集群当前状态,状态是否是良好;如果状态不是良好,说明Kafka服务异常。2. 生产命令错误如果使用21005端口,命令如下:kafka-console-producer.sh --broker-list Kafka业务IP:21005 --topic topicName如果使用21007端口,命令如下:kafka-console-producer.sh --broker-list Kafka业务IP:21005 --topic topicName -- --producer.config ../config/producer.properties其中,producer.properties中的security.protocol必须是SASL_PLAINTEXT。如果使用21007端口,security.protocol必须是SASL_PLAINTEXT,Kafka服务端配置ssl.mode.enable必须是true。3. 无权限如果使用21005端口生产,查看“allow.everyone.if.no.acl.found”参数是否为“ture”,如果不是true,修改为true后保存重启。如果使用21007端口生产,如果连接提示中有“TOPIC_AUTHORIZATION_FAILED”或“Not authorized to access topic”信息,说明无消费的topic的权限。执行klist查看用户名,执行id -Gn 用户名,确定该用户名的属组,如果id -Gn命令执行失败,可能sssd服务异常,可使用“systemctl start sssd”重启进程。如果属组中无kafkaadmin或supergroup或kafka,说明用户没有配置kafka角色,需添加kafka角色信息;如果只有kafka,需要查看该用户是否有topic的生产权限,查看命令如下:kafka-acls.sh --authorizer-properties zookeeper.connect=zk业务IP:24002/kafka --list --topic topicName如果没有配置,需添加此用户权限信息或此用户加入kafkaadmin组。添加生产权限信息命令如下:kafka-acls.sh --authorizer-properties zookeeper.connect= zk业务IP:24002/kafka --topic topicName --producer --add --allow-principal User:用户名4. Kafka配置中version配置错误通过FusionInsight Manager页面,选择“集群->Kafka->配置->全部配置”中搜“version”,查看如下version配置: 651X版本对应的是1.1-IV0,C80对应版本是0.11.0-IV2,C70对应版本是0.10-IV1。如果配置不对,修改为对应配置后保存并重启Kafka所有实例。5. Kafka端口不可用执行netstat -anp|grep 端口号查看端口是否有监听或者连接,如果有,在生产的客户端执行ssh -v -p 21005 IP,查看连接是否成功,如果报“Connection timeout”说明客户端不可访问服务端的端口,服务端需对客户端开放访问端口权限。6. 网络异常检查网络是否正常方法如下:Ø 使用ping命令查看是否有网络丢包,ping -s 15000 IP,如果网络延迟大于5ms,说明网络延迟高。Ø 使用scp从客户端传输一个大文件到kafka服务侧节点,查看传输速率。7. 如果是集群外客户端连接失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
-
一、 收集Kafka实例和分区信息1. 执行kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka 命令获取Kafka实例信息2. 执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区3. 对比Replicas和ISR列表中的信息,确定哪些节点的分区未同步,确定未同步的节点正常与否。例如,如下案例中,对比Replicas和ISR列表中的信息发现3节点没有同步,而正常的kafka实例中没有3。 二、 准备迁移方案根据步骤一中的未完全同步的分区即需要迁移的分区,把需要迁移的所有分区写入一个新的json文件,文件中replicas中的信息需要根据步骤一中第2步查询结果中的Replicas把退服节点的brokerid改为正常节点的brokerid(即步骤一中第1步查询到的brokerid),其他不变,格式如下所示:{"version":1,"partitions":[{"topic":"test1","partition":0"replicas":[1,2]},{"topic":"test1","partition":1"replicas":[1,2]},{"topic":"test1","partition":5,"replicas":[2,1]},{"topic":"test1","partition":3,"replicas":[2,1]}]}如果涉及的分区比较多,也可按如下方式生成迁移的json文件。1. 根据步骤一中未完全同步的分区中的topic信息编写迁移topic的json文件,例如json文件名为generate.json,格式如下:{"topics":[{"topic": "test1"},{"topic": "test2"}], "version":1}2. 执行命令kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --topics-to-move-json-file generate.json文件路径 --broker-list 1,2 --generate 生成迁移方案,其中,broker-list中对应参数是所有正常的broker实例的id,即步骤一中1获取的所有brokerid,不同brokerid间用逗号隔开。3. 把上一步得到的“Proposed partition reassignment configuration”的json文件写入迁移的json文件中,例如move.json,此json文件中只保留需要迁移的topic和partition的信息。如下图所示,红色方框内即迁移的json文件信息。三、 执行迁移执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --reassignment-json-file move.json文件路径 --execute 执行迁移四、 查看迁移进度执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --reassignment-json-file move.json文件路径 --verify 查看迁移进度,当所有partition都提示completed successfully时说明迁移完成。五、 查看是否所有分区都同步执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区,如果结果为空,说明所有分区都同步,操作完成;如果有leader为-1,按步骤六执行。六、 Leader为-1的解决办法1. Kafka配置中查看unclean.leader.election.enable配置,如果配置为true,只执行下面步骤2即可;如果配置为false,执行下面步骤3、4、5。2. 切controller:进zookeeper客户端,执行 zkCli.sh -server zk业务IP:24002/kafka进入zk,执行deleteall /controller 切controller。3. Kafka客户端修改参数配置:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true,其中,topicName是leader为-1的topic。4. 按步骤2切controller。5. 查看leader为-1的分区leader是否正常,正常后在kafka客户端执行kafka-topics.sh --alter --topic topicName --zookeeper 100.112.22.233:24002/kafka --delete-config unclean.leader.election.enable。
-
配置好了,启动就报错Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.Error: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.求解???
-
【功能模块】bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005运行测试 例子 com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin运行不通过 【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)--sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei******************************************************************************************<topic> is the kafka topic name<bootstrap.servers> is the ip:port list of brokers******************************************************************************************java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Callback at com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin.main(WriteIntoKafka4SQLJoin.java:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Callback at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 18 more
-
## 背景 在搭建Kafka集群时,通常会按照机架部署机器。Kafka在新建topic时,为了提高集群的可靠性, 会将同一分区的不同副本分布在不同的机架内,这样即使有一个机架上的机器宕掉,也不会影响服务的可用性和稳定性 如下图:集群中有三个机架,每个机架内部3个节点,在这个集群内创建了3分区3副本的topic  ## 生产/消费模型  - 生产模型 - 各个Producer客户端会连接对应分区的 **leader** 进行数据生产, 对应分区的follower会定期从leader处同步数据 - 消费模型 - 各个Consumer客户端连接对应分区的 **leader** 进行数据消费 ## 上述生产/消费模型弊端 - 生产和消费的客户端对接的全部都是**leader**分区,当客户端较多时,会给集群造成较大压力 - 同一个分区的follower副本,仅作为数据备份,不对外提供服务 - 当跨机架间消费客户端增多时,会产生大量的机架间网络传输,给网络造成压力 ## 就近消费特性  - 打破follower副本不能对外提供服务的禁锢,允许消费者从follower副本处拉取数据, 优点如下: - 从follower副本消费数据,可大大降低leader副本节点压力 - 就近从同一机架内节点内消费数据,可大大降低集群内机架间网络数据传输量 - 降低客户端消费延迟 ## 如何开启就近消费特性 - 客户端配置 - 启动服务时,replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector - 客户端配置 - 启动Consumer客户端时,配置 client.rack = /xxxx/xxxx
-
操作场景Flume采集 kafka 内容导入到 hbase前提条件已创建启用Kerberos认证的流集群。已在日志生成节点安装Flume客户端,请参见安装Flume客户端。已配置网络,使日志生成节点与流集群互通。操作步骤(1) 从HDFS客户端拷贝配置文件core-site.xml,hdfs-site.xml到Flume Client的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.6.0/conf" 下。通常可以在分析集群Master节点HDFS客户端安装目录如“/opt/client/HDFS/hadoop/etc/hadoop/”下找到core-site.xml,hdfs-site.xml文件。(2) 从MRS集群下载用户的认证凭据。①在MRS Manager,单击“系统设置”。 ②在“权限配置”区域,单击“用户管理”。 ③在用户列表中选择需要的用户,单击后面的“更多”下载用户凭据。 ④解压下载的用户凭据文件,获取krb5.conf和user.keytab文件。(3) 将上一步获得的krb5.conf和user.keytab拷贝到Flume Client所在节点的配置目录 " /opt/FlumeClient/fusioninsight-flume-1.X.X/conf" 下。(4)修改配置文件jaas.conf, ① 将/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 的jaas.conf文件拷贝到/opt/FlumeClient/fusioninsight-flume-1.X.XS/conf下② 修改jaas.conf文件中的参数principal 与keyTabPrincipal: 可以kinit 用户名进行认证查看参数keyTab: 定义的用户认证文件完整路径即步骤(1)中保存user.keytab认证文件的目录 (5) 修改配置文件flume-env.sh,文件所在目录 /opt/FlumeClient/fusioninsight-flume-1.6.0/conf。在 “-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:-Djava.security.krb5.conf=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /krb5.conf -Djava.security.auth.login.config=/opt/FlumeClient/fusioninsight-flume-1.6.0/conf /jaas.conf -Dzookeeper.request.timeout=120000krb5.conf与jaas.conf根据实际情况,修改配置文件路径,然后保存并退出配置文件。(6)假设Flume客户端安装路径为“/opt/FlumeClient”,执行以下命令,重启Flume客户端:cd /opt/FlumeClient/fusioninsight-flume-1.6.0/bin./flume-manage.sh restart(7)执行以下命令,修改Flume客户端配置文件“properties.properties”。vi Flume客户端安装目录/fusioninsight-flume-1.6.0/conf/properties.properties将以下内容保存到文件中:client.sources = obsclient.channels = flumeclient.sinks = hdfsclient.sources.obs.type = org.apache.flume.source.kafka.KafkaSourceclient.sources.obs.monTime = 0client.sources.obs.nodatatime = 0client.sources.obs.batchSize = 1client.sources.obs.batchDurationMillis = 1client.sources.obs.keepTopicInHeader = falseclient.sources.obs.keepPartitionInHeader = falseclient.sources.obs.kafka.bootstrap.servers =xxxxxxxx:21007client.sources.obs.kafka.consumer.group.id = consumer_groupclient.sources.obs.kafka.topics = topic01client.sources.obs.kafka.security.protocol = SASL_PLAINTEXTclient.sources.obs.kafka.kerberos.domain.name = hadoop.xxx.comclient.sources.obs.channels = flumeclient.channels.flume.type = memoryclient.channels.flume.capacity = 10000client.channels.flume.transactionCapacity = 1000client.channels.flume.channelfullcount = 10client.channels.flume.keep-alive = 3client.channels.flume.byteCapacity =client.channels.flume.byteCapacityBufferPercentage = 20client.sinks.hdfs.type = hdfsclient.sinks.hdfs.hdfs.path = hdfs://hacluster/tmp/ldpclient.sinks.hdfs.montime =client.sinks.hdfs.hdfs.filePrefix = over_%{basename}client.sinks.hdfs.hdfs.fileSuffix =client.sinks.hdfs.hdfs.inUsePrefix =client.sinks.hdfs.hdfs.inUseSuffix = .tmpclient.sinks.hdfs.hdfs.idleTimeout = 0client.sinks.hdfs.hdfs.inUseSuffix = .tmpclient.sinks.hdfs.hdfs.idleTimeout = 0client.sinks.hdfs.hdfs.batchSize = 1000client.sinks.hdfs.hdfs.codeC =client.sinks.hdfs.hdfs.fileType = DataStreamclient.sinks.hdfs.hdfs.maxOpenFiles = 5000client.sinks.hdfs.hdfs.writeFormat = Writableclient.sinks.hdfs.hdfs.callTimeout = 10000client.sinks.hdfs.hdfs.threadsPoolSize = 10client.sinks.hdfs.hdfs.rollTimerPoolSize = 1client.sinks.hdfs.hdfs.kerberosPrincipal = ldpclient.sinks.hdfs.hdfs.kerberosKeytab = /opt/FlumeClient/fusioninsight-flume-1.6.0/conf/user.keytabclient.sinks.hdfs.hdfs.round = falseclient.sinks.hdfs.hdfs.roundUnit = secondclient.sinks.hdfs.hdfs.useLocalTimeStamp = trueclient.sinks.hdfs.hdfs.failcount = 10client.sinks.hdfs.hdfs.fileCloseByEndEvent = falseclient.sinks.hdfs.hdfs.rollInterval = 30client.sinks.hdfs.hdfs.rollSize = 1024client.sinks.hdfs.hdfs.rollCount = 10client.sinks.hdfs.hdfs.batchCallTimeout = 0client.sinks.hdfs.serializer.appendNewline = trueclient.sinks.hdfs.channel = flume----------------------------------------------------------------client.sources.obs.kafka.kerberos.domain.name 配置可以从hosts文件中找到hadoop开头的
-
各位大佬好,有没有知道kafka如何释放,删除?我的删除功能是灰色的不能用
-
【功能模块】kafka【操作步骤&问题现象】参考apache kylin对接文档,做sample-streaming.sh用例的时候在kylin.log遇到报错【截图信息】【日志信息】(可选,上传日志内容或者附件)2020-12-16 15:23:18,010 ERROR [http-nio-7070-exec-5] controller.CubeController:434 : Timeout expired while fetching topic metadataorg.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata2020-12-16 15:23:18,014 ERROR [http-nio-7070-exec-5] controller.BasicController:63 :org.apache.kylin.rest.exception.InternalErrorException: Timeout expired while fetching topic metadata at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:435) at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:418)
-
Flink 1.10 读取安全模式kafka Demo ``` public class FromKafkaToFile { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,fsSettings); fsTableEnv.connect( new Kafka().version("0.10").topic("flinkTest") .property("bootstrap.servers","8.5.167.1:21007") .property("security.protocol","SASL_PLAINTEXT") .property("sasl.kerberos.service.name","kafka") .property("zookeeper.connect","8.5.167.1:24002/kafka") ).withFormat( new Csv() ).withSchema(new Schema() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) ) .createTemporaryTable("kafkaTable"); Table table = fsTableEnv.from("kafkaTable"); TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO); fsTableEnv.toAppendStream(table, tupleTypeInfo).print(); fsTableEnv.execute("KafkaToPrint"); fsEnv.execute(); } } ``` 发送测试数据到topic  查看TaskManager输出 
-
## 一、测试环境说明 FusionInsight集群:版本8.0.2 开启安全模式 客户端系统类型:CentOS Linux release 7.6.1810 x86_64位 集群和客户端均位于无法连接互联网的内网环境 ## 二、配置步骤 **本章节所有步骤在客户端节点执行** ### 2.1 配置yum源 已有可用yum源的可忽略2.1章节 1. 上传系统安装镜像文件 CentOS-7.6-x86_64-DVD-1810.iso 到客户端节点,例如/opt目录 2. 将镜像文件挂载到/media目录 ``` mount -o loop /opt/CentOS-7.6-x86_64-DVD-1810.iso /media ``` 3. 配置/etc/yum.repos.d/目录下的repo文件,保证该目录下只有一个以.repo结尾的文件,其内容如下 ``` [base] name=file baseurl=file:///media enabled=1 gpgcheck=0 ``` 4. 配置完毕后检查下yum源,如图执行yum repolist命令  ### 2.2 安装python3 从python官网下载python3源码包,本例中下载的链接为 https://www.python.org/ftp/python/3.7.8/Python-3.7.8.tgz 将获取的Python-3.7.8.tgz上传到/opt目录下,安装命令如下 ``` yum -y install zlib-devel bzip2-devel openssl-devel openssl-static ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libcap-devel xz-devel libffi-devel lzma gcc cd /opt rm -rf /tmp/Python-3.7.8 tar -zxvf Python-3.7.8.tgz -C /tmp mv /tmp/Python-3.7.8 /usr/local/python-3.7 cd /usr/local/python-3.7/ ./configure --prefix=/usr/local/sbin/python-3.7 make && make install ln -sv /usr/local/sbin/python-3.7/bin/python3 /usr/bin/python3 ln -sv /usr/local/sbin/python-3.7/bin/pip3 /usr/bin/pip3 ``` ### 2.3 安装kafka-python **说明:安装该依赖之前切记不可执行 FusionInsight 客户端环境变量,即不能执行 source /opt/client/bigdata_env** 附件中的安装包都是从pypi官网下载,可直接使用,或者到pypi下载 ***<u>切记:如下依赖包的安装顺序必须是 six > decorator > gssapi > kafka-python</u>*** 1. 获取依赖包,pypi下载链接如下 six https://pypi.org/project/six/#files decorator https://pypi.org/project/decorator/#files gssapi https://pypi.org/project/gssapi/#files kafka-python https://pypi.org/project/kafka-python/#files 2. 安装依赖six ``` tar -zxvf six-1.15.0.tar.gz cd six-1.15.0 python3 setup.py install ``` 3. 安装decorator ``` tar -zxvf decorator-4.4.2.tar.gz cd decorator-4.4.2 python3 setup.py install ``` 4. 安装gssapi ``` tar -zxvf gssapi-1.6.11.tar.gz cd gssapi-1.6.11 python3 setup.py install ``` 5. 安装kafka-python ``` tar -zxvf kafka-python-2.0.2.tar.gz cd kafka-python-2.0.2 python3 setup.py install ``` ### 2.4 安装FusionInsight客户端 本步骤略,客户端安装目录自定义,例如/opt/client ## 三、验证 ### 3.1 执行kerberos认证 参考命令 ``` source /opt/client/bigdata_env kinit -k -t user.keytab developuser ``` ### 3.2 创建python测试脚本,内容参考如下 注意bootstrap_servers和sasl_kerberos_domain_name根据实际环境进行修改 ``` #!/usr/bin/env python3 # coding=utf-8 import time from kafka import KafkaProducer from kafka import KafkaConsumer def kafka_python_producer_main(): producer = KafkaProducer(bootstrap_servers='10.162.26.137:21007', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', sasl_plain_username='KafkaClient') producer.send('testTopic3', 'kafka python test'.encode('utf-8')) producer.flush() producer.close() print('producer done') def kafka_python_consumer_main(): consumer = KafkaConsumer('testTopic3', bootstrap_servers='10.162.26.137:21007', group_id='kafka-test-20191014', auto_offset_reset='earliest', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', sasl_plain_username='KafkaClient') for msg in consumer: print(msg.value) print(msg.partition) print('consumer done') if __name__ == '__main__': kafka_python_producer_main() time.sleep(1) kafka_python_consumer_main() ``` 3.3 执行测试脚本,查看执行结果  通过kafka自带脚本验证结果:  ## 更多kafka-python使用参考: https://kafka-python.readthedocs.io/en/master/usage.html
上滑加载中
推荐直播
-
HDC深度解读系列 - Serverless与MCP融合创新,构建AI应用全新智能中枢2025/08/20 周三 16:30-18:00
张昆鹏 HCDG北京核心组代表
HDC2025期间,华为云展示了Serverless与MCP融合创新的解决方案,本期访谈直播,由华为云开发者专家(HCDE)兼华为云开发者社区组织HCDG北京核心组代表张鹏先生主持,华为云PaaS服务产品部 Serverless总监Ewen为大家深度解读华为云Serverless与MCP如何融合构建AI应用全新智能中枢
回顾中 -
关于RISC-V生态发展的思考2025/09/02 周二 17:00-18:00
中国科学院计算技术研究所副所长包云岗教授
中科院包云岗老师将在本次直播中,探讨处理器生态的关键要素及其联系,分享过去几年推动RISC-V生态建设实践过程中的经验与教训。
回顾中 -
一键搞定华为云万级资源,3步轻松管理企业成本2025/09/09 周二 15:00-16:00
阿言 华为云交易产品经理
本直播重点介绍如何一键续费万级资源,3步轻松管理成本,帮助提升日常管理效率!
回顾中
热门标签