• 【限时免费】华为云专家带你5节课玩转Kafka!赢蓝牙音箱、书籍好礼!
     想学习当下热门的分布式消息系统Kafka?华为云中间件专家Hleecs倾囊相授5节课带你零基础入门,从入门到实战轻松玩转Kafka!扫码关注“中间件小哥”公众号,回复“Kafka”即可免费报名!本期课程结合华为云分布式消息服务Kafka,从基础原理入门到实践操作,循序渐进一站式学习。5节实战精品课,囊括创建topic,生产消息、消费消息,编写生产/消费代码,Kafka服务架构机制、常用工具使用等内容,让你系统性掌握Kafka。 参与打卡任务,还有机会获得50元京东卡、Kafka书籍、华为蓝牙音箱等超值奖品,知识+好礼双丰收!   活动过程如遇大量用户参与,会导致暂时无法添加助手微信号,大家可以稍等一段时间后再试。活动中有任何疑问,请添加智能应用平台小助手(微信:zhongjianjianxiaoge)咨询另外,对本次活动有任何想法和建议,欢迎在评论区回帖哦~ 
  • [中间件] 【鲲鹏翱翔】消息中间件05-kafka移植安装指南
    1、简介Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本次移植以Kafka-2.1.0为例进行说明官方链接:https://kafka.apache.org/类别:应用程序2、环境类别子项版本获取地址(方法)ECS规格arm通用计算增强型|c3.xlarge.4|4vCPUs| 14GB---OSCentOS7.5--Kernel4.14--PackageKafka2.1.0https://codeload.github.com/apache/kafka/tar.gz/2.1.0OpenJdkJdk8u191-b12https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz Gradle4.10https://downloads.gradle.org/distributions/gradle-5.4-bin.zip Scala2.12.0https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz  3、依赖安装         通用工具Gradle、Jdk8u191-b12、Scala-2.12.0Gradle4.10安装1、下载地址   执行:wget https://downloads.gradle.org/distributions/gradle-5.4-bin.zip2、解压   执行:yum install unzip   执行::unzip -d   /home/tools/gradle/gradle-5.4-bin.zip3、设置环境变量   执行:vim /etc/profile   在配置文件最后加上:export GRADLE_HOME=/home/tools/gradle/gradle-5.4export PATH=$ GRADLE _HOME/bin:$PATH   保存后执行: source /etc/profilejdk8u191-b12安装1、下载地址:执行:wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz2、解压   执行:cd /home/tools   执行:tar -zxvf OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz3、设置环境变量   执行:vim /etc/profileexport JAVA_HOME=/home/tools/jdk8u191-b12export PATH=${JAVA_HOME}/bin:$PATHexport CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar   执行: source /etc/profile Scala-2.12.0安装1、下载地址:wget https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz2、解压   执行:cd /home/tools   执行:tar -zxvf scala-2.12.0.tgz3、设置环境变量   执行:vim /etc/profileexport SCALA_HOME=/home/tools/scala-2.12.0export PATH=${SCALA_HOME}/bin:$PATH   执行: source /etc/profile 4、组件编译安装4.1 下载源码    从Kafka官网下载kafka-2.1.0的源码,    执行:wget -O kafka-2.1.0.tar.gz   https://codeload.github.com/apache/kafka/tar.gz/2.1.04.2 解压    执行:tar -zxvf  kafka-2.1.0.tar.gz  进行解压    执行:cd kafka-2.1.0  进入源码目录4.3 执行编译命令执行:mkdir  /usr/gradleRepository  (gradleRepository为gradle本地仓库目录需要手动创建)执行:gradle -g   /usr/gradleRepository clean(注意:此时目录为kafka-2.1.0)    执行:gradle -g /usr /gradleRepository releaseTarGz     (按实际情况选择编译命令)4.4 安装部署1、配置   执行:cd kafka-2.1.0   执行:vim config/server.properties           #配置zookeeper和主机名           host.name=localhost              zookeeper.connect= localhost:21812、配置环境变量   执行:vim /etc/profileexport KAFKA_HOME=/xxxx/kafka-2.1.0export PATH=${KAFKA_HOME}/bin:$PATH   执行: source /etc/profile3、启动Kafka   执行:zookeeper-server-start.sh   config/zookeeper.properties &   执行:kafka-server-start.sh config/server.properties &4、检查进程启动情况:   执行:jps   如果kafka进程:Kafka、QuorumPeerMain两个进程启动成功,整个Kafka服务启动完成。 5、测试Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。1、shell_1创建topic:   执行:   kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor   1 --partitions 1 --topic test2、shell_2生产数据:   执行:   kafka-console-producer.sh --broker-list localhost:9092   --topic test3、shenll_3消费数据   执行:   kafka-console-consumer.sh --bootstrap-server localhost:9092   --topic test --from-beginning   在shell_2中的producer生产端可以输入文本,在shell_3中的consumer消费端可以查看到刚刚producer端生产的文本。如果需要了解更多的命令,建议查看kafka官网。6、参考信息    官方链接:https://kafka.apache.org      Github链接:https://github.com/apache/kafka      Kafka配置可以参考官网指导(https://kafka.apache.org/documentation/#configuration) 7、FAQ     待补充
  • [其他] kafka-python客户端连接开启kerbeors认证的MRS Kafka集群配置指导
    1.配置python3环境安装步骤:# 配置华为云欧拉镜像源wget http://mirrors.myhuaweicloud.com/repo/mirrors_source.sh && sh mirrors_source.sh # 安装必要的工具,用来编译python3yum groupinstall "Development tools" -yyum -y install zlib zlib-develyum -y install bzip2 bzip2-develyum -y install ncurses ncurses-develyum -y install readline readline-develyum -y install openssl openssl-develyum -y install openssl-staticyum -y install xz lzma xz-develyum -y install sqlite sqlite-develyum -y install gdbm gdbm-develyum -y install tk tk-develyum -y install libffi libffi-devel # 下载,解压python3的tgz包,也可以自行去python官网下一个# 推荐使用Python-3.6.X版本wget https://www.python.org/ftp/python/3.6.7/Python-3.6.7.tgztar -zxvf Python-3.6.7.tgzcd Python-3.6.7  # 配置信息及编译安装,安装到/opt/Bigdata/python3目录下,也可以自行指定./configure --prefix=/opt/Bigdata/python3 --enable-shared CFLAGS=-fPICmake && make install #配置变量echo "/opt/Bigdata/python3/lib" >> /etc/ld.so.confldconfig ln -s /opt/Bigdata/python3/bin/python3 /usr/bin/python3ln -s /opt/Bigdata/python3/bin/pip3 /usr/bin/pip3 2.安装kafka-python#kafka-python安装:pip3 install kafka-pythonpip3 install gssapi 3.运行kafka-python#运行步骤:#在安装有mrs客户端的节点上,例如客户端安装在/opt/client下source /opt/client/bigdata_envkinit kafka用户,例如kinit admin,输入密码#执行python3脚本#脚本样例:#producer:from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=["broker_ip:21007"],                         security_protocol="SASL_PLAINTEXT",                         sasl_mechanism="GSSAPI",                         sasl_kerberos_service_name="kafka",                         sasl_kerberos_domain_name="hadoop.hadoop.com")for _ in range(100):    response = producer.send("test-topic", b"testmessage")    result = response.get(timeout=50)    print(result) #consumer:from kafka import KafkaConsumer                         consumer = KafkaConsumer("test-topic",                         bootstrap_servers=["broker_ip:21007"],                         group_id="test-group",                         enable_auto_commit="true",                         security_protocol="SASL_PLAINTEXT",                         sasl_mechanism="GSSAPI",                         sasl_kerberos_service_name="kafka",                         sasl_kerberos_domain_name="hadoop.hadoop.com")for message in consumer:    print(message)
  • [其他] 开源kafka manager链接开启kerbeors认证的MRS Kafka集群部署配置指南
    1.       Kafka manager不发布可运行包,需要自己编译Github地址:https://github.com/yahoo/kafka-manager编译打包命令:./sbt clean dist这里使用的版本为 1.3.3.22编译完成后解压到kafka manager安装目录2.       在运行kafka broker节点执行命令ps –ef | grep kafka.Kafka,获取进程信息中的以下变量值:-Djava.security.auth.login.config=/****/jaas.conf  #jaas.conf文件路径-Djava.security.krb5.conf=/****/kdc.conf  #kdc配置文件路径-Dkerberos.domain.name=domin_name #krbs domain name打开jaas.conf文件,查看以下配置项的值:keyTab="/***/kafka.keytab"  #keytab文件路径将jaas.conf文件、kdc配置文件、keytab文件拷贝到安装kafka manager的机器3.       到安装kafka manager的机器编辑jaas.conf文件将keyTab配置项的值改为本机上keytab文件的路径复制Client模块,粘贴并改名为KafkaClient模块,其他内容不作改动。完成后jaas.conf样例如下:KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireddebug=falsekeyTab="example keytab file path"useTicketCache=falsestoreKey=trueprincipal="example principal "useKeyTab=true;}; Client {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;}; KafkaClient {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;};新建consumer.properties文件,内容如下,替换加粗部分:key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializersecurity.protocol = SASL_PLAINTEXTvalue.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializerkerberos.domain.name = 步骤2中获得的krbs domain namebootstrap.servers = broker地址sasl.kerberos.service.name = kafka4.       到kafka manager安装目录下,修改./conf/application.conf中的以下配置项:kafka-manager.zkhosts #配置为zookeeper 地址,默认有2个重复配置项,只保留1个kafka-manager.consumer.properties.file #配置为步骤3中的consumer.properties路径5.       将./lib目录下的org.apache.kafka.kafka-clients-1.1.0.jar和org.apache.zookeeper.zookeeper-3.4.10.jar替换为mrs发布jar包,可在kafka broker lib目录下获取6.       到kafka manager安装目录,替换以下命令中的加粗部分并执行:./bin/kafka-manager -Dconfig.file=步骤4中修改的application.conf文件路径 -Dhttp.port=kafka manager监听端口 -Dhttp.address=kafka manager监听网卡ip地址 -Djava.security.auth.login.config=步骤3中修改的jaas.con文件路径 -Djava.security.krb5.conf=步骤2中获得的kdc.conf文件路径 -Dzookeeper.request.timeout=120000 -Dkerberos.domain.name=步骤2中获得的krbs domain name7.       在网络连通的节点通过浏览器访问地址:http://ip:port, ip和port为步骤6中启动命令中的kafka manager监听网卡ip地址和kafka manager监听端口
  • [云早报] kafka 商业公司 Confluent 获 1.25 亿美元融资,估值 25 亿美元(北京时间)1月25日,星期五
    云早报,(北京时间)1月25日,星期五【云头条】Kafka 商业公司 Confluent 获 1.25 亿美元融资,估值 25 亿美元开源方案提供商Confluent主攻事件流(Event Streaming)数据管理领域,提供针对免费开源技术的服务、支持和管理工具。这家公司是多年来增长最快的针对企业市场的初创公司之一。近日,Confluent以25亿美元估值完成1.25亿美元D轮融资。本轮融资由红杉资本领投,参投方还包括以前的投资者Index Ventures和Benchmark。据悉,Confluent的融资总额已达2.06亿美元。Confluent的发展速度相当迅猛。四年前,这家公司还在加州山景城默默无闻地经营着。一年后,这家公司才聘请了第一位销售人员。消息人士称,如今,Confluent的年预订量(预订量包括合同的全部价值)已超过1亿美元。Confluent的数据显示,其预订量在过去一年里增长了3.5倍。【华为云新闻】云上故事集锦 | 发现华为云的『第一性原理』物理学『第一性原理』,同时也是整个自然科学的重要理念和希腊精华思想的体现,巧妙地用一句话揭示了解决世界级难题的基础方法论:简单(simplicity),普适(universality)——用简单的方式,解决复杂的问题(Complexity out of simplicity),我国哲学也称“大道至简”。(查看全文)【互联网新闻】1.微软收购开源公司Citus Data,加码对抗谷歌、亚马逊据美国媒体CNBC报道,微软周四表示,它正在收购一家初创公司Citus Data。该公司已将其称为PostgreSQL的开源数据库软件商业化。交易条款尚未披露。这笔交易可以支撑微软提出支持开源技术的论点,特别是在云端。微软一直继续从受欢迎的自有产权软件如Windows和Office赚钱。在云计算业务方面,微软希望利用开放性来应对谷歌、市场领导者亚马逊等公司的竞争。2.抖音短视频:希望企业之间不要封杀找借口36氪讯,抖音短视频今日发文称,针对用户反馈“微信账户无法登录抖音”,抖音多次与腾讯沟通,均没有得到回复。此外,针对媒体报道提及微信内部人士回应称,通过微信登录授权,用户在微信中的关系链可以被轻松复制到抖音平台,抖音表示,此系谣言。文中称,除非微信主动提供,没有第三方可以获取微信关系链;未经用户同意,抖音不会收集、使用、共享、提供个人信息;希望企业之间不要封杀找借口,更不要污名化被封杀者网友评论:人家那里找借口了?人家压根就没搭理你!3.柔宇科技副总裁称小米折叠屏“公然造假”柔宇科技副总裁樊俊超昨日在朋友圈炮轰小米,称小米双折叠手机是买的别人尚未量产的概念柔性屏幕和概念机,并表示小米宣称的“攻克了柔性折叠屏技术”是公然造假。樊俊超直言,小米是一家没有核心技术的手机组装公司。网友评论:关于小米的话就让你们来说吧~4.拼多多涨近8%,市值再次逼近京东拼多多近日遭遇两大利空,一方面,拼多多被曝重大漏洞,遭用户“薅羊毛”,损失千万。此外,1月22日,拼多多长达半年的股票禁售期结束,拼多多股东面临二级市场出售股票套现机会。然而拼多多股价却并未受太多负面影响,反而在周四迎来大涨,截至收盘,拼多多报28.74美元,涨幅达7.76%,市值318.38亿美元,京东涨0.59%,市值为319.63亿美元。拼多多市值重回300亿美元的同时也几乎与京东市值持平。网友评论:难道是前两天的优惠券bug让大家重拾了信心?5.王欣或将推出新社交产品丸子视频有消息人士透露,王欣@王铁匠 或将推出新的社交产品“丸子视频”,目前,丸子视频官网下方标明为云歌人工智能公司版权所有,但并未披露更多的产品细节。此前,王欣推出以“匿名熟人社交”为自我定位的App,马桶MT。(虎嗅)网友评论:这个马桶MT你用过么?6.微软CEO:面部识别技术需要管制,以免竞争导致丧失底线据外媒报道,微软首席执行官萨提亚·纳德拉表示欢迎政府机构对面部识别技术进行监管,因为人们越来越担心该技术被用于监视人们和侵犯隐私。纳德拉认为,随着面部识别技术变得越来越普遍,科技公司的自我监管可能不足以应对它可能带来的社会威胁。他在达沃斯论坛上说:“我觉得,在市场上,只要有竞争,那么正确使用面部识别技术和错误使用面部识别技术之间就没有区别。”网友评论:只要用就没啥区别了~7.华为5G“秀肌肉”:发布5G芯片,5G折叠屏手机也在路上1月24日,华为在北京举办5G发布会暨2019世界移动大会预沟通会,发布了全球首款5G基站核心芯片——华为天罡,推动自身5G业务在全球范围内的大规模快速部署。目前,华为已经可提供涵盖终端、网络、数据中心的端到端5G自研芯片,支持“全制式、全频谱”网络。华为常务董事、运营商BG总裁丁耘介绍称,华为在过去一年里获得了30个5G的商业合同,5G产品全球发货25000台以上。他还透露,华为会在今年的MWC展会上发布首款商用5G折叠屏手机。网友评论:踏踏实实做技术就好!8.谷歌要求美国高院终止甲骨文Java侵权诉讼据国外媒体报道,1月24日,美国谷歌公司要求美国最高法院终止甲骨文公司(Oracle Corp)提起的一起价值10亿美元的版权诉讼。这起诉讼始于2010年。谷歌敦促高等法院裁定,美国版权法允许其采用甲骨文公司的Java编程语言,以创建安卓操作系统。网友评论:观众都散场了,电影还没结局!9.腾讯公司诉“糗事百科”不正当竞争,索赔50万元因认为“糗事百科”网站发布、传播虚假信息,损害了其商业信誉和商品声誉,深圳市腾讯计算机系统有限公司和腾讯科技(北京)有限公司以不正当竞争纠纷为由,将“糗事百科”运营商友际无限(北京)科技有限公司诉至法院,要求其停止侵权、赔礼道歉,并赔偿损失50万元。日前,海淀法院受理了此案。网友评论:大过年的,不要吵架。10.“阿尔法星际”正式亮相 10比0人类职业选手谷歌旗下人工智能部门DeepMind开发的人工智能(AI)程序“AlphaStar”(阿尔法星际)今日凌晨挑战《星际争霸2》游戏。结果是:名为“AlphaStar”的人工智能在与两位人类职业选手“TLO”和“MANA”的比赛中,均以5比0取胜。AlphaStar跟AlphaGo有些类似,最开始都是通过学习人类选手的Replay(比赛录像)来提升水平。直播中展示的10场比赛都是在一张相同的比赛地图上进行,而且都是神族内战。人工智能的APM(每分钟操作的次数)限制在与人类选手相仿的程度。网友评论:求这两位选手的心理阴影面积?【本周新闻】崩溃!程序员年会加班合并代码,77 万大奖没了。。。想离职(北京时间)1月21日,星期一周鸿祎:智能设备要警惕海豚音攻击(北京时间)1月22日,星期二300亿!北京印发5G产业发展行动方案(北京时间)1月23日,星期三程序员锁死服务器毁掉600万游戏项目?当事人回应:我没那能力(北京时间)1月24日,星期四【更多内容,欢迎访问】http://forum.huaweicloud.com/forum.php?mod=forumdisplay&fid=569&filter=typeid&typeid=266(内容来源于互联网,如侵犯您的合法权益或有其他任何疑问,请联系:huaweicloud.bbs@huawei.com沟通处理。谢谢!)
  • [问题求助] 分布式消息服务 Kafka可以支持公网访问么?
    分布式消息服务 Kafka可以支持公网访问么?公网如何接入的?
  • Kafka 事务特性分析
    特性背景消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。华为云DMS率先提供Kafka 1.1.0的专享版服务,支持消息事务特性。         支持事务消息有什么作用?消息事务是实现分布式事务的一种方案,可以确保分布式场景下的数据最终一致性。例如最常用的转账场景,小王 转账到小明,实际操作是小王账户减去相应金额,小明的账户增加相应金额,在分库分表的前提下,2个账户存储在不同的数据库中,这时需要分布式事务才能保证数据库一致性,单个数据库的事务无法保证跨库之间的原子性。如果小王账户先扣钱,再去发送消息到小明所在的数据库去通知增加钱,在没有事务消息的情况下,无论是先扣钱或者先发送通知增加钱,都会有数据不一致的问题,因为无法保证两者的原子性。而有了事务消息,可以保证发送通知与本地事务(扣钱)是一个原子操作,本地事务与发送通知可以同时成功或者同时失败,确保数据一致。除了数据最终一致性外,还实现了消息Exactly once语义。所谓Exactly once语义是消息传递语义中最难实现的一种,包括At most once:最多一次(不会重复,但是可能丢失数据); At least once:至少投递一次(不会丢失,但是会导致重复)和Exactly  once: 刚好一次(不丢不重),也即幂等性。Kafka的幂等性可以保证生产只对一个分区实现Exactl once语义,需要多个分区也实现这个语义,还需要引入消息事务确保原子性。分布式事务介绍当前系统架构主流是分布式架构与微服务架构,在这种架构下数据源不是单一的数据库,业务逻辑往往需要在多个数据库中实现原子操作,单个数据库中的强大的本地事务无法保证多节点原子操作。 此时需要分布式事务来确保数据的一致性。目前使用较多的分布式事务解决方案有几种:1、XA事务:两阶段/三阶段提交XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。实现XA事务的关键是两阶段和三阶段提交协议。两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器C和若干事务参与者Si两种角色,这里的事务参与者就是具体的数据库,协调器可以和事务参与者在一台机器上,如下图二阶段提交协议主要包括由2个阶段:第一个阶段为准备阶段(prepare),第二阶段为提交阶段。准备阶段由事务协调者向事务参与者发送prepare消息,各个参与者处理本地事务但不提交,然后向事务协调者返回事务状态。 提交阶段根据准备阶段各参与者的执行请求,协调者确定事务是提交或者回滚,向各个参与者发送命令。二阶段提交协议主要的问题是在提交执行过程中,所有的参与者都需要听从协调者的统一调度,期间处于阻塞状态而不能从事其他操作,这样效率及其低下。特别是当协调者发出提交通知到部分参与者后宕机,其他参与者就会阻塞。针对二阶段提交存在的问题,三阶段提交协议在prepare与commit阶段之间增加一个pre-commit阶段。Prepare阶段只询问参与者而不做事务,而在pre-commit阶段各个参与者才会执行本地事务但不提交。Commit阶段就是直接提交。这样做可以避免二阶段当协调者迟迟没有发出commit或者rollback通知,参与者在超时后可以自行提交或者回滚,避免阻塞事务(这是因为经过了prepare阶段已经确认了各个参与者是可以执行的,最后第三阶段直接执行即可)。 三阶段提交也存在很多问题,也不能完全保证数据一致,完全一致需要用到Paxos算法。2、TCC补偿性事务解决方案TCC分别对应Try、Confirm和Cancel三种操作,含义如下:- Try:预留业务资源- Confirm:确认执行业务操作,执行事务- Cancel:取消执行业务操作TCC解决了跨应用业务操作的原子性问题,在诸如组合支付、账务拆分场景非常实用。TCC实际上把数据库层的二阶段提交上提到了应用层来实现,对于数据库来说是一阶段提交,规避了数据库层的2PC性能低下问题。TCC需要业务提供使用,开发复杂和成本高。3、 事务消息基于消息中间件的事务消息来完成分布式事务。事务消息可以确保本地执行事务与消息发送是原子的:先发送一条消息到消息中间件,然后执行本地事务,当本地事务成功后再发送提交确认到消息中间件,然后这条消息才能被其他业务消费者所能感知,从而确保原子性。Kafka消息事务基本概念为了支持事务,Kafka 0.11.0版本引入以下概念:1、 事务协调者:类似于消费组负载均衡的协调者,每一个实现事务的生产端都被分配到一个事务协调者(Transaction Coordinator)2、引入一个内部Kafka Topic作为事务Log:类似于消费管理Offset的Topic,事务Topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入。3、引入控制消息(Control Messages): 这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。4、引入TransactionId:不同生产实例使用同一个TransactionId表示是同一个事务,可以跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作,避免事务僵死。5、Producer ID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。主要是为提供幂等性时引入的6、Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。7、 每个生产者增加一个epoch: 用于标识同一个事务Id在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求。8、幂等性: 保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性的开关enable.idempotence,可以独立与事务使用,即可以只开启幂等但不开启事务。事务流程如下图所示:1、查找事务协调者生产者会首先发起一个查找事务协调者的请求(FindCoordinatorRequest)。协调者会负责分配一个PID给生产者。类似于消费组的协调者,2、获取produce ID在知道事务协调者后,生产者需要往协调者发送初始化pid请求(initPidRequest)。这个请求分两种情况:2.1  不带transactionID这种情况下直接生成一个新的produce ID即可,返回给客户端2.2  带transactionID这种情况下,kafka根据transactionalId获取对应的PID,这个对应关系是保存在事务日志中(上图2a)。这样可以确保相同的TransactionId返回相同的PID,用于恢复或者终止之前未完成的事务。3、启动事务生产者通过调用beginTransaction接口启动事务,此时只是内部的状态记录为事务开始,但是事务协调者认为事务开始只有当生产者开始发送第一条消息才开始。4.    消费和生产配合过程这一步是消费和生成互相配合完成事务的过程,其中涉及多个请求:4.1  增加分区到事务请求当生产者有新分区要写入数据,则会发送AddPartitionToTxnRequest到事务协调者。协调者会处理请求,主要做的事情是更新事务元数据信息,并把信息写入到事务日志中(事务Topic4.2  生产请求       生产者通过调用send接口发送数据到分区,这些请求新增pid,epoch和sequence number字段,如上图中的4.2a描述4.3  增加消费offset到事务生产者通过新增的snedOffsetsToTransaction接口,会发送某个分区的Offset信息到事务协调者。协调者会把分区信息增加到事务中4.4  事务提交offset请求当生产者调用事务提交offset接口后,会发送一个TxnOffsetCommitRequest请求到消费组协调者(见4.4a),消费组协调者会把offset存储在__consumer-offsets Topic中。协调者会根据请求的PID和epoch验证生产者是否允许发起这个请求。 消费offset只有当事务提交后才对外可见5.    提交或回滚事务用户通过调用commitTransaction或abortTranssaction方法提交或回滚事务。5.1  EndTxnRequest当生产者完成事务后,客户端需要显式调用结束事务或者回滚事务。前者会使得消息对消费者可见,后者会对生产数据标记为Abort状态,使得消息对消费者不可见。无论是提交或者回滚,都是发送一个EndTnxRequest请求到事务协调者,写入PREPARE_COMMIT或者PREPARE_ABORT信息到事务记录日志中(5.1a)。5.2  WriteTxnMarkerRequest这个请求是事务协调者向事务中每个TopicPartition的Leader发送的。每个Broker收到请求后会写入COMMIT(PID)或者ABORT(PID)控制信息到数据日志中(5.2a)。这个信息用于告知消费者当前消息是哪个事务,消息是否应该接受或者丢弃。而对于未提交消息,消费者会缓存该事务的消息直到提交或者回滚。这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左边)。5.3  写入最终提交或回滚信息当提交和回滚信息写入数据日子后,事务协调者会往事务日志中写入最终的提交或者终止信息以表示事务已经完成(图5.3),此时大部分于事务有关系的消息都可以被删除(通过标记后面在日志压缩时会被移除),我们只需要保留事务ID以及其时间戳即可。接口:KafkaProducer.javapublic interface Producer<K,V> extends Closeable {       /**    * Needs to be called before any of the other transaction methods. Assumes that    * the transactional.id is specified in the producer configuration.    *    * This method does the following:    *   1. Ensures any transactions initiated by previous instances of the producer    *      are completed. If the previous instance had failed with a transaction in    *      progress, it will be aborted. If the last transaction had begun completion,    *      but not yet finished, this method awaits its completion.    *   2. Gets the internal producer id and epoch, used in all future transactional    *      messages issued by the producer.    *    * @throws IllegalStateException if the TransactionalId for the producer is not set    *         in the configuration.    */   void initTransactions() throws IllegalStateException;       /**    * Should be called before the start of each new transaction.    *    * @throws ProducerFencedException if another producer is with the same    *         transactional.id is active.    */   void beginTransaction() throws ProducerFencedException;       /**    * Sends a list of consumed offsets to the consumer group coordinator, and also marks    * those offsets as part of the current transaction. These offsets will be considered    * consumed only if the transaction is committed successfully.    *    * This method should be used when you need to batch consumed and produced messages    * together, typically in a consume-transform-produce pattern.    *    * @throws ProducerFencedException if another producer is with the same    *         transactional.id is active.    */   void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,                                 String consumerGroupId) throws ProducerFencedException;       /**    * Commits the ongoing transaction.    *    * @throws ProducerFencedException if another producer is with the same    *         transactional.id is active.    */   void commitTransaction() throws ProducerFencedException;       /**    * Aborts the ongoing transaction.    *    * @throws ProducerFencedException if another producer is with the same    *         transactional.id is active.        */   void abortTransaction() throws ProducerFencedException;       /**    * Send the given record asynchronously and return a future which will eventually contain the response information.    *    * @param record The record to send    * @return A future which will eventually contain the response information    *    */   public Future<RecordMetadata> send(ProducerRecord<K, V> record);     /**    * Send a record and invoke the given callback when the record has been acknowledged by the server    */   public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); }示例:KafkaTransaction**ample.javapublic class KafkaTransaction**ample {      public static void main(String args[]) {     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);         // Note that the ‘transactional.id’ configuration _must_ be specified in the     // producer config in order to use transactions.     KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);       // We need to initialize transactions once per producer instance. To use transactions,     // it is assumed that the application id is specified in the config with the key     // transactional.id.     //     // This method will recover or abort transactions initiated by previous instances of a     // producer with the same app id. Any other transactional messages will report an error     // if initialization was not performed.     //     // The response indicates success or failure. Some failures are irrecoverable and will     // require a new producer  instance. See the documentation for TransactionMetadata for a     // list of error codes.     producer.initTransactions();           while(true) {       ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);       if (!records.isEmpty()) {       // We need to initialize transactions once per producer instance. To use transactions,       // it is assumed that the application id is specified in the config with the key       // transactional.id.       // This method will recover or abort transactions initiated by previous instances of a       // producer with the same app id. Any other transactional messages will report an error       // if initialization was not performed.       //       // The response indicates success or failure. Some failures are irrecoverable and will       // require a new producer  instance. See the documentation for TransactionMetadata for a       // list of error codes.         //启动新事务         producer.beginTransaction();                   // Process the input records and send them to the output topic(s).         List<ProducerRecord<String, String>> outputRecords = processRecords(records);         for (ProducerRecord<String, String> outputRecord : outputRecords) {           producer.send(outputRecord);         }                   // To ensure that the consumed and produced messages are batched, we need to commit         // the offsets through         // the producer and not the consumer.         //         // If this returns an error, we should abort the transaction.         //发送消费offset与生产消息是同一个事务,这样可以保证消费进度和生产同时成功或失败。         sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());                          // Now that we have consumed, processed, and produced a batch of messages, let's         // commit the results.         // If this does not report success, then the transaction will be rolled back.         producer.endTransaction();       }     }   } }参考资料:1.       https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging2.       https://www.confluent.io/blog/transactions-apache-kafka/欢迎扫码查看更多精彩:
  • 请问:分布式消息服务DMS和分布式消息服务Kafka什么区别?
    如题?
  • [热门活动] 华为云分布式消息服务Kafka与RabbitMQ于2018年8月15日00:00(北京时间)转商通知
    尊敬的华为云客户:华为云计划于2018/08/15 00:00:00将分布式消息服务Kafka专享版、RabbitMQ队列正式商用。商用后,Kafka专享版和RabbitMQ队列支持按需和包周期计费,具体价格请届时参考该服务的计费详情页。公测期间体验服务的用户,商用后将会按需计费,为保证业务的连续性,请您及时关注账户,确保账户余额充足;如您体验结束,不再需要使用该服务,建议您尽快删除队列,以免账户扣款。Kafka专享版和RabbitMQ队列商用版本采用更高性能主机,性能强劲。具有多种规格实例供选择,满足用户不同场景需要;提供SSL能力,具备更高的安全性。更多关于分布式消息服务的产品介绍,请您点击了解。如您在使用过程中有宝贵意见,欢迎您拨打华为云服务热线:4000-955-988与我们联系。感谢您对华为云的支持!
  • Kafka无法消费!?究竟是bug的“沦丧”还是配置的“扭曲”?
    在一个月黑风高的夜晚,突然收到现网生产环境Kafka消息积压的告警,梦中惊醒啊,马上起来排查日志。问题现象:消费请求卡死在查找CoordinatorCoordinator为何物?Coordinator用于管理Consumer Group中各个成员,负责消费offset位移管理和Consumer Rebalance。Consumer在消费时必须先确认Consumer Group对应的Coordinator,随后才能join Group,获取对应的topic partition进行消费。那如何确定Consumer Group的Coordinator呢?分两步走:1、一个Consumer Group对应一个__consumers_offsets的分区,首先先计算Consumer Group对应的__consumers_offsets的分区,计算公式如下:__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount,其中groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定。2、1中计算的该partition的leader所在的broker就是被选定的Coordinator。定位过程Coordinator节点找到了,现在看看Coordinator是否有问题:      不出所料,Coordinator对应分区Leader为-1,消费端程序会一直等待,直到Leader选出来为止,这就直接导致了消费卡死。为啥Leader无法选举?Leader选举是由Controller负责的。Controller节点负责管理整个集群中分区和副本的状态,比如partition的Leader选举,topic创建,副本分配,partition和replica扩容等。现在我们看看Controller的日志:1.   6月10日15:48:30,006 秒Broker 1成为controller此时感知的节点为1和2,节点3 在zk读不出来:31秒847的时候把__consumer_offsets的分区3的Leader选为1,ISR为[1,2],leader_epoch为14:再过1秒后才感知到Controller发生变化,自身清退2.Broker 2在其后几百毫秒后(15:48:30,936)也成为Controller但是Broker2 是感知到Broker 3节点是活的,日志如下:注意这个时间点,Broker1还没在zk把__consumer_offsets的分区3 的Leader从节点3改为1,这样Broker 2还认为Broker 3是Leader,并且Broker 3在它认为是活的,所以不需要重新选举Leader。这样一直保持了相当长的时间,即使Broker 1已经把这个分区的Leader切换了,它也不感知。3. Broker 2在12号的21:43:19又感知Broker 1网络中断,并处理节点失败事件:因为Broker 2内存中认为__consumer_offsets分区3的Leader是broker 3,所以不会触发分区3的Leader切换。Broker 2但是在处理失败的节点Broker 1时,会把副本从ISR列表中去掉,去掉前会读一次zk,代码如下:       但是发现zk中分区3的Leader已经变为1,ISR列表为[1,2],当要去掉的节点1就是Leader的时候,Leader就会变为-1, ISR只有[2],从日志也可以看到:    这样分区3 的Leader一直为-1,直到有新的事件触发节点2重新选举才能恢复(例如重启某个节点)。根因总结出现网络异常后,由于新老controller之间感知的可用节点不同,导致新controller对某个分区的Leader在内存中的信息与zk记录元数据的信息不一致,导致controller选举流程出现错误,选不出Leader。 需要有新的选举事件才能触发Leader选出来,例如重启。问题总结    这是一个典型的由于网络异常导致脑裂,进而出现了多个Controller,华为云分布式消息服务(DMS)Kafka经过电信级的可靠性验证,已经完美解决了这些问题,您值得拥有!欢迎扫码查看更多精彩:
  • 【福利】Kafka专享实例开放公测啦,欢迎【免费试用】
    =======================================================================小编给大家带来一个好消息,华为云分布式消息服务日前开放了Kafka专享实例版的公测试用。欢迎大家免费体验。如何免费试用Kafka专享实例?------------------------->点此申请Kafka专享版<--------------------------进入页面后参考下图申请:        申请完成后如何使用?    申请完成后重新登录或者刷新华为云控制台,选择分布式消息服务DMS,在Kafka专享版管理页签中购买,公测期间免费哦~      专享实例和普通队列相比有哪些好处?1.      实例资源独享,根据业务需要,申请合适规格的Kafka实例,在保证支撑业务峰值的同时,节约用户成本。2.      分区上限多达270个,topic 10个3.      存储空间多达600G4.      支持VPC内网络访问控制,您的服务更加安全。Kafka专享实例如何使用?点此了解Kafka专享实例的帮助指导
  • 【干货贴】分布式消息服务DMS与开源Kafka对比
    分布式消息服务(简称DMS)是一项基于高可用分布式集群技术的消息中间件服务,提供了可靠且可扩展的托管消息队列,用于收发消息和存储消息。那么,比起自建开源的Kafka,分布式消息服务DMS有哪些好处呢?以下就是两者的详细对比。 16470 16471 16472 16473 16474 16475
  • 【干货贴】Kafka实战:如何把Kafka消息时延秒降10倍
    本帖最后由 小柴不加胡 于 2018-5-11 11:28 编辑背景国内某大型税务系统,业务应用分布式上云改造。 业务难题15178如上图所示是模拟客户的业务网页构建的一个并发访问模型。用户在页面点击从而产生一个HTTP请求,这个请求发送到业务生产进程,就会启动一个投递线程(Deliver Thread)调用Kafka的SDK接口,并发送3条消息到DMS(分布式消息服务),每条消息大小3k,需要等待3条消息都被处理完成后才会返回请求响应⑧。当消息达到DMS后,业务消费进程调用Kafka的消费接口把消息取出来,然后将每条消息放到一个响应线程(Response Thread)中进行处理,响应线程处理完后,通过HTTP请求通知投递线程,投递线程收到响应后返回回复响应。 100并发访问时延500ms,未达成用户业务要求客户提出了明确的要求:每1个两核的ECS要能够支撑并发访问量100,每条消息端到端的时延范围是几十毫秒,即从生产者发送开始到接收到消费者响应的时间。客户实测在使用了DMS的Kafka 队列后,并发访问量为100时时延高达到500ms左右,甚至出现达到秒级的时延,远未达到客户提出的业务诉求。相比较而言,客户在Pod区使用的是自己搭建的原生Kafka,在并发访问量为100时测试到的时延大约只有10~20ms左右。那么问题来了,在并发访问量相同的条件下,DMS的Kafka 队列与Pod区自建的原生Kafka相比为什么时延会有这么大的差异呢?我们DMS的架构师 Mr. Peng对这个时延难题进行了一系列分析后完美解决了这个客户难题,下面就让我们来看看他的心路历程。 难题剖析根据模拟的客户业务模型,Mr. Peng在华为云类生产环境上也构造了一个测试程序,同样模拟构造了100的并发访问量,通过测试发现,类生产环境上压测得到的时延平均时间在60ms左右。类生产上的时延数值跟客户在真实生产环境上测到的时延差距这么大,这是怎么回事呢?问题变得扑朔迷离起来。Mr. Peng当机立断,决定就在华为云现网上运行构造的测试程序,来看看到底是什么原因。同时,在客户的ECS服务器上,也部署了相同的测试程序,模拟构建了100的并发量,得到如下的时延结果对比表:15181 表1 华为云现网与类生产环境时延对比表 从时延对比表的结果看来,Mr. Peng发现,即使在相同的并发压力下,华为云现网的时延比类生产差很多。Mr.Peng意识到,现在有2个问题需要分析:为什么华为云现网的时延会比类生产差?DMS的Kafka队列时延比原生自建的Kafka队列时延表现差的问题怎么解决?Mr. Peng分析如下: 时延分析回归问题的本质,DMS Kafka队列的时延到底是怎么产生的?可控的端到端时延具体分为哪些?Mr.Peng给出了如下的计算公式:总时延 = 入队时延 + 发送时延 + 写入时延 + 复制时延+ 拉取时延让我们来依次了解一下,公式中的每一项都是指什么。入队时延: 消息进入Kafka sdk后,先进入到要发送分区的队列,完成消息打包后再发送,这一过程所用的时间。发送时延:消息从生产者发送到服务端的时间。写入时延:消息写入到Kafka Leader的时间。复制时延:消费者只可以消费到高水位以下的消息(即被多个副本都保存的消息),所以消息从写入到Kafka Leader,到所有副本都写入该消息直到上涨至高水位这段时间就是消息复制的时延。拉取时延:消费者采用pull模式拉取数据,拉取过程所用的时间。 (1) 入队时延现网是哪一部分的时延最大呢?通过我们的程序可以看到,入队列等待发送时延非常大,如下图:15179 即消息都等待在生产端的队列中,来不及发送! 我们再看其他时延分析,因为无法在现网测试,我们分别在类生产测试了相同压力的,测试其他各种时延如下:(2) 复制时延以下是类生产环境测试的1并发下的15182 从日志上看,复制时延包括在remoteTime里面,当然这个时间也会包括生产者写入时延比较慢导致的,但是也从一定的程度反映复制时延也是提升性能时延的一个因素。 (3) 写入时延因为用户使用的是高吞吐队列,写入都是异步落盘,我们从日志看到写入时延非常低(localTime),可以判断不是瓶颈。发送时延与拉取时延都是跟网络传输有关系,这个优化主要是通过调TCP的参数来决定的。轻轻松松把Kafka消息时延秒降10倍,就用华为云DMS
  • [其他] 如何通过公网EIP发送消息到MRS Kafka
    本帖最后由 石头剪刀布 于 2018-5-29 11:22 编辑操作场景常有用户需要将云下的数据搬到云上MRS服务中,但由于MRS集群的节点均在VPC网络中,外部是无法直接访问的,那如果需要将线下的数据搬到云上MRS服务中,该如何处理呢?本文介绍一种方式,如何通过公网访问MapReduceService的Kafka服务。前提条件l 已创建MRS集群 操作步骤1.1 绑定弹性IP步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。步骤 2 在节点信息列表中,选择任意一台“类型”为“Core”的节点,点击打开该节点,在新打开的页面中选择“弹性IP”,点击“绑定弹性IP”。在弹出框中,选择已创建的弹性IP进行绑定。 15171如果当前没有弹性IP,则需要新创建,新创建的弹性IP建议选择“按流量计费” 15172 步骤 3 对集群中的节点信息列表中的其他两个Core节点重复“步骤2”,即给另外两个Core节点都绑定弹性IP。 1.2 开通安全组步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。步骤 2 在节点信息列表中,选择任意一台“类型”为“Core”的节点,点击打开该节点,在新打开的页面中选择“安全组”,点击已经绑定的安全组链接。 15173 在新打开的页面中,选择“添加规则”,在新弹出的框中按如下进行配置 15174 即允许任意IP地址访问21005端口。注意: MRS 1.7.0及以后的版本21005端口改成了与开源一致,即 9092端口 注意:此示例中直接开放了任意IP访问21005端口,这种做法不安全,在实际的生产环境中,需要限制只有特定的IP才能访问,保证安全性。源IP地址填写 指定的IP 即可。步骤 3 Core节点共用一个安全组规格,不需要重复进行配置。 1.3 修改Kafka配置 步骤 1 登录华为云控制台,选择“EI企业智能 > MapReduce服务”,点击已经创建的集群名称,进入到集群详情页面。 步骤 2 点击“集群管理页面”后面的“点击查看”链接,进入到MRS Manager页面。15175步骤 3 在MRS Manager页面中选择“服务管理 > Kafka > 服务配置”,“参数类别”选择“全部配置”,在左树中找到“Broker > 自定义”,然后在参数列表中添加“advertised.listeners”,值填写“PLAINTEXT://:21005,SSL://:21008”MRS 1.7.0及以后的版本值填写为“PLAINTEXT://:9092,SSL://:9093” 15176 步骤 4 点击“保存配置”,并勾选“重新启动受影响的服务或实例。” 1.4 公网连接 步骤 1 通过步骤2.1中绑定的弹性IP登录各个Core节点,然后执行“hostname”,找到EIP对应的主机名 15177 依次登录找到EIP与主机名的对应列表[code]49.4.xx.xx node-core-ndnIi 49.4.xx.xx1 node-core-AYmJu 49.4.xx.xx2 node-core-ZAgvo[/code] 步骤 2 在Kafka客户端的配置中,bootstrap.servers的地址全部使用主机名[code]bootstrap.servers = node-core-ndnIi:21005,node-core-AYmJu:21005,node-core-ZAgvo:21005 [/code]如果使用的版本为MRS 1.7.0及以后,21005端口修改为9092。 并在本机的/etc/hosts中添加步骤1中的EIP和主机名列表。 步骤 3 以下为使用Flume的KafkaSink的示例[code]client.sources = spoolDir1 client.channels = memo1 client.sinks = kafka client.sources.spoolDir1.type = spooldir client.sources.spoolDir1.spoolDir = /tmp/flumedata/ client.sources.spoolDir1.decodeErrorPolicy = IGNORE client.sources.spoolDir1.deserializer.maxLineLength = 4048210 client.sources.spoolDir1.deserializer.maxBatchLine = 10 client.sources.spoolDir1.deserializer = BufferedLine client.sources.spoolDir1.channels = memo1 client.channels.memo1.type = memory client.channels.memo1.capacity = 10000 client.channels.memo1.transactionCapacity = 1000 client.channels.memo1.channelfullcount = 10 client.channels.memo1.keep-alive = 3 client.channels.memo1.byteCapacity = client.channels.memo1.byteCapacityBufferPercentage = 20 client.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink client.sinks.kafka.kafka.bootstrap.clients = node-core-ndnIi:21005,node-core-AYmJu:21005,node-core-ZAgvo:21005 client.sinks.kafka.kafka.topic = Test client.sinks.kafka.channel = memo1 client.sinks.kafka.kafka.producer.security.protocol = PLAINTEXT[/code]同上,如果使用的版本为MRS 1.7.0及以后,21005端口修改为9092。 特别说明:不建议使用此种方式消费MRSKafka的消息,因为华为云EIP的上行流量不收取费用,而下行流量需要收取费用。
  • 【干货贴】1分钟带你玩转Kafka
    本帖最后由 小柴不加胡 于 2018-5-11 11:28 编辑说起Kafka,许多使用者对它是又爱又恨。Kafka是一种分布式的、基于发布/订阅的消息系统,其极致体验让人欲罢不能,但操心的运维、复杂的安全策略、可靠性易用性的缺失、算不上极致的性能发挥、并不丰富的消息服务功能,仍需要使用者付出诸多的背后工作。即使你是Kafka老手,也难免会有上述同样的烦恼。 与其整日操心Kafka的部署,不如试试云上Kafka带给你的惊喜?目前国内主流的云服务厂商均提供了云上的Kafka服务,为应用系统提供异步的消息队列服务。通过高可用的消息缓冲队列,实现应用解耦、突发流量处理及与第三方的互通和集成,具有大规模、高可靠、高并发访问、可扩展且完全托管的特点。比如分布式消息服务DMS,帮助云端的应用程序组件去耦合,具有很高的成本效益。DMS拥抱开源,提供支持兼容开源Kafka接口,用户可无缝迁移,按需使用。 15132 DMS优势全面兼容Kafka 提升效率兼容开源业务系统基于开源的Kafka进行开发,只需加入少量认证安全配置,即可使用DMS的Kafka队列,做到无缝迁移。消息互通支持消息多通道,DMS接口可对Kafka队列进行消息收发,也可用开源Kafka client进行消息收发。安全保证华为独有的安全加固体系,提供业务操作可回溯,消息存储加密及租户间有效隔离等有效安全措施。 高可靠及可用无忧运维数据高可靠消息持久化,多副本存储。服务高可用后台多集群部署,支持故障自动迁移和容错,保证用户关键业务的可靠运行。无忧运维提供一整套完整的监控告警等运维服务,故障自动发现和告警,避免7*24小时人工值守。 核心特性细粒度灵活控制DMS支持RBAC(基于角色访问控制)和PBAC(基于策略访问控制)两种访问控制模式,从而提供更加安全灵活的访问策略。可以实现消息队列粒度和API调用动作的安全策略访问控制,结合华为云的IAM服务,可以满足用户几乎所有对消息服务使用的安全要求。多协议的访问DMS提供多种协议接入方式,包括1)HTTP 符合REST规范标准的接入,支持多种语言接入使用,并支持云内及云外访问。2)基于TCP的简单SDK方式,提供更加高性能的访问接口;另外,还提供兼容开源Kafka的开放接口,能够更好地帮助用户把使用Kafka的应用快速上云。分布式可靠集群和海量队列能力内建的分布式集群技术,使得服务具有高度扩展性;无限扩展的队列数量和可扩展的高性能机制,保证在高并发、高性能和大规模场景下的访问能力,轻松实现百亿级消息的堆积和访问能力。内建消息冗余存储,保证消息存储的可靠性,有效避免服务节点故障。死信管理死信是在消费环节为用户提供一种不能正常处理消息时的可选方案,防止因个别消息不正常导致后续消息都不能被消费,造成业务阻塞。DMS为用户提供可视化死信开关和参数配置,并提供API接口。在不开启死信队列功能时,对于确认失败或超时的消息,系统会进行回滚并重投递。无论是否开启死信功能,DMS服务都会保证消息不丢失。自定义消息功能DMS除提供消息队列的核心功能外,还提供额外的增强能力,为用户提供高收益、低成本的极具性价比的消息服务。消息消费重置允许用户设置任一可消费的时间点控制向前或向后消费进度,提高消费的灵活性;基于消息标签的消息过滤可实现选择性地消费包含指定标签的消息;支持消息属性,每条消息上都可以设置不同的属性。无忧运维消息服务是云原生服务,整个消息服务的运维对用户透明。用户无需关心后台运维情况,只需关注自身涉及的消息队列指标,支持对入队消息数、请求数、已经消费的消息数、消息堆积数量等情况的监控,并支持配置告警规则,用户可以在第一时间通过短信、邮件等获得业务消息队列的运行使用和负载状态。 适用场景业务解耦将业务中依赖其他系统同时属于非核心或不重要的部分使用消息服务,无需同步等待其他系统的处理结果。如电商网站获取用户订单后,信息放入消息队列,会从队列里读取出库、发货任务信息然后执行。最终一致性用于两个系统的状态最终保持一致,或都成功或都失败。如用于交易系统的高可靠数据传递,实现跨系统的事务最终一致,降低实现难度和成本;如预定门票及软件打车。错峰流控上下游系统处理能力有差异时,可以使用消息服务转储系统之间的通信数据,提供消息堆积缓冲能力,在下游系统有能力处理消息的时候再处理,减少拥塞、系统崩溃等问题,提高系统的可用性,降低复杂性。如高峰时段的注册、抢购、预约等。日志同步应用通过可靠异步方式将日志消息同步到消息服务,再通过其他组件对日志做实时或离线分析,也可用于关键日志信息收集,进行应用监控。如注册时用户填写的个人信息等。 现阶段DMS免费使用喔!一分钟玩转Kafka,就是这么简单。老铁们快戳这里享受福利吧~