-
分布式消息服务DMS和分布式消息服务Kafka是什么关系?
-
分布式消息服务DMS和分布式消息服务Kafka是什么关系?
-
如图,同样一段源码生成的so函数库封装了KCF和kafka的功能,使用自己的单元测试工具可以成功调用并执行完毕,而把该模块链接到atlas在host端的engine后配置完全一样的配置文件,却无法完成kafka接口的初始化,整个graph无法成功建立并执行,请问这是什么原因以及如何处理
-
1、简介 Apache Kafka是一个开源流处理软件平台,用Scala和Java编写。该项目旨在提供统一、高吞吐量、低延迟的平台,用于处理实时数据馈送。它的存储层本质上是一个大规模可扩展设计为分布式事务日志的发布/订阅消息队列。2、基础环境类别子项版本获取地址(方法)华为云虚拟机KC1(920)--OSCentOS7.6Kernel4.14软件包 Zookeeper3.4.9http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gzkafka2.12http://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgzjava1.8.03、依赖安装yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel4、组件安装下载并解压组件wet http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gzwet http://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz 配置cp -rp zoo_sample.cfg zoo.cfg启动服务./bin/zkServer.sh start 查看运行状态./bin/zkServer.sh status kafka启动bin/kafka-server-start.sh config/server.properties &netstat -tunlp |egrep "(2181|9092)"kafka 停止服务bin/kafka-server-stop.sh5、系统配置 无6、测试 重新启动kafka服务 创建生产者:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test561 创建消费者: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test561 --from-beginning 测试内容:发送消息 测试内容:接收消息 7、参考资料 无8、FAQQ:执行./bin/kafka-console-consumer.sh --zookeeper 172.17.0.11:2181 --topic test561 --from-beginning创建消费者报错 A:bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.11:9092 --topic test561 --from-beginning
-
Kafka 组件的介绍Kafka定义Kafka 是一个高吞吐、分布式、基于发布订阅A的消息系统,利用Kafka技术可在廉价PC Server上搭建起大规模消息系统。Kafka应用场景简介Kafka和其他组件比较,具有消息持久化、高吞吐、实时等特性,适用于离线和实时的消息消费,如网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的数据收集场景。 Kafka拓扑结构图一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干Broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举Leader,以及在Consumer发生变化时进行rebalance。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。Broker:Kafka集群包含一个或多个服务实例,这些服务实例被称为Broker。Producer:负责发布消息到Kafka Broker。Consumer:消息消费者,从Kafka Broker读取消息的客户端。Kafka Topics每条发布到Kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。图片中的蓝色框为Kafka的一个Topic,即可以理解为一个队列,每个格子代表一条消息。生产者产生的消息逐条放到Topic的末尾。消费者从左至右顺序读取消息,使用Offset来记录读取的位置。Kafka Partition每个Topic 都有一个或者多个Partitions构成。每个Partition都是有序且不可变的消息队列。引入Partition机制,保证了Kafka的高吞吐能力。每个topic被分成多个partition(区),每个partition在存储层面对应一个log文件,log文件中记录了所有的消息数据。引入Partition机制,保证了Kafka的高吞吐能力,因为Topic的多个Partition分布在不同的Kafka节点上,这样一来多个客户端(Producer和Consumer)就可以并发访问不同的节点对一个Topic进行消息的读写。
-
MRS 1.8.5及以后的版本,都支持在流式节点上开启LVM特性。LVM特性能有效防止kafka多磁盘场景下因为单topic流量特别大时导致某个磁盘被写爆。同时开启LVM以后可以做到不重启系统、服务或组件的情况下实现磁盘平滑扩容,保证业务的连续性。下面我就介绍一下如何在开启了LVM的节点上实现Kafka的磁盘扩容操作。1. 购买云硬盘并挂载。a) 登录管理控制台。b) 选择“存储 > 云硬盘”。进入云硬盘页面。c) 单击“购买磁盘”,创建云硬盘。关于创建云硬盘的详细操作,请参见云硬盘用户指南。d) 在云硬盘列表,找到新购买的云硬盘,单击“挂载”。e) 选择云硬盘待挂载的云服务器,该云服务器必须与云硬盘位于同一个可用分区,通过下拉列表选择“挂载点”。2. 以root用户登录弹性云服务器。3. 执行如下命令,查看磁盘并记录新添加设备名称。如“/dev/vdc”fdisk -l | grep /dev/vd | grep -v vda4. 执行如下命令,将新挂载的磁盘创建为物理卷。pvcreate /dev/vdc5. 执行如下命令,查询卷组名称。vgdisplay6. 执行如下命令,添加物理卷到卷组中,对卷组进行扩容。vgextend vg_group /dev/vdc7. 执行如下命令,查询逻辑卷路径lvdisplay8. 执行如下命令,扩展逻辑卷的容量lvextend -L +99GB /dev/mapper/vg_group-core9. 执行如下命令,扩展文件系统的容量。到此,单个kafka节点的磁盘扩容完成。resize2fs /dev/mapper/vg_group-core 10. 重复以上步骤,对所有kafka节点进行磁盘扩容。
-
1、简介Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本次移植以Kafka-2.1.0为例进行说明官方链接:https://kafka.apache.org/类别:应用程序2、环境类别子项版本获取地址(方法)ECS规格arm通用计算增强型|c3.xlarge.4|4vCPUs| 14GB---OSCentOS7.5--Kernel4.14--PackageKafka2.1.0https://codeload.github.com/apache/kafka/tar.gz/2.1.0OpenJdkJdk8u191-b12https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz Gradle4.10https://downloads.gradle.org/distributions/gradle-5.4-bin.zip Scala2.12.0https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz 3、依赖安装 通用工具Gradle、Jdk8u191-b12、Scala-2.12.0Gradle4.10安装1、下载地址 执行:wget https://downloads.gradle.org/distributions/gradle-5.4-bin.zip2、解压 执行:yum install unzip 执行::unzip -d /home/tools/gradle/gradle-5.4-bin.zip3、设置环境变量 执行:vim /etc/profile 在配置文件最后加上:export GRADLE_HOME=/home/tools/gradle/gradle-5.4export PATH=$ GRADLE _HOME/bin:$PATH 保存后执行: source /etc/profilejdk8u191-b12安装1、下载地址:执行:wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz2、解压 执行:cd /home/tools 执行:tar -zxvf OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz3、设置环境变量 执行:vim /etc/profileexport JAVA_HOME=/home/tools/jdk8u191-b12export PATH=${JAVA_HOME}/bin:$PATHexport CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar 执行: source /etc/profile Scala-2.12.0安装1、下载地址:wget https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz2、解压 执行:cd /home/tools 执行:tar -zxvf scala-2.12.0.tgz3、设置环境变量 执行:vim /etc/profileexport SCALA_HOME=/home/tools/scala-2.12.0export PATH=${SCALA_HOME}/bin:$PATH 执行: source /etc/profile 4、组件编译安装4.1 下载源码 从Kafka官网下载kafka-2.1.0的源码, 执行:wget -O kafka-2.1.0.tar.gz https://codeload.github.com/apache/kafka/tar.gz/2.1.04.2 解压 执行:tar -zxvf kafka-2.1.0.tar.gz 进行解压 执行:cd kafka-2.1.0 进入源码目录4.3 执行编译命令执行:mkdir /usr/gradleRepository (gradleRepository为gradle本地仓库目录需要手动创建)执行:gradle -g /usr/gradleRepository clean(注意:此时目录为kafka-2.1.0) 执行:gradle -g /usr /gradleRepository releaseTarGz (按实际情况选择编译命令)4.4 安装部署1、配置 执行:cd kafka-2.1.0 执行:vim config/server.properties #配置zookeeper和主机名 host.name=localhost zookeeper.connect= localhost:21812、配置环境变量 执行:vim /etc/profileexport KAFKA_HOME=/xxxx/kafka-2.1.0export PATH=${KAFKA_HOME}/bin:$PATH 执行: source /etc/profile3、启动Kafka 执行:zookeeper-server-start.sh config/zookeeper.properties & 执行:kafka-server-start.sh config/server.properties &4、检查进程启动情况: 执行:jps 如果kafka进程:Kafka、QuorumPeerMain两个进程启动成功,整个Kafka服务启动完成。 5、测试Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。1、shell_1创建topic: 执行: kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2、shell_2生产数据: 执行: kafka-console-producer.sh --broker-list localhost:9092 --topic test3、shenll_3消费数据 执行: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 在shell_2中的producer生产端可以输入文本,在shell_3中的consumer消费端可以查看到刚刚producer端生产的文本。如果需要了解更多的命令,建议查看kafka官网。6、参考信息 官方链接:https://kafka.apache.org Github链接:https://github.com/apache/kafka Kafka配置可以参考官网指导(https://kafka.apache.org/documentation/#configuration) 7、FAQ 待补充
-
1.配置python3环境安装步骤:# 配置华为云欧拉镜像源wget http://mirrors.myhuaweicloud.com/repo/mirrors_source.sh && sh mirrors_source.sh # 安装必要的工具,用来编译python3yum groupinstall "Development tools" -yyum -y install zlib zlib-develyum -y install bzip2 bzip2-develyum -y install ncurses ncurses-develyum -y install readline readline-develyum -y install openssl openssl-develyum -y install openssl-staticyum -y install xz lzma xz-develyum -y install sqlite sqlite-develyum -y install gdbm gdbm-develyum -y install tk tk-develyum -y install libffi libffi-devel # 下载,解压python3的tgz包,也可以自行去python官网下一个# 推荐使用Python-3.6.X版本wget https://www.python.org/ftp/python/3.6.7/Python-3.6.7.tgztar -zxvf Python-3.6.7.tgzcd Python-3.6.7 # 配置信息及编译安装,安装到/opt/Bigdata/python3目录下,也可以自行指定./configure --prefix=/opt/Bigdata/python3 --enable-shared CFLAGS=-fPICmake && make install #配置变量echo "/opt/Bigdata/python3/lib" >> /etc/ld.so.confldconfig ln -s /opt/Bigdata/python3/bin/python3 /usr/bin/python3ln -s /opt/Bigdata/python3/bin/pip3 /usr/bin/pip3 2.安装kafka-python#kafka-python安装:pip3 install kafka-pythonpip3 install gssapi 3.运行kafka-python#运行步骤:#在安装有mrs客户端的节点上,例如客户端安装在/opt/client下source /opt/client/bigdata_envkinit kafka用户,例如kinit admin,输入密码#执行python3脚本#脚本样例:#producer:from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=["broker_ip:21007"], security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com")for _ in range(100): response = producer.send("test-topic", b"testmessage") result = response.get(timeout=50) print(result) #consumer:from kafka import KafkaConsumer consumer = KafkaConsumer("test-topic", bootstrap_servers=["broker_ip:21007"], group_id="test-group", enable_auto_commit="true", security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com")for message in consumer: print(message)
-
1. Kafka manager不发布可运行包,需要自己编译Github地址:https://github.com/yahoo/kafka-manager编译打包命令:./sbt clean dist这里使用的版本为 1.3.3.22编译完成后解压到kafka manager安装目录2. 在运行kafka broker节点执行命令ps –ef | grep kafka.Kafka,获取进程信息中的以下变量值:-Djava.security.auth.login.config=/****/jaas.conf #jaas.conf文件路径-Djava.security.krb5.conf=/****/kdc.conf #kdc配置文件路径-Dkerberos.domain.name=domin_name #krbs domain name打开jaas.conf文件,查看以下配置项的值:keyTab="/***/kafka.keytab" #keytab文件路径将jaas.conf文件、kdc配置文件、keytab文件拷贝到安装kafka manager的机器3. 到安装kafka manager的机器编辑jaas.conf文件将keyTab配置项的值改为本机上keytab文件的路径复制Client模块,粘贴并改名为KafkaClient模块,其他内容不作改动。完成后jaas.conf样例如下:KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireddebug=falsekeyTab="example keytab file path"useTicketCache=falsestoreKey=trueprincipal="example principal "useKeyTab=true;}; Client {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;}; KafkaClient {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;};新建consumer.properties文件,内容如下,替换加粗部分:key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializersecurity.protocol = SASL_PLAINTEXTvalue.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializerkerberos.domain.name = 步骤2中获得的krbs domain namebootstrap.servers = broker地址sasl.kerberos.service.name = kafka4. 到kafka manager安装目录下,修改./conf/application.conf中的以下配置项:kafka-manager.zkhosts #配置为zookeeper 地址,默认有2个重复配置项,只保留1个kafka-manager.consumer.properties.file #配置为步骤3中的consumer.properties路径5. 将./lib目录下的org.apache.kafka.kafka-clients-1.1.0.jar和org.apache.zookeeper.zookeeper-3.4.10.jar替换为mrs发布jar包,可在kafka broker lib目录下获取6. 到kafka manager安装目录,替换以下命令中的加粗部分并执行:./bin/kafka-manager -Dconfig.file=步骤4中修改的application.conf文件路径 -Dhttp.port=kafka manager监听端口 -Dhttp.address=kafka manager监听网卡ip地址 -Djava.security.auth.login.config=步骤3中修改的jaas.con文件路径 -Djava.security.krb5.conf=步骤2中获得的kdc.conf文件路径 -Dzookeeper.request.timeout=120000 -Dkerberos.domain.name=步骤2中获得的krbs domain name7. 在网络连通的节点通过浏览器访问地址:http://ip:port, ip和port为步骤6中启动命令中的kafka manager监听网卡ip地址和kafka manager监听端口
-
如题?
-
尊敬的华为云客户:华为云计划于2018/08/15 00:00:00将分布式消息服务Kafka专享版、RabbitMQ队列正式商用。商用后,Kafka专享版和RabbitMQ队列支持按需和包周期计费,具体价格请届时参考该服务的计费详情页。公测期间体验服务的用户,商用后将会按需计费,为保证业务的连续性,请您及时关注账户,确保账户余额充足;如您体验结束,不再需要使用该服务,建议您尽快删除队列,以免账户扣款。Kafka专享版和RabbitMQ队列商用版本采用更高性能主机,性能强劲。具有多种规格实例供选择,满足用户不同场景需要;提供SSL能力,具备更高的安全性。更多关于分布式消息服务的产品介绍,请您点击了解。如您在使用过程中有宝贵意见,欢迎您拨打华为云服务热线:4000-955-988与我们联系。感谢您对华为云的支持!
-
本帖最后由 石头剪刀布 于 2018-5-29 11:22 编辑操作场景常有用户需要将云下的数据搬到云上MRS服务中,但由于MRS集群的节点均在VPC网络中,外部是无法直接访问的,那如果需要将线下的数据搬到云上MRS服务中,该如何处理呢?本文介绍一种方式,如何通过公网访问MapReduceService的Kafka服务。前提条件l 已创建MRS集群 操作步骤1.1 绑定弹性IP步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。步骤 2 在节点信息列表中,选择任意一台“类型”为“Core”的节点,点击打开该节点,在新打开的页面中选择“弹性IP”,点击“绑定弹性IP”。在弹出框中,选择已创建的弹性IP进行绑定。 15171如果当前没有弹性IP,则需要新创建,新创建的弹性IP建议选择“按流量计费” 15172 步骤 3 对集群中的节点信息列表中的其他两个Core节点重复“步骤2”,即给另外两个Core节点都绑定弹性IP。 1.2 开通安全组步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。步骤 2 在节点信息列表中,选择任意一台“类型”为“Core”的节点,点击打开该节点,在新打开的页面中选择“安全组”,点击已经绑定的安全组链接。 15173 在新打开的页面中,选择“添加规则”,在新弹出的框中按如下进行配置 15174 即允许任意IP地址访问21005端口。注意: MRS 1.7.0及以后的版本21005端口改成了与开源一致,即 9092端口 注意:此示例中直接开放了任意IP访问21005端口,这种做法不安全,在实际的生产环境中,需要限制只有特定的IP才能访问,保证安全性。源IP地址填写 指定的IP 即可。步骤 3 Core节点共用一个安全组规格,不需要重复进行配置。 1.3 修改Kafka配置 步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。 步骤 2 点击“集群管理页面”后面的“点击查看”链接,进入到MRS Manager页面。15175步骤 3 在MRS Manager页面中选择“服务管理 > Kafka > 服务配置”,“参数类别”选择“全部配置”,在左树中找到“Broker > 自定义”,然后在参数列表中添加“advertised.listeners”,值填写“PLAINTEXT://:21005,SSL://:21008”MRS 1.7.0及以后的版本值填写为“PLAINTEXT://:9092,SSL://:9093” 15176 步骤 4 点击“保存配置”,并勾选“重新启动受影响的服务或实例。” 1.4 公网连接 步骤 1 通过步骤2.1中绑定的弹性IP登录各个Core节点,然后执行“hostname”,找到EIP对应的主机名 15177 依次登录找到EIP与主机名的对应列表[code]49.4.xx.xx node-core-ndnIi 49.4.xx.xx1 node-core-AYmJu 49.4.xx.xx2 node-core-ZAgvo[/code] 步骤 2 在Kafka客户端的配置中,bootstrap.servers的地址全部使用主机名[code]bootstrap.servers = node-core-ndnIi:21005,node-core-AYmJu:21005,node-core-ZAgvo:21005 [/code]如果使用的版本为MRS 1.7.0及以后,21005端口修改为9092。 并在本机的/etc/hosts中添加步骤1中的EIP和主机名列表。 步骤 3 以下为使用Flume的KafkaSink的示例[code]client.sources = spoolDir1 client.channels = memo1 client.sinks = kafka client.sources.spoolDir1.type = spooldir client.sources.spoolDir1.spoolDir = /tmp/flumedata/ client.sources.spoolDir1.decodeErrorPolicy = IGNORE client.sources.spoolDir1.deserializer.maxLineLength = 4048210 client.sources.spoolDir1.deserializer.maxBatchLine = 10 client.sources.spoolDir1.deserializer = BufferedLine client.sources.spoolDir1.channels = memo1 client.channels.memo1.type = memory client.channels.memo1.capacity = 10000 client.channels.memo1.transactionCapacity = 1000 client.channels.memo1.channelfullcount = 10 client.channels.memo1.keep-alive = 3 client.channels.memo1.byteCapacity = client.channels.memo1.byteCapacityBufferPercentage = 20 client.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink client.sinks.kafka.kafka.bootstrap.clients = node-core-ndnIi:21005,node-core-AYmJu:21005,node-core-ZAgvo:21005 client.sinks.kafka.kafka.topic = Test client.sinks.kafka.channel = memo1 client.sinks.kafka.kafka.producer.security.protocol = PLAINTEXT[/code]同上,如果使用的版本为MRS 1.7.0及以后,21005端口修改为9092。 特别说明:不建议使用此种方式消费MRSKafka的消息,因为华为云EIP的上行流量不收取费用,而下行流量需要收取费用。
-
操作步骤[*](可选)配置DNS。 公网无法直接访问DMS Kafka,必须要先创建ECS,在ECS上才可以访问DMS Kafka。新创建的ECS不需要再配置DNS,如果使用已有的ECS需要配置DNS。 [*]华北区的DNS IP:100.125.1.250 [*]华东区的DNS IP:100.125.17.29 [*]华南区的DNS IP:100.125.1.250 修改/etc/resolv.conf文件,增加内网域名服务器配置,在/etc/resolv.conf文件的第一行增加如下行: nameserver 100.125.1.250 [*]编辑dms_kafka_client_jaas.conf文件,配置 access_key,secret_key和project_id。 DMS Kafka API基于access_key,secret_key和projectID鉴权,配置dms_kafka_client_jaas.conf,内容如下: [code]KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key="XXXXXX" secret_key="XXXXXX" project_id="XXXXXX"; };[/code]注意:把XX替换为服务账号的access_key,secret_key和project_id。 如果需要访问其他租户授权的队列,则需要配置授权者的Project ID,即配置target_project_id为授权者的Project ID。 [code]KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key="XXXXXX" secret_key="XXXXXX" project_id="XXXXXX" target_project_id=""; };[/code] [*]配置启用SASL,(“/path”需修改为实际路径,如下两种选择一种即可)。 [list=a] [*]使用JVM参数设置,进程启动参数增加。-Djava.security.auth.login.config=/path/kafka_client_jaas.conf [*]在代码中设置参数(需要保证在 Kafka Producer 和 Consumer 启动之前)。System.setProperty("java.security.auth.login.config", "/path/kafka_client_jaas.conf"); [*]在consumer.properties增加如下行。 connections.max.idle.ms=30000 [*]配置文件consumer.properties/producer.properties主要参数说明。 表1 主要参数说明参数 说明值bootstrap.serversDMS服务端的地址,配置为IP或者域名。ECS访问: [*]中国华北区1:dms-kafka.cn-north-1.myhuaweicloud.com:37000 [*]中国华东区2 :dms-kafka.cn-east-2.myhuaweicloud.com:37000 [*]中国华南区1 :dms-kafka.cn-south-1.myhuaweicloud.com:37000 公网访问:dms-kafka.cn-north-1.myhuaweicloud.com:37003ssl.truststore.location证书的路径。/path/client.truststore.jks(注意:修改为自己的路径)ssl.truststore.password证书的密码。dms@kafkasecurity.protocol安全协议。SASL_SSLsasl.mechanism服务名称。DMS(注意:必须全大写)Kafka 其它参数请参看Kafka 官网说明。 [*]为调试运行Kafka,可修改log4j.properties,打开kafka debug日志: [code]log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.kafka.clients=DEBUG log4j.logger.kafka=INFO, stdout log4j.additivity.kafka=false log4j.logger.org.apache.kafka=DEBUG, stdout log4j.additivity.org.apache.kafka=false[/code] [*]开始编写代码,API接口请参考Kafka 官网说明。
-
本帖最后由 云彩飞扬 于 2018-3-21 15:05 编辑本章节以新建名为“kafkademo”的Maven Project为例。操作步骤[*]下载Demo包。 [list=a] [*]登录DMS服务管理控制台,访问API使用向导。 [*]选择“KAFKA API”。 [*]单击“下载”。下载DmsKafkaDemo.zip压缩包,即为Demo包。 [*]下载SDK包,即DMS Kafka sasl包。 下载地址:http://static.huaweicloud.com/upload/files/dms/dmskafkasasl.zip,下载后解压,目录如下: [*]client.truststore.jks:客户端证书 [*]dms.kafka.sasl.client-1.0.0.jar: SASL包 [*]dms_kafka_client_jaas.conf:客户端配置文件 也可以从Demo包中获取DMS Kafka sasl包,获取地址:\DmsKafkaDemo\dist\libs\dms.kafka.sasl.client-1.0.0.jar。[*]打开Eclipse,建议使用4.6以上版本,新建一个Maven Project,工程名自定义,这里以kafkademo为例。 [*]单击“Finish”。 [*]导入DMS Kafka sasl的jar包。 [list=a] [*]右键单击新建的工程“kafkademo”,新建一个Folder,命名为libs。 [*]将dms.kafka.sasl.client-1.0.0.jar复制到libs目录下。 [*]在pom.xml文件中增加如下行,将DMS Kafka sasl的jar包导入Maven仓库。[code] dms kafka.sasl.client 1.0.0 system ${project.basedir}/libs/dms.kafka.sasl.client-1.0.0.jar org.apache.kafka kafka-clients 0.10.2.1 org.slf4j slf4j-api 1.7.7 org.slf4j slf4j-log4j12 1.7.7 log4j log4j 1.2.17 [/code] [*]保存“pom.xml”。 [*]如果没有其他依赖包,请参考“DmsKafkaDemo示例"。
-
本帖最后由 云彩飞扬 于 2018-3-21 15:02 编辑 开发环境Eclipse:Eclipse 3.6.0及以上版本,可至Eclipse官方网站下载。JDK:Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。Maven:Apache Maven 3.0.3及以上版本。获取Kafka Topic和消费组的ID使用SDK接口访问DMS服务,需要先在Web Console创建队列和消费组,步骤如下:[*]登录管理控制台。 [*]单击“服务列表”,选择“应用服务 > 分布式消息服务”,进入分布式消息服务信息页面。 [*]单击左侧菜单栏的“队列管理”。 [*]在“队列管理”页面,单击页面上方的“创建队列”。 [*]填写队列名称和描述信息。 表1 参数说明 参数说明当前区域表示当前创建队列的区域。队列名称队列的名称,必须唯一。 DMS为您自动生成了队列名称,您可以根据需要修改,队列名称只能包含a~z,A~Z,0-9,-,_,长度是[1,64]。 创建队列后不能修改名称。队列类型选择“Kafka队列”。队列模式支持高吞吐和高可靠两种模式。 默认值:高吞吐。 高吞吐:消息副本异步落盘,具有较高的性能。 高可靠:消息多副本同步落盘,保证消息的可靠性。消息保留时长(小时)仅Kafka队列才有该参数。 指定kafka队列的消息保存时间,超过该时长的消息将会被删除,删除的消息无法被消费。 取值范围:1-72,必须为整数。 默认值:72小时。描述(可选)队列描述不能包含,长度是[0,160]。图1 创建Kafka队列 [*]单击“确定”。创建队列完成。 [*]单击队列名称,显示队列详情,获取Kafka Topic,如图2所示。 图2 获取Kafka Topic [*]单击“创建消费组”。进入“创建消费组”页面。 [*]填写消费组的名称。 DMS为您自动生成了消费组名称,您可以根据需要修改,消费组的名称只能包含a~z,A~Z,0~9,-,_,长度是[1,32]。同个队列的消费组名称不能重复。 [*]单击“确定”。创建消费组完成,选择已创建的消费组,获取消费组ID,如图3所示。 图3 获取消费组ID 获取project id在调用接口的时候,需要填入项目编号project_id,所以需要先在管理控制台上获取到项目编号,步骤如下:[*]注册并登录管理控制台。 [*]单击用户名,在下拉列表中单击“我的凭证”。 [*]在“我的凭证”页面的“项目列表”中查看项目ID。 图4 获取项目ID 获取AK/SK[*]注册并登录管理控制台。 [*]单击用户名,在下拉列表中单击“我的凭证”。 [*]在“我的凭证”页,单击“管理访问密钥”页签。 [*]单击“新增访问密钥”,进入“新增访问密钥”页面。 [*]输入当前用户的登录密码。 [*]通过邮箱或者手机进行验证,输入对应的验证码。 [*]单击“确定”,下载访问密钥。 说明:为防止访问密钥泄露,建议您将其保存到安全的位置。 [*]保存文件到本地,从下载的credentials.csv文件中获取“Access Key Id”和“Secret Access Key”。获取区域和EndpointRegion域名说明中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37000在华为云(ECS)上搭建开发环境,使用该endpoint访问DMS服务。中国华东区2dms-kafka.cn-east-2.myhuaweicloud.com:37000中国华南区1dms-kafka.cn-south-1.myhuaweicloud.com:37000中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37003在本地(公网)搭建开发环境,使用该endpoint访问DMS服务。环境信息汇总表2 环境信息汇总 类型项目收集的信息(以下为示例,请根据实际情况替换)弹性云服务器弹性IP114.115.141.228用户名huaweicloud密码password分布式消息服务队列名称my-kafka-queue队列ID4df89da6-ede4-4072-93e0-28dc6e866299队列类型Kafka队列Kafka Topick-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299消费组名称my-consumer-group消费组IDg-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7访问密钥AK (Access Key Id)VAODAIIJGPUAYTJRRLODSK (Secret Access Key)ZHN49c6bpwDiQvPqKJ5CxutJxqc04Glt9xSzxYWi项目ID所属区域中国华北区1项目cn-north-1项目IDbd67aaead60940d688b872c31bdc653b区域和Endpoint区域名称中国华北区1Endpointdms-kafka.cn-north-1.myhuaweicloud.com:37000DNSDNS服务器IP华北:100.125.1.250 华东:100.125.17.29 华南:100.125.1.250
上滑加载中
推荐直播
-
HDC深度解读系列 - Serverless与MCP融合创新,构建AI应用全新智能中枢2025/08/20 周三 16:30-18:00
张昆鹏 HCDG北京核心组代表
HDC2025期间,华为云展示了Serverless与MCP融合创新的解决方案,本期访谈直播,由华为云开发者专家(HCDE)兼华为云开发者社区组织HCDG北京核心组代表张鹏先生主持,华为云PaaS服务产品部 Serverless总监Ewen为大家深度解读华为云Serverless与MCP如何融合构建AI应用全新智能中枢
回顾中 -
关于RISC-V生态发展的思考2025/09/02 周二 17:00-18:00
中国科学院计算技术研究所副所长包云岗教授
中科院包云岗老师将在本次直播中,探讨处理器生态的关键要素及其联系,分享过去几年推动RISC-V生态建设实践过程中的经验与教训。
回顾中 -
一键搞定华为云万级资源,3步轻松管理企业成本2025/09/09 周二 15:00-16:00
阿言 华为云交易产品经理
本直播重点介绍如何一键续费万级资源,3步轻松管理成本,帮助提升日常管理效率!
回顾中
热门标签