-
Kafka修改配置有两个命令:kafka-topics.sh和kafka-configs.sh。kafka-topics.sh主要是修改单个topic的配置,支持修改的配置参数见附录一;kafka-configs.sh支持修改topic、broker、user和client级别的配置,支持修改的配置参数见附录二。一、 kafka-topics.sh1. kafka-topics.sh命令增加/修改配置命令如下:kafka-topics.sh --alter --topic <topicName> --zookeeper <ZK业务IP:24002/kafka> --config <name=value>2. kafka-topics.sh命令查看配置命令如下:kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>修改后的配置会出现在Configs中。3. kafka-topics.sh命令删除配置命令如下:kafka-topics.sh --alter --topic <topicName> --zookeeper <ZK业务IP:24002/kafka> --delete-config <name>配置删除后,修改的配置也会从Configs中删除。二、 kafka-configs.sh1. kafka-configs.sh命令增加/修改topic级别配置命令如下:kafka-configs.sh --alter --zookeeper <ZK业务IP:24002/kafka> --add-config <name=value> --entity-name <topicName > --entity-type topicsTopic级别配置的修改,查看是否成功,依然可以使用kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>命令查看Configs中是否有修改的配置。2. kafka-configs.sh命令删除topic级别配置命令如下:kafka-configs.sh --alter --zookeeper <ZK业务IP:24002/kafka> --delete-config <name=value> --entity-name <topicName > --entity-type topicsTopic级别配置的删除,查看是否成功,依然可以使用kafka-topics.sh --describe --topic <topicName> --zookeeper <ZK业务IP:24002/kafka>命令查看Configs中是否还存在删除的配置。3. kafka-configs.sh命令修改broker级别配置命令如下:修改单个broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --add-config <name=value> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。修改所有broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --add-config <name=value> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。注意:当config中对应的name是broker端不存在的配置时命令也可以执行成功,但是查看配置时对应的配置为null。4. kafka-configs.sh命令查看broker级别配置命令如下:查看单个broker的修改配置命令如下:kafka-configs.sh -- describe --bootstrap-server <kafka业务IP:port> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。查看所有broker的修改配置命令如下:kafka-configs.sh -- describe --bootstrap-server <kafka业务IP:port> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。5. kafka-configs.sh命令删除broker级别配置命令如下:删除单个broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --delete-config <name> --entity-name <brokerID> --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。删除所有broker的配置命令如下:kafka-configs.sh --alter --bootstrap-server <kafka业务IP:port> --delete-config <name> --entity-default --entity-type brokers --command-config <property file>,其中property file中security.protocol对应协议必须与使用的端口对应(使用21005端口可以不加command-config参数)。附录一:kafka-topics.sh支持的topic级别的配置修改如下:cleanup.policycompression.typedelete.retention.msfile.delete.delay.msflush.messagesflush.msfollower.replication.throttled.replicasindex.interval.bytesleader.replication.throttled.replicaslog.partition.strategymax.message.bytesmessage.format.versionmessage.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicaspreallocateretention.bytesretention.mssegment.bytessegment.index.bytessegment.jitter.mssegment.msunclean.leader.election.enable附录二:kafka-configs.sh支持的topic级别的配置修改如下:cleanup.policycompression.typedelete.retention.msfile.delete.delay.msflush.messagesflush.msfollower.replication.throttled.replicasindex.interval.bytesleader.replication.throttled.replicaslog.partition.strategymax.message.bytesmessage.format.versionmessage.timestamp.difference.max.msmessage.timestamp.typemin.cleanable.dirty.ratiomin.compaction.lag.msmin.insync.replicaspreallocateretention.bytesretention.mssegment.bytessegment.index.bytessegment.jitter.mssegment.msunclean.leader.election.enablekafka-configs.sh支持的broker级别的配置修改如下:log.message.timestamp.typessl.client.authlog.retention.mssasl.kerberos.ticket.renew.window.factorlog.preallocatelog.index.size.max.bytesssl.truststore.typessl.keymanager.algorithmlog.cleaner.io.buffer.load.factorssl.key.passwordbackground.threadslog.retention.bytesssl.trustmanager.algorithmlog.segment.byteslog.cleaner.delete.retention.mslog.segment.delete.delay.msmin.insync.replicasssl.keystore.locationssl.cipher.suiteslog.roll.jitter.mslog.cleaner.backoff.mssasl.jaas.configprincipal.builder.classlog.flush.interval.mslog.cleaner.dedupe.buffer.sizelog.flush.interval.messagesadvertised.listenersnum.io.threadslistener.security.protocol.mapsasl.enabled.mechanismsssl.truststore.passwordlistenersmetric.reportersssl.protocolsasl.kerberos.ticket.renew.jitterssl.keystore.passwordsasl.mechanism.inter.broker.protocollog.cleanup.policysasl.kerberos.principal.to.local.rulessasl.kerberos.min.time.before.reloginnum.recovery.threads.per.data.dirlog.cleaner.io.max.bytes.per.secondlog.roll.msssl.endpoint.identification.algorithmunclean.leader.election.enablemessage.max.byteslog.cleaner.threadslog.cleaner.io.buffer.sizesasl.kerberos.service.namessl.providerfollower.replication.throttled.ratelog.index.interval.byteslog.cleaner.min.compaction.lag.mslog.message.timestamp.difference.max.msssl.enabled.protocolslog.cleaner.min.cleanable.ratioreplica.alter.log.dirs.io.max.bytes.per.secondssl.keystore.typessl.secure.random.implementationssl.truststore.locationsasl.kerberos.kinit.cmdleader.replication.throttled.ratenum.network.threadscompression.typenum.replica.fetcherskafka-configs.sh支持的user级别的配置修改如下:request_percentageproducer_byte_rateSCRAM-SHA-256SCRAM-SHA-512consumer_byte_ratekafka-configs.sh支持的client级别的配置修改如下:request_percentageproducer_byte_rateconsumer_byte_ratekafka-configs.sh支持的topics_limit级别的配置修改如下:producer_byte_rateconsumer_byte_rate说明1:附录一和附录二中的配置是651X版本支持的参数,651X之前版本可以具体执行kafka-topics.sh和kafka-config.sh命令查看--config和--add-config中的参数说明。说明2:使用kafka-config.sh修改num.io.threads、num.network.threads、num.replica.fetchers时,只能在原设置的基础上2倍以内扩大或者1/2倍以内缩小,否则会执行报错。例如,FI中num.io.threads默认为8,直接修改为24时报错如下:若把num.io.threads改为24,执行命令两次,先把值改为16,再改为24。
-
【功能模块】【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)报错信息如下: The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:700) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:219) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:932) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1005) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1737) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1005)Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1651) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1633) at com.huawei.bigdata.flink.examples.ReadFromKafka.main(ReadFromKafka.java:59) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 11 moreCaused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9e3c78adac04823062e5328231935728) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 19 moreCaused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
beautiful idea
发表于2021-03-24 15:09:01
2021-03-24 15:09:01
最后回复
beautiful idea
2021-03-29 18:09:21
11600 7 -
一、 节点故障或磁盘下线1. Kafka节点服务异常通过FusionInsight Manager页面,选择“集群->Kafka”,查看当前Kafka集群当前状态,状态是否是良好;如果状态不是良好,说明Kafka服务异常。2. 磁盘下线通过FusionInsight Manager页面查看告警信息,是否有“Kafka数据目录状态异常”的告警,如果有,server.log日志分析告警产生原因,重启数据目录异常节点。二、 网络异常检查网络是否正常方法如下:使用ping -s 15000 IP命令查看是否有网络丢包,如果网络延迟大于5ms,说明网络延迟高。使用scp从客户端传输一个大文件到kafka服务侧节点,查看传输速率。一般,未完全同步的分区总数监控曲线一直在变化或者通过kafka-topics.sh --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查询处理的未完全同步的分区一直在变化,网络原因的可能性比较大。三、 副本线程下线1. 查看未完全同步分区所在节点执行kafka-topics.sh --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查看所有未完全同步的分区,对比Replicas和ISR列表中的节点,判断未同步的分区是否集中在一个或者几个节点上。例如,下图中ISR列表中都是3未同步。 2. 查看节点信息执行kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka查看未同步分区所在节点。3. Kafka配置页面查看“num.replica.ers”的配置大小。4. 切omm用户打jstack信息根据第二步获取的节点IP,连上此节点,su - omm切omm用户,jps |grep Kafka获取Kafka进程的pid,执行“jstack -l kafkapid >>文件名”,然后cat jstack文件|grep ReplicaFetcherThread-查看同步线程。注意:一个副本同步线程的格式如:ReplicaFetcherThread-tid-bid,tid代表一个线程编号,如果配置多个(由服务端配置参数num.replica.ers决定),这个线程号会从0开始依次增加;bid表示leader所在节点的broker-id,如果两个broker之间存在主副本关系,比如:leader在broker-5上,follower在broker-6上,那么broker-6至少会有一个线程名为ReplicaFetcherThread-0-5;如果num.replica.ers配置为2,broker-6 可能会存在两个副本同步线程ReplicaFetcherThread-0-5,ReplicaFetcherThread-1-5。一个节点理论上能够出现的副本同步线程个数为num.replica.ers *(集群broker实例个数 - 1)。但并不是num.replica.ers配置多少就一定会出现这么多个副本同步线程,线程的数量会根据当前broker节点的分区总数来决定,如果节点上面的副本数很少,kafka会判定分区少而没有必要启动这么多副本同步线程,则副本同步线程个数可能少于num.replica.ers *(broker实例个数 - 1)。通常建议num.replica.ers值配置为3即可,太大会影响CPU的使用率)。5. 查看分区同步线程是否下线。如果未查询到同步线程,或者num.replica.ers配置为1且同步线程个数小于(broker实例个数 - 1)的值,或者没有查找此节点到未同步的leader节点的线程,则有同步线程下线。具体可根据未完全同步分区开始上升的时间去server.log日志中排查原因,可搜索“Stopped”、“Shutdown”等关键字查找“ReplicaFetcherThread-”开头的线程是否异常下线。6. 副本线程下线相关BUG1) java.nio.BufferOverflowException2) unexpected offset in append to3) Attempt to append to a full index4) Trying to roll a new log segment for partition以上BUG均在6518及之后版本解决。四、 节点异常退服节点退服前未将退服节点上的分区迁移至未退服节点,具体查看方式是执行kafka-topics.sh --zookeeper zk业务IP:24002/kafka --describe --under-replicated-partitions命令查看所有未完全同步的分区,对比Replicas和ISR列表中的节点,判断未同步的分区是否集中在一个或者几个节点上,如果是,执行kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka查看节点是否在此列表中,如果没有,确认之前是否退服过节点,如果是,可以确定是退服节点导致。解决办法是迁移退服节点上的分区到正常节点。五、 Kafka节点压力大iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。Kafka的数据流量监控中在未同步分区出现时有没有陡增,字节输入输出流量是否达到网络阈值。
-
我们使用kafka时,有时候会遇到发送数据失败的情况,其原因及解决方案如下:1. Kafka topic leader为-1Kafka客户端执行如下命令查看topic的leader信息:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka如果leader为-1,需查看Replicas中的副本节点是否正常,查看命令如下:kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka命令中可以查询到brokerid,说明节点kafka服务正常如果leader为-1的分区对应的Replicas中节点都不正常,需要先恢复异常节点kafka服务。如果都正常但ISR列表中无节点信息,或者ISR列表中的节点不正常,需要查看Kafka服务端的unclean.leader.election.enable参数是否为true或者topic端是否配置此参数为true。如果都不为true,需要对此topic修改配置为true。Kafka服务端的unclean.leader.election.enable参数配置查看方式如下: Kafka topic端unclean.leader.election.enable参数配置查看方式如下:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --topic topicName如果Config中无unclean.leader.election.enable信息,则与服务端配置一致。如果有,则此配置优先级高于服务端配置。修改topic端此配置的方式如下:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true2. DNS配置导致执行 vi /etc/resolv.conf,如果有“nameserver X.X.X.X”,把此内容注释掉3. 网络异常生产端节点ping服务端IP,ping -s 15000 IP,如果延迟高于5ms,说明网络延迟过高,也可通过长ping来判断是否有网络丢包。4. CPU或者IO过高也可能导致连接失败iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。5. 磁盘坏道或者其他原因也可能导致连接失败如果server.log日志中有大量“java.io.IOException: Connection to IP:21007 (id: 2 rack: null) failed”日志,查看IP节点操作系统日志中是否有“Sense Key : Medium Error”信息,如果有,说明出现磁盘坏道,需修复或更换磁盘。另外,还需要打此节点的jstack信息,排查是否有阻塞或者死锁。如果是C80版本,且jstack信息中有如下信息,需打死锁补丁:6. 如果报“TimeoutException”或偶尔发送失败,可先调大request.timeout.ms,并查看服务端num.io.threads和num.network.threads是否可以优化(这两个参数一般调整为节点磁盘个数的倍数)。根据发送的数据量的大小也可适当调整batch.size、buffer.memory、linger.ms的大小。如果发送的数据量很大且可容忍一定时延,也可以考虑开启压缩,compression.type指定压缩方式,可配置为“gzip”、“snappy”或“lz4”。7. ssh卡住ssh -v -p 端口号 异常节点ip如果如上图所示卡住,解决方式是将GSSAPIAuthentication修改为no8. 如果是集群外客户端生产发送失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
-
【操作步骤&问题现象】kafka节点扩容后执行重分布,如果节点存在系统盘慢盘是否有影响?
-
【功能模块】ranger给kafka指定权限策略【操作步骤&问题现象】1、创建新用户,不添加角色和用户组,该用户可以访问kafka所有topic2、给用户添加角色或者配置用户组,以及添加相应的权限策略后,权限策略不生效【截图信息】问题一:创建一个新用户,正常应该是没有权限访问kafka的topic,但是却可以访问问题二:使用ranger添加禁止策略后还是策略不生效,还是可以访问kafka的topic【也是用rangeradmin用户添加过,依然不行,云服务器线下安装的FusionInsight 版本号8.0.2】添加的策略【日志信息】(可选,上传日志内容或者附件)无日志信息
-
我们使用kafka时,有时候会遇到发送数据失败的情况,其原因及解决方案如下:1. Kafka topic leader为-1Kafka客户端执行如下命令查看topic的leader信息:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka如果leader为-1,需查看Replicas中的副本节点是否正常,查看命令如下:kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka命令中可以查询到brokerid,说明节点kafka服务正常如果leader为-1的分区对应的Replicas中节点都不正常,需要先恢复异常节点kafka服务。如果都正常但ISR列表中无节点信息,或者ISR列表中的节点不正常,需要查看Kafka服务端的unclean.leader.election.enable参数是否为true或者topic端是否配置此参数为true。如果都不为true,需要对此topic修改配置为true。Kafka服务端的unclean.leader.election.enable参数配置查看方式如下: Kafka topic端unclean.leader.election.enable参数配置查看方式如下:kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --topic topicName如果Config中无unclean.leader.election.enable信息,则与服务端配置一致。如果有,则此配置优先级高于服务端配置。修改topic端此配置的方式如下:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true2. DNS配置导致执行 vi /etc/resolv.conf,如果有“nameserver X.X.X.X”,把此内容注释掉3. 网络异常生产端节点ping服务端IP,ping -s 15000 IP,如果延迟高于5ms,说明网络延迟过高,也可通过长ping来判断是否有网络丢包。4. CPU或者IO过高也可能导致连接失败iostat -d -x 1查看CPU参数idle和IO参数util,idle值越高,CPU越空闲,util值越高,IO使用率越高。如果idle值小于20%,top -Hp kafkaPid查看CPU使用率高的线程,打jstack日志分析具体的原因;如果util值大于80%,查看磁盘对应的读写速率、await和svctm的大小判断对IO影响大的原因,如果读写速率比较大,排查kafka读写请求和延时,如果awati远大于svctm,IO队列太长,应用响应时间很慢。5. 磁盘坏道或者其他原因也可能导致连接失败如果server.log日志中有大量“java.io.IOException: Connection to IP:21007 (id: 2 rack: null) failed”日志,查看IP节点操作系统日志中是否有“Sense Key : Medium Error”信息,如果有,说明出现磁盘坏道,需修复或更换磁盘。另外,还需要打此节点的jstack信息,排查是否有阻塞或者死锁。如果是C80版本,且jstack信息中有如下信息,需打死锁补丁:6. 如果报“TimeoutException”或偶尔发送失败,可先调大request.timeout.ms,并查看服务端num.io.threads和num.network.threads是否可以优化(这两个参数一般调整为节点磁盘个数的倍数)。根据发送的数据量的大小也可适当调整batch.size、buffer.memory、linger.ms的大小。如果发送的数据量很大且可容忍一定时延,也可以考虑开启压缩,compression.type指定压缩方式,可配置为“gzip”、“snappy”或“lz4”。7. ssh卡住ssh -v -p 端口号 异常节点ip如果如上图所示卡住,解决方式是将GSSAPIAuthentication修改为no8. 如果是集群外客户端生产发送失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
-
在使用Kafka时,我们有时候会遇到生产时连接Kafka失败的情况,原因及解决办法如下:1. Kafka服务异常通过FusionInsight Manager页面,选择“集群->Kafka”,查看当前Kafka集群当前状态,状态是否是良好;如果状态不是良好,说明Kafka服务异常。2. 生产命令错误如果使用21005端口,命令如下:kafka-console-producer.sh --broker-list Kafka业务IP:21005 --topic topicName如果使用21007端口,命令如下:kafka-console-producer.sh --broker-list Kafka业务IP:21005 --topic topicName -- --producer.config ../config/producer.properties其中,producer.properties中的security.protocol必须是SASL_PLAINTEXT。如果使用21007端口,security.protocol必须是SASL_PLAINTEXT,Kafka服务端配置ssl.mode.enable必须是true。3. 无权限如果使用21005端口生产,查看“allow.everyone.if.no.acl.found”参数是否为“ture”,如果不是true,修改为true后保存重启。如果使用21007端口生产,如果连接提示中有“TOPIC_AUTHORIZATION_FAILED”或“Not authorized to access topic”信息,说明无消费的topic的权限。执行klist查看用户名,执行id -Gn 用户名,确定该用户名的属组,如果id -Gn命令执行失败,可能sssd服务异常,可使用“systemctl start sssd”重启进程。如果属组中无kafkaadmin或supergroup或kafka,说明用户没有配置kafka角色,需添加kafka角色信息;如果只有kafka,需要查看该用户是否有topic的生产权限,查看命令如下:kafka-acls.sh --authorizer-properties zookeeper.connect=zk业务IP:24002/kafka --list --topic topicName如果没有配置,需添加此用户权限信息或此用户加入kafkaadmin组。添加生产权限信息命令如下:kafka-acls.sh --authorizer-properties zookeeper.connect= zk业务IP:24002/kafka --topic topicName --producer --add --allow-principal User:用户名4. Kafka配置中version配置错误通过FusionInsight Manager页面,选择“集群->Kafka->配置->全部配置”中搜“version”,查看如下version配置: 651X版本对应的是1.1-IV0,C80对应版本是0.11.0-IV2,C70对应版本是0.10-IV1。如果配置不对,修改为对应配置后保存并重启Kafka所有实例。5. Kafka端口不可用执行netstat -anp|grep 端口号查看端口是否有监听或者连接,如果有,在生产的客户端执行ssh -v -p 21005 IP,查看连接是否成功,如果报“Connection timeout”说明客户端不可访问服务端的端口,服务端需对客户端开放访问端口权限。6. 网络异常检查网络是否正常方法如下:Ø 使用ping命令查看是否有网络丢包,ping -s 15000 IP,如果网络延迟大于5ms,说明网络延迟高。Ø 使用scp从客户端传输一个大文件到kafka服务侧节点,查看传输速率。7. 如果是集群外客户端连接失败,还可以通过集群内客户端测试下生产是否成功,进一步减小排查方向。
-
一、 收集Kafka实例和分区信息1. 执行kafka-broker-info.sh --zookeeper zk业务IP:24002/kafka 命令获取Kafka实例信息2. 执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区3. 对比Replicas和ISR列表中的信息,确定哪些节点的分区未同步,确定未同步的节点正常与否。例如,如下案例中,对比Replicas和ISR列表中的信息发现3节点没有同步,而正常的kafka实例中没有3。 二、 准备迁移方案根据步骤一中的未完全同步的分区即需要迁移的分区,把需要迁移的所有分区写入一个新的json文件,文件中replicas中的信息需要根据步骤一中第2步查询结果中的Replicas把退服节点的brokerid改为正常节点的brokerid(即步骤一中第1步查询到的brokerid),其他不变,格式如下所示:{"version":1,"partitions":[{"topic":"test1","partition":0"replicas":[1,2]},{"topic":"test1","partition":1"replicas":[1,2]},{"topic":"test1","partition":5,"replicas":[2,1]},{"topic":"test1","partition":3,"replicas":[2,1]}]}如果涉及的分区比较多,也可按如下方式生成迁移的json文件。1. 根据步骤一中未完全同步的分区中的topic信息编写迁移topic的json文件,例如json文件名为generate.json,格式如下:{"topics":[{"topic": "test1"},{"topic": "test2"}], "version":1}2. 执行命令kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --topics-to-move-json-file generate.json文件路径 --broker-list 1,2 --generate 生成迁移方案,其中,broker-list中对应参数是所有正常的broker实例的id,即步骤一中1获取的所有brokerid,不同brokerid间用逗号隔开。3. 把上一步得到的“Proposed partition reassignment configuration”的json文件写入迁移的json文件中,例如move.json,此json文件中只保留需要迁移的topic和partition的信息。如下图所示,红色方框内即迁移的json文件信息。三、 执行迁移执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --reassignment-json-file move.json文件路径 --execute 执行迁移四、 查看迁移进度执行kafka-reassign-partitions.sh --zookeeper zk业务IP:24002/kafka --reassignment-json-file move.json文件路径 --verify 查看迁移进度,当所有partition都提示completed successfully时说明迁移完成。五、 查看是否所有分区都同步执行kafka-topics.sh --describe --zookeeper zk业务IP:24002/kafka --unavailable-partitions 命令获取Kafka未同步分区,如果结果为空,说明所有分区都同步,操作完成;如果有leader为-1,按步骤六执行。六、 Leader为-1的解决办法1. Kafka配置中查看unclean.leader.election.enable配置,如果配置为true,只执行下面步骤2即可;如果配置为false,执行下面步骤3、4、5。2. 切controller:进zookeeper客户端,执行 zkCli.sh -server zk业务IP:24002/kafka进入zk,执行deleteall /controller 切controller。3. Kafka客户端修改参数配置:kafka-topics.sh --alter --topic topicName --zookeeper zk业务IP:24002/kafka --config unclean.leader.election.enable=true,其中,topicName是leader为-1的topic。4. 按步骤2切controller。5. 查看leader为-1的分区leader是否正常,正常后在kafka客户端执行kafka-topics.sh --alter --topic topicName --zookeeper 100.112.22.233:24002/kafka --delete-config unclean.leader.election.enable。
-
配置好了,启动就报错Error: VM option 'UseG1GC' is experimental and must be enabled via -XX:+UnlockExperimentalVMOptions.Error: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.求解???
-
【功能模块】bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005运行测试 例子 com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin运行不通过 【操作步骤&问题现象】1、2、【截图信息】【日志信息】(可选,上传日志内容或者附件)--sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei******************************************************************************************<topic> is the kafka topic name<bootstrap.servers> is the ip:port list of brokers******************************************************************************************java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Callback at com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin.main(WriteIntoKafka4SQLJoin.java:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:430) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:814) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:288) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1051) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1127) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1127)Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Callback at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 18 more
-
## 背景 在搭建Kafka集群时,通常会按照机架部署机器。Kafka在新建topic时,为了提高集群的可靠性, 会将同一分区的不同副本分布在不同的机架内,这样即使有一个机架上的机器宕掉,也不会影响服务的可用性和稳定性 如下图:集群中有三个机架,每个机架内部3个节点,在这个集群内创建了3分区3副本的topic  ## 生产/消费模型  - 生产模型 - 各个Producer客户端会连接对应分区的 **leader** 进行数据生产, 对应分区的follower会定期从leader处同步数据 - 消费模型 - 各个Consumer客户端连接对应分区的 **leader** 进行数据消费 ## 上述生产/消费模型弊端 - 生产和消费的客户端对接的全部都是**leader**分区,当客户端较多时,会给集群造成较大压力 - 同一个分区的follower副本,仅作为数据备份,不对外提供服务 - 当跨机架间消费客户端增多时,会产生大量的机架间网络传输,给网络造成压力 ## 就近消费特性  - 打破follower副本不能对外提供服务的禁锢,允许消费者从follower副本处拉取数据, 优点如下: - 从follower副本消费数据,可大大降低leader副本节点压力 - 就近从同一机架内节点内消费数据,可大大降低集群内机架间网络数据传输量 - 降低客户端消费延迟 ## 如何开启就近消费特性 - 客户端配置 - 启动服务时,replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector - 客户端配置 - 启动Consumer客户端时,配置 client.rack = /xxxx/xxxx
推荐直播
-
HDC深度解读系列 - Serverless与MCP融合创新,构建AI应用全新智能中枢2025/08/20 周三 16:30-18:00
张昆鹏 HCDG北京核心组代表
HDC2025期间,华为云展示了Serverless与MCP融合创新的解决方案,本期访谈直播,由华为云开发者专家(HCDE)兼华为云开发者社区组织HCDG北京核心组代表张鹏先生主持,华为云PaaS服务产品部 Serverless总监Ewen为大家深度解读华为云Serverless与MCP如何融合构建AI应用全新智能中枢
回顾中 -
关于RISC-V生态发展的思考2025/09/02 周二 17:00-18:00
中国科学院计算技术研究所副所长包云岗教授
中科院包云岗老师将在本次直播中,探讨处理器生态的关键要素及其联系,分享过去几年推动RISC-V生态建设实践过程中的经验与教训。
回顾中 -
一键搞定华为云万级资源,3步轻松管理企业成本2025/09/09 周二 15:00-16:00
阿言 华为云交易产品经理
本直播重点介绍如何一键续费万级资源,3步轻松管理成本,帮助提升日常管理效率!
回顾中
热门标签