• [问题求助] 【鲲鹏架构服务器】【taos(涛思数据)、kafka】对应的下载安装方式
    【功能模块】【鲲鹏架构服务器】【taos(涛思数据)、kafka】【操作步骤&问题现象】1、taos(涛思数据)、kafka对应的下载安装包和安装方式2、【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [运维管理] HD 6.5.1.3 版本集群 zookeeper和kafka是否有限制客户端ip访问的方法?kerberos除外
    【操作步骤&问题现象】HD 6.5.1.3 版本集群 zookeeper和kafka是否有限制客户端ip访问的方法?kerberos除外
  • [基础组件] 【MRS3.1.2产品】【CDL组件功能】CDL监控MySQL数据生产到Kafka
    【功能模块】创建CDL作业-MySQL--kafka,任务可以成功运行,且能够监控MySQL新增的数据,生产到Kafka中【操作步骤&问题现象】问题一、①MySQL中insert的时间类型的数据是2021-01-01 00:00:00②生产到Kafak的对应字段的数据变成了2020-12-31T16:00:00Z,时间出现了晚一天现象问题二、①在创建CDL作业的时候,Mysql的配置信息,Schema Auto Create我选择了否②然后消费kafka的数据发现还有大量的Schema信息被生产到Kafka中,希望的是不需要额外大量没有用处的信息
  • [赋能学习] 华为FusionInsight MRS实战 - FlinkSQL从kafka写入hive
    # 华为FusionInsight MRS实战 - FlinkSQL从kafka写入hive## 背景说明随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。SQL 是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 支持全部应用场景,Flink SQL 的实现也完全遵循 ANSI SQL 标准。之前,用户可能需要编写上百行业务代码,使用 SQL 后,可能只需要几行 SQL 就可以轻松搞定。本文介绍如何使用华为FusionInsight MRS FlinkServer服务进行界面化的FlinkSQL编辑,从而处理复杂的嵌套Json格式## Kafka样例数据模拟物联网场景的数据```{"device":"Demo1","signal":"60","life":"24","times":"2021-12-20 15:46:37"}{"device":"Demo2","signal":"78","life":"20","times":"2021-12-20 15:46:37"}{"device":"Demo3","signal":"41","life":"6","times":"2021-12-20 15:46:38"}{"device":"Demo4","signal":"71","life":"29","times":"2021-12-20 15:46:38"}{"device":"Demo5","signal":"38","life":"19","times":"2021-12-20 15:46:38"}{"device":"Demo6","signal":"98","life":"10","times":"2021-12-20 15:46:38"}{"device":"Demo7","signal":"80","life":"19","times":"2021-12-20 15:46:38"}{"device":"Demo8","signal":"55","life":"27","times":"2021-12-20 15:46:38"}{"device":"Demo9","signal":"93","life":"13","times":"2021-12-20 15:46:38"}{"device":"Demo10","signal":"46","life":"2","times":"2021-12-20 15:46:38"}{"device":"Demo11","signal":"94","life":"28","times":"2021-12-20 15:46:38"}{"device":"Demo12","signal":"24","life":"26","times":"2021-12-20 15:46:38"}{"device":"Demo13","signal":"64","life":"3","times":"2021-12-20 15:46:38"}{"device":"Demo14","signal":"97","life":"22","times":"2021-12-20 15:46:38"}{"device":"Demo15","signal":"82","life":"13","times":"2021-12-20 15:46:38"}{"device":"Demo16","signal":"2","life":"2","times":"2021-12-20 15:46:38"}{"device":"Demo17","signal":"19","life":"22","times":"2021-12-20 15:46:38"}{"device":"Demo18","signal":"51","life":"22","times":"2021-12-20 15:46:38"}{"device":"Demo19","signal":"1","life":"20","times":"2021-12-20 15:46:38"}{"device":"Demo20","signal":"41","life":"24","times":"2021-12-20 15:46:38"}```## 使用华为MRS Flinkserver对接Hive### 前提条件- 集群已安装HDFS、Yarn、Kafka、Flink和Hive等服务。- 包含Hive服务的客户端已安装,安装路径如:/opt/client。- Flink支持1.12.2及以上版本,Hive支持3.1.0及以上版本。- 参考基于用户和角色的鉴权创建一个具有“FlinkServer管理操作权限”的用户用于访问Flink WebUI的用户。 ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/1736360gbj0jcxg8s5b01r.png)- 参考创建集群连接中的“说明”获取访问Flink WebUI用户的客户端配置文件及用户凭据。### 操作步骤以映射表类型为Kafka对接Hive流程为例。1. 使用flink_admin访问Flink WebUI,请参考访问Flink WebUI。2. 新建集群连接,如:flink_hive。 a. 选择“系统管理 > 集群连接管理”,进入集群连接管理页面。 b. 单击“创集集群连接”,在弹出的页面中参考表1填写信息,单击“测试”,测试连接成功后单击“确定”,完成集群连接创建。 表1 创建集群连接信息| 参数名称 | 参数描述 | 取值样例 || ---- | ---- | ---- ||集群连接名称|集群连接的名称,只能包含英文字母、数字和下划线,且不能多于100个字符。|flink_hive||描述|集群连接名称描述信息。|-||版本|选择集群版本。|MRS 3||是否安全版本|是,安全集群选择是。需要输入访问用户名和上传用户凭证; 否,非安全集群选择否。|是||访问用户名|访问用户需要包含访问集群中服务所需要的最小权限。只能包含英文字母、数字和下划线,且不能多于100个字符。“是否安全版本”选择“是”时存在此参数。|flink_admin||客户端配置文件|集群客户端配置文件,格式为tar。|-||用户凭据|FusionInsight Manager中用户的认证凭据,格式为tar。“是否安全版本”选择“是”时存在此参数。输入访问用户名后才可上传文件。|flink_admin的用户凭| ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173701okrumrzvi0plfwut.png) ![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173725okb4gjbtuycxpgt0.png)3. 新建Flink SQL流作业,如:kafka_to_hive。在作业开发界面进行作业开发,输入如下语句,可以单击上方“语义校验”对输入内容校验。```CREATE TABLE test_kafka ( device varchar, signal varchar, life varchar, times timestamp) WITH ( 'properties.bootstrap.servers' = '172.16.9.116:21007', 'format' = 'json', 'topic' = 'example-metric1', 'connector' = 'kafka', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com');CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-version' = '3.1.0', 'default-database' = 'default', 'cluster.name' = 'flink_hive');use catalog myhive;set table.sql-dialect = hive;create table test_avro_signal_table_orc ( device STRING, signal STRING, life STRING, ts timestamp ) PARTITIONED BY (dy STRING, ho STRING, mi STRING) stored as orc TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dy $ho:$mi:00', 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '0S', 'sink.partition-commit.policy.kind' = 'metastore,success-file' );INSERT into test_avro_signal_table_orcSELECT device, signal, life, times, DATE_FORMAT(times, 'yyyy-MM-dd'), DATE_FORMAT(times, 'HH'), DATE_FORMAT(times, 'mm')FROM default_catalog.default_database.test_kafka;```![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/1737526tbsjqu5lhyh0qpm.png)注意:作业SQL开发完成后,请勾选“运行参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。4. 启动任务![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173831ksr246oxpzfdjojy.png)5. 启动kafka生产者插入样例数据![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173856n53f2m49esjuttwn.png)6. 查看hive数据![](https://bbs-img.huaweicloud.com/data/forums/attachment/forum/202112/20/173926jzcxzw96afacb6tg.png)
  • [二次开发] 【大数据Spark2X】sparkstreaming对接kafka程序yarn-cluster提交到华为集群后运行失败,提示入口
    【功能模块】【操作步骤&问题现象】1、sparkstreaming对接kafka程序,本地环境测试正常2、程序打包后通过如下命令提交到yarn上3、提交成功,运行失败,报错:找不到主类【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [赋能学习] 使用pytho接口访问kafka安全模式
    环境准备1.          Manager下载krb5.conf,放到/etc/krb5.conf 目前开源代码不支持自定义krb5.conf路径2.          Manager下载客户端,并安装3.          yum -y install epel-release 测试环境使用CentOS,需要安装EPEL源才能安装pip4.          yum install python-pip 安装python安装管理工具pip5.          pip install kafka-python 需要修改源码 /usr/lib/python2.7/site-packages/kafka/conn.py line 570 将self.host修改为‘hadoop.hadoop.com’。或者直接从git下载最新代码,通过saslkerberosdomain_name参数指定为hadoop.hadoop.com.6.          yum install python-gssapi 安装gssapi库7.          kinit 执行pythong脚本前先通过kinit方式认证8.          执行kafka python脚本。创建topic#!/usr/bin/env bash # cd /opt/client 进入实际客户端安装目录 # source bigdata_env 导入环境变量 # kinit 使用具有KafkaAdmin权限的用户登录 # cd ./Kafka/kafka/bin/ 进入Kafka脚本目录  # 查询当前已有的topic ./kafka-topics.sh --list --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka  # 如果没有test-topic,则创建。partition数量跟同一组中的consumer的数量保持一致。 # 如果要保证消息按顺序被消费,就只建一个partition。 ./kafka-topics.sh --create --zookeeper 172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --partitions 3 --replication-factor 2 --topic test-topic  # 给producer赋予向topic中生产数据的权限 ./kafka-acls.sh --authorizer-properties zookeeper.connect=172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --add --allow-principal User:developuser --producer --topic test-topic  # 给consumer赋予从topic中消费数据的权限 ./kafka-acls.sh --authorizer-properties zookeeper.connect=172.21.3.101:24002,172.21.3.102:24002,172.21.3.103:24002/kafka --add --allow-principal User:developuser --consumer --topic test-topic --group test-groupProducer样例from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=["172.21.3.101:21007"],                          security_protocl="SASL_PLAINTEXT",                          sasl_mechanism="GSSAPI",                          sasl_kerberos_service_name="kafka") for _ in range(100):     response = producer.send("test-topic", b"testmessage")     result = response.get(timeout=50)     print(result)    Consumer样例from kafka import KafkaConsumer consumer = KafkaConsumer("test-topic",                          bootstrap_servers=["172.21.3.101:21007"],                          group_id="test-group",                          enable_auto_commit="true",                          security_protocl="SASL_PLAINTEXT",                          sasl_mechanism="GSSAPI",                          sasl_kerberos_service_name="kafka") for message in consumer:     print(message)
  • [问题求助] 【FI】【Kafka(KCluster)】开启kerberos认证后,代码无法从kafka集群中获取topic
    如题所示,报下面的错误,借鉴网上https://blog.csdn.net/li1987by/article/details/82856873的改法。将kafka-clients-2.6.0.jar 替换为kafka-clients-1.1.0.jar后,改错误消失,但是在哪里可以下载到华为kafka-clients-2.6.0.jar 的jar??我们需要后面spark 任务需要kafka-client2.6
  • [问题求助] 【FI】【Kafka(KCluster)】开启kerberos认证后,代码无法从kafka集群中获取topic
    如题所述,kafka-console-producer.sh & kafka-console-consumer.sh 执行正常。但是代码执行失败, kerberos验证开启debug模式后,日志观察成功。jar引用是华为kafka client libs。kafka版本为:2.11-1.1.0哪位大佬给指点下,万分感谢。----------------------------String krb5 = args[0];String jaasPath = args[1];String broker= args[2];// todoSystem.setProperty("java.security.krb5.conf", krb5);System.setProperty("java.security.auth.login.config", jaasPath);System.setProperty("zookeeper.server.principal", "zookeeper/hadoop.hadoop.com");Properties props = new Properties();props.put("bootstrap.servers", broker);props.put("group.id", "g1");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", ByteBufferDeserializer.class.getName());// todoprops.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "GSSAPI");props.put("sasl.kerberos.service.name", "kafka");// adminclientAdminClient client = AdminClient.create(props);ListTopicsResult listTopics = client.listTopics();Set<String> strings = listTopics.names().get();System.out.println(strings);client.close();// kafkaconsumerKafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();System.out.println(stringListMap.size());consumer.close();
  • [运维管理] HD 6.5.1.7版本 kafka节点出现单台故障实例后,执行删除topic标记删除问题
    【操作步骤&问题现象】HD 6.5.1.7版本 kafka节点出现单台故障实例后,执行删除topic标记删除问题业务侧自己执行zk 将对应topic信息删除,后创建topic使用时报错元数据不存在在单节点故障情况下是否可以执行,清理zk元数据 addauth krbgroup deleteall /kafka/brokers/topics/topicnamedeleteall /kafka/config/topics/topicname手动清理Kafka所有节点上的该topic的数据文件重启kafka实例 恢复?
  • [行业资讯] Apache Kafka 3.0.0 正式发布
    Apache Kafka 3.0.0正式发布,Apache Kafka是一个分布式流平台,具有四个核心API。本次更新是一个重要的版本更新,其中包括许多新的功能:对Java 8和Scala 2.12的支持被废弃了;Kafka Raft支持元数据主题的快照,以及self-managed quorum方面的其他改进;废弃了消息格式v0和v1;默认情况下为Kafka Producer启用更强的交付保证;增强了Kafka Streams中时间戳同步的语义;修改了Stream的TaskId的公共API;优化了OffsetFetch和FindCoordinator请求等等。
  • [基础组件] 程序中连接kafka时报错且确认配置的bootstrap.servers正确
    【功能模块】kafka【操作步骤&问题现象】1、编写程序,大致逻辑:SparkStreaming读取kafka中的数据,然后写入hbase中2、此demo是华为云上的样例demo,视频地址:https://bbs.huaweicloud.com/forum/thread-90888-1-1.html提交方式使用的是yarn-client【截图信息】【日志信息】(可选,上传日志内容或者附件)一直在重复报地址连不上[Consumer clientId=consumer-testGroup-1, groupId=testGroup] Bootstrap broker 172.31.8.38:21007 (id: -2 rack: null) disconnected | org.apache.kafka.clients.NetworkClient.handleServerDisconnect,详细日志见附件
  • [环境搭建] 【FusionInsight HD产品】【HD 6.5.1】Kafka节点部署规划问题
    【功能模块】关于kafka节点部署台数规划【操作步骤&问题现象】1、某局点kafka broker节点部署了4个,在开会讨论中,客户领导说kafka节点必须保持奇数,否则有台broker会浪费,而且leader选举时也会有问题2、但实际情况4台broker节点都进行了存储和使用,并没有出现客户说的问题,而且产品文档中也说明了,broker节点最少三台,并没有说必须保持奇数。只是zookeeper需要保持奇数。客户想让说明,问什么开源的需要保持奇数,但FusionInsight HD 没有这个要求,麻烦大佬帮忙解释下。【截图信息】【日志信息】(可选,上传日志内容或者附件)
  • [技术干货] IoT设备消息洪峰怎么扛?
    传统的消息队列((Kafka、RocketMQ等)经过多年打磨,在高性能、海量堆积、消息可靠性等诸多方面都已经做得非常极致,但在物联网场景中,往往需要面临着海量的消息传递,传统的消息队列表现的“力不从心”。IoT领域中,从应用服务器到嵌入式芯片,都需要传递事件消息,比如共享充电宝的开柜子、开灯指令从服务器发到设备、工业网关高频消息流等,在这些信息传递的过程中,队列最大意义在于让整个消息事件在不可控的环境因素变成一个平稳运行的系统,因为IoT设备时不时会由于故障或网络抖动会导致大量消息洪峰。阿里云AIoT作为物联网领域的引领者和创新者,在消息队列领域不断深耕与沉淀,为了让物联网从业者更进一步了解IoT场景队列,阿里云技术专家吕建文,整理了一份IoT队列的干货知识,与大家一同探讨一个适合于物联网系统的消息队列。一、IoT队列和普通队列的差异点1,上下行隔离拆分在IoT场景中,我们把需要队列分为两个场景,一个是上行队列,一个是下行队列。 拆分之后,可以隔离上下行链路,控制一个设备,比如支付成功要下发打开柜子等,上行出任何问题,千万不能影响到下行业务。另外,上下行两条链路的特点差异非常大。设备上行消息,并发量非常高,但很多场景下对于可靠性和时延要求低,而设备下行消息,并发量则比较低,但下行消息(一般是控制设备指令)要求到达成功率很高。2,支持设备级的海量topic传统队列的核心诉求是,不论堆积多少不影响它的性能。kafka的topic一多,原本消息顺序写文件优势就会导致一个broker要退化到随机写,失去优势,另外要zookeeper来协调这么多topic也是有局限,所以这些队列本身有提供一个外挂代理桥接器对外入口是多个设备topic,再桥接映射到少量的实际kafka topic,这方案有一定可行性,但做不到隔离效果,治标不治本。通过,图1和图2对比较明显,一个队列拥塞尽量减少对其它设备影响。我们需要的是“海量topic尽量相互隔离,并且不影响整体性能”,尽量做到设备A的消息堆积topic,不影响设备B。3,实时生成消息优先发送先举一个例子,一个快递柜业务的队列堆积,然后“此时此刻”在柜子旁边的用户死命的在旁边用手机点开柜子怎么也打不开(此时后端系统都恢复了),问题就是队列里面还有几十万条的消息,新来的消息需要排队, 等着之前的那些消息消费完,甭管这些消息还有没有用。  因此,实时生成消息优先发送,堆积的消息进入降级模式。二、IoT消息队列诞生1, IoT队列的设计思路设计目标是为了打造一个支持上下行隔离、实时优先、及海量topic的队列网关,设计原则如下:完全follow开源生态、和传统队列互补兼容保序降级,实时优先,堆积退化;仅实时消息相对有序。海量topic,多租户隔离连接、计算、存储分离2, 消息模式图片只是个片段,从这个模式可以看出来机制差别,大家都没有错,只是出发点不同。3, 连接、计算、存储分离broker不做连接,连接网关代理,broker只做流转分发,无状态+水平扩展;存储交给nosql DB,高吞吐写。4, 消息策略-推拉结合这个应该是队列的核心难点之一,和传统队列区分在于,我们考虑为平台化模式,独享资源过于昂贵。但带来问题是消费端不可控,所以使用结合模式,只有在消费者在线时会拉取堆积消息,而拉取是由AMQP队列网关来做,给到用户接口始终是推送过去的onMessage回调。broker不是直接让consumer来连接,而是把队列网关剥离出来,  这样会更灵活,甚至对于部分用户我们的queue可以切换到ons、kafka等实现。kafka、rocketmq做法是在连接时会分配给客户端一个broker接入地址。broker实时消息优先推送给consumer,失败才会落到queue ;这是一个完整事件,如果没有完成则不给producer commit。异步ACK5, 线性扩展-离线消息部分实时部分消息采用推方式,基本上不会成为瓶颈,消费不过来消息进入堆积模式。由于底层依赖存储已经帮我们解决核心存储的扩展,剩下主要问题点在于如何消除写入热点和消费热点,这样broker可以完全做到无状态。三,一个思考——如何解决海量topic问题?首先面对“大量”的问题一般都是考虑分区,单元化,分组等隔离和拆分,这里海量topic我们讨论针对一个单实例模式下如何尽可能做到更多topic,完全任意数量都能100%没问题肯定是不现实的。由于broker和存储已经隔离,broker和topic已经没有什么关系,或者说任何topic数据生成,broker做的事情就是写入和分发。海量topic,每个topic有限数量订阅:  topic和订阅者关系使用redis缓存或本地缓存,针对mqtt topic匹配有个topic tree的树算法,hivemq有实现版本。单个topic 海量订阅:  这个场景其实是组播和广播,我们不会考虑在队列本身上面去做这个事情,而是在上层封装广播组件来协调任务和批量发送。 四, 阿里云AIoT消息队列目前阿里云AIoT队列,也叫服务端订阅,意思就是用户用服务端订阅他们设备消息。为了降低接入成本,用户可以使用AMQP1.0协议接入,符合开源生态。 同时兼容传统队列和新队列,交给用户按场景来选择,用户即可选择使用kafka、mq,也可以选用iot队列,甚至组合模式,比如按消息特征规则来配置流转队列。阿里云AIoT的场景队列实践,在现有mq队列、kafka队列融合之外,加了种自有的实时优先队列实现,同时,加入了队列网关代理,既能让用户选择普通消息队列,也可以选择轻便的IoT消息队列。————————————————原文链接:https://blog.csdn.net/alitech2017/article/details/119563822
  • [交流吐槽] 领取Kafka免费实例 这个任务应该怎么做呢
    提示“您不在此活动范围。”为什么呀?不懂,瑟瑟发抖!
  • [交流吐槽] 领取Kafka免费实例 任务怎么做呀
    提示“您不在此活动范围。”为什么呀?不懂,瑟瑟发抖!
总条数:146 到第
上滑加载中