-
欢迎参加华为云“中间件最佳实践挑战营”!这是本次挑战营的第4关,坚持闯关成功有机会获第五期好礼:码豆/荣耀手环/华为背包等,全通关还有大奖!本期活动截止5月24日。注意:参与闯关前,请确保已报名加入活动群并领取实践资源,如未入群请添加小助手微信(zhongjianjianxiaoge),回复“中间件”报名入群!点击这里了解活动详情>> | 点击这里查看活动FAQ>>一、 场景介绍在DMS提供的原生Kafka SDK中,消费者可以自定义拉取消息的时长,如果需要长时间的拉取消息,只需要把poll(long)方法的参数设置合适的值即可。但是这样的长连接可能会对客户端和服务端造成一定的压力,特别是分区数较多且每个消费者开启多个线程的情况下。 Kafka队列含有多个分区,消费组中有多个消费者同时进行消费,每个线程均为长连接。当队列中消息较少或者没有时,连接不断开,所有消费者不间断地拉取消息,这样造成了一定的资源浪费。 二、 实践指南(1)领取实践资源:点击这里免费领取1个月Kafka体验规格实例,可用区3/5已售罄,请选择2。提示:实践活动提供的免费Kafka实例没有开SASL,在配置时需做一定修改,见→FAQ第12条。(2)最佳实践指南:https://support.huaweicloud.com/bestpractice-dms/dms-bp-0312001.html(3)视频操作演示:https://education.huaweicloud.com:8443/courses/course-v1:HuaweiX+CBUCNXP021+Self-paced/courseware/062fe309dc964326b06b7e5505fe5e4a/89d814d93c544301a6cb62db0914cc51/(4)新手入门教学:《Kafka全景实践课》三、 闯关任务任务一:创建DMS Kafka实例,查看实例详情并截图,截图需包含右上角华为云账号名,并按回帖格式要求在本帖中回帖;奖励:100码豆(可用于兑换DevCloud会员中心超多奖品); 任务二:根据实践指南完成实践操作,将代码截图,将运行结果截图,并按回帖格式要求在本帖中回帖;奖励:本期闯1关可参与抽取“荣耀手环4 Running版”;闯2关可参与抽取“华为背包”;详见FAQ评奖规则 四、 回帖格式请务必按照以下格式要求进行回帖,否则无法计算奖励:华为云账号名:XXX(即右上角的字母数字组合ID)微信昵称:XXX实践感想:XXX实践截图:至少包含(a)实例详情截图、(b)代码截图、(c)运行结果截图三张截图。附各关卡快速入口:第1关任务:基于API网关的电话号码归属地查询第2关任务:使用函数工作流服务为图片打水印第3关任务:使用Redis实现排行榜功能第4关任务:使用DMS Kafka优化消费者poll第5关任务:使用CPTS进行电商网站性能测试
-
Kafka 组件的介绍Kafka定义Kafka 是一个高吞吐、分布式、基于发布订阅A的消息系统,利用Kafka技术可在廉价PC Server上搭建起大规模消息系统。Kafka应用场景简介Kafka和其他组件比较,具有消息持久化、高吞吐、实时等特性,适用于离线和实时的消息消费,如网站活性跟踪、聚合统计系统运营数据(监控数据)、日志收集等大量数据的数据收集场景。 Kafka拓扑结构图一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干Broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举Leader,以及在Consumer发生变化时进行rebalance。Producer使用push模式将消息发布到Broker,Consumer使用pull模式从Broker订阅并消费消息。Broker:Kafka集群包含一个或多个服务实例,这些服务实例被称为Broker。Producer:负责发布消息到Kafka Broker。Consumer:消息消费者,从Kafka Broker读取消息的客户端。Kafka Topics每条发布到Kafka的消息都有一个类别,这个类别被称为Topic,也可以理解为一个存储消息的队列。例如:天气作为一个Topic,每天的温度消息就可以存储在“天气”这个队列里。图片中的蓝色框为Kafka的一个Topic,即可以理解为一个队列,每个格子代表一条消息。生产者产生的消息逐条放到Topic的末尾。消费者从左至右顺序读取消息,使用Offset来记录读取的位置。Kafka Partition每个Topic 都有一个或者多个Partitions构成。每个Partition都是有序且不可变的消息队列。引入Partition机制,保证了Kafka的高吞吐能力。每个topic被分成多个partition(区),每个partition在存储层面对应一个log文件,log文件中记录了所有的消息数据。引入Partition机制,保证了Kafka的高吞吐能力,因为Topic的多个Partition分布在不同的Kafka节点上,这样一来多个客户端(Producer和Consumer)就可以并发访问不同的节点对一个Topic进行消息的读写。
-
Java代码:package dms.kafka.demo; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo { public static void main(String[] args) { if (args.length != 3) { throw new IllegalArgumentException("usage: dms.kafka.demo.KafkaProducerDemo bootstrap-servers topic-name group-name."); } Properties props = new Properties(); props.put("bootstrap.servers", args[0]); props.put("group.id", args[2]); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(args[1])); while (true) { ConsumerRecords records = consumer.poll(200); for (ConsumerRecord record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }1.打包。点击打包,生成demo-2.jar2.上传服务器:进入linux服务器,进入Kafka的libs目录下使用sz命令,把demo-2.jar传入到这个目录。3.生产消息使用cd..命令回退到上级目录。执行命令(请替换自己的ip,端口号,topic名):java -cp .:./libs/* dms.kafka.demo.KafkaProducerDemo 192.168.236.49:9092 topic-1393626260执行结果如下(红框里根据个人不同替换):4.消费消息执行命令(请替换自己的ip,端口号,topic名)::java -cp .:./libs/* dms.kafka.demo.KafkaConsumerDemo 192.168.236.49:9092 topic-1393626260 test-grp结果如下(红框里根据个人不同替换):到此,今天的课程完成。附件里是今天的代码jar包(因为不支持直接上传jar后缀文件,只能压缩打包后传的。)。
-
Hello!欢迎参加Kafka全景实践课!这是一篇帮助帖,我们会将每节课的常见问题和难点疑点记录在帖子中。为减少群内打扰,节省大家的时间,请遇到问题后先查阅本帖内容,如没有对应解答,再在用户群中提问哦! 【常用链接】(1)每日课程更新链接:https://education.huaweicloud.com:8443/courses/course-v1:HuaweiX+CBUCNXP017+Self-paced/about?isAuth=0&cfrom=hwc备注:8月12日-16日每日上午10点发布当天课程内容(2)打卡链接:https://w.url.cn/s/AX2jKda 备注:打卡时间为每日10:00-24:00,需根据操作指导上传对应截图【积分公示】本期课程活动(2019年8月12日-16日)已结束,积分及抽奖资格公示见下方附件PDF。特别说明:由于有的同学一天打了多次卡,有的打卡没有上传截图,导致计算时识别到没有截图记为无成绩。先将这部分同学补充如下,且附件PDF名单已修正:
-
MRS 1.8.5及以后的版本,都支持在流式节点上开启LVM特性。LVM特性能有效防止kafka多磁盘场景下因为单topic流量特别大时导致某个磁盘被写爆。同时开启LVM以后可以做到不重启系统、服务或组件的情况下实现磁盘平滑扩容,保证业务的连续性。下面我就介绍一下如何在开启了LVM的节点上实现Kafka的磁盘扩容操作。1. 购买云硬盘并挂载。a) 登录管理控制台。b) 选择“存储 > 云硬盘”。进入云硬盘页面。c) 单击“购买磁盘”,创建云硬盘。关于创建云硬盘的详细操作,请参见云硬盘用户指南。d) 在云硬盘列表,找到新购买的云硬盘,单击“挂载”。e) 选择云硬盘待挂载的云服务器,该云服务器必须与云硬盘位于同一个可用分区,通过下拉列表选择“挂载点”。2. 以root用户登录弹性云服务器。3. 执行如下命令,查看磁盘并记录新添加设备名称。如“/dev/vdc”fdisk -l | grep /dev/vd | grep -v vda4. 执行如下命令,将新挂载的磁盘创建为物理卷。pvcreate /dev/vdc5. 执行如下命令,查询卷组名称。vgdisplay6. 执行如下命令,添加物理卷到卷组中,对卷组进行扩容。vgextend vg_group /dev/vdc7. 执行如下命令,查询逻辑卷路径lvdisplay8. 执行如下命令,扩展逻辑卷的容量lvextend -L +99GB /dev/mapper/vg_group-core9. 执行如下命令,扩展文件系统的容量。到此,单个kafka节点的磁盘扩容完成。resize2fs /dev/mapper/vg_group-core 10. 重复以上步骤,对所有kafka节点进行磁盘扩容。
-
想学习当下热门的分布式消息系统Kafka?华为云中间件专家Hleecs倾囊相授5节课带你零基础入门,从入门到实战轻松玩转Kafka!扫码关注“中间件小哥”公众号,回复“Kafka”即可免费报名!本期课程结合华为云分布式消息服务Kafka,从基础原理入门到实践操作,循序渐进一站式学习。5节实战精品课,囊括创建topic,生产消息、消费消息,编写生产/消费代码,Kafka服务架构机制、常用工具使用等内容,让你系统性掌握Kafka。 参与打卡任务,还有机会获得50元京东卡、Kafka书籍、华为蓝牙音箱等超值奖品,知识+好礼双丰收! 活动过程如遇大量用户参与,会导致暂时无法添加助手微信号,大家可以稍等一段时间后再试。活动中有任何疑问,请添加智能应用平台小助手(微信:zhongjianjianxiaoge)咨询另外,对本次活动有任何想法和建议,欢迎在评论区回帖哦~
-
1、简介Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。本次移植以Kafka-2.1.0为例进行说明官方链接:https://kafka.apache.org/类别:应用程序2、环境类别子项版本获取地址(方法)ECS规格arm通用计算增强型|c3.xlarge.4|4vCPUs| 14GB---OSCentOS7.5--Kernel4.14--PackageKafka2.1.0https://codeload.github.com/apache/kafka/tar.gz/2.1.0OpenJdkJdk8u191-b12https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz Gradle4.10https://downloads.gradle.org/distributions/gradle-5.4-bin.zip Scala2.12.0https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz 3、依赖安装 通用工具Gradle、Jdk8u191-b12、Scala-2.12.0Gradle4.10安装1、下载地址 执行:wget https://downloads.gradle.org/distributions/gradle-5.4-bin.zip2、解压 执行:yum install unzip 执行::unzip -d /home/tools/gradle/gradle-5.4-bin.zip3、设置环境变量 执行:vim /etc/profile 在配置文件最后加上:export GRADLE_HOME=/home/tools/gradle/gradle-5.4export PATH=$ GRADLE _HOME/bin:$PATH 保存后执行: source /etc/profilejdk8u191-b12安装1、下载地址:执行:wget https://github.com/AdoptOpenJDK/openjdk8-binaries/releases/download/jdk8u191-b12/OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz2、解压 执行:cd /home/tools 执行:tar -zxvf OpenJDK8U-jdk_aarch64_linux_hotspot_8u191b12.tar.gz3、设置环境变量 执行:vim /etc/profileexport JAVA_HOME=/home/tools/jdk8u191-b12export PATH=${JAVA_HOME}/bin:$PATHexport CLASSPATH=.:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar 执行: source /etc/profile Scala-2.12.0安装1、下载地址:wget https://downloads.lightbend.com/scala/2.12.0/scala-2.12.0.tgz2、解压 执行:cd /home/tools 执行:tar -zxvf scala-2.12.0.tgz3、设置环境变量 执行:vim /etc/profileexport SCALA_HOME=/home/tools/scala-2.12.0export PATH=${SCALA_HOME}/bin:$PATH 执行: source /etc/profile 4、组件编译安装4.1 下载源码 从Kafka官网下载kafka-2.1.0的源码, 执行:wget -O kafka-2.1.0.tar.gz https://codeload.github.com/apache/kafka/tar.gz/2.1.04.2 解压 执行:tar -zxvf kafka-2.1.0.tar.gz 进行解压 执行:cd kafka-2.1.0 进入源码目录4.3 执行编译命令执行:mkdir /usr/gradleRepository (gradleRepository为gradle本地仓库目录需要手动创建)执行:gradle -g /usr/gradleRepository clean(注意:此时目录为kafka-2.1.0) 执行:gradle -g /usr /gradleRepository releaseTarGz (按实际情况选择编译命令)4.4 安装部署1、配置 执行:cd kafka-2.1.0 执行:vim config/server.properties #配置zookeeper和主机名 host.name=localhost zookeeper.connect= localhost:21812、配置环境变量 执行:vim /etc/profileexport KAFKA_HOME=/xxxx/kafka-2.1.0export PATH=${KAFKA_HOME}/bin:$PATH 执行: source /etc/profile3、启动Kafka 执行:zookeeper-server-start.sh config/zookeeper.properties & 执行:kafka-server-start.sh config/server.properties &4、检查进程启动情况: 执行:jps 如果kafka进程:Kafka、QuorumPeerMain两个进程启动成功,整个Kafka服务启动完成。 5、测试Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。1、shell_1创建topic: 执行: kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2、shell_2生产数据: 执行: kafka-console-producer.sh --broker-list localhost:9092 --topic test3、shenll_3消费数据 执行: kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 在shell_2中的producer生产端可以输入文本,在shell_3中的consumer消费端可以查看到刚刚producer端生产的文本。如果需要了解更多的命令,建议查看kafka官网。6、参考信息 官方链接:https://kafka.apache.org Github链接:https://github.com/apache/kafka Kafka配置可以参考官网指导(https://kafka.apache.org/documentation/#configuration) 7、FAQ 待补充
-
1.配置python3环境安装步骤:# 配置华为云欧拉镜像源wget http://mirrors.myhuaweicloud.com/repo/mirrors_source.sh && sh mirrors_source.sh # 安装必要的工具,用来编译python3yum groupinstall "Development tools" -yyum -y install zlib zlib-develyum -y install bzip2 bzip2-develyum -y install ncurses ncurses-develyum -y install readline readline-develyum -y install openssl openssl-develyum -y install openssl-staticyum -y install xz lzma xz-develyum -y install sqlite sqlite-develyum -y install gdbm gdbm-develyum -y install tk tk-develyum -y install libffi libffi-devel # 下载,解压python3的tgz包,也可以自行去python官网下一个# 推荐使用Python-3.6.X版本wget https://www.python.org/ftp/python/3.6.7/Python-3.6.7.tgztar -zxvf Python-3.6.7.tgzcd Python-3.6.7 # 配置信息及编译安装,安装到/opt/Bigdata/python3目录下,也可以自行指定./configure --prefix=/opt/Bigdata/python3 --enable-shared CFLAGS=-fPICmake && make install #配置变量echo "/opt/Bigdata/python3/lib" >> /etc/ld.so.confldconfig ln -s /opt/Bigdata/python3/bin/python3 /usr/bin/python3ln -s /opt/Bigdata/python3/bin/pip3 /usr/bin/pip3 2.安装kafka-python#kafka-python安装:pip3 install kafka-pythonpip3 install gssapi 3.运行kafka-python#运行步骤:#在安装有mrs客户端的节点上,例如客户端安装在/opt/client下source /opt/client/bigdata_envkinit kafka用户,例如kinit admin,输入密码#执行python3脚本#脚本样例:#producer:from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=["broker_ip:21007"], security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com")for _ in range(100): response = producer.send("test-topic", b"testmessage") result = response.get(timeout=50) print(result) #consumer:from kafka import KafkaConsumer consumer = KafkaConsumer("test-topic", bootstrap_servers=["broker_ip:21007"], group_id="test-group", enable_auto_commit="true", security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com")for message in consumer: print(message)
-
1. Kafka manager不发布可运行包,需要自己编译Github地址:https://github.com/yahoo/kafka-manager编译打包命令:./sbt clean dist这里使用的版本为 1.3.3.22编译完成后解压到kafka manager安装目录2. 在运行kafka broker节点执行命令ps –ef | grep kafka.Kafka,获取进程信息中的以下变量值:-Djava.security.auth.login.config=/****/jaas.conf #jaas.conf文件路径-Djava.security.krb5.conf=/****/kdc.conf #kdc配置文件路径-Dkerberos.domain.name=domin_name #krbs domain name打开jaas.conf文件,查看以下配置项的值:keyTab="/***/kafka.keytab" #keytab文件路径将jaas.conf文件、kdc配置文件、keytab文件拷贝到安装kafka manager的机器3. 到安装kafka manager的机器编辑jaas.conf文件将keyTab配置项的值改为本机上keytab文件的路径复制Client模块,粘贴并改名为KafkaClient模块,其他内容不作改动。完成后jaas.conf样例如下:KafkaServer {com.sun.security.auth.module.Krb5LoginModule requireddebug=falsekeyTab="example keytab file path"useTicketCache=falsestoreKey=trueprincipal="example principal "useKeyTab=true;}; Client {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;}; KafkaClient {com.sun.security.auth.module.Krb5LoginModule requiredstoreKey=trueprincipal="example principal "useTicketCache=falsekeyTab="example keytab file path "debug=falseuseKeyTab=true;};新建consumer.properties文件,内容如下,替换加粗部分:key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializersecurity.protocol = SASL_PLAINTEXTvalue.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializerkerberos.domain.name = 步骤2中获得的krbs domain namebootstrap.servers = broker地址sasl.kerberos.service.name = kafka4. 到kafka manager安装目录下,修改./conf/application.conf中的以下配置项:kafka-manager.zkhosts #配置为zookeeper 地址,默认有2个重复配置项,只保留1个kafka-manager.consumer.properties.file #配置为步骤3中的consumer.properties路径5. 将./lib目录下的org.apache.kafka.kafka-clients-1.1.0.jar和org.apache.zookeeper.zookeeper-3.4.10.jar替换为mrs发布jar包,可在kafka broker lib目录下获取6. 到kafka manager安装目录,替换以下命令中的加粗部分并执行:./bin/kafka-manager -Dconfig.file=步骤4中修改的application.conf文件路径 -Dhttp.port=kafka manager监听端口 -Dhttp.address=kafka manager监听网卡ip地址 -Djava.security.auth.login.config=步骤3中修改的jaas.con文件路径 -Djava.security.krb5.conf=步骤2中获得的kdc.conf文件路径 -Dzookeeper.request.timeout=120000 -Dkerberos.domain.name=步骤2中获得的krbs domain name7. 在网络连通的节点通过浏览器访问地址:http://ip:port, ip和port为步骤6中启动命令中的kafka manager监听网卡ip地址和kafka manager监听端口
-
云早报,(北京时间)1月25日,星期五【云头条】Kafka 商业公司 Confluent 获 1.25 亿美元融资,估值 25 亿美元开源方案提供商Confluent主攻事件流(Event Streaming)数据管理领域,提供针对免费开源技术的服务、支持和管理工具。这家公司是多年来增长最快的针对企业市场的初创公司之一。近日,Confluent以25亿美元估值完成1.25亿美元D轮融资。本轮融资由红杉资本领投,参投方还包括以前的投资者Index Ventures和Benchmark。据悉,Confluent的融资总额已达2.06亿美元。Confluent的发展速度相当迅猛。四年前,这家公司还在加州山景城默默无闻地经营着。一年后,这家公司才聘请了第一位销售人员。消息人士称,如今,Confluent的年预订量(预订量包括合同的全部价值)已超过1亿美元。Confluent的数据显示,其预订量在过去一年里增长了3.5倍。【华为云新闻】云上故事集锦 | 发现华为云的『第一性原理』物理学『第一性原理』,同时也是整个自然科学的重要理念和希腊精华思想的体现,巧妙地用一句话揭示了解决世界级难题的基础方法论:简单(simplicity),普适(universality)——用简单的方式,解决复杂的问题(Complexity out of simplicity),我国哲学也称“大道至简”。(查看全文)【互联网新闻】1.微软收购开源公司Citus Data,加码对抗谷歌、亚马逊据美国媒体CNBC报道,微软周四表示,它正在收购一家初创公司Citus Data。该公司已将其称为PostgreSQL的开源数据库软件商业化。交易条款尚未披露。这笔交易可以支撑微软提出支持开源技术的论点,特别是在云端。微软一直继续从受欢迎的自有产权软件如Windows和Office赚钱。在云计算业务方面,微软希望利用开放性来应对谷歌、市场领导者亚马逊等公司的竞争。2.抖音短视频:希望企业之间不要封杀找借口36氪讯,抖音短视频今日发文称,针对用户反馈“微信账户无法登录抖音”,抖音多次与腾讯沟通,均没有得到回复。此外,针对媒体报道提及微信内部人士回应称,通过微信登录授权,用户在微信中的关系链可以被轻松复制到抖音平台,抖音表示,此系谣言。文中称,除非微信主动提供,没有第三方可以获取微信关系链;未经用户同意,抖音不会收集、使用、共享、提供个人信息;希望企业之间不要封杀找借口,更不要污名化被封杀者网友评论:人家那里找借口了?人家压根就没搭理你!3.柔宇科技副总裁称小米折叠屏“公然造假”柔宇科技副总裁樊俊超昨日在朋友圈炮轰小米,称小米双折叠手机是买的别人尚未量产的概念柔性屏幕和概念机,并表示小米宣称的“攻克了柔性折叠屏技术”是公然造假。樊俊超直言,小米是一家没有核心技术的手机组装公司。网友评论:关于小米的话就让你们来说吧~4.拼多多涨近8%,市值再次逼近京东拼多多近日遭遇两大利空,一方面,拼多多被曝重大漏洞,遭用户“薅羊毛”,损失千万。此外,1月22日,拼多多长达半年的股票禁售期结束,拼多多股东面临二级市场出售股票套现机会。然而拼多多股价却并未受太多负面影响,反而在周四迎来大涨,截至收盘,拼多多报28.74美元,涨幅达7.76%,市值318.38亿美元,京东涨0.59%,市值为319.63亿美元。拼多多市值重回300亿美元的同时也几乎与京东市值持平。网友评论:难道是前两天的优惠券bug让大家重拾了信心?5.王欣或将推出新社交产品丸子视频有消息人士透露,王欣@王铁匠 或将推出新的社交产品“丸子视频”,目前,丸子视频官网下方标明为云歌人工智能公司版权所有,但并未披露更多的产品细节。此前,王欣推出以“匿名熟人社交”为自我定位的App,马桶MT。(虎嗅)网友评论:这个马桶MT你用过么?6.微软CEO:面部识别技术需要管制,以免竞争导致丧失底线据外媒报道,微软首席执行官萨提亚·纳德拉表示欢迎政府机构对面部识别技术进行监管,因为人们越来越担心该技术被用于监视人们和侵犯隐私。纳德拉认为,随着面部识别技术变得越来越普遍,科技公司的自我监管可能不足以应对它可能带来的社会威胁。他在达沃斯论坛上说:“我觉得,在市场上,只要有竞争,那么正确使用面部识别技术和错误使用面部识别技术之间就没有区别。”网友评论:只要用就没啥区别了~7.华为5G“秀肌肉”:发布5G芯片,5G折叠屏手机也在路上1月24日,华为在北京举办5G发布会暨2019世界移动大会预沟通会,发布了全球首款5G基站核心芯片——华为天罡,推动自身5G业务在全球范围内的大规模快速部署。目前,华为已经可提供涵盖终端、网络、数据中心的端到端5G自研芯片,支持“全制式、全频谱”网络。华为常务董事、运营商BG总裁丁耘介绍称,华为在过去一年里获得了30个5G的商业合同,5G产品全球发货25000台以上。他还透露,华为会在今年的MWC展会上发布首款商用5G折叠屏手机。网友评论:踏踏实实做技术就好!8.谷歌要求美国高院终止甲骨文Java侵权诉讼据国外媒体报道,1月24日,美国谷歌公司要求美国最高法院终止甲骨文公司(Oracle Corp)提起的一起价值10亿美元的版权诉讼。这起诉讼始于2010年。谷歌敦促高等法院裁定,美国版权法允许其采用甲骨文公司的Java编程语言,以创建安卓操作系统。网友评论:观众都散场了,电影还没结局!9.腾讯公司诉“糗事百科”不正当竞争,索赔50万元因认为“糗事百科”网站发布、传播虚假信息,损害了其商业信誉和商品声誉,深圳市腾讯计算机系统有限公司和腾讯科技(北京)有限公司以不正当竞争纠纷为由,将“糗事百科”运营商友际无限(北京)科技有限公司诉至法院,要求其停止侵权、赔礼道歉,并赔偿损失50万元。日前,海淀法院受理了此案。网友评论:大过年的,不要吵架。10.“阿尔法星际”正式亮相 10比0人类职业选手谷歌旗下人工智能部门DeepMind开发的人工智能(AI)程序“AlphaStar”(阿尔法星际)今日凌晨挑战《星际争霸2》游戏。结果是:名为“AlphaStar”的人工智能在与两位人类职业选手“TLO”和“MANA”的比赛中,均以5比0取胜。AlphaStar跟AlphaGo有些类似,最开始都是通过学习人类选手的Replay(比赛录像)来提升水平。直播中展示的10场比赛都是在一张相同的比赛地图上进行,而且都是神族内战。人工智能的APM(每分钟操作的次数)限制在与人类选手相仿的程度。网友评论:求这两位选手的心理阴影面积?【本周新闻】崩溃!程序员年会加班合并代码,77 万大奖没了。。。想离职(北京时间)1月21日,星期一周鸿祎:智能设备要警惕海豚音攻击(北京时间)1月22日,星期二300亿!北京印发5G产业发展行动方案(北京时间)1月23日,星期三程序员锁死服务器毁掉600万游戏项目?当事人回应:我没那能力(北京时间)1月24日,星期四【更多内容,欢迎访问】http://forum.huaweicloud.com/forum.php?mod=forumdisplay&fid=569&filter=typeid&typeid=266(内容来源于互联网,如侵犯您的合法权益或有其他任何疑问,请联系:huaweicloud.bbs@huawei.com沟通处理。谢谢!)
-
分布式消息服务 Kafka可以支持公网访问么?公网如何接入的?
-
特性背景消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性在0.10.2的版本是不支持的,从0.11版本开始才支持。华为云DMS率先提供Kafka 1.1.0的专享版服务,支持消息事务特性。 支持事务消息有什么作用?消息事务是实现分布式事务的一种方案,可以确保分布式场景下的数据最终一致性。例如最常用的转账场景,小王 转账到小明,实际操作是小王账户减去相应金额,小明的账户增加相应金额,在分库分表的前提下,2个账户存储在不同的数据库中,这时需要分布式事务才能保证数据库一致性,单个数据库的事务无法保证跨库之间的原子性。如果小王账户先扣钱,再去发送消息到小明所在的数据库去通知增加钱,在没有事务消息的情况下,无论是先扣钱或者先发送通知增加钱,都会有数据不一致的问题,因为无法保证两者的原子性。而有了事务消息,可以保证发送通知与本地事务(扣钱)是一个原子操作,本地事务与发送通知可以同时成功或者同时失败,确保数据一致。除了数据最终一致性外,还实现了消息Exactly once语义。所谓Exactly once语义是消息传递语义中最难实现的一种,包括At most once:最多一次(不会重复,但是可能丢失数据); At least once:至少投递一次(不会丢失,但是会导致重复)和Exactly once: 刚好一次(不丢不重),也即幂等性。Kafka的幂等性可以保证生产只对一个分区实现Exactl once语义,需要多个分区也实现这个语义,还需要引入消息事务确保原子性。分布式事务介绍当前系统架构主流是分布式架构与微服务架构,在这种架构下数据源不是单一的数据库,业务逻辑往往需要在多个数据库中实现原子操作,单个数据库中的强大的本地事务无法保证多节点原子操作。 此时需要分布式事务来确保数据的一致性。目前使用较多的分布式事务解决方案有几种:1、XA事务:两阶段/三阶段提交XA是由X/Open组织提出的分布式事务的规范。XA规范主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。实现XA事务的关键是两阶段和三阶段提交协议。两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器C和若干事务参与者Si两种角色,这里的事务参与者就是具体的数据库,协调器可以和事务参与者在一台机器上,如下图二阶段提交协议主要包括由2个阶段:第一个阶段为准备阶段(prepare),第二阶段为提交阶段。准备阶段由事务协调者向事务参与者发送prepare消息,各个参与者处理本地事务但不提交,然后向事务协调者返回事务状态。 提交阶段根据准备阶段各参与者的执行请求,协调者确定事务是提交或者回滚,向各个参与者发送命令。二阶段提交协议主要的问题是在提交执行过程中,所有的参与者都需要听从协调者的统一调度,期间处于阻塞状态而不能从事其他操作,这样效率及其低下。特别是当协调者发出提交通知到部分参与者后宕机,其他参与者就会阻塞。针对二阶段提交存在的问题,三阶段提交协议在prepare与commit阶段之间增加一个pre-commit阶段。Prepare阶段只询问参与者而不做事务,而在pre-commit阶段各个参与者才会执行本地事务但不提交。Commit阶段就是直接提交。这样做可以避免二阶段当协调者迟迟没有发出commit或者rollback通知,参与者在超时后可以自行提交或者回滚,避免阻塞事务(这是因为经过了prepare阶段已经确认了各个参与者是可以执行的,最后第三阶段直接执行即可)。 三阶段提交也存在很多问题,也不能完全保证数据一致,完全一致需要用到Paxos算法。2、TCC补偿性事务解决方案TCC分别对应Try、Confirm和Cancel三种操作,含义如下:- Try:预留业务资源- Confirm:确认执行业务操作,执行事务- Cancel:取消执行业务操作TCC解决了跨应用业务操作的原子性问题,在诸如组合支付、账务拆分场景非常实用。TCC实际上把数据库层的二阶段提交上提到了应用层来实现,对于数据库来说是一阶段提交,规避了数据库层的2PC性能低下问题。TCC需要业务提供使用,开发复杂和成本高。3、 事务消息基于消息中间件的事务消息来完成分布式事务。事务消息可以确保本地执行事务与消息发送是原子的:先发送一条消息到消息中间件,然后执行本地事务,当本地事务成功后再发送提交确认到消息中间件,然后这条消息才能被其他业务消费者所能感知,从而确保原子性。Kafka消息事务基本概念为了支持事务,Kafka 0.11.0版本引入以下概念:1、 事务协调者:类似于消费组负载均衡的协调者,每一个实现事务的生产端都被分配到一个事务协调者(Transaction Coordinator)2、引入一个内部Kafka Topic作为事务Log:类似于消费管理Offset的Topic,事务Topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入。3、引入控制消息(Control Messages): 这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。4、引入TransactionId:不同生产实例使用同一个TransactionId表示是同一个事务,可以跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作,避免事务僵死。5、Producer ID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。主要是为提供幂等性时引入的6、Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。7、 每个生产者增加一个epoch: 用于标识同一个事务Id在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求。8、幂等性: 保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性的开关enable.idempotence,可以独立与事务使用,即可以只开启幂等但不开启事务。事务流程如下图所示:1、查找事务协调者生产者会首先发起一个查找事务协调者的请求(FindCoordinatorRequest)。协调者会负责分配一个PID给生产者。类似于消费组的协调者,2、获取produce ID在知道事务协调者后,生产者需要往协调者发送初始化pid请求(initPidRequest)。这个请求分两种情况:2.1 不带transactionID这种情况下直接生成一个新的produce ID即可,返回给客户端2.2 带transactionID这种情况下,kafka根据transactionalId获取对应的PID,这个对应关系是保存在事务日志中(上图2a)。这样可以确保相同的TransactionId返回相同的PID,用于恢复或者终止之前未完成的事务。3、启动事务生产者通过调用beginTransaction接口启动事务,此时只是内部的状态记录为事务开始,但是事务协调者认为事务开始只有当生产者开始发送第一条消息才开始。4. 消费和生产配合过程这一步是消费和生成互相配合完成事务的过程,其中涉及多个请求:4.1 增加分区到事务请求当生产者有新分区要写入数据,则会发送AddPartitionToTxnRequest到事务协调者。协调者会处理请求,主要做的事情是更新事务元数据信息,并把信息写入到事务日志中(事务Topic4.2 生产请求 生产者通过调用send接口发送数据到分区,这些请求新增pid,epoch和sequence number字段,如上图中的4.2a描述4.3 增加消费offset到事务生产者通过新增的snedOffsetsToTransaction接口,会发送某个分区的Offset信息到事务协调者。协调者会把分区信息增加到事务中4.4 事务提交offset请求当生产者调用事务提交offset接口后,会发送一个TxnOffsetCommitRequest请求到消费组协调者(见4.4a),消费组协调者会把offset存储在__consumer-offsets Topic中。协调者会根据请求的PID和epoch验证生产者是否允许发起这个请求。 消费offset只有当事务提交后才对外可见5. 提交或回滚事务用户通过调用commitTransaction或abortTranssaction方法提交或回滚事务。5.1 EndTxnRequest当生产者完成事务后,客户端需要显式调用结束事务或者回滚事务。前者会使得消息对消费者可见,后者会对生产数据标记为Abort状态,使得消息对消费者不可见。无论是提交或者回滚,都是发送一个EndTnxRequest请求到事务协调者,写入PREPARE_COMMIT或者PREPARE_ABORT信息到事务记录日志中(5.1a)。5.2 WriteTxnMarkerRequest这个请求是事务协调者向事务中每个TopicPartition的Leader发送的。每个Broker收到请求后会写入COMMIT(PID)或者ABORT(PID)控制信息到数据日志中(5.2a)。这个信息用于告知消费者当前消息是哪个事务,消息是否应该接受或者丢弃。而对于未提交消息,消费者会缓存该事务的消息直到提交或者回滚。这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左边)。5.3 写入最终提交或回滚信息当提交和回滚信息写入数据日子后,事务协调者会往事务日志中写入最终的提交或者终止信息以表示事务已经完成(图5.3),此时大部分于事务有关系的消息都可以被删除(通过标记后面在日志压缩时会被移除),我们只需要保留事务ID以及其时间戳即可。接口:KafkaProducer.javapublic interface Producer<K,V> extends Closeable { /** * Needs to be called before any of the other transaction methods. Assumes that * the transactional.id is specified in the producer configuration. * * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer * are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * * @throws IllegalStateException if the TransactionalId for the producer is not set * in the configuration. */ void initTransactions() throws IllegalStateException; /** * Should be called before the start of each new transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void beginTransaction() throws ProducerFencedException; /** * Sends a list of consumed offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered * consumed only if the transaction is committed successfully. * * This method should be used when you need to batch consumed and produced messages * together, typically in a consume-transform-produce pattern. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; /** * Commits the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void commitTransaction() throws ProducerFencedException; /** * Aborts the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void abortTransaction() throws ProducerFencedException; /** * Send the given record asynchronously and return a future which will eventually contain the response information. * * @param record The record to send * @return A future which will eventually contain the response information * */ public Future<RecordMetadata> send(ProducerRecord<K, V> record); /** * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); }示例:KafkaTransaction**ample.javapublic class KafkaTransaction**ample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); // Note that the ‘transactional.id’ configuration _must_ be specified in the // producer config in order to use transactions. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); // We need to initialize transactions once per producer instance. To use transactions, // it is assumed that the application id is specified in the config with the key // transactional.id. // // This method will recover or abort transactions initiated by previous instances of a // producer with the same app id. Any other transactional messages will report an error // if initialization was not performed. // // The response indicates success or failure. Some failures are irrecoverable and will // require a new producer instance. See the documentation for TransactionMetadata for a // list of error codes. producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { // We need to initialize transactions once per producer instance. To use transactions, // it is assumed that the application id is specified in the config with the key // transactional.id. // This method will recover or abort transactions initiated by previous instances of a // producer with the same app id. Any other transactional messages will report an error // if initialization was not performed. // // The response indicates success or failure. Some failures are irrecoverable and will // require a new producer instance. See the documentation for TransactionMetadata for a // list of error codes. //启动新事务 producer.beginTransaction(); // Process the input records and send them to the output topic(s). List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } // To ensure that the consumed and produced messages are batched, we need to commit // the offsets through // the producer and not the consumer. // // If this returns an error, we should abort the transaction. //发送消费offset与生产消息是同一个事务,这样可以保证消费进度和生产同时成功或失败。 sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); // Now that we have consumed, processed, and produced a batch of messages, let's // commit the results. // If this does not report success, then the transaction will be rolled back. producer.endTransaction(); } } } }参考资料:1. https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging2. https://www.confluent.io/blog/transactions-apache-kafka/欢迎扫码查看更多精彩:
-
如题?
-
尊敬的华为云客户:华为云计划于2018/08/15 00:00:00将分布式消息服务Kafka专享版、RabbitMQ队列正式商用。商用后,Kafka专享版和RabbitMQ队列支持按需和包周期计费,具体价格请届时参考该服务的计费详情页。公测期间体验服务的用户,商用后将会按需计费,为保证业务的连续性,请您及时关注账户,确保账户余额充足;如您体验结束,不再需要使用该服务,建议您尽快删除队列,以免账户扣款。Kafka专享版和RabbitMQ队列商用版本采用更高性能主机,性能强劲。具有多种规格实例供选择,满足用户不同场景需要;提供SSL能力,具备更高的安全性。更多关于分布式消息服务的产品介绍,请您点击了解。如您在使用过程中有宝贵意见,欢迎您拨打华为云服务热线:4000-955-988与我们联系。感谢您对华为云的支持!
-
在一个月黑风高的夜晚,突然收到现网生产环境Kafka消息积压的告警,梦中惊醒啊,马上起来排查日志。问题现象:消费请求卡死在查找CoordinatorCoordinator为何物?Coordinator用于管理Consumer Group中各个成员,负责消费offset位移管理和Consumer Rebalance。Consumer在消费时必须先确认Consumer Group对应的Coordinator,随后才能join Group,获取对应的topic partition进行消费。那如何确定Consumer Group的Coordinator呢?分两步走:1、一个Consumer Group对应一个__consumers_offsets的分区,首先先计算Consumer Group对应的__consumers_offsets的分区,计算公式如下:__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount,其中groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定。2、1中计算的该partition的leader所在的broker就是被选定的Coordinator。定位过程Coordinator节点找到了,现在看看Coordinator是否有问题: 不出所料,Coordinator对应分区Leader为-1,消费端程序会一直等待,直到Leader选出来为止,这就直接导致了消费卡死。为啥Leader无法选举?Leader选举是由Controller负责的。Controller节点负责管理整个集群中分区和副本的状态,比如partition的Leader选举,topic创建,副本分配,partition和replica扩容等。现在我们看看Controller的日志:1. 6月10日15:48:30,006 秒Broker 1成为controller此时感知的节点为1和2,节点3 在zk读不出来:31秒847的时候把__consumer_offsets的分区3的Leader选为1,ISR为[1,2],leader_epoch为14:再过1秒后才感知到Controller发生变化,自身清退2.Broker 2在其后几百毫秒后(15:48:30,936)也成为Controller但是Broker2 是感知到Broker 3节点是活的,日志如下:注意这个时间点,Broker1还没在zk把__consumer_offsets的分区3 的Leader从节点3改为1,这样Broker 2还认为Broker 3是Leader,并且Broker 3在它认为是活的,所以不需要重新选举Leader。这样一直保持了相当长的时间,即使Broker 1已经把这个分区的Leader切换了,它也不感知。3. Broker 2在12号的21:43:19又感知Broker 1网络中断,并处理节点失败事件:因为Broker 2内存中认为__consumer_offsets分区3的Leader是broker 3,所以不会触发分区3的Leader切换。Broker 2但是在处理失败的节点Broker 1时,会把副本从ISR列表中去掉,去掉前会读一次zk,代码如下: 但是发现zk中分区3的Leader已经变为1,ISR列表为[1,2],当要去掉的节点1就是Leader的时候,Leader就会变为-1, ISR只有[2],从日志也可以看到: 这样分区3 的Leader一直为-1,直到有新的事件触发节点2重新选举才能恢复(例如重启某个节点)。根因总结出现网络异常后,由于新老controller之间感知的可用节点不同,导致新controller对某个分区的Leader在内存中的信息与zk记录元数据的信息不一致,导致controller选举流程出现错误,选不出Leader。 需要有新的选举事件才能触发Leader选出来,例如重启。问题总结 这是一个典型的由于网络异常导致脑裂,进而出现了多个Controller,华为云分布式消息服务(DMS)Kafka经过电信级的可靠性验证,已经完美解决了这些问题,您值得拥有!欢迎扫码查看更多精彩:
上滑加载中