• [介绍/入门] Kafka开发指南搭建开发环境之配置说明!
    操作步骤[*](可选)配置DNS。 公网无法直接访问DMS Kafka,必须要先创建ECS,在ECS上才可以访问DMS Kafka。新创建的ECS不需要再配置DNS,如果使用已有的ECS需要配置DNS。 [*]华北区的DNS IP:100.125.1.250 [*]华东区的DNS IP:100.125.17.29 [*]华南区的DNS IP:100.125.1.250 修改/etc/resolv.conf文件,增加内网域名服务器配置,在/etc/resolv.conf文件的第一行增加如下行: nameserver 100.125.1.250 [*]编辑dms_kafka_client_jaas.conf文件,配置 access_key,secret_key和project_id。 DMS Kafka API基于access_key,secret_key和projectID鉴权,配置dms_kafka_client_jaas.conf,内容如下: [code]KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key="XXXXXX" secret_key="XXXXXX" project_id="XXXXXX"; };[/code]注意:把XX替换为服务账号的access_key,secret_key和project_id。 如果需要访问其他租户授权的队列,则需要配置授权者的Project ID,即配置target_project_id为授权者的Project ID。 [code]KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key="XXXXXX" secret_key="XXXXXX" project_id="XXXXXX" target_project_id=""; };[/code] [*]配置启用SASL,(“/path”需修改为实际路径,如下两种选择一种即可)。 [list=a] [*]使用JVM参数设置,进程启动参数增加。-Djava.security.auth.login.config=/path/kafka_client_jaas.conf [*]在代码中设置参数(需要保证在 Kafka Producer 和 Consumer 启动之前)。System.setProperty("java.security.auth.login.config", "/path/kafka_client_jaas.conf"); [*]在consumer.properties增加如下行。 connections.max.idle.ms=30000 [*]配置文件consumer.properties/producer.properties主要参数说明。 表1 主要参数说明参数 说明值bootstrap.serversDMS服务端的地址,配置为IP或者域名。ECS访问: [*]中国华北区1:dms-kafka.cn-north-1.myhuaweicloud.com:37000 [*]中国华东区2 :dms-kafka.cn-east-2.myhuaweicloud.com:37000 [*]中国华南区1 :dms-kafka.cn-south-1.myhuaweicloud.com:37000 公网访问:dms-kafka.cn-north-1.myhuaweicloud.com:37003ssl.truststore.location证书的路径。/path/client.truststore.jks(注意:修改为自己的路径)ssl.truststore.password证书的密码。dms@kafkasecurity.protocol安全协议。SASL_SSLsasl.mechanism服务名称。DMS(注意:必须全大写)Kafka 其它参数请参看Kafka 官网说明。 [*]为调试运行Kafka,可修改log4j.properties,打开kafka debug日志: [code]log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.kafka.clients=DEBUG log4j.logger.kafka=INFO, stdout log4j.additivity.kafka=false log4j.logger.org.apache.kafka=DEBUG, stdout log4j.additivity.org.apache.kafka=false[/code] [*]开始编写代码,API接口请参考Kafka 官网说明。
  • [介绍/入门] Kafka开发指南搭建开发环境之创建工程!
    本帖最后由 云彩飞扬 于 2018-3-21 15:05 编辑本章节以新建名为“kafkademo”的Maven Project为例。操作步骤[*]下载Demo包。 [list=a] [*]登录DMS服务管理控制台,访问API使用向导。 [*]选择“KAFKA API”。 [*]单击“下载”。下载DmsKafkaDemo.zip压缩包,即为Demo包。 [*]下载SDK包,即DMS Kafka sasl包。 下载地址:http://static.huaweicloud.com/upload/files/dms/dmskafkasasl.zip,下载后解压,目录如下: [*]client.truststore.jks:客户端证书 [*]dms.kafka.sasl.client-1.0.0.jar: SASL包 [*]dms_kafka_client_jaas.conf:客户端配置文件 也可以从Demo包中获取DMS Kafka sasl包,获取地址:\DmsKafkaDemo\dist\libs\dms.kafka.sasl.client-1.0.0.jar。[*]打开Eclipse,建议使用4.6以上版本,新建一个Maven Project,工程名自定义,这里以kafkademo为例。 [*]单击“Finish”。 [*]导入DMS Kafka sasl的jar包。 [list=a] [*]右键单击新建的工程“kafkademo”,新建一个Folder,命名为libs。 [*]将dms.kafka.sasl.client-1.0.0.jar复制到libs目录下。 [*]在pom.xml文件中增加如下行,将DMS Kafka sasl的jar包导入Maven仓库。[code] dms kafka.sasl.client 1.0.0 system ${project.basedir}/libs/dms.kafka.sasl.client-1.0.0.jar org.apache.kafka kafka-clients 0.10.2.1 org.slf4j slf4j-api 1.7.7 org.slf4j slf4j-log4j12 1.7.7 log4j log4j 1.2.17 [/code] [*]保存“pom.xml”。 [*]如果没有其他依赖包,请参考“DmsKafkaDemo示例"。
  • [介绍/入门] Kafka开发指南搭建开发环境之准备环境!
    本帖最后由 云彩飞扬 于 2018-3-21 15:02 编辑 开发环境Eclipse:Eclipse 3.6.0及以上版本,可至Eclipse官方网站下载。JDK:Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。Maven:Apache Maven 3.0.3及以上版本。获取Kafka Topic和消费组的ID使用SDK接口访问DMS服务,需要先在Web Console创建队列和消费组,步骤如下:[*]登录管理控制台。 [*]单击“服务列表”,选择“应用服务 > 分布式消息服务”,进入分布式消息服务信息页面。 [*]单击左侧菜单栏的“队列管理”。 [*]在“队列管理”页面,单击页面上方的“创建队列”。 [*]填写队列名称和描述信息。 表1 参数说明 参数说明当前区域表示当前创建队列的区域。队列名称队列的名称,必须唯一。 DMS为您自动生成了队列名称,您可以根据需要修改,队列名称只能包含a~z,A~Z,0-9,-,_,长度是[1,64]。 创建队列后不能修改名称。队列类型选择“Kafka队列”。队列模式支持高吞吐和高可靠两种模式。 默认值:高吞吐。 高吞吐:消息副本异步落盘,具有较高的性能。 高可靠:消息多副本同步落盘,保证消息的可靠性。消息保留时长(小时)仅Kafka队列才有该参数。 指定kafka队列的消息保存时间,超过该时长的消息将会被删除,删除的消息无法被消费。 取值范围:1-72,必须为整数。 默认值:72小时。描述(可选)队列描述不能包含,长度是[0,160]。图1 创建Kafka队列 [*]单击“确定”。创建队列完成。 [*]单击队列名称,显示队列详情,获取Kafka Topic,如图2所示。 图2 获取Kafka Topic [*]单击“创建消费组”。进入“创建消费组”页面。 [*]填写消费组的名称。 DMS为您自动生成了消费组名称,您可以根据需要修改,消费组的名称只能包含a~z,A~Z,0~9,-,_,长度是[1,32]。同个队列的消费组名称不能重复。 [*]单击“确定”。创建消费组完成,选择已创建的消费组,获取消费组ID,如图3所示。 图3 获取消费组ID 获取project id在调用接口的时候,需要填入项目编号project_id,所以需要先在管理控制台上获取到项目编号,步骤如下:[*]注册并登录管理控制台。 [*]单击用户名,在下拉列表中单击“我的凭证”。 [*]在“我的凭证”页面的“项目列表”中查看项目ID。 图4 获取项目ID 获取AK/SK[*]注册并登录管理控制台。 [*]单击用户名,在下拉列表中单击“我的凭证”。 [*]在“我的凭证”页,单击“管理访问密钥”页签。 [*]单击“新增访问密钥”,进入“新增访问密钥”页面。 [*]输入当前用户的登录密码。 [*]通过邮箱或者手机进行验证,输入对应的验证码。 [*]单击“确定”,下载访问密钥。 说明:为防止访问密钥泄露,建议您将其保存到安全的位置。 [*]保存文件到本地,从下载的credentials.csv文件中获取“Access Key Id”和“Secret Access Key”。获取区域和EndpointRegion域名说明中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37000在华为云(ECS)上搭建开发环境,使用该endpoint访问DMS服务。中国华东区2dms-kafka.cn-east-2.myhuaweicloud.com:37000中国华南区1dms-kafka.cn-south-1.myhuaweicloud.com:37000中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37003在本地(公网)搭建开发环境,使用该endpoint访问DMS服务。环境信息汇总表2 环境信息汇总 类型项目收集的信息(以下为示例,请根据实际情况替换)弹性云服务器弹性IP114.115.141.228用户名huaweicloud密码password分布式消息服务队列名称my-kafka-queue队列ID4df89da6-ede4-4072-93e0-28dc6e866299队列类型Kafka队列Kafka Topick-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299消费组名称my-consumer-group消费组IDg-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7访问密钥AK (Access Key Id)VAODAIIJGPUAYTJRRLODSK (Secret Access Key)ZHN49c6bpwDiQvPqKJ5CxutJxqc04Glt9xSzxYWi项目ID所属区域中国华北区1项目cn-north-1项目IDbd67aaead60940d688b872c31bdc653b区域和Endpoint区域名称中国华北区1Endpointdms-kafka.cn-north-1.myhuaweicloud.com:37000DNSDNS服务器IP华北:100.125.1.250 华东:100.125.17.29 华南:100.125.1.250
  • [介绍/入门] 分布式消息服务之Kafka API快速入门编译示例源码
    本帖最后由 云彩飞扬 于 2018-3-20 10:42 编辑[*]下载并解压DmsKafkaDemo示例工程代码。 [*]打开Eclipse,通过“File -> Import…”,“Select an import source”选择“Exiting Projects Into Workspace”,选择DmsKafkaDemo解压目录,导入示例工程代码。 [*]选择“Project -> Build Project”,执行编译构建。 [*]鼠标右键单击DmsKafkaDemo项目,在弹出的上下文菜单中选择“Export…”,选择“Java -> JAR file”,填写待生成的JAR文件保存位置和名称,导出JAR文件。 [*]使用新生成的JAR文件替换弹性云服务器DmsKafkaDemo/dist/libs目录下的dms.kafka.demo.jar文件,按照“运行示例工程”执行验证。
  • [介绍/入门] 分布式消息服务之Kafka API快速入门运行示例工程!
    本帖最后由 云彩飞扬 于 2018-3-20 10:39 编辑[*]登录弹性云服务器。说明:用户可以直接在192网段的ECS虚拟机上运行。 [*]安装Java JDK或JRE,并配置JAVA_HOME与PATH环境变量,使用执行用户在用户家目录下修改.bash_profile,添加如下行。 export JAVA_HOME=/opt/java/jdk1.8.0_151export PATH=$JAVA_HOME/bin:$PATH执行source .bash_profile命令使修改生效。 说明:ECS虚拟机默认自带的JDK可能不符合要求,例如OpenJDK,需要配置为Oracle的JDK,可至Oracle官方下载页面下载Java Development Kit 1.8.111及以上版本。 [*]使用root用户编辑/etc/resolv.conf文件,配置DNS服务器地址。 在第一行增加如下:nameserver 100.125.1.250 [*]下载DmsKafkaDemo示例工程代码。 [*]解压DmsKafkaDemo.zip压缩包。 $ unzip DmsKafkaDemo.zip [*]进入DmsKafkaDemo/dist目录,该目录下包含预编译好的二进制文件和执行脚本。 $ cd DmsKafkaDemo/dist [*]编辑配置文件config/dms_kafka_client_jaas.conf,设置access_key、secret_key和project_id。 $ vim config/dms_kafka_client_jaas.conf设置内容如下(其中标红内容需要替换为实际值):KafkaClient { com.huawei.middleware.kafka.sasl.client.KafkaLoginModule required access_key=VAODAIIJGPUAYTJRRLOD secret_key=ZHN49c6bpwDiQvPqKJ5CxutJxqc04Glt9xSzxYWi project_id=bd67aaead60940d688b872c31bdc653b;}; [*]编辑配置文件config/producer.properties,设置topic和bootstrap.servers。 $ vim config/producer.properties设置内容如下(其中标红内容需要替换为实际值):topic=k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299bootstrap.servers=dms-kafka.cn-north-1.myhuaweicloud.com:37000ssl.truststore.password=dms@kafkaacks=allretries=1batch.size=16384buffer.memory=33554432key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializersecurity.protocol=SASL_SSLsasl.mechanism=DMS [*]编辑配置文件config/consumer.properties,设置topic、bootstrap.servers和group.id。 $ vim config/consumer.properties设置内容如下(其中标红内容需要替换为实际值):topic=k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299bootstrap.servers=dms-kafka.cn-north-1.myhuaweicloud.com:37000group.id=g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7ssl.truststore.password=dms@kafkasecurity.protocol=SASL_SSLsasl.mechanism=DMSkey.deserializer=org.apache.kafka.common.serialization.StringDeserializervalue.deserializer=org.apache.kafka.common.serialization.StringDeserializerauto.offset.reset=earliestenable.auto.commit=false [*]运行生产消息示例。 $ bash produce.sh [*]运行消费消息示例。 $ bash consume.sh
  • [介绍/入门] 分布式消息服务之Kafka API快速入门环境准备
    本帖最后由 云彩飞扬 于 2018-3-20 10:22 编辑 开发环境Eclipse:Eclipse 3.6.0及以上版本,可至Eclipse官方网站下载。JDK:Java Development Kit 1.8.111及以上版本,可至Oracle官方下载页面下载。Maven:Apache Maven 3.0.3及以上版本。创建Kafka队列通过浏览器登录华为云,在分布式消息服务控制台中创建队列,队列类型选择“Kafka队列”。请记录Kafka Topic ID,后续访问需要使用。如果已创建过Kafka队列,可以跳过此步骤。创建消费组通过浏览器登录华为云,在分布式消息服务控制台中选择上文创建的Kafka队列,为该队列创建一个消费组。请记录消费组ID,后续访问需要使用。如果已为该Kafka队列创建过消费组,可以跳过此步骤。创建访问密钥通过浏览器登录华为云,选择“我的凭证”菜单,单击“管理访问密钥”标签,新增一组访问密钥,保存credentials.csv文件到本地。从credentials.csv文件中可以提取“Access Key Id”和“Secret Access Key”。如果之前已创建过访问密钥,可以跳过此步骤。获取项目ID通过浏览器登录华为云,选择“我的凭证”菜单,单击“项目列表”标签,根据所属区域记录对应的项目ID。获取区域和EndpointRegion域名说明中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37000在华为云(ECS)上运行Demo示例,使用该endpoint访问DMS服务。中国华东区2dms-kafka.cn-east-2.myhuaweicloud.com:37000中国华南区1dms-kafka.cn-south-1.myhuaweicloud.com:37000中国华北区1dms-kafka.cn-north-1.myhuaweicloud.com:37003在本地(公网)运行Demo示例,使用该endpoint访问DMS服务。申请弹性云服务器说明:在华为云(ECS)上运行DMS Demo示例,需要先申请弹性云服务器。 通过浏览器登录华为云,在弹性云服务器控制台中申请创建一台Linux弹性云服务器,并绑定弹性IP。请记录弹性IP地址、用户名和密码,后续访问需要使用。如果已申请过华为云弹性云服务器,可以跳过此步骤。说明:弹性IP用于登录弹性云服务器,以及上传文件等。 环境信息汇总表1 环境信息汇总类型项目收集的信息(以下为示例,请根据实际情况替换)弹性云服务器弹性IP114.115.141.228用户名huaweicloud密码password分布式消息服务队列名称my-kafka-queue队列ID4df89da6-ede4-4072-93e0-28dc6e866299队列类型Kafka队列Kafka Topick-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299消费组名称my-consumer-group消费组IDg-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7访问密钥AK (Access Key Id)VAODAIIJGPUAYTJRRLODSK (Secret Access Key)ZHN49c6bpwDiQvPqKJ5CxutJxqc04Glt9xSzxYWi项目ID所属区域中国华北区1项目cn-north-1项目IDbd67aaead60940d688b872c31bdc653b区域和Endpoint区域名称中国华北区1Endpointdms-kafka.cn-north-1.myhuaweicloud.comDNSDNS服务器IP华北:100.125.1.250华东:100.125.17.29华南:100.125.1.250
  • [介绍/入门] 分布式消息服务之Kafka开发指南概述
    Kafka API介绍DMS支持开源的Kafka接口,第三方应用程序直接使用原生Kafka client来接入DMS即可实现开源Kafka的消息处理能力。使用约束DMS Kafka一般可达到几千TPS(即每秒可处理的消息数),如果需要上万甚至10万TPS,请单独提交工单或联系客服进行特殊处理。Kafka client推荐使用0.10.2.1版本。使用Kafka SDK接口生产消息时,单条消息最大限制为10MB。使用WebConsole生产消息到Kafka队列时,最大限制依然是512KB。
  • 【云中间件锦囊妙计】巧用参数调优为客户解决时延高的难题(上篇)
    客户简介 中软独家中标税务核心征管系统,全国34个省国/地税。电子税务局15省格局。 大数据国家税务总局局点,中国软件电子税务局技术路径:核心征管 + 纳税服务 业务应用分布式上云改造。业务难题12226 如上图所示是模拟客户的业务网页构建的一个并发访问模型。用户在页面点击从而产生一个HTTP请求,这个请求发送到业务生产进程,就会启动一个投递线程(DeliverThread)调用Kafka的SDK接口,并发送3条消息到DMS(分布式消息服务),每条消息大小3k,需要等待3条消息都被处理完成后才会返回请求响应⑧。当消息达到DMS后,业务消费进程调用Kafka的消费接口把消息取出来,然后将每条消息放到一个响应线程(Response Thread)中进行处理,响应线程处理完后,通过HTTP请求通知投递线程,投递线程收到响应后返回回复响应。 100并发访问时延500ms,未达成用户业务要求客户提出了明确的要求:每1个两核的ECS要能够支撑并发访问量100,每条消息端到端的时延范围是几十毫秒,即从生产者发送开始到接收到消费者响应的时间。客户实测在使用了DMS的Kafka 队列后,并发访问量为100时时延高达到500ms左右,甚至出现达到秒级的时延,远未达到客户提出的业务诉求。相比较而言,客户在Pod区使用的是自己搭建的原生Kafka,在并发访问量为100时测试到的时延大约只有10~20ms左右。那么问题来了,在并发访问量相同的条件下,DMS的Kafka 队列与Pod区自建的原生Kafka相比为什么时延会有这么大的差异呢?我们DMS的架构师 Mr. Peng对这个时延难题进行了一系列分析后完美解决了这个客户难题,下面就让我们来看看他的心路历程。难题剖析根据模拟的客户业务模型,Mr. Peng在华为云类生产环境上也构造了一个测试程序,同样模拟构造了100的并发访问量,通过测试发现,类生产环境上压测得到的时延平均时间在60ms左右。类生产上的时延数值跟客户在真实生产环境上测到的时延差距这么大,这是怎么回事呢?问题变得扑朔迷离起来。Mr. Peng当机立断,决定就在华为云现网上运行构造的测试程序,来看看到底是什么原因。同时,在客户的ECS服务器上,也部署了相同的测试程序,模拟构建了100的并发量,得到如下的时延结果对比表:调优前时延现网时延(ms)类生产时延(ms)100并发500ms ~ 4000ms40ms ~ 80 ms1并发31ms6msPing测试0.9ms ~ 1.2ms0.3ms ~ 0.4ms表1 华为云现网与类生产环境时延对比表 从时延对比表的结果看来,Mr. Peng发现,即使在相同的并发压力下,华为云现网的时延比类生产差很多。Mr.Peng意识到,现在有2个问题需要分析:为什么华为云现网的时延会比类生产差?DMS的Kafka队列时延比原生自建的Kafka队列时延表现差的问题怎么解决?Mr. Peng陷入了沉思之中。 欲知后事如何,请看下【云中间件锦囊妙计】巧用参数调优为客户解决时延高的难题(中篇)。云中间件锦囊妙计系列专题旨在为开发者、用户和对云中间件技术有兴趣的同学提供一系列的客户案例、技术干货、疑难杂症解决思路的分享,欢迎关注我们的小伙伴在下方留言,共同学习和分享,请大家多多支持哦~
  • 【技术分享】DMS分布式消息服务——Kafka队列怎么创建和使用!
    本帖最后由 云彩飞扬 于 2018-1-30 15:53 编辑大概步骤9975 具体步骤 准备环境 步骤 9982 ①登录华为云控制台。选择“计算> 弹性云服务器”,创建一台弹性云服务器(ECS),建议选择windows服务器。 ②下载和安装开发工具:Eclipse3.6.0以上版本,JDK1.8.111以上版本。说明9984 ①ECS需要绑定一个弹性IP,具备外网访问权限,用于安装Eclipse和JDK。 ②如果用户已有ECS,可重复使用,无需多次创建。 创建Kafka队列步骤99859986 ①控制台选择“应用服务> 分布式消息服务”。单击右上角“创建队列”。 ②填写队列名称,选择队列类型为“Kafka队列”,选择队列模式。单击“确定”,创建队列。 说明 ①确认当前区域为您应用服务部署的区域,如果不一致,请单击控制台左上角的区域图标进行切换。 创建消费组步骤 ①队列创建完成后,单击队列名称,进入队列详情页。 ②单击“创建消费组”,填写消费组名称,完成消费组创建。 9987说明 ①队列详情页的上方显示了Kafka Topic,下方显示了消费组ID。在生产与消费消息时使用这几项识别Kafka队列以及消费组。 ②一个队列默认可以创建3个消费组,队列和消费组的模式帮助服务架构解耦。获取IAM认证信息 获取IAM认证信息步骤 ①单击控制台右上角的用户名,进入用户中心,在“账号管理> 基本信息”中找到“管理我的凭证”。 ②在“我的凭证”页中获取项目ID以及访问密钥(AK/SK),用于Kafka队列Demo示例工程配置。 说明99889989 ①分布式消息服务使用华为云IAM服务进行身份识别,保证队列和消息的安全。 ②AK/SK密钥对如果已申请,可以使用原有AK/SK。 搭建工程环境步骤9993 ①登录ECS,下载Demo示例包(含SDK)。 ②导入工程,选择“Exiting Projects Into Workspace”方式导入。 ③将获取到的队列Topic、消费组ID、项目ID以及AK/SK等信息替换到配置文件中。参数配置说明参考 说明9990 ①工程包含了DMS Kafka的SDK文件以及demo示例代码。 ②需要对弹性云服务器添加域名服务器地址,帮助解析DMS Kafka的Endpoint地址。 编译和管理队列消息步骤9991 ①编译工程,也可设置为Build Automatically。 ②运行DMSKafkaProducerDemo类,生产消息。 ③运行DMSKafkaconsumerDemo类,从消费组消费消息。说明9992 ①在控制台可以查看队列和消费组的消息数变化。 ②Demo工程可以正常生产和消费消息,则说明队列、消费组以及本地环境配置均正确。开发者可以参考Demo进行开发和对接分布式消息服务。
  • [其他] MRS服务的Kafka节点磁盘占用率达到100%导致Broker实例状态为“出错”恢复方法
    本帖最后由 落地成盒 于 2018-1-25 14:32 编辑 现象描述当Kafka配置的数据目录所在的磁盘占用率达到100%后,该节点的Broker实例出现“出错”状态,导致当前节点无法提供Kafka服务。 可能原因 [*]Topic的Partition划分不合理,导致个别磁盘占用率达到100%。 [*]本地磁盘上还存在除Kafka外的其他数据,导致磁盘占用率达到100%。 定位思路 [*]定位磁盘占用率达到100%的根因。 [*]删除或移动数据到其他空闲磁盘,重启Broker。 [*]按照“ALM-38001 Kafka磁盘容量不足”告警指导彻底解决磁盘容量不足问题。详见《MapReduce服务用户指南》5.7.70章节。 处理步骤 [*]登录MRS Manager,单击“服务管理 > Kafka > 实例”,将健康状态为“出错”的Broker实例停止并记录实例所在节点的管理IP地址以及对应的“broker.id”,该值可通过单击角色名称,在“实例配置”页面中设置“参数类别”为“全部”,搜索“broker.id”参数获取。 [*]使用PuTTY工具以root用户登录记录的管理IP地址,并执行df -lh命令,查看磁盘占用率为100%的挂载目录,例如“/srv/BigData/kafka/data1”。 [*]进入该目录,执行du -sh *命令,查看该目录下各文件夹的大小。查看是否存在除“kafka-logs”目录外的其他文件,并判断是否可以删除或者迁移。 [*]是,删除或者迁移相关数据,然后执行步骤 8。 [*]否,执行步骤 4。 [*]进入“kafka-logs”目录,执行du -sh *命令,选择一个待移动的Partition文件夹,其名称命名规则为“Topic名称-Partition标识”,记录Topic及Partition。 [*]修改“kafka-logs”目录下的“recovery-point-offset-checkpoint”和“replication-offset-checkpoint”文件(两个文件做同样的修改)。 [*]减少文件中第二行的数字(若移出多个目录,则减少的数字为移出的目录个数)。 [*]删除待移出的Partition所在的行(行结构为“Topic名称 Partition标识 Offset”,删除前先将该行数据保存,后续此内容还要添加到目的目录下的同名文件中)。 [*]修改目的数据目录下(例如/“/srv/BigData/kafka/data2/kafka-logs”)的“recovery-point-offset-checkpoint”和“replication-offset-checkpoint”文件(两个文件做同样的修改)。 [*]增加文件中第二行的数字(若移入多个Partition目录,则增加的数字为移入的Partition目录个数)。 [*]添加待移入的Partition行到文件末尾(行结构为“Topic名称 Partition标识 Offset”,直接复制步骤 5中保存的行数据即可)。 [*]移动数据,将待移动的Partition文件夹移动到目的目录下,移动完成后执行chown omm:wheel -R Partition目录命令修改Partition目录属组。 [*]登录MRS Manager,单击“服务管理 > Kafka > 实例”,启动停止的Broker实例。 [*]等待5-10分钟后查看Broker实例的健康状态是否为“良好”。 参考信息《MapReduce服务用户指南》5.7.70章节 ALM-38001 Kafka磁盘容量不足。
  • [教程指导] Kafka-Manager编译&安装&启动
    本帖最后由 那人好像一条狗 于 2018-6-8 11:52 编辑Kafka-Manager是由Yahoo公司开源的一款基于Web的Kafka管理工具,可以对Kafka集群中的Broker、Topic、Partition、Consumer等进行监控查看和管理。 1 获取Kafka-Manager编译包 Kafka-Manager的官网上只提供了源码包,并没有提供编译包,因此获取Kafka-Manager编译包有两种途径: (1)直接从网上获取其他人自己编译好的Kafka-Manager编译包; (2)从官网下载Kafka-Manager源码包,然后自己手动编译。 此种方式见本文最后,非开发人员可以不关注。 1.1 直接获取已有Kafka-Manager编译包 由于Kafka-Manager是利用SBT进行编译构建的,这个过程由于网络等限制比较麻烦和耗时。可以直接从网上获取别人已经编译好的Kafka-Manager包,这样就直接省去了1.2整个章节中的工作。 Kafka-Manager编译包可以直接从下面这个链接获取: http://blog.wolfogre.com/posts/kafka-manager-download/ 下载完成后,直接到本文中的《2 Kafka-Manager部署运行部分》 2 Kafka-Manager部署运行 由于当前MRS集群中根据是否开启了Kerberos认证分成非安全集群和安全集群两种模式。2.1 非安全模式Kafka集群(未开启Kerberos认证)2.1.1 修改Kafka JMX监控的相关配置 由于当前集群中在安装配置Kafka的监控时,默认的只允许Kafka服务所在的本节点访问当前节点的JMX服务;这样导致运行在其他节点上的Kafka-Manager无法访问JMX服务,从而无法获取相关的监控信息。所以需要进行以下配置修改: 到Manager的管理界面上,修改Kafka服务的服务配置,如下图所示: 将KAFKA_JMX_IP配置项修改为${BROKER_IP} 然后再保存配置,重启Kafka服务,以便让修改的配置生效。 2.1.2 修改Kafka-Manager中关于访问JMX服务的配置 说明: MRS 1.7.1以前版本需要做2.1.2步骤,1.7.1不需要 由于当前集群中的Kafka服务在开启JMX服务时,JMX服务的访问地址的后缀为kafka,Kafka-Manager中的访问JMX服务的地址的后缀为jmxrmi,因此需要修改Kafka-Manager中的访问地址的后缀。 修改kafka-manager-master/app/kafka/manager/jmx/KafkaJMX.scala中的doWithConnection方法中的urlString,即第36行【限Kafka-Manager 2.11-1.3.3.17版本】。 原UrlString:[code]val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"[/code] 修改为: [code]val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/kafka"[/code] 2.1.3 将前面编译生成的kafka-manager-1.3.3.17.zip包进行解压,比如存放在/opt目录下2.1.4 修改Kafka-Manager的配置1、修改Kafka-Manager中关于Kafka集群对应的Zookeeper集群的配置以及与consumer相关的配置:[code]cd /opt/kafka-manager-1.3.3.17 vi conf/application.conf kafka-manager.zkhosts="IP1:24002" vi conf/application.conf kafka-manager.consumer.properties.file=/opt/kafka-manager-1.3.3.17/conf/consumer.properties #该consumer.properties在kafka-manager-1.3.3.17/conf目录中,最好写全路径[/code]2、修改/opt/kafka-manager-1.3.3.17/conf/consumer.properties文件:[code]cd /opt/kafka-manager-1.3.3.17 vi /conf/consumer.properties [/code]3、修改Kafka-Manager相关日志文件的存放路径[code]cd /opt/kafka-manager-1.3.3.17[/code] 修改conf/logback.xml和conf/logger.xml文件 将和两个地方修改为你期望的日志存放路径。 2.1.5 启动Kafka-Manager[code]cd /opt/kafka-manager-1.3.3.17 nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &[/code]如果需要使用JMX Polling,在启动之前先执行以下命令 export JMX_PORT=21006 在浏览器中输入IP1:9000,若出现以下界面则说明Kafka-Manager启动成功。 2.2 安全模式Kafka集群(开启Kerberos认证) 2.2.1 修改Kafka JMX监控的相关配置 由于当前集群中在安装配置Kafka的监控时,默认的只允许Kafka服务所在的本节点访问当前节点的JMX服务;这样导致运行在其他节点上的Kafka-Manager无法访问JMX服务,从而无法获取相关的监控信息。1、到Manager的管理界面上,修改Kafka服务的服务配置,如下图所示: 将KAFKA_JMX_IP配置项修改为${BROKER_IP} 然后再保存配置,重启Kafka服务,以便让修改的配置生效。 2.2.2 修改Kafka-Manager中关于访问JMX服务的配置 说明: MRS 1.7.1以前版本需要做2.2.2步骤,1.7.1不需要 由于当前集群中的Kafka服务在开启JMX服务时,JMX服务的访问地址的后缀为kafka,Kafka-Manager中的访问JMX服务的地址的后缀为jmxrmi,因此需要修改Kafka-Manager中的访问地址的后缀。 修改kafka-manager-master/app/kafka/manager/jmx/KafkaJMX.scala中的doWithConnection方法中的urlString,即第36行【限Kafka-Manager 2.11-1.3.3.17版本】。 原UrlString:[code]val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"[/code] 修改为:[code]val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/kafka"[/code] 2.2.7 启动Kafka-Manager 在启动kafka manager之前先做以下几件事情 1. 修改kafka服务端,将allow.everyone.if.no.acl.found配置成true。原因是Kafka Manager会调用SimpleConsumer API去获取每个consumer的offset,新SimpleConsumer的API不支持认证,即需要开启不认证也能消费。 2. 导出环境变量 export JAVA_OPTS="-Djava.security.auth.login.config=/opt/kafka-manager-1.3.3.17/conf/jaas.conf -Dzookeeper.server.principal=zookeeper/hadoop.hadoop.com -Djava.security.krb5.conf=/opt/huawei/Bigdata/MRS_1.7.0/etc/1_4_KerberosClient/kdc.conf -Dzookeeper.request.timeout=120000 -Dkerberos.domain.name=hadoop.hadoop.com" 注意将路径和kerberos.domain.name替换成实际值 [code]/opt/kafka-manager-1.3.3.17 nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000[/code] 在浏览器中输入IP1:9000,若出现以下界面则说明Kafka-Manager启动成功。 附录: 利用源码手动编译 Kafka-Manager的编译打包是基于SBT的,因此在编译Kafka-Manager要先预先准备以及Java JDK与SBT。1.2.1 配置Java JDK 这里要求JDK的版本为JDK 1.8。具体的JDK的安装与配置,比较常见,这里就不赘述。1.2.2 安装SBT 到SBT官网:http://www.scala-sbt.org/download.html 下载SBT压缩包。将下载的SBT压缩包进行解压并配置SBT环境变量。【目前先讨论下Windows环境下的编译】。 本文主要是针对Windows操作系统下的SBT的安装,至于Linux操作系统下SBT的安装可自行搜索安装。1. Windows环境: 假设解压的SBT放在D:\sbt目录。 添加SBT_HOME环境变量,[code]SBT_HOME ="D:\sbt"[/code] 并在Path中追加[code];%SBT_HOME%\bin;[/code] 然后通过在命令行窗口中输入sbt,如果sbt被识别则说明已经安装配置成功。1.2.3 配置SBT 由于在利用SBT工具编译项目时,需要下载很多SBT工程编译时依赖的jar包等资源,由于SBT默认的资源都是在国外,由于网络原因可以会很慢甚至无法连接,所以为了能更快速地下载这些资源,可以对SBT进行一些配置。1、 先配置SBT的本地资源存放目录,以便在下载一次后可以为之后的编译使用。 打开“SBT安装目录:\conf\sbtconfig.txt”文件,在文件最后追加[code]-Dsbt.boot.directory=D:/boot/ -Dsbt.global.base=D:/.sbt -Dsbt.ivy.home=D:/.ivy2[/code]2、 在“SBT安装目录:\conf\”目录下,新建文件repo.properties文件,将以下内容复制到文件中。[code][repositories] local store_1:http://repo.typesafe.com/typesafe/ivy-releases/ store_2:http://repo2.maven.org/maven2/ typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly typesafe2: http://repo.typesafe.com/typesafe/releases/ sbt-plugin: http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/ sonatype: http://oss.sonatype.org/content/repositories/snapshots uk_maven: http://uk.maven.org/maven2/ ibibli: http://mirrors.ibiblio.org/maven2/ repo2: http://repo2.maven.org/maven2/ aliyun: http://maven.aliyun.com/nexus/content/groups/public comp-maven:http://mvnrepository.com/artifact/ store_cn:http://maven.oschina.net/content/groups/public/ store_mir:http://mirrors.ibiblio.org/maven2/ store_0:http://maven.net.cn/content/groups/public/ sbt-releases-repo: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext] sbt-plugins-repo: http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext] maven-central: http://repo1.maven.org/maven2/[/code]3、 打开“SBT安装目录:\conf\sbtconfig.txt”文件,在文件最后追加:[code]-Dsbt.repository.config= SBT安装目录:/conf/repo.properties[/code]4、 如果依旧有一些repo无法访问,可以考虑为SBT设置代理,由于走HTTPS时,由于证书问题,还是走HTTP较好。 打开“SBT安装目录:\conf\sbtconfig.txt”文件,在文件最后追加[code]-Dsbt.repository.secure=false #配置项告知SBT使用HTTP而非HTTPS来更新和下载artifacts -Dhttp.proxyHost= proxy host -Dhttp.proxyPort= proxy port -Dhttp.proxyUser=用户名 -Dhttp.proxyPassword=密码[/code] 1.2.4 利用SBT编译Kafka-Manager1、下载Kafka-Manager工程 到https://github.com/yahoo/kafka-manager界面上直接下载Kafka-Manager工程。2、将步骤1中下载的kafka-manager-master.zip文件进行解压,并进入kafka-manager-master目录,如:D:\kafka-manager-master。[code]cd D:\kafka-manager-master sbt clean dist[/code] 该步骤由于需要下载很多的jar包等资源文件,比较耗时,需要耐心等待。 如果最后出现如下输出,则说明Kafka-manager编译成功。 3、到D:\kafka-manager-master\target\universal目录中找到编译好的kafka-manager包即kafka-manager-1.3.3.17.zip。
  • [技术干货] Kafka是个奇葩!——Linkin论文学习笔记
    摘要: 实时数据流是现在互联网公司、甚至拥有大规模数据的传统企业的主要模式, 实时数据(Real-time Activity Data)就是那些非交易,不需要秒级响应的数据, 但在后续的分析中产生极大作用,例如广播、排序、个性化推荐、运营监控、报表等 Linkin的Kafka主要就是为了处理这种性质的数据设计的。 本文是对Linkin的论文的学习笔记,欢迎大家吐槽! Kafka是啥?##是个消息中间件吗?那和市面上其他一堆堆的中间件例如ActiveMQ, RabbitMQ有什么区别?答案只有一个:Kafka是个集群的消息中间件+存储,一个节点可以存储几T的数据!为啥一个中间件需要存储数据呢?慢慢道来……原来,对于Linkin这样的互联网企业来说,用户和网站上产生的数据有三种: [*]需要实时响应的交易数据,用户提交一个表单,输入一段内容,这种数据最后是存放在关系数据库(Oracle, MySQL)中的,有些需要事务支持。 [*]活动流数据,准实时的,例如页面访问量、用户行为、搜索情况,这些数据可以产生啥?广播、排序、个性化推荐、运营监控等。这种数据一般是前端服务器先写文件,然后通过批量的方式把文件倒到Hadoop这种大数据分析器里面慢慢整。 [*]各个层面程序产生的日志,例如httpd的日志、tomcat的日志、其他各种程序产生的日志。码农专用,这种数据一个是用来监控报警,还有就是用来做分析。 Linkin的牛逼之处,就在于他们发现了原先2,3的数据处理方式有问题,对于2而言,原来动辄一两个钟头批处理一次的方式已经不行了,用户在一次购买完之后最好马上就能看到相关的推荐。而对于3而言,传统的syslog模式等也不好用,而且很多情况下2和3用的是同一批数据,只是数据消费者不一样。这2种数据的特点是: [*]准实时,不需要秒级响应,分钟级别即可。 [*]数据量巨大,是交易数据的10倍以上。 [*]数据消费者众多,例如评级、投票、排序、个性化推荐、安全、运营监控、程序监控、后期报表等 于是,Linkin就自己开发了一套系统,专门用来处理这种性质的数据,这就是Kafka那么,在整个实践过程中Linkin做了什么样的设计,解决了什么问题?首先看下数据流动图:5817多数据中心怎么管理数据:5818集群本身的架构图5819Kafka内部架构图,分为数据产生者(Producer),数据中间者(Broker),数据消费者(Consumer)显然,这是一个集群的发布/订阅系统,有如下几个特点 [*]生产者是推数据(Push),消费者是拉数据(Pull)。存在数据复用,在Linkin平均生产1条消息会被消费5.5次。 [*]数据生产者和数据消费者的速度不对等,所以要把数据沉淀在Kafka内慢慢处理,Linkin一般在集群内放7天的数据。 [*]性能上追求高吞吐,保证一定的延时性之内。这方面做了大量优化,包括没有全局hash,批量发送,跨数据中心压缩等等。 [*]容错性上使用的“至少传输一次”的语义。不保证强一次,但避免最多传一次的情况。 [*]集群中数据分区,保证单个数据消费者可以读到某话题(topic)的某子话题(例如某用户的数据)的所有数据,避免全局读数据 [*]数据规范性,所有数据分为数百个话题,然后在数据的源头——生产者(Producer)这边就用Schema来规范数据,这种理念使得后期的数据传输、序列化、压缩、消费都有了统一的规范,同时也解决了这个领域非常麻烦的数据版本不兼容问题——生产者一改代码,消费者就抓瞎。 [*]用于监控,这个系统的威力在于,前面所有生产系统的数据流向,通过这个系统都能关联起来,用于日常的运营也好,用于数据审计,用于运维级别的监控也好都是神器啊! To be continued...##所以,Kafka的设计基本上目前这个领域的唯一选择。我也看了很多其他实现,包括:5820 [*]数据采集组件 [*]数据传输组件 [*]数据实时计算/索引/搜索组件 [*]数据存储/持久化组件 [*]数据展示/查询/报警界面组件 从数据传输这块的设计理念来说,Kafka是最为先进的,在目前的各种实现中,我猜测可以和Kafka一战的也就只有Splunk了后面我会分析一下这个软件的设计和实现欲知后事如何,且听下回分解 ~~主要参考文章日志:每个软件工程师都应该知道的有关实时数据的统一概念 —— 这篇比较抽象,高屋建瓴,理论先行Building LinkedIn’s Real-time Activity Data Pipeline —— 实践层的论文,把做事情的前因后果都写明白了分布式发布订阅消息系统 Kafka 架构设计 —— 落地设计次要参考文章《分布式发布订阅消息系统 Kafka 架构设计》《StreamBase简介》《Yahoo! s4和Twitter storm的粗略比较》《最火爆的开源流式系统Storm vs 新星Samza》《架构之淘宝实时数据传输平台: TimeTunnel介绍》《Graylog2 简介》《logstash 还是不行》《日志收集以及分析:Splunk 》《LogStash日志分析系统》《LogStash,使日志管理更简单》《logstash VS splunk》《个性化离线实时分析系统pora》《日志:每个软件工程师都应该知道的有关实时数据的统一概念》《基于Flume的美团日志收集系统(二)改进和优化》《基于Flume的美团日志收集系统(一)架构和设计》《对互联网海量数据实时计算的理解》《流式日志系统启示录》《flume-ng+Kafka+Storm+HDFS 实时系统搭建》
  • [其他] 开源confluent-kafka-go连接华为MRS的安全集群
    本帖最后由 yd_55568363 于 2017-11-20 14:41 编辑1 问题描述: 开源confluent-kafka-go连接华为MRS的安全集群失败。 具体原因:confluent-kafka-go依赖的库librdkafka默认将broker所在hostname作为了server princple的一部分来使用,导致认证失败。 2 修改思路: 增加domainName参数用于认证,允许客户端设置。 2.1 librdkafka具体修改步骤: 源码地址:https://github.com/edenhill/librdkafka src/rdkafka_conf.c 增加sasl.kerberos.service.name配置项 "Kerberos principal name that Kafka runs as.", .sdef = "kafka" }, { _RK_GLOBAL, "sasl.kerberos.principal", _RK_C_STR, _RK(sasl.principal), "This client´s Kerberos principal name.", .sdef = "kafkaclient" }, + { _RK_GLOBAL, "sasl.kerberos.domain.name", _RK_C_STR, + _RK(sasl.domain_name), + "This cluster´s Kerberos domain name.", + .sdef = "hadoop.hadoop.com" }, #ifndef _MSC_VER { _RK_GLOBAL, "sasl.kerberos.kinit.cmd", _RK_C_STR, _RK(sasl.kinit_cmd), "Full kerberos kinit command string, %{config.prop.name} is replaced " "by corresponding config object value, %{broker.name} returns the " "broker´s hostname.", - .sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{broker.name}\" " + .sdef = "kinit -S \"%{sasl.kerberos.service.name}/%{sasl.kerberos.domain.name}\" " "-k -t \"%{sasl.kerberos.keytab}\" %{sasl.kerberos.principal}" }, { _RK_GLOBAL, "sasl.kerberos.keytab", _RK_C_STR, _RK(sasl.keytab), "Path to Kerberos keytab file. Uses system default if not set." "**NOTE**: This is not automatically used but must be added to the " "template in sasl.kerberos.kinit.cmd as " src/rdkafka_conf.h增加domain_name字段 --- src\rdkafka_conf.h 2017-10-17 11:20:56.000000000 +0800 +++ src\rdkafka_conf.h 2017-10-25 16:26:34.000000000 +0800 @@ -118,12 +118,13 @@ struct { const struct rd_kafka_sasl_provider *provider; char *principal; char *mechanisms; char *service_name; + char *domain_name; char *kinit_cmd; char *keytab; int relogin_min_time; char *username; char *password; #if WITH_SASL_SCRAM src/rdkafka_sasl_cyrus.c 将hostname替换成domainName --- src\rdkafka_sasl.c 2017-10-17 11:20:56.000000000 +0800 +++ src\rdkafka_sasl.c 2017-10-25 16:09:38.000000000 +0800 @@ -192,13 +192,14 @@ rk->rk_conf.sasl.mechanisms, rk->rk_conf.api_version_request ? "" : ": try api.version.request=true"); return -1; } - rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename); + //rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename); + rd_strdupa(&hostname, rk->rk_conf.sasl.domain_name); if ((t = strchr(hostname, ´:´))) *t = ´\0´; /* remove ":port" */ 3 重新编译librdkafka 具体步骤参考https://github.com/edenhill/librdkafka/tree/v0.11.1 要求:已安装libsasl2-dev 编译: ./configure make make install 4 客户端使用 增加如下配置项: "security.protocol": "SASL_PLAINTEXT", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.keytab": "/opt/nemon/user.keytab", ----需要换成路径 "sasl.kerberos.principal": "nemon@HADOOP.COM",----需要换成实际用户名 "sasl.kerberos.domain.name": "hadoop.hadoop.com",--需要换成实际domainName 其中前四项为开源已有参数。Keytab可通过manager界面下载。系统设置—用户管理—更多—下载认证凭据。 domainName如何获取: domain的命名规则为:hadoop. toLowerCase(realm)。假设集群的域名(default_realm)为HUAWEI.COM时,domain的值为hadoop.huawei.com。 可通过manager界面服务管理—KrbServer—服务配置—参数类别(全部配置)—域 ,查看default_realm。 示例代码consumer_example.go(非安全版VS安全版): 运行效果:略