• [问题求助] 【Kafka产品】Apache Kylin跑sample-streaming.sh报错求助
    【功能模块】kafka【操作步骤&问题现象】参考apache kylin对接文档,做sample-streaming.sh用例的时候在kylin.log遇到报错【截图信息】【日志信息】(可选,上传日志内容或者附件)2020-12-16 15:23:18,010 ERROR [http-nio-7070-exec-5] controller.CubeController:434 : Timeout expired while fetching topic metadataorg.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata2020-12-16 15:23:18,014 ERROR [http-nio-7070-exec-5] controller.BasicController:63 :org.apache.kylin.rest.exception.InternalErrorException: Timeout expired while fetching topic metadata        at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:435)        at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:418)
  • [赋能学习] Flink 1.10通过TableAPI读取kafka案例
    Flink 1.10 读取安全模式kafka Demo ``` public class FromKafkaToFile { public static void main(String[] args) throws Exception { EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,fsSettings); fsTableEnv.connect( new Kafka().version("0.10").topic("flinkTest") .property("bootstrap.servers","8.5.167.1:21007") .property("security.protocol","SASL_PLAINTEXT") .property("sasl.kerberos.service.name","kafka") .property("zookeeper.connect","8.5.167.1:24002/kafka") ).withFormat( new Csv() ).withSchema(new Schema() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) ) .createTemporaryTable("kafkaTable"); Table table = fsTableEnv.from("kafkaTable"); TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO); fsTableEnv.toAppendStream(table, tupleTypeInfo).print(); fsTableEnv.execute("KafkaToPrint"); fsEnv.execute(); } } ``` 发送测试数据到topic ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202012/15/19260991dovd6bwgrzvkbg.png) 查看TaskManager输出 ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202012/15/192717kibnavlinkbdcc4t.png)
  • [技术干货] kafka+zookeeper单机部署及验证
    软件介绍:ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、命名服务、分布式同步、组服务等。Kafka的运行依赖ZooKeeper。安装环境:本文主要写下单机形式的kafka+ zookeeper部署和使用。使用1台华为云服务器,配置信息如下:型号:4vCPUs | 16GB | kc1.xlarge.4操作系统:CentOS 8.0 64bit with ARM安装步骤:1 安装openjdkjava -version 查看当前环境是否安装,如果环境已安装就跳过该步骤 如果没有安装,则依次执行命令:安装:yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel修改配置文件vim /etc/profile,在文档末尾添加:JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk   --路径按照实际情况填写PATH=$JAVA_HOME/bin:$PATHexport JAVA_HOME PATH,后保存退出使生效source /etc/profile2 安装mavenmvn -v 查看当前环境是否安装,如果环境已安装就跳过该步骤 如果没有安装,则依次执行命令:获取:wget https://archive.apache.org/dist/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz解压:tar -zxf apache-maven-3.5.4-bin.tar.gz修改配置文件vim /etc/profile,在文档末尾添加:MAVEN_HOME=/home/apache-maven-3.5.4  --路径按照实际情况填写PATH=$MAVEN_HOME/bin:$JAVA_HOME/bin:$PATHexport MAVEN_HOME JAVA_HOME PATH,后保存退出使生效source /etc/profile修改maven配置,修改Maven配置文件中的:本地仓路径、远程仓等配置文件路径在解压的conf目录下,我的是: /home/apache-maven-3.5.4/conf/settings.xml<mirror> <id>huaweimaven</id> <name>huawei maven</name> <url>https://mirrors.huaweicloud.com/repository/maven/</url> <mirrorOf>central</mirrorOf></mirror>3 安装gradle下载安装包:wget https://downloads.gradle.org/distributions/gradle-4.10-bin.zip解压:unzip gradle-4.10-bin.zip修改配置文件vim /etc/profile,在文档末尾添加:export GRADLE_HOME=/home/gradle-4.10   --路径按照实际情况填写export PATH=$GRADLE_HOME/bin:$PATH,后保存退出使生效source /etc/profile4 安装zookeeper①下载安装包:wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz本次使用的zookeeper3.4.12版本②解压并进入目录tar zxvf zookeeper-3.4.12.tar.gz        cd zookeeper-3.4.12③修改配置文件cp conf/zoo_sample.cfg conf/zoo.cfgvim conf/zoo.cfg,进行修改dataDir=/tmp/zookeeper/datadataLogDir=/tmp/zookeeper/log④修改配置文件vim /etc/profile,在文档末尾添加:export ZOOKEEPER_INSTALL=/home/zookeeper-3.4.12/   --路径按照实际情况填写export PATH=$PATH:$ZOOKEEPER_INSTALL/bin,后保存退出使生效source /etc/profile⑤启动验证zookeeper:    启动:sh zkServer.sh start    查看状态:sh zkServer.sh status    停止:sh  zkServer.sh stop5 源码编译kafka①下载安装包,并将安装包上传至服务器自定义目录。本次使用的kafka2.1.0版本下载地址为:https://archive.apache.org/dist/kafka/2.1.0/kafka-2.1.0-src.tgz②解压并进入目录tar -zxvf kafka-2.1.0-src.tgz         cd kafka-2.1.0-src③修改build文件vim build.gradle在repositories标签下内容添加多个maven仓库maven {     url "https://mirrors.huaweicloud.com/kunpeng/maven"   }   maven {     url "https://mirrors.huaweicloud.com/repository/maven"   }   maven {     url "https://plugins.gradle.org/m2/"   }   mavenCentral()④进行编译gradle -g /$UserHome/gradleRepository releaseTarGz -info⑤编译成功,会在对应目录:/home/kafka-2.1.0-src/core/build/distributions/     生成.tgz包6 配置kafka①进入生成.tgz的目录,cd /home/kafka-2.1.0-src/core/build/distributions/②解压并进入目录,tar -zxvf  kafka_2.11-2.1.0.tgz    cd /home/kafka_2.11-2.1.0/config③修改配置文件,vim server.properties  增加,host.name=hostname   zookeeper.connect=localhost:2181 listeners=PLAINTEXT://:9092④进入kafka安装目录,cd /home/kafka_2.11-2.1.0  启动,bin/zookeeper-server-start.sh config/zookeeper.properties &bin/kafka-server-start.sh config/server.properties &⑤检查进程启动情况,jps当Kafka、QuorumPeerMain两个进程启动成功,整个Kafka服务启动完成测试验证①创建topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test②生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test③获取数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test,也可以一次性消费之前所有的消息:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning④启动一个生产者,启动一个消费者,在生产者控制台输入字符串,可以看到消费者控制台进行消费并打印效果展示如图,生产者:消费者:
  • [赋能学习] 使用python连接有kerberos认证的kafka指导
    ## 一、测试环境说明 FusionInsight集群:版本8.0.2 开启安全模式 客户端系统类型:CentOS Linux release 7.6.1810 x86_64位 集群和客户端均位于无法连接互联网的内网环境 ## 二、配置步骤 **本章节所有步骤在客户端节点执行** ### 2.1 配置yum源 已有可用yum源的可忽略2.1章节 1. 上传系统安装镜像文件 CentOS-7.6-x86_64-DVD-1810.iso 到客户端节点,例如/opt目录 2. 将镜像文件挂载到/media目录 ``` mount -o loop /opt/CentOS-7.6-x86_64-DVD-1810.iso /media ``` 3. 配置/etc/yum.repos.d/目录下的repo文件,保证该目录下只有一个以.repo结尾的文件,其内容如下 ``` [base] name=file baseurl=file:///media enabled=1 gpgcheck=0 ``` 4. 配置完毕后检查下yum源,如图执行yum repolist命令 ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202012/10/164449p1ncagerkbfi3tjv.png) ### 2.2 安装python3 从python官网下载python3源码包,本例中下载的链接为 https://www.python.org/ftp/python/3.7.8/Python-3.7.8.tgz 将获取的Python-3.7.8.tgz上传到/opt目录下,安装命令如下 ``` yum -y install zlib-devel bzip2-devel openssl-devel openssl-static ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libcap-devel xz-devel libffi-devel lzma gcc cd /opt rm -rf /tmp/Python-3.7.8 tar -zxvf Python-3.7.8.tgz -C /tmp mv /tmp/Python-3.7.8 /usr/local/python-3.7 cd /usr/local/python-3.7/ ./configure --prefix=/usr/local/sbin/python-3.7 make && make install ln -sv /usr/local/sbin/python-3.7/bin/python3 /usr/bin/python3 ln -sv /usr/local/sbin/python-3.7/bin/pip3 /usr/bin/pip3 ``` ### 2.3 安装kafka-python **说明:安装该依赖之前切记不可执行 FusionInsight 客户端环境变量,即不能执行 source /opt/client/bigdata_env** 附件中的安装包都是从pypi官网下载,可直接使用,或者到pypi下载 ***<u>切记:如下依赖包的安装顺序必须是 six > decorator > gssapi > kafka-python</u>*** 1. 获取依赖包,pypi下载链接如下 six https://pypi.org/project/six/#files decorator https://pypi.org/project/decorator/#files gssapi https://pypi.org/project/gssapi/#files kafka-python https://pypi.org/project/kafka-python/#files 2. 安装依赖six ``` tar -zxvf six-1.15.0.tar.gz cd six-1.15.0 python3 setup.py install ``` 3. 安装decorator ``` tar -zxvf decorator-4.4.2.tar.gz cd decorator-4.4.2 python3 setup.py install ``` 4. 安装gssapi ``` tar -zxvf gssapi-1.6.11.tar.gz cd gssapi-1.6.11 python3 setup.py install ``` 5. 安装kafka-python ``` tar -zxvf kafka-python-2.0.2.tar.gz cd kafka-python-2.0.2 python3 setup.py install ``` ### 2.4 安装FusionInsight客户端 本步骤略,客户端安装目录自定义,例如/opt/client ## 三、验证 ### 3.1 执行kerberos认证 参考命令 ``` source /opt/client/bigdata_env kinit -k -t user.keytab developuser ``` ### 3.2 创建python测试脚本,内容参考如下 注意bootstrap_servers和sasl_kerberos_domain_name根据实际环境进行修改 ``` #!/usr/bin/env python3 # coding=utf-8 import time from kafka import KafkaProducer from kafka import KafkaConsumer def kafka_python_producer_main(): producer = KafkaProducer(bootstrap_servers='10.162.26.137:21007', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', sasl_plain_username='KafkaClient') producer.send('testTopic3', 'kafka python test'.encode('utf-8')) producer.flush() producer.close() print('producer done') def kafka_python_consumer_main(): consumer = KafkaConsumer('testTopic3', bootstrap_servers='10.162.26.137:21007', group_id='kafka-test-20191014', auto_offset_reset='earliest', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', sasl_plain_username='KafkaClient') for msg in consumer: print(msg.value) print(msg.partition) print('consumer done') if __name__ == '__main__': kafka_python_producer_main() time.sleep(1) kafka_python_consumer_main() ``` 3.3 执行测试脚本,查看执行结果 ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202012/10/164828mnus548bykwullyl.png) 通过kafka自带脚本验证结果: ![image.png](https://bbs-img-cbc-cn.obs.cn-north-1.myhuaweicloud.com/data/attachment/forum/202012/10/164955yustuehfqtuti1to.png) ## 更多kafka-python使用参考: https://kafka-python.readthedocs.io/en/master/usage.html
  • [问题求助] 是否有RabbitMQ和KAFKA的宣传胶片
    是否有RabbitMQ和KAFKA的宣传胶片
  • [问题求助] 大数据架构实验课:综合实验5-实时检索-NoNodes Available问题
    【操作步骤&问题现象】对应实验指导书82页左右,编写ElasticSearch工具类,在测试时就跑不通了。目前遇到问题为“NoNodes Available”【截图信息】【日志信息】(可选,上传日志内容或者附件)疑似问题原因1:端口配置问题网络端口显示为9200,配置为9300。不过网上有说就是按tcp接口9300来的。也有说9301,都进行过修改,目前无效。2:ZK连接地址问题指导书上未指明KafKa地址还是Zookeeper地址。进入控制台后发现实验要求购买的分析集群只有Zookeeper组件,没有Kafka组件。于是先按Zookeeper地址填写的。但是问题在于,本实验是实时实时检索实验,我觉得此处应填写流式集群配置的Kafka组件地址。在购买的时候也应该买混合集群。
  • [赋能学习] MRS3.0.2版本 二次开发(18/27): Flink读写Kafka样例
    # 视频在本帖二楼 ## 一、Flink简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 ## 二、Flink客户端准备,可参考MRS二次开发(15/27)Flink构造DataStream样例,有详细的视频介绍 **Kerberos认证** 从服务器上下载用户keytab,并将keytab放到Flink客户端所在主机的某个文件夹下。 修改客户端配置文件/opt/client/Flink/flink/conf/flink-conf.yaml内容: ``` keytab路径。例如:security.kerberos.login.keytab: /opt/testclient/flinkuser.keytab principal名。例如:security.kerberos.login.principal: flinkuser 对于HA模式,如果配置了ZooKeeper,还需要设置ZK kerberos认证相关的配置。配置如下: zookeeper.sasl.disable: false security.kerberos.login.contexts: Client 如果用户对于Kafka client和Kafka broker之间也需要做kerberos认证,配置如下: security.kerberos.login.contexts: Client,KafkaClient ``` **Security Cookie认证** 1. 用户需要获取SSL证书,放置到Flink客户端中。 参考 拷贝其中代码生成generate_keystore.sh脚本,放置在Flink客户端的bin目录下。 在客户端目录下执行source bigdata_env。 执行命令“sh generate_keystore.sh ”即可,例如“sh generate_keystore.sh 123456",会在Flink客户端的conf目录下生成flink.keystore,flink.truststore文件。 在Flink客户端同级目录下新建ssl目录,例如“/opt/testclient/Flink/flink/ssl”,将生成的flink.keystore,flink.truststore文件拷贝到ssl目录中。 1. 获取证书后,在Flink客户端的conf目录下配置文件“flink-conf.yaml”中将以下配置项进行赋值。 将配置项“security.ssl.keystore”设置为keystore文件的相对路径,例如“ssl/flink.keystore” 将配置项“security.ssl.truststore”设置为truststore文件的相对路径,例如“ssl/flink.truststore” 将配置项“security.cookie”设置为一串密码,该密码可以是一串随机规则密码,可以取默认 使用Manager明文加密API进行获取密文:curl -k -i -u 用户名:密码 -X POST -HContent-type:application/json -d '{"plainText":""}' '';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。 将配置项“security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”设置为如上指令获取到的密文。 2. 打开“Security Cookie”开关,配置“security.enable: true”,查看“security cookie”是否已配置成功。 #### 加密传输 配置SSL传输,用户主要在客户端的“flink-conf.yaml”文件中做如下配置: 1. 打开SSL开关和设置SSL加密算法,配置如下: security.ssl.enabled:true akka.ssl.enabled: true blob.service.ssl.enabled: true taskmanager.data.ssl.enabled: true security.ssl.algorithms: TLS_DHE_RSA_WITH_AES_128_CBC_SHA256,TLS_DHE_DSS_WITH_AES_128_CBC_SHA256 2. 配置keystore或truststore文件路径为相对路径时,Flink Client执行命令的目录需要可以直接访问该相对路径。 在Flink的CLI yarn-session.sh命令中增加“-t”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/yarn-session.sh **-t** ssl/ ” 在Flink run命令中增加“-yt”选项来传输keystore和truststore文件到各个执行节点。例如“./bin/flink run **-yt** ssl/ -ys 3 -yn 3 -m yarn-cluster -c com.huawei.SocketWindowWordCount ../lib/flink-eg-1.0.jar --hostname r3-d3 --port 9000” 3. 配置keystore或truststore文件路径为绝对路径时,需要在Flink Client以及Yarn各个节点的该绝对路径上放置keystore或truststore文件。 执行命令中不需要使用“-t”或“-yt”来传输keystore和truststore文件。 4. 将客户端安装节点的业务ip和manager界面浮动ip追加到jobmanager.web.allow-access-address配置中用“,”隔开 ## 三、样例背景 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: 实时统计总计网购时间超过2个小时的女性网民信息。周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 ## 四、样例调试 前提:Linux环境有安装集群客户端,环境准备,参考第一课 数据规划: Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 1. 创建Topic。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true” 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。 在kafka客户端目录执行如下指令,ZooKeeper集群信息以自己的为准: ``` bin/kafka-topics.sh --create --zookeeper 10.244.231.91:24002,10.244.230.245:24002,10.244.230.229:24002/kafka --partitions 5 --replication-factor 1 --topic topic3 ``` 2. Kerberos认证配置 修改flink客户端flink-conf.yaml如下配置: security.kerberos.login.keytab: /opt/testclient/user.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false 调试步骤: 1. 比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟 2. 检查linux环境的JDK版本为1.8 3. 配置linux环境的/etc/hosts文件 4. 检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息 5. 在IDEA打开样例代码的FlinkKafkaJavaExample目录,检查SDK配置 6. 默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕 7. 在IDEA主页面,选择“File > Project Structures...”进入“Project Structure”页面。 8. 在“Project Structure”页面,选择“Artifacts”,单击“+”并选择“JAR > Empty”。 9. 根据实际情况设置Jar包的名称、类型以及输出路径。 10. 选择“Put into Output Root”。然后单击“Apply”。 11. 在IDEA主页面,选择“Build > Build Artifacts...”。在弹出的菜单中构建我们的jar包 12. 从IDEA项目out目录下的获取到Jar包,拷贝到Flink客户端目录,如“/opt/testclient/Flink/flink”。 13. 将如下包拷到"/opt/testclient/Flink/flink/lib"目录下,若已存在则忽略。 - kafka-clients-2.4.0-hw-ei-302002.jar - flink-connector-kafka_2.11-1.10.0-hw-ei-302002.jar - flink-connector-kafka-base_2.11-1.10.0-hw-ei-302002.jar - flink-dist_2.11-1.10.0-hw-ei-302002.jar 14. 在Linux环境中运行Flink应用程序,需要先启动Flink集群。在Flink客户端下执行yarn session命令,启动flink集群。 例如我们使用ssl相对路径指令为“bin/yarn-session.sh **-t** ssl/ -jm 1024 -tm 1024” 15. 在终端另开一个窗口,进入Flink客户端目录,调用bin/flink run脚本运行代码,例如: a. 生产数据的执行命令启动程序: ``` bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` b. 消费数据 ``` bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/testclient/Flink/flink/FlinkKafkaJavaExample.jar --topic topic3 -bootstrap.servers 10.244.230.213:21007,10.244.231.48:21007,10.244.231.130:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop_arm_802.com ``` 16. 使用Flink Web页面查看Flink应用程序运行情况。 ## 五、问题互动渠道 FusonInsight 论坛入口 https://bbs.huaweicloud.com/forum/forum-1103-1.html
  • [技术干货] 【转载】解惑“高深”的Kafka时间轮原理,原来也就这么回事!
    【摘要】 Kafka时间轮是Kafka实现高效的延时任务的基础,它模拟了现实生活中的钟表对时间的表示方式,同时,时间轮的方式并不仅限于Kafka,它是一种通用的时间表示方式,本文主要介绍Kafka中的时间轮原理。Kafka中存在一些定时任务(DelayedOperation),如DelayedFetch、DelayedProduce、DelayedHeartbeat等,在Kafka中,定时任务的添加、轮转、执行、消亡等是通过时间轮来实现的。(时间轮并不是Kafka独有的设计,而是一种通用的实现方式,Netty中也有用到时间轮的方式)1. 时间轮是什么参考网上的两张图(摘自 https://blog.csdn.net/u013256816/article/details/80697456)这两张图就比较清楚的说明了Kafka时间轮的结构了:类似现实中的钟表,由多个环形数组组成,每个环形数组包含20个时间单位,表示一个时间维度(一轮),如:第一层时间轮,数组中的每个元素代表1ms,一圈就是20ms,当延迟时间大于20ms时,就“进位”到第二层时间轮,第二层中,每“一格”表示20ms,依此类推…对于一个延迟任务,大体包含三个过程:进入时间轮、降级和到期执行。进入时间轮1. 根据延迟时间计算对应的时间轮“层次”(如钟表中的“小时级”还是“分钟级”还是“秒级”,实际上是一个不断“升级”的过程,直到找到合适的“层次”)2. 计算在该轮中的位置,并插入该位置(每个bucket是一个双向链表,可能包含多个延迟任务,这也是时间轮提高效率的一大原因,后面会提到)3. 若该bucket是首次插入,需要将该bucket加入DelayQueue中(DelayQueue的引入是为了解决“空推进”,后面会提到)降级当时间“推进”到某个bucket时,说明该bucket中的任务在当前时间轮中的时间已经走完,需要进行“降级”,即进入更小粒度的时间轮中,reinsert的过程和进入时间轮是类似的到期执行1. 在reinsert的过程中,若发现已经到期,则执行这些任务整体过程大致如下:2. 时间的“推进”一种直观的想法是,像现实中的钟表一样,“一格一格”地走,这样就需要有一个线程一直不停的执行,而大多数情况下,时间轮中的bucket大部分是空的,指针的“推进”就没有实质作用,因此,为了减少这种“空推进”,Kafka引入了DelayQueue,以bucket为单位入队,每当有bucket到期,即queue.poll能拿到结果时,才进行时间的“推进”,减少了 ExpiredOperationReaper 线程空转的开销。3. 为什么要用时间轮用到延迟任务时,比较直接的想法是DelayQueue、ScheduledThreadPoolExecutor 这些,而时间轮相比之下,最大的优势是在时间复杂度上:时间复杂度对比:因此,理论上,当任务较多时,TimingWheel的时间性能优势会更明显总结一下Kafka时间轮性能高的几个主要原因:(1)时间轮的结构+双向列表bucket,使得插入操作可以达到O(1)的时间复杂度(2)Bucket的设计让多个任务“合并”,使得同一个bucket的多次插入只需要在delayQueue中入队一次,同时减少了delayQueue中元素数量,堆的深度也减小,delayqueue的插入和弹出操作开销也更小
  • [全栈开发者] 【学习笔记】springboot使用kafka收发消息
    springboot使用kafka收发消息参考:https://www.cnblogs.com/lixianguo/p/13254915.html实现实体分离的序列化和反序列化操作springboot里建立模组maven分别建立consumer和producer以及common公共模块对consumer和producer都建立springboot启动类common的application-common.yml全局配置spring:  kafka:    #kafka配置    bootstrap-servers: localhost:9092    producer:      retries: 0      # 每次批量发送消息的数量      batch-size: 16384      buffer-memory: 33554432      # 指定消息key和消息体的编解码方式      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      # 指定默认消费者group id      group-id: test-consumer-group      auto-offset-reset: earliest      enable-auto-commit: true      auto-commit-interval: 5000      # 指定消息key和消息体的编解码方式      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    #自己定义的主题名称,在微服务中使用Value注解注入调用,如果kafka中没有该主题,则会自动创建    template:      default-topic: springkafka  #    topic:#      userTopic: springkafka特别注意配置producer的application.ymlserver:  port: 8088spring:  application:    name: kafka-producer  profiles:    active: common  kafka:    consumer:      auto-offset-reset: latest      enable-auto-commit: true      group-id: consumer      bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}    producer:      bootstrap-servers: localhost:9092配置consumer的application.ymlserver:  port: 8089spring:  application:    name: kafka-consumer  profiles:    active: common  kafka:    consumer:      auto-offset-reset: latest      enable-auto-commit: true      group-id: consumer      bootstrap-servers: ${spring.kafka.producer.bootstrap-servers}    producer:      bootstrap-servers: localhost:9092消息分发客户端通过kafka直接进行消息接收,从而实现消息的分发  参考Spring Cloud Data Flow 的Message类及定义内容。  所有消息都会带有ClassName,如前面文件中的数据表,这明确表达了这个消息是什么对象,后续的用户操作是什么也很明确。RESTFul规范中,可以返回用户可以选择的所有下一步操作(Actions),增加灵活性。批量获取数据分发配置详解批量并发:配置文件:#kafka配置信息kafka:  producer:    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667    batch-size: 16785                                   #一次最多发送数据量    retries: 1                                          #发送失败后的重复发送次数    buffer-memory: 33554432                             #32M批处理缓冲区    linger: 1  consumer:    bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667    auto-offset-reset: latest                           #最早未被消费的offset earliest    max-poll-records: 3100                              #批量消费一次最大拉取的数据量    enable-auto-commit: false                           #是否开启自动提交    auto-commit-interval: 1000                          #自动提交的间隔时间    session-timeout: 20000                              #连接超时时间    max-poll-interval: 15000                            #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。    max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15M  listener:    batch-listener: true                                #是否开启批量消费,true表示批量消费    concurrencys: 3,6                                   #设置消费的线程数    poll-timeout: 1500                                  #只限自动提交,有关配置earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 Java代码(并发、批量获取)Kafka消费者配置类 批量获取关键代码: ①factory.setBatchListener(true); ②propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,50); 并发获取关键代码: factory.setConcurrency(concurrency);
  • [Java] 导致kafka运行变慢的情况
    cpu性能瓶颈磁盘读写瓶颈网络瓶颈
  • [技术干货] 启用LVM特性的MRS Kafka集群磁盘在线扩容,业务不中断
    MRS 1.8.5及以后的版本,都支持在流式节点上开启LVM特性。LVM特性能有效防止kafka多磁盘场景下topic中因为数据不均导致某个partition流量特别大进而磁盘被写爆。同时开启LVM以后可以做到不重启系统、服务或组件的情况下实现磁盘平滑扩容,保证业务的连续性。详情请点击博文链接:https://bbs.huaweicloud.com/blogs/174810
  • [Java] kafka的两种数据保留的策略
    按照过期时间保留按照存储的消息大小保留
  • [热门活动] 华为云高校开发者青年班第五期_14天探索云原生,玩转Redis、Kafka
    亲爱的同学们~华为云高校开发者青年班活动第五期正式上线!亮点:本期课程以学习小组形式开展,先报名后组队哦!往下翻了解详情~ 华为云开发者青年班,面向高校学生,顶级专家带队,助你提升求职竞争力!本期活动:华为云高校开发者青年班,学、练、赛三部曲,带你探索云原生,轻松玩转Redis、Kafka报名流程:https://developer.huaweicloud.com/hero/thread-71447-1-1.html1、点击上方网址链接报名活动2、加入开发者青年班活动交流群,获取最新活动态。3、点击下方链接预约本期课程>>点击预约课程获取报名奖励 进入页面点击加入课题即可预约成功,课程内容活动开始后将陆续开放,完成活动报名+课程预约后,即可获得华为云定制水杯或华为云三合一数据线奖励组队说明:本期课程将采用组队学习的方式,以 5 人学习小队作为基础单位,统一进行报名、考核和评奖。每个小队需选出 1 位队长,负责统筹小组名单、发起讨论、组织观看直播、答疑、督促队员完成打卡等各环节任务,以确保小组学习效率和质量。而在最后的优秀成绩评奖环节中,优秀队长也将获得一份特别礼物哦。>>点击查看组队详情  快来召唤你的队友吧!!! 往下看还有更多惊喜福利为什么要加入华为云开发者青年班???在这里,你将:有机会获得万元实物奖励以及码豆奖励有机会免费领取华为云云服务器、Redis、Kafka资源代金券有机会得到华为云高级技术专家的指导与答疑;有机会获得华为2020HC大会门票,带着作品参加华为年度科技盛会——全联接大会,面向全球开发者和数万名华为客户展示作品;有机会成为华为云学生精英,直通HERO高校联盟盟主队,享受更多权益; 活动内容简介:通过本期青年班,让你14天从入门到实践,掌握华为云分布式缓存服务Redis(DCS)和华为云分布式消息服务Kafka,循序渐进一站式学习加实操,加深对知识点的认知。还有详细的实验手册,更能让同学们在操作时得心应手。第一周的课程结合华为云分布式缓存服务Redis(DCS),从入门到实践应用,循序渐进一站式学习。5节精品课,囊括DCS入门、proxy集群和原生集群、大key热key在线分析、读写分离、在线迁移等内容,让你系统性掌握分布式缓存服务Redis。第二周的课程结合华为云分布式消息服务Kafka,从基础原理入门到实践操作,循序渐进一站式学习。5节精品课,囊括创建topic,生产消息、消费消息,编写生产/消费代码,Kafka服务架构机制、常用工具使用等内容,让你系统性掌握Kafka。每日内容 全程专家指导8月14日-9月6日招募期活动宣传招募期,大家可以邀请身边同学报名组队学习哦~8月31日开班直播华为云带你探索云原生,轻松玩转Redis、Kafka9月1日直播课华为云高级专家讲解Redis课程任务+发放资源代金券9月1日-9月6日《Redis全景实践课》学习课程  第1章—分布式缓存服务Redis入门及特性介绍  第2章—proxy集群和原生集群  第3章—大key、热key在线分析  第4章—集群读写分离和主从读写分离  第5章—在线迁移(全量和增量)9月7日直播课华为云高级专家讲解本周课程任务+发放资源代金券9月7日-9月13日《kafka全景实践课》学习课程  第1章—Kafka入门基础知识  第2章—Kafka生产机制与实践  第3章—Kafka消费机制与实践  第4章—Kafka服务架构与机制  第5章—Kafka常用工具介绍9月14日-9月18日毕业测验开发者青年班——结业考试结业证书发放【留下实战记录】 每日课程或实践任务,均在华为云开发者青年班HERO联盟中报名打卡并进行跟帖反馈指定任务截图【资源说明】本次活动为免费活动,活动中涉及全部资源,直播开课后会将免费资源以代金券形式发到大家的华为云账号中。注意:发放资源代金券前同学们需要先完成实名认证+学生认证,否则无法正常领取哦!实名认证入口:https://account.huaweicloud.com/usercenter/#/accountindex/realNameAuth学生认证入口:https://account.huaweicloud.com/usercenter/#/accountindex/studentAuth特别提醒:本次活动以每天发放任务的形式在活动交流群内发布,同学们需在规定时间内完成学习任务并成功打卡,坚持打卡的同学才有机会获得结业证书哦~ 活动奖项:最佳成绩奖:第一名小组5人:华为云定制机械键盘+华为云官网个人展示机会,组长额外获得40000码豆奖励+华为HC大会门票+HERO高校联盟文化衫+HERO高校联盟棒球帽第二名小组5人:华为云定制机械键盘+华为云官网个人展示机会,组长额外获得20000码豆奖励+华为HC大会门票+HERO高校联盟文化衫+HERO高校联盟棒球帽第三名小组5人:华为云定制鼠标+华为云官网个人展示机会,组长额外获得10000码豆奖励+华为HC大会门票+HERO高校联盟文化衫+HERO高校联盟棒球帽注:坚持完成每日打卡,并完成结业考试,考试分数前三名小组同学均可获得奖励。优秀个人奖:华为云定制无线鼠标+华为云官网个人展示机会注:优秀个人奖会根据活动宣传、群内互动答疑、课程学习、任务打卡等环节积极主动的同学表现进行发放哦直播互动奖:华为云三合一数据线注:本次活动中有三场直播,在直播间参与互动问答的同学可获得项目完成奖:华为云开发者青年班结业证书注:完成本次开发者青年班全部课程并通过测验的同学均可获得华为云开发者青年班结业证书活动参与奖:报名即可获得华为云定制水杯/华为云三合一数据线二选一+云服务器、Redis、Kafka免费资源代金券!活动邀请奖:邀请15人,可获得华为云定制三合一数据线+荣耀手环4 Running版邀请20人,华为云定制文件收纳包+荣耀魔方蓝牙音箱邀请30人,可获得华为云定制折叠双肩包+华为智能体脂秤 WiFi版 白色邀请40人,华为云高级定制书包+罗技MK470键鼠套装 无线全尺寸键鼠套装 白色 带无线2.4G接收器邀请50人,华为云定制键盘+华为云高级定制书包+罗技G502 Hero主宰者RGB炫光游戏鼠标 全线升级Hero引擎 16000DPI 黑色邀请排名奖:参与活动邀请同学还可有机会获得邀请排名奖励哦,详细如下:邀请人数第1名,可获得HUAWEI nova 7 5G 前置3200万像素高清自拍 后置6400万变焦四摄 麒麟985 5G SoC 芯片 8GB+128GB 全网通版(绮境森林)价值:2999元 邀请人数第2~3名,可获得荣耀平板6 10.1英寸 4GB+128GB WiFi价值:1699元 邀请人数第4`6名,HUAWEI FreeBuds 3 无线耳机(陶瓷白)价值:899元>>点我获取个人专属活动邀请链接 
  • [问题求助] 使用DMS Kafka优化消费者poll demo跑不起来
    Kafka 后台可以登录进去这是哪配置问题
  • [教程] 【开发者最佳实践挑战】第4关任务:使用DMS Kafka优化消费者poll
    【重要公告】《开发者最佳实践挑战营》第八期活动已结束打卡名单及邀请名单已公示见:FAQ帖>>>注:统计截止2020年12月31日24:00前打卡数据为保证您顺利领取活动奖品,请全通关和获得邀请TOP奖的伙伴提前填写下方奖品收货信息链接。填写截止时间为1月8日,如您没有填写,视为放弃奖励填写地址请戳我>>欢迎参加华为云“开发者最佳实践挑战营”第八期这是本次挑战营的第4关,坚持闯关成功有机会获12900码豆/机械键盘/背包等精美好礼!本期活动截止12月31日。注意:参与闯关前,请确保已报名加入活动群并领取实践资源,如未入群请添加小助手微信(hwpaas01),回复“最佳实践”进入学习交流群!点击这里了解活动详情>>  | 点击这里查看活动FAQ>>为避免无法发放码豆,请从未登录过会员中心的用户需提前登陆下DevCloud会员中心在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 (1)领取实践资源:点击这里免费领取1个月Kafka体验规格实例,可用区3/5已售罄,请选择2。提示:实践活动提供的免费Kafka实例没有开SASL,在配置时需做一定修改,见→FAQ第12条。(2)最佳实践指南:点击查看实践指南>>(3)视频操作演示:点击查看实践视频演示>>任务一:创建DMS Kafka实例,查看实例详情并截图,截图需包含右上角华为云账号名,并按回帖格式要求在本帖中回帖;任务二:根据实践指南完成实践操作(可查看实践视频演示),将代码截图,将运行结果截图,并按回帖格式要求在本帖中回帖;注:已参加过上一期在本帖已闯关完成打卡的用户参与第八期通本关将不在发放通关奖励1.【通关、参与奖励&规则】点击这里填写报名登记表,完成提交可获100码豆(已提交的可忽略)① 参与奖励:每关完成创建实例可获100码豆,共8关最高可获得800码豆;② 通关奖励:通过2关赠送:2000码豆(可选关)通过4关赠送:4000码豆(可选关)通过6关赠送:6000码豆(可选关)通过8关赠送:8000码豆(全通关)取通关最高数发放码豆,全通关可额外获得旅行本(套装)1本(限量100本取最先全通关者发放)和“开发者最佳实践挑战达人”荣誉证书;全通关用户点击填写问卷,以便证书发放;2.【邀请奖励】:每邀请1位好友并成功完成1关或多关实践并打卡,可额外获得1000码豆(最高可获得3000码豆);3.【邀请TOP榜奖励】:邀请好友并完成1关或多关实践并打卡,邀请榜前3可获得机械键盘(雷柏)1个,邀请榜4-10名可获得华为云定制背包1个(如有并列取最先邀请完成打卡者)4.【分享奖励】:点击这里进入分享活动(最高可获1000码豆)>>>5.【邀请有礼】:点击这里进入邀请好友报名活动瓜分30万码豆,(最高可获得66666码豆)>>>活动结束后5个工作日内公示通关名单,15个工作日内发放码豆奖励; 码豆有什么用?码豆为虚拟货币可用于在华为云码豆会员中心-兑换商城中进行实物/虚拟礼品的兑换(礼品会不定期更换)兑换礼品点击查看:华为云专属码豆商城>>> 请务必按照以下格式要求进行回帖,否则无法计算奖励:1、华为云账号:xxx(即右上角的字母数字组合ID)2、邀请人华为云账号:xxx(即右上角的字母数字组合ID,如无就不填)3、实践感想:(如对课程内容、产品体验的建议或感想等)4、体验任务截图:(打卡样例图)注:华为云账号请勿填错,如填错码豆无法发放到账。实践截图:至少包含(a)实例详情截图、(b)代码截图、(c)运行结果截图三张截图。【快速传送门】第1关任务:基于API网关的电话号码归属地查询第2关任务:使用函数工作流服务为图片打水印第3关任务:使用Redis实现排行榜功能第4关任务:使用DMS Kafka优化消费者poll第5关任务:使用CPTS进行电商网站性能测试第6关任务:基于ServiceStage的天气预报应用部署第7关任务:使用CloudIDE在云端环境开发AI交付实操第8关任务:使用ModelArts实现零代码美食分类模型开发>>点击进入:开发者最佳实践挑战营第八期活动页>>点击进入:活动FAQ帖>>点击进入:邀请有礼,瓜分30万码豆,最高可获66666码豆!>>点击进入:分享任务,获码豆换好礼!