• [技术干货] window下搭建kafka运行环境
    背景Kafka作为流式处理的消息中间件,应用场景极其广泛。那么在使用过程中,遇到不明所以的报错信息,无法很好的定位问题根因。此时,若本地有一个套kafka的运行环境,就可以进行本地代码调试。一方面,能够快速定位问题根因;另一方面,通过调试kafka源码,可以很好地熟悉kafka代码背后的原理。因此,一套本地kafka运行环境对kafka运维人员是必不可少的。搭建流程 搭建步骤软件版本:jdk1.8.xscala:2.11.12gradle:3.1zookeeper:3.5.1kafka:1.1.0环境:windows单机,一个zookeeper实例,一个kafka实例一、      安装前准备1.    安装jdk去JDK官网下载jdk,然后在window上安装,此处略。2.    安装scala从Scala官网地址http://www.scala-lang.org/download/下载Scala的windows安装包,验证:3.    安装gradle从gradle官网https://gradle.org/releases/下载gradle安装包,如下图所示安装完毕后,验证: 二、      安装启动zookeeper1.    下载安装包此处直接去官网下载,使用的是 zookeeper-3.5.1-alpha,解压后zookeeper家目录记为{zookeeperHome}变量2.    修改配置a)         打开{zookeeperHome}\conf,复制zoo_sample.cfg重命名成zoo.cfgb)         编辑zoo.cfg,修改dataDir为【dataDir=/zookeeper-3.5.2-alpha/data】,样例如下 tickTime=2000initLimit=10syncLimit=5dataDir=\datadataLogDir=\logclientPort=2181server.1=localhost:2287:3387同时,在dataDir目录下新建myid文件,传入一个整数值c)         添加环境变量   ZOOKEEPER_HOME          {zookeeperHome}   Path 在现有的值后面添加     ;%ZOOKEEPER_HOME%\bin; 3.    启动zookeeper打开cmd,切换到bin目录下,执行(该cmd窗口不能关闭)zkServer.cmd 若显示 JAVA_HOME is incorrectly set,则 {zookeeperHome}\bin目录下的zkEnv.cmd中第一行添加set JAVA_HOME="C:\Program Files\Java\jdk1.8.0_131" 三、      安装启动kafka1.    下载安装包下载源码包(去kafka官网下载)在build.gradle中增加国内maven库2.    修改配置a)         解压后,执行: b)   修改server.properties修改log.dirs为本地目录地址 3.    启动kafka 方式一:idea上调试  方式二:cmd启动i.      打开cmd,切换到kafka家目录C:\kafka-1.1.1-src>ii.      输入gradle命令,执行C:\kafka-1.1.1-src>gradleiii.      执行完成后,执行以下命令C:\kafka-1.1.1-src>gradlew jariv.      启动kafkaC:\kafka-1.1.1-src>.\bin\windows\kafka-server-start.bat .\config\server.properties附录安装过程采坑指南:1,直接点击运行,可能出现问题:java.lang.NoClassDefFoundError: org/apache/log4j/or/RendererMap添加slf4j.jar和log4j.jar添加到classpath: 
  • 使用Flume消费kafka topic数据并存储到HBase
    操作场景:    Flume消费kafka数据存储到HBase中。前提条件:    已创建混合集群或者流式和分析集群(集群间网络互通,如果开启kerberos,则需配置跨集群互信https://support.huaweicloud.com/usermanual-mrs/mrs_01_0354.html)。操作步骤:    1. 从HBase客户端拷贝配置文件hbase-site.xml到Flume server所在节点(流式core节点)的配置目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 下。通常可以在分析集群Master节点HBase客户端安装目录如“/opt/client/HBase/hbase/conf/”下找到hbase-site.xml文件。 拷贝完成后文件需要修改文件属组为omm:wheel。    2. 从HBase集群下载用户的认证凭据。a. 在MRS Manager,单击“系统设置”。b. 在“权限配置”区域,单击“用户管理”。c. 在用户列表中选择需要的用户,单击后面的“更多”下载用户凭据。d. 解压下载的用户凭据文件,获取krb5.conf和user.keytab文件。    3. 将上一步获得的krb5.conf和user.keytab拷贝到Flume server所在节点(流式core节点)的配置目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 下。文件属组需要修改为omm:wheel。    4. 修改配置文件jaas.conf,文件所在目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 。        修改参数“keyTab”定义的用户认证文件完整路径即步骤3中保存用户认证文件的目录。    5. 修改配置文件flume-env.sh,文件所在目录 "/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/" 。                在 “-XX:+UseCMSCompactAtFullCollection”后面,增加以下内容:-Djava.security.krb5.conf=/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/krb5.conf -Djava.security.auth.login.config=/opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/jaas.conf -Dzookeeper.request.timeout=120000                例如:"-XX:+UseCMSCompactAtFullCollection -Djava.security.krb5.conf=/opt/Bigdata/MRS_2.1.0/1_6_Flume/etc/krb5.conf -Djava.security.auth.login.config=/opt/Bigdata/MRS_2.1.0/1_6_Flume/etc/jaas.conf -Dzookeeper.request.timeout=120000"                请根据实际情况,修改配置文件路径,然后保存并退出配置文件。    6. 将HBase集群的“/etc/hosts”文件中host匹配相关内容添加到Flume server节点的“/etc/hosts”文件。    7. 重启该节点的flume实例。    8. 修改Flume配置文件“properties.properties”。                vi /opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/properties.properties                将以下内容保存到文件“properties.properties”中:             server.sources = kafka_source         server.channels = flume         server.sinks = hbase                         server.sources.kafka_source.channels = flume         server.sources.kafka_source.type = org.apache.flume.source.kafka.KafkaSource         server.sources.kafka_source.topics = flume_test         server.sources.kafka_source.groupId = flume_group         server.sources.kafka_source.batchSize = 1000         server.sources.kafka_source.kafka.security.protocol  = SASL_PLAINTEXT         server.sources.kafka_source.kafka.kerberos.domain.name = hadoop.XXX.com         server.sources.kafka_source.kafka.bootstrap.servers=XXX.XXX.XXX.XXX:21007,XXX.XXX.XXX.XXX:21007,XXX.XXX.XXX.XXX:21007                  server.channels.flume.type = memory         server.channels.flume.capacity=100000         server.channels.flume.transactionCapacity=10000         server.channels.flume.channelfullcount = 10         server.channels.flume.keep-alive = 3          server.channels.flume.byteCapacityBufferPercentage = 20                  server.sinks.hbase.channel = flume         server.sinks.hbase.type = hbase         server.sinks.hbase.table = hbase_name         server.sinks.hbase.columnFamily= info         server.sinks.hbase.batchSize = 1000         server.sinks.hbase.kerberosPrincipal = admin         server.sinks.hbase.kerberosKeytab = /opt/Bigdata/MRS_x.x.x/1_x_Flume/etc/user.keytab         请根据实际情况,修改以下参数,然后保存并退出。         kafka.bootstrap.servers         默认情况下,安全集群对应端口21007,普通集群对应端口9092。         kafka.security.protocol         安全集群请配置为SASL_PLAINTEXT,普通集群请配置为PLAINTEXT。         kafka.kerberos.domain.name         普通集群无需配置此参数。安全集群对应此参数的值为Kafka集群中“kerberos.domain.name”对应的值。具体可到Borker实例所在节点上查看“/opt/Bigdata/MRS_Current/1_X_Broker/etc/server.properties”文件中配置项“kerberos.domain.name”对应的值,仅安全集群需要配置,其中“X”为随机生成的数字,请根据实际情况修改。         kerberosPrincipal         安全集群需要配置,用户名         kerberosKeytab                安全集群需要配置,用户认证文件,需要写绝对路径。
  • [问题求助] kafka是可以批量读取数据,但是flink是一条一条处理的,应该也可以一条一条提交吧?
    如题
  • [问题求助] 分布式消息服务DMS和分布式消息服务Kafka是什么关系?
    分布式消息服务DMS和分布式消息服务Kafka是什么关系?
  • [介绍/入门] 分布式消息服务DMS和分布式消息服务Kafka是什么关系?
    分布式消息服务DMS和分布式消息服务Kafka是什么关系?
  • [技术干货] [大数据] 鲲鹏 kafka_2.11-0.10.1.1移植指引
    1 简介Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本次移植以Kafka_2.11-0.10.1.1为例进行说明。官方链接:https://kafka.apache.org/类别:应用程序语言:Java 2 环境类别子项版本获取地址(方法)硬件CPUHi1616iBMC网络Ethernet-10GEiBMC存储SATA 4TiBMC内存xxG xxxMHziBMCOSLinx6.0.90cat /etc/os-releasePRETTY_NAME="Linx GNU/Linux 6.0.90   (stretch)"NAME="Linx GNU/Linux"VERSION_ID="9"VERSION="9 (stretch)"ID=LinxHOME_URL="http://www.linx-info.com/"Kernel4.19.0cat /proc/versionLinux version   4.19.0-0.bpo.1-linx-security-arm64 (linx-kernel@linx-info.com) (gcc version   6.3.0 20170516 (Debian 6.3.0-18+deb9u1)) #1 SMP Linx 4.19.12-1~bpo9+1linx4   (2019-06-05)软件GCC7.3.0Linx OS 自带OpenJDK1.8.0_181Linx OS 自带MySQL5.7.27参考下面安装说明 3 依赖安装3.1  Linx OS 安装3.2  挂载Linx OS 镜像3.3  安将Linx OS 镜像作为Linx 的下载镜像源3.4  安装ant编译软件3.5  lz4-java-1.3.0编译和安装3.6  rocksdb-5.7.3编译和安装3.7  gradle-5.4.1编译和安装3.8  Scala-2.11.11编译和安装3.9  gradle-scoverage-3.1.3编译和安装3.10  shadow-5.0.0编译和安装3.1 Linx OS 安装从Linx 接口人获取OS 镜像:Linx-6.0.90-20190802-arm64-DVD-1.iso。支持物理机安装。支持虚拟机qemu-kvm 安装。说明:安装过程网卡故障排除,参考章节7。3.2 挂载Linx OS 镜像1、下载OS 镜像,部署到指定的目录(如/home/root/):Linx:~/kafka_2.11-0.10.1.1#   ls /home/root/Linx-6.0.90-20190802-arm64-DVD-1.iso 2、部署Linx OS 镜像到/media/cdrom/目录下:Linx:~/kafka_2.11-0.10.1.1#mkdir   /media/cdromLinx:~/kafka_2.11-0.10.1.1#mount  -o loop   /home/root/rhel-server-7.3-x86_64-dvd.iso /media/cdrom/home/root/Linx-6.0.90-20190802-arm64-DVD-1.iso   on /media/cdrom 3、确认部署是否成功:Linx:~/kafka_2.11-0.10.1.1#   mount |grep Linx-6.0.90-20190802-arm64-DVD-1.iso/home/root/Linx-6.0.90-20190802-arm64-DVD-1.iso   on /media/cdrom type iso9660   (ro,relatime,nojoliet,check=s,map=n,blocksize=2048) 3.3 安将Linx OS 镜像作为Linx 的下载镜像源1、配置镜像源:Linx:~/kafka_2.11-0.10.1.1# apt-cdrom -m   -d=/media/cdrom addUsing CD-ROM mount point /media/cdrom/Identifying...   [d8aeb8c8dac04a340b1b991de1a2052b-2]Scanning disc for index files...Found 2 package indexes, 0 source   indexes, 0 translation indexes and 0 signaturesThis disc is called:'Linx GNU/Linux 6.0.90-20190802 _Stretch_   - Unofficial arm64 DVD Binary-1 20190802-08:17'Reading Package Indexes... DoneWriting new source listSource list entries for this disc are:deb cdrom:[Linx GNU/Linux 6.0.90-20190802   _Stretch_ - Unofficial arm64 DVD Binary-1 20190802-08:17]/ stretch contrib   mainRepeat this process for the rest of the   CDs in your set. 3.4 安装ant编译软件ant  是一个将软件编译、测试、部署等步骤联系在一起加以自动化的一个工具,大多用于Java环境中的软件开发。1、下载ant开源软件包wget   http://mirror.bit.edu.cn/apache//ant/binaries/apache-ant-1.9.14-bin.tar.gz 2、解压apache-ant-1.9.14-bin.tar.gztar zxvf apache-ant-1.9.14-bin.tar.gz 3、进入解压路径cd apache-ant-1.9.14/ 4、将ANT_HOME配置到/etc/profile环境变量中vi /etc/proflile 5、使ANT_HOME环境变量生效source /etc/proflile 说明: ant 编译过程故障问题,参考:9.1 章节。3.5 lz4-java-1.3.0编译和安装1、下载源码包并解压到指定的目录:wget https://github.com/lz4/lz4-java/archive/1.3.0.zipunzip 1.3.0.zip 2、适配ARM64,修改编译参数,增加编译选项"-fsigned-char":vim build.xml 3、修改版本号。修改版本控制文件"ivy.xml",修改revision字段的值为:1.3.04、执行编译命令:ant 编译结果: 输出的jar包位于dist目录下:./dist/lz4-1.3.0.jar5、运行:将lz4-1.3.0.jar 替换替换kafka-0.10.1.1 目录下libs对应的库lz4-1.3.0.jar3.6 rocksdb-5.7.3编译和安装kafka-0.10.1.1默认依赖rocksdb-4.9.0。但是rocksdb-4.9.0,在aarch64存在编译错误,官方通过高版本来解决此问题。因此直接升级rocksdb依赖库版本为5.7.3。1、下载源码并解压:wget   https://github.com/facebook/rocksdb/archive/v5.7.3.tar.gzMD5   值如下:root@HMW:~/facebook_rocksdb/tmp$   md5sum v5.7.3.tar.gz539d606dc532ebc2e823a62a064e6be8  v5.7.3.tar.gz mv v5.7.3.tar.gz  rocksdb-5.7.3.tar.gztar -zxvf rocksdb-5.7.3.tar.gz 2、修改代码和编译参数,修改Makefile:# Set the default DEBUG_LEVEL to 1DEBUG_LEVEL?=0 CFLAGS += $(WARNING_FLAGS) -I.   -I./include $(PLATFORM_CCFLAGS) $(OPT) -fsigned-char   -I /usr/lib/jvm/java-8-openjdk-arm64/include -I   /usr/lib/jvm/java-8-openjdk-arm64/include/linuxCXXFLAGS += $(WARNING_FLAGS) -I.   -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual   -Wnon-virtual-dtor -Wno-missing-field-initializers   -fsigned-char -std=c++11 -I /usr/lib/jvm/java-8-openjdk-arm64/include -I   /usr/lib/jvm/java-8-openjdk-arm64/include/linux 3、编译:make rocksdbjava -j8 编译结果,编译出的jar包位于: ./java/target/rocksdbjni-5.7.3-linux64.jar。参考:Linx:/home/root/rocksdb-5.7.3/java/target#   ls -altotal 154128drwxr-xr-x  5 root root      4096 Aug 26 11:49 .drwxrwxr-x 10 root root      4096 Aug 25 16:53 ..drwxr-xr-x  3 root root      4096 Aug 25 16:43 apidocsdrwxr-xr-x  3 root root      4096 Aug 25 16:43 classes-rwxr-xr-x  1 root root 117951728 Aug 25 16:55   librocksdbjni-linux64.so-rw-r--r--  1 root root    39851398 Aug 25 16:56 rocksdbjni-5.7.3-linux64.jardrwxr-xr-x  3 root root      4096 Aug 25 16:44 test-classes 说明:rocksdbjni-5.7.3-linux64.jar 包内的librocksdbjni-linux64.so包含符号信息,可通过strip命令去掉。重新制作jar包。4、运行:编译输出的rocksdbjni-5.7.3-linux64.jar 替换kafka-0.10.1.1 目录下libs对应的库rocksdbjni-4.9.0.jar3.7 gradle-5.4.1编译和安装Gradle是一个基于Apache Ant和Apache Maven概念的项目自动化建构工具。它使用一种基于Groovy的特定领域语言(DSL)来声明项目设置,抛弃了基于XML的各种繁琐配置。面向Java应用为主。当前其支持的语言限于Java、Groovy和Scala,计划未来将支持更多的语言。1、下载并安装Gradle包:wget -c https://downloads.gradle.org/distributions/gradle-5.4.1-bin.zipunzip gradle-5.4.1-bin.zip -d /usr/local 2、配置路径:export GRADLE_HOME=/usr/local/gradle-5.4.1export PATH=$PATH:$GRADLE_HOME/bin 3.8 Scala-2.11.11编译和安装1、下载并安装Scala-2.11.11:wget -c   https://downloads.lightbend.com/scala/2.11.11/scala-2.11.11.tgztar -zxvf scala-2.11.11.tgz -d /usr/local 2、配置路径:export SCALA_HOME=/usr/local/scla-2.11.11export PATH=$PATH:$SCALA_HOME/bin 3.9 gradle-scoverage-3.1.3编译和安装1、下载并安装gradle-scoverage-3.1.3:wget -c   https://github.com/scoverage/gradle-scoverage/archive/3.1.3.tar.gz -O   gradle-scoverage-3.1.3.tar.gztar -zxvf gradle-scoverage-3.1.3.tar.gz 2、执行编译:cd gradle-scoverage-3.1.3gradle assemble 3.10 shadow-5.0.0编译和安装1、下载并安装shadow-5.0.0:wget -c   https://github.com/johnrengelman/shadow/archive/5.0.0.tar.gz -O   shadow-5.0.0.tar.gztar -zxvf shadow-5.0.0.tar.gz 2、执行编译:cd shadow-5.0.0gradle assemble 4 kafka-0.10.1.1编译和安装1、创建gradle本地仓库目录:mkdir -p /root/gradleRepository 2、下载kafka-0.11.0.0源码包并解压:https://archive.apache.org/dist/kafka/0.10.1.1/kafka-0.10.1.1-src.tgztar -zxvf kafka-0.10.1.1-src.tgzcd kafka-0.10.1.1-src 3、修改配置文件:vim build.gradle dependencies {// For Apache Rat   plugin to ignore non-Git filesclasspath   "org.ajoberstar:grgit:1.5.0"classpath   'com.github.ben-manes:gradle-versions-plugin:0.12.0'classpath 'org.scoverage:gradle-scoverage:3.1.3'classpath   'com.github.jengelman.gradle.plugins:shadow:5.0.0'} 将上述版本号修改成对应组件的版本号。此处使用的是gradle-scoverage-3.1.3 和 shadow-5.0.0修改gradle.properties文件中的scalaVersion版本号gradle.properties:20:scalaVersion=2.11.11 通过文件dependencies.gradle,修改依赖的rocksdb版本号(4.9.0 ->5.7.3 )gradle/dependencies.gradle:41:  rocksDB: "5.7.3" 4、执行编译:gradle -g /root/gradleRepository cleangradle -g /root/gradleRepository releaseTarGz -info 编译结果如下:Linx:/home/root/kafka-0.10.1.1-src/core/build/distributions#   ls -altotal 40048drwxr-xr-x  3 root root     4096 Aug 26 12:46 .drwxr-xr-x 11 root   root     4096 Aug 25 18:40 ..-rw-r--r--  1 root root   727217 Aug 25 18:41   kafka_2.11-0.10.1.1-site-docs.tgz-rw-r--r--  1 root root 40266922 Aug 25 18:41 kafka_2.11-0.10.1.1.tgz 5、组件包重新打包/home/root/kafka-0.10.1.1-src/core/build/distributionstar -zxvf   kafka_2.11-0.10.1.1.tgz Linx:~/kafka_2.11-0.10.1.1#   ls  ./libsaopalliance-repackaged-2.4.0-b34.jar       javassist-3.18.2-GA.jar                     jetty-security-9.2.15.v20160210.jar     kafka-tools-0.10.1.1.jarargparse4j-0.5.0.jar                         javax.annotation-api-1.2.jar                jetty-server-9.2.15.v20160210.jar       log4j-1.2.17.jarconnect-api-0.10.1.1.jar                   javax.inject-1.jar                        jetty-servlet-9.2.15.v20160210.jar    lz4-1.3.0.jarconnect-file-0.10.1.1.jar                    javax.inject-2.4.0-b34.jar                  jetty-servlets-9.2.15.v20160210.jar     metrics-core-2.2.0.jarconnect-json-0.10.1.1.jar                  javax.servlet-api-3.1.0.jar                 jetty-util-9.2.15.v20160210.jar         osgi-resource-locator-1.0.1.jarconnect-runtime-0.10.1.1.jar               javax.ws.rs-api-2.0.1.jar                 jopt-simple-4.9.jar                   reflections-0.9.10.jarguava-18.0.jar                               jersey-client-2.22.2.jar                    kafka_2.11-0.10.1.1.jar                 rocksdbjni-5.7.3.jarhk2-api-2.4.0-b34.jar                        jersey-common-2.22.2.jar                    kafka_2.11-0.10.1.1-javadoc.jar         scala-library-2.11.11.jarhk2-locator-2.4.0-b34.jar                    jersey-container-servlet-2.22.2.jar       kafka_2.11-0.10.1.1-scaladoc.jar      scala-parser-combinators_2.11-1.0.4.jarhk2-utils-2.4.0-b34.jar                    jersey-container-servlet-core-2.22.2.jar  kafka_2.11-0.10.1.1-sources.jar       slf4j-api-1.7.21.jarjackson-annotations-2.6.0.jar              jersey-guava-2.22.2.jar                     kafka_2.11-0.10.1.1-test.jar            slf4j-log4j12-1.7.21.jarjackson-core-2.6.3.jar                     jersey-media-jaxb-2.22.2.jar                kafka_2.11-0.10.1.1-test-sources.jar    snappy-java-1.1.2.6.jarjackson-databind-2.6.3.jar                   jersey-server-2.22.2.jar                    kafka-clients-0.10.1.1.jar              tmpjackson-jaxrs-base-2.6.3.jar                 jetty-continuation-9.2.15.v20160210.jar   kafka-log4j-appender-0.10.1.1.jar     validation-api-1.1.0.Final.jarjackson-jaxrs-json-provider-2.6.3.jar      jetty-http-9.2.15.v20160210.jar           kafka-streams-0.10.1.1.jar            zkclient-0.9.jarjackson-module-jaxb-annotations-2.6.3.jar  jetty-io-9.2.15.v20160210.jar               kafka-streams-examples-0.10.1.1.jar     zookeeper-3.4.8.jar 使用3.5 章节 lz4-1.3.0.jar 和3.6 章节rocksdbjni-5.7.3.jar 替换上面红色标注的jar包。并重新压缩kafka_2.11-0.10.1.1.tgz。则最终安装包为kafka_2.11-0.10.1.1.tgz。 5 移植结果分析 使用鲲鹏开发套件重新检查kafka_2.11-0.10.1.1.tgz是否还包含依赖X86的so文件。如果还有依赖的x86架构的so文件,则需要继续完成对应jar包的编译,直到无依赖的x86架构的so文件。鲲鹏开发套件获取地址: https://bbs.huaweicloud.com/forum/thread-34057-1-1.html  6 功能验证1、解压kafka_2.11-0.10.1.1.tgzcd ~tar -zxvf    kafka_2.11-0.10.1.1.tgz 2、修改配置文件server.propertiescd ~/kafka_2.11-0.10.1.1/configvim server.properties 根据本地情况,修改hostname:host.name=hostnamezookeeper.connect=hostname:2181listeners=PLAINTEXT://hostname:9092 kafka详细配置可以参考官网指导https://kafka.apache.org/documentation/。3、启动Kafkal   新建Console,执行如下命令:bin/zookeeper-server-start.sh   config/zookeeper.properties l   新建Console,执行如下命令:bin/kafka-server-start.sh config/server.properties l   新建Console,检查进程启动情况:jsp 当Kafka,QuorunPeerMain两个进程启动成功,整个Kafka服务启动完成。下面通过Kafka的Topic,生产者,消费者进行消息验证。4、新建Console,创建Topicbin/kafka-topics.sh --create --zookeeper   XX.XX.XX.XX:2181 --replication-factor 1 --partitions 1 --topic test 5、新建Console,启动生产者bin/kafka-console-producer.sh --broker-list   XX.XX.XX.XX:9092 --topic test 6、新建Console,启动消费者./bin/kafka-console-consumer.sh --zookeeper   XX.XX.XX.XX:2181 --topic test --from-beginning 在生产者Console输入消息,则可以在消费者Console中收到由生产者发布的消息。则kafka基本功能验证完成。说明:验证过程中,启动kafka消费者失败故障排除,参考:7.3 章节 消费者或生产者启动失败故障排除 7 故障排除7.1  OS 安装故障排除7.2  ant 编译过程故障排除7.3  kafka 消费者或生产者启动失败故障排除7.1 OS 安装故障排除1、在Linx OS 安装完成后,会发现驱动存在,但是找不到网卡设备。需要修改内核启动参数,如下图:2、从Linx 接口人获取最新有效的序列号:linx-serial。3、通过如下配置文件进行修改:/etc/default/grub 4、通过下面命令使grub配置文件修改生效:update-grub 7.2 ant 编译过程故障排除1、缺少ivy*.jar包1)、出现如下错误提示问题根因缺少对应的ivy*.jar 包。2)、本例中执行下面的命令:ant ivy-bootstrap 下载ivy-2.2.0.jar。2、ivy*.jar 包版本过低,导致下载失败。错误提示如下:install-bnd:[ivy:cachepath] :: Ivy 2.2.0 -   20100923230623 :: http://ant.apache.org/ivy/   ::[ivy:cachepath] :: loading settings ::   url =   jar:file:/root/.ant/lib/ivy-2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml[ivy:cachepath] :: resolving dependencies   :: biz.aQute#bnd-caller;working[ivy:cachepath] confs: [default][ivy:cachepath] You probably access the   destination server through a proxy server that is not well configured.[ivy:cachepath] You probably access the   destination server through a proxy server that is not well configured.[ivy:cachepath] :: resolution report ::   resolve 150ms :: artifacts dl 0ms---------------------------------------------------------------------| | modules || artifacts || conf | number| search|dwnlded|evicted||   number|dwnlded|---------------------------------------------------------------------| default | 1 | 0 | 0 | 0 || 0 | 0 |---------------------------------------------------------------------[ivy:cachepath][ivy:cachepath] :: problems summary ::[ivy:cachepath] :::: WARNINGS[ivy:cachepath] Host repo1.maven.org not   found. url=http://repo1.maven.org/maven2/biz/aQute/bnd/1.50.0/bnd-1.50.0.pom[ivy:cachepath] Host repo1.maven.org not   found. url=http://repo1.maven.org/maven2/biz/aQute/bnd/1.50.0/bnd-1.50.0.jar[ivy:cachepath] module not found:   biz.aQute#bnd;1.50.0[ivy:cachepath] ==== local: tried[ivy:cachepath]   /root/.ivy2/local/biz.aQute/bnd/1.50.0/ivys/ivy.xml[ivy:cachepath] -- artifact   biz.aQute#bnd;1.50.0!bnd.jar:[ivy:cachepath]   /root/.ivy2/local/biz.aQute/bnd/1.50.0/jars/bnd.jar[ivy:cachepath] ==== shared: tried[ivy:cachepath]   /root/.ivy2/shared/biz.aQute/bnd/1.50.0/ivys/ivy.xml[ivy:cachepath] -- artifact   biz.aQute#bnd;1.50.0!bnd.jar:[ivy:cachepath]   /root/.ivy2/shared/biz.aQute/bnd/1.50.0/jars/bnd.jar[ivy:cachepath] ==== public: tried[ivy:cachepath] http://repo1.maven.org/maven2/biz/aQute/bnd/1.50.0/bnd-1.50.0.pom[ivy:cachepath] -- artifact   biz.aQute#bnd;1.50.0!bnd.jar:[ivy:cachepath] http://repo1.maven.org/maven2/biz/aQute/bnd/1.50.0/bnd-1.50.0.jar[ivy:cachepath]   ::::::::::::::::::::::::::::::::::::::::::::::[ivy:cachepath] :: UNRESOLVED   DEPENDENCIES ::[ivy:cachepath]   ::::::::::::::::::::::::::::::::::::::::::::::[ivy:cachepath] :: biz.aQute#bnd;1.50.0:   not found[ivy:cachepath]   ::::::::::::::::::::::::::::::::::::::::::::::[ivy:cachepath][ivy:cachepath] :: USE VERBOSE OR DEBUG   MESSAGE LEVEL FOR MORE DETAILSBUILD FAILED/home/dir/dir_lz4-java_1.3.0/lz4-java-1.3.0/build.xml:87:   impossible to resolve dependencies:resolve failed - see   output for details ivy*.jar 低版本替换成高版本。如:/root/.ant/lib/ivy-2.2.0.jar 替换成ivy-2.4.0.jar7.3 kafka 消费者或生产者启动失败故障排除1、出现如下错误提示:root@Linx:~/kafka_2.11-0.10.1.1#   bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningUsing the ConsoleConsumer with old consumer is   deprecated and will be removed in a future major release. Consider using the   new consumer by passing [bootstrap-server] instead of [zookeepe                   r].[2019-08-28 10:40:56,432] WARN Fetching topic metadata   with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,Linx,9092)]   failed (kafka.client.ClientUtils$)java.nio.channels.ClosedChannelExceptionat   kafka.network.BlockingChannel.send(BlockingChannel.scala:110)at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)at   kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)at   kafka.producer.SyncProducer.send(SyncProducer.scala:124)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)at   kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)at   kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)at   kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)[2019-08-28 10:40:56,437] WARN   [console-consumer-46411_Linx-1567006855849-6e1e43c8-leader-finder-thread],   Failed to find leader for Set(test-0)   (kafka.consumer.ConsumerFetcherManager$Lead                   erFinderThread)kafka.common.KafkaException: fetching topic metadata   for topics [Set(test)] from broker [ArrayBuffer(BrokerEndPoint(0,Linx,9092))]   failedat   kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)at   kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)at   kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)Caused by: java.nio.channels.ClosedChannelExceptionat kafka.network.BlockingChannel.send(BlockingChannel.scala:110)at   kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)at   kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)at   kafka.producer.SyncProducer.send(SyncProducer.scala:124)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)。 启动失败的原因分析:上面红色标注,是一个hostname。此hostname 不能被系统正常解析。解决办法如下:l   修改配置:vim ./config/server.propertieslisteners=PLAINTEXT://localhost:9092 l   主机名Linx配置有效的IP:【来自转载】 
  • [技术干货] Kafka-2.11-1.1.0移植编译指导--中标麒麟7.5
    1简介Apache Kafka是一个开源流处理软件平台,用Scala和Java编写。该项目旨在提供统一、高吞吐量、低延迟的平台,用于处理实时数据馈送。它的存储层本质上是一个大规模可扩展设计为分布式事务日志的发布/订阅消息队列。官方链接:https://kafka.apache.org/类别:开源流处理平台语言:Scala/Java 2环境类别子项版本获取地址(方法) 硬件CPUKunpeng 920iBMC网络Ethernet-10GEiBMC存储SATA 4TiBMC内存xxG xxxMHziBMCOSNeoKylin7.5cat /etc/neokylin-releaseKernel4.14.0cat /proc/version 软件GCC4.8.5gcc -vOpenJDK 1.8.0_191参考下面安装说明Kafka2.11-1.1.0参考下面安装说明3依赖安装3.1安装OpenJDK下载并安装到指定目录(如/opt/tools/installed):wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gztar -zxf OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gzmv jdk8u191-b12   /opt/tools/installed/【注】:使用系统自带Openjdk软件包可以通过执行“yum -y install java-1.8.0*”进行安装然后参考如下步骤配置JAVA_HOME环境变量;配置java环境变量,在/etc/profile文件末尾处增加下面的代码:JAVA_HOME=/opt/tools/installed/jdk8u191-b12PATH=$JAVA_HOME/bin:$PATHexport JAVA_HOME PATH运行下面命令,使修改的环境变量生效:source /etc/profile3.2安装GCC等依赖项挂载OS镜像:mount -o loop   /home/NeoKylin/nsV7Update5-adv-lic-build05-aarch64.iso /mnt/NeoKylin/修改/etc/yum.repos.d/ns7-adv.repo文件,配置yum本地源:[ns7-adv-os]name=NeoKylin Linux Advanced Server 7 - Osbaseurl= file:///mnt/NeoKylingpgcheck=0gpgkey=file:///mnt/NeoKylin/RPM-GPG-KEY-neokylin-releaseenabled=1        运行下面的命令,使yum源配置生效:yum clean allyum makecacheyum安装GCC等相关依赖:sudo yum install -y snappy snappy-devel autoconf automake libtool   git gcc gcc-c++ make cmake openssl openssl-devel ncurses-devel zlib   zlib-devel bzip2 bzip2-devel bzip2-libs readline readline-devel bison zip   unzip tar tcl3.3安装gradle编译软件1、下载grade开源软件包wget   https://downloads.gradle.org/distributions/gradle-5.4.1-bin.zip2、解压gradle-5.4.1-bin.zipunzip gradle-5.4.1-bin.zip3、进入解压路径cd gradle-5.4.1/        4、将GRADLE_HOME配置到/etc/profile环境变量中vi /etc/proflile5、使GRADLE_HOME环境变量生效source /etc/proflile 3.4安装编译gradle-scoverage1、下载gradle-scoverage开源软件包wget   https://github.com/scoverage/gradle-scoverage/archive/3.1.3.tar.gz2、解压3.1.3.tar.gztar zxvf 3.1.3.tar.gz3、进入解压路径cd gradle-scoverage-3.1.3/        4、使用gradle编译gradle-scoverage-3.1.3gradle assemble 3.5安装编译shadow1、下载shadow开源软件包wget https://github.com/johnrengelman/shadow/archive/5.0.0.tar.gz2、解压5.0.0.tar.gztar zxvf 5.0.0.tar.gz3、进入解压路径cd shadow-5.0.0/        4、使用gradle编译shadow-5.0.0gradle assemble 3.6安装scala软件1、下载scala安装软件包wget   https://downloads.lightbend.com/scala/2.11.11/scala-2.11.11.tgz2、解压scala-2.11.11.tgztar zxvf scala-2.11.11.tgz -d /usr/local3、将SCALA_HOME配置到/etc/profile环境变量中vi /etc/proflile增加如下内容export   SCALA_HOME=/usr/local/scla-2.11.11export PATH=$PATH:$SCALA_HOME/bin5、使SCALA_HOME环境变量生效source /etc/proflile 3.7对gcc、g++和c++增加-fsigned-char选项1、对gcc增加-fsigned-char选项1)使用which gcc命令寻找gcc所在路径(一般位于/usr/bin/gcc)which gcc2)、更改gcc的名字(比如改成gcc-arm)mv /usr/bin/gcc   /usr/bin/gcc-arm3)、进入gcc所在目录执行vi gcc,并填入如下内容保存:#! /bin/sh /usr/bin/gcc-arm -fsigned-char "$@"cd /usr/bin/vi gcc4)、执行chmod +x gcc给脚本添加执行权限chmod +x gcc2、对g++增加-fsigned-char选项1)使用which g++命令寻找g++所在路径(一般位于/usr/bin/g++)which g++2)、更改g++的名字(比如改成g++-arm)mv /usr/bin/g++   /usr/bin/g++-arm3)、进入g++所在目录执行vi g++,并填入如下内容保存:#! /bin/sh /usr/bin/g++-arm -fsigned-char "$@"cd /usr/bin/vi g++4)、执行chmod +x g++给脚本添加执行权限chmod +x g++3、对c++增加-fsigned-char选项1)使用which c++命令寻找g++所在路径(一般位于/usr/bin/c++)which c++2)、更改c++的名字(比如改成c++-arm)mv /usr/bin/c++   /usr/bin/c++-arm3)、进入c++所在目录执行vi c++,并填入如下内容保存:#! /bin/sh /usr/bin/c++-arm -fsigned-char "$@"cd /usr/bin/vi c++4)、执行chmod +x c++给脚本添加执行权限chmod +x c++ 4移植分析使用checkSo工具(获取地址:https://bbs.huaweicloud.com/forum/thread-22679-1-1.html)检查Kafka-2.11-1.1.0对应x86的安装包(kafka_2.11-1.1.0.tgz)或源码包是否有依赖x86的so文件,通过检查Kafka-2.11-1.1.0对应x86安装包发现有依赖的“librocksdbjni32.so,rocksdbjni-5.7.3.jar”、“librocksdbjni64.so,rocksdbjni-5.7.3.jar”和“librocksdbjnile.so,rocksdbjni-5.7.3.jar”:需要先编译rocksdbjni-5.7.3.jar使其中包含的librocksdbjni32.so、librocksdbjni64.so和librocksdbjnile.so使能arm,编译过程见 “5 依赖库编译”。5依赖库编译1、编译rocksdbjni-5.7.3.jar1)下载源码并解压wget   https://codeload.github.com/facebook/rocksdb/zip/v5.7.3mv   v5.7.3 rocksdbjni-5.7.3.zipunzip   rocksdbjni-5.7.3.zip2)、进入解压目录cd rocksdb-5.7.3/        3)、修改Makefile将DEBUG_LEVEL设为0,并添加-fsigned-char选项4)、编译rocksdbjavaPORTABLE=1   make rocksdbjava -j8        5)、进入编译结果路径cd   java/target/6)、重命名rocksdbjni-5.7.3-linux64.jar并将其拷贝到本地gradle仓cp   rocksdbjni-5.7.3-linux64.jar rocksdbjni-5.7.3.jarcp rocksdbjni-5.7.3.jar   /gradleRepository/caches/modules-2/files-2.1/org.rocksdb/rocksdbjni/5.7.3/421b44ad957a2b6cce5adedc204db551831b553d/ 6编译安装1、从github网站下载源码并解压wget   https://archive.apache.org/dist/kafka/1.1.0/kafka-1.1.0-src.tgztar   zxvf kafka-1.1.0-src.tgz2、进入解压目录cd   kafka-1.1.0-src/        3、修改build.gradlevi build.gradle把gradle-scoverage、shadow的版本修改成当前使用的版本,注释红框内注释行,并添加    classpath   'org.scoverage:gradle-scoverage:3.1.3'    classpath   'com.github.jengelman.gradle.plugins:shadow:5.0.0'4、执行gradle -g /$UserHome/gradleRepository cleangradle   -g /$UserHome/gradleRepository clean5、执行gradle -g /$UserHome/gradleRepository releaseTarGz -infogradle   -g /$UserHome/gradleRepository releaseTarGz -info6、查看编译的文件cd   core/build/distributions/ 7验证1、编译验证使用checkSo工具(获取地址:https://bbs.huaweicloud.com/forum/thread-22679-1-1.html)检查编译后的kafka_2.11-1.1.0.tgz是否不再含有依赖的x86架构的so文件,检查方法参考工具中的 《CheckSo使用说明.docx》,如果还有依赖的x86架构的so文件,则需要继续完成对应jar包的编译,直到无依赖的x86架构的so文件后,再次编译该组件并通过checkSo工具检查确认,编译后的组件包不再有依赖的x86架构的so文件则表明编译成功。2、功能验证参考https://www.huaweicloud.com/kunpeng/software/kafka.html8参考信息1、https://www.huaweicloud.com/kunpeng/software/kafka.html2、http://kafka.apache.org/ 9FAQ暂无。【来自转载https://bbs.huaweicloud.com/forum/thread-41082-1-1.html】 
  • [Atlas300] kafka模块在atlas的engine中无法执行
    如图,同样一段源码生成的so函数库封装了KCF和kafka的功能,使用自己的单元测试工具可以成功调用并执行完毕,而把该模块链接到atlas在host端的engine后配置完全一样的配置文件,却无法完成kafka接口的初始化,整个graph无法成功建立并执行,请问这是什么原因以及如何处理
  • [中间件] 【华为云鲲鹏云服务最佳实践】【工具篇】第087期Kafka-2.12安装配置指南
    1、简介   Apache Kafka是一个开源流处理软件平台,用Scala和Java编写。该项目旨在提供统一、高吞吐量、低延迟的平台,用于处理实时数据馈送。它的存储层本质上是一个大规模可扩展设计为分布式事务日志的发布/订阅消息队列。2、基础环境类别子项版本获取地址(方法)华为云虚拟机KC1(920)--OSCentOS7.6Kernel4.14软件包 Zookeeper3.4.9http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gzkafka2.12http://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgzjava1.8.03、依赖安装yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel4、组件安装下载并解压组件wet http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gzwet http://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz 配置cp -rp zoo_sample.cfg zoo.cfg启动服务./bin/zkServer.sh start 查看运行状态./bin/zkServer.sh status  kafka启动bin/kafka-server-start.sh config/server.properties &netstat -tunlp |egrep "(2181|9092)"kafka 停止服务bin/kafka-server-stop.sh5、系统配置    无6、测试 重新启动kafka服务   创建生产者:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test561   创建消费者: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test561 --from-beginning   测试内容:发送消息                    测试内容:接收消息              7、参考资料   无8、FAQQ:执行./bin/kafka-console-consumer.sh --zookeeper 172.17.0.11:2181 --topic test561 --from-beginning创建消费者报错 A:bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.11:9092 --topic test561 --from-beginning
  • 【中间件最佳实践挑战】第4关任务:使用DMS Kafka优化消费者poll
    欢迎参加华为云“中间件最佳实践挑战营”!这是本次挑战营的第4关,坚持闯关成功有机会获第五期好礼:码豆/荣耀手环/华为背包等,全通关还有大奖!本期活动截止5月24日。注意:参与闯关前,请确保已报名加入活动群并领取实践资源,如未入群请添加小助手微信(zhongjianjianxiaoge),回复“中间件”报名入群!点击这里了解活动详情>>  | 点击这里查看活动FAQ>>一、  场景介绍在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 二、  实践指南(1)领取实践资源:点击这里免费领取1个月Kafka体验规格实例,可用区3/5已售罄,请选择2。提示:实践活动提供的免费Kafka实例没有开SASL,在配置时需做一定修改,见→FAQ第12条。(2)最佳实践指南:https://support.huaweicloud.com/bestpractice-dms/dms-bp-0312001.html(3)视频操作演示:https://education.huaweicloud.com:8443/courses/course-v1:HuaweiX+CBUCNXP021+Self-paced/courseware/062fe309dc964326b06b7e5505fe5e4a/89d814d93c544301a6cb62db0914cc51/(4)新手入门教学:《Kafka全景实践课》三、  闯关任务任务一:创建DMS Kafka实例,查看实例详情并截图,截图需包含右上角华为云账号名,并按回帖格式要求在本帖中回帖;奖励:100码豆(可用于兑换DevCloud会员中心超多奖品); 任务二:根据实践指南完成实践操作,将代码截图,将运行结果截图,并按回帖格式要求在本帖中回帖;奖励:本期闯1关可参与抽取“荣耀手环4 Running版”;闯2关可参与抽取“华为背包”;详见FAQ评奖规则 四、  回帖格式请务必按照以下格式要求进行回帖,否则无法计算奖励:华为云账号名:XXX(即右上角的字母数字组合ID)微信昵称:XXX实践感想:XXX实践截图:至少包含(a)实例详情截图、(b)代码截图、(c)运行结果截图三张截图。附各关卡快速入口:第1关任务:基于API网关的电话号码归属地查询第2关任务:使用函数工作流服务为图片打水印第3关任务:使用Redis实现排行榜功能第4关任务:使用DMS Kafka优化消费者poll第5关任务:使用CPTS进行电商网站性能测试
  • [分享交流] Kafka 组件的介绍
    Kafka 组件的介绍Kafka定义Kafka 是一个高吞吐、分布式、基于发布订阅A的消息系统,利用Kafka技术可在廉价PC Server上搭建起大规模消息系统。Kafka应用场景简介Kafka和其他组件比较,具有消息持久化、高吞吐、实时等特性,适用于离线和实时的消息消费,如网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的数据收集场景。 Kafka拓扑结构图一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干Broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举Leader,以及在Consumer发生变化时进行rebalance。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。Broker:Kafka集群包含一个或多个服务实例,这些服务实例被称为Broker。Producer:负责发布消息到Kafka Broker。Consumer:消息消费者,从Kafka Broker读取消息的客户端。Kafka Topics每条发布到Kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。图片中的蓝色框为Kafka的一个Topic,即可以理解为一个队列,每个格子代表一条消息。生产者产生的消息逐条放到Topic的末尾。消费者从左至右顺序读取消息,使用Offset来记录读取的位置。Kafka Partition每个Topic 都有一个或者多个Partitions构成。每个Partition都是有序且不可变的消息队列。引入Partition机制,保证了Kafka的高吞吐能力。每个topic被分成多个partition(区),每个partition在存储层面对应一个log文件,log文件中记录了所有的消息数据。引入Partition机制,保证了Kafka的高吞吐能力,因为Topic的多个Partition分布在不同的Kafka节点上,这样一来多个客户端(Producer和Consumer)就可以并发访问不同的节点对一个Topic进行消息的读写。 
  • [学习Kafka-第3天] Kafka消费机制实践
    Java代码:package dms.kafka.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo {     public static void main(String[] args) {         if (args.length != 3) {             throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name group-name.");         }         Properties props = new Properties();         props.put("bootstrap.servers", args[0]);         props.put("group.id", args[2]);         props.put("enable.auto.commit", "true");         props.put("auto.offset.reset", "earliest");         props.put("auto.commit.interval.ms", "1000");         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         KafkaConsumer consumer = new KafkaConsumer(props);         consumer.subscribe(Arrays.asList(args[1]));         while (true) {             ConsumerRecords records = consumer.poll(200);             for (ConsumerRecord record : records){                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());             }         }     } }1.打包。点击打包,生成demo-2.jar2.上传服务器:进入linux服务器,进入Kafka的libs目录下使用sz命令,把demo-2.jar传入到这个目录。3.生产消息使用cd..命令回退到上级目录。执行命令(请替换自己的ip,端口号,topic名):java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.236.49:9092 topic-1393626260执行结果如下(红框里根据个人不同替换):4.消费消息执行命令(请替换自己的ip,端口号,topic名)::java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.236.49:9092 topic-1393626260 test-grp结果如下(红框里根据个人不同替换):到此,今天的课程完成。附件里是今天的代码jar包(因为不支持直接上传jar后缀文件,只能压缩打包后传的。)。
  • 【FAQ】Kafka全景实践课程FAQ文档--群内提问前请查询此文档
    Hello!欢迎参加Kafka全景实践课!这是一篇帮助帖,我们会将每节课的常见问题和难点疑点记录在帖子中。为减少群内打扰,节省大家的时间,请遇到问题后先查阅本帖内容,如没有对应解答,再在用户群中提问哦! 【常用链接】(1)每日课程更新链接:https://education.huaweicloud.com:8443/courses/course-v1:HuaweiX+CBUCNXP017+Self-paced/about?isAuth=0&cfrom=hwc备注:8月12日-16日每日上午10点发布当天课程内容(2)打卡链接:https://w.url.cn/s/AX2jKda 备注:打卡时间为每日10:00-24:00,需根据操作指导上传对应截图【积分公示】本期课程活动(2019年8月12日-16日)已结束,积分及抽奖资格公示见下方附件PDF。特别说明:由于有的同学一天打了多次卡,有的打卡没有上传截图,导致计算时识别到没有截图记为无成绩。先将这部分同学补充如下,且附件PDF名单已修正:
  • [其他] 启用LVM特性的Kafka集群磁盘在线扩容
    MRS 1.8.5及以后的版本,都支持在流式节点上开启LVM特性。LVM特性能有效防止kafka多磁盘场景下因为单topic流量特别大时导致某个磁盘被写爆。同时开启LVM以后可以做到不重启系统、服务或组件的情况下实现磁盘平滑扩容,保证业务的连续性。下面我就介绍一下如何在开启了LVM的节点上实现Kafka的磁盘扩容操作。1.        购买云硬盘并挂载。a)    登录管理控制台。b)     选择“存储 > 云硬盘”。进入云硬盘页面。c)     单击“购买磁盘”,创建云硬盘。关于创建云硬盘的详细操作,请参见云硬盘用户指南。d)     在云硬盘列表,找到新购买的云硬盘,单击“挂载”。e)     选择云硬盘待挂载的云服务器,该云服务器必须与云硬盘位于同一个可用分区,通过下拉列表选择“挂载点”。2.        以root用户登录弹性云服务器。3.        执行如下命令,查看磁盘并记录新添加设备名称。如“/dev/vdc”fdisk -l | grep /dev/vd | grep -v vda4.        执行如下命令,将新挂载的磁盘创建为物理卷。pvcreate /dev/vdc5.        执行如下命令,查询卷组名称。vgdisplay6.        执行如下命令,添加物理卷到卷组中,对卷组进行扩容。vgextend vg_group /dev/vdc7.        执行如下命令,查询逻辑卷路径lvdisplay8.        执行如下命令,扩展逻辑卷的容量lvextend -L +99GB /dev/mapper/vg_group-core9.        执行如下命令,扩展文件系统的容量。到此,单个kafka节点的磁盘扩容完成。resize2fs /dev/mapper/vg_group-core 10.        重复以上步骤,对所有kafka节点进行磁盘扩容。
  • 【限时免费】华为云专家带你5节课玩转Kafka!赢蓝牙音箱、书籍好礼!
     想学习当下热门的分布式消息系统Kafka?华为云中间件专家Hleecs倾囊相授5节课带你零基础入门,从入门到实战轻松玩转Kafka!扫码关注“中间件小哥”公众号,回复“Kafka”即可免费报名!本期课程结合华为云分布式消息服务Kafka,从基础原理入门到实践操作,循序渐进一站式学习。5节实战精品课,囊括创建topic,生产消息、消费消息,编写生产/消费代码,Kafka服务架构机制、常用工具使用等内容,让你系统性掌握Kafka。 参与打卡任务,还有机会获得50元京东卡、Kafka书籍、华为蓝牙音箱等超值奖品,知识+好礼双丰收!   活动过程如遇大量用户参与,会导致暂时无法添加助手微信号,大家可以稍等一段时间后再试。活动中有任何疑问,请添加智能应用平台小助手(微信:zhongjianjianxiaoge)咨询另外,对本次活动有任何想法和建议,欢迎在评论区回帖哦~ 
总条数:177 到第
上滑加载中