• [问题求助] FusionInsight HD 6.5.1版本数据节点重装操作系统后需执行preinstall嘛?
    集群环境:生成环境问题描述:某单一数据节点已完成操作系统的重装,现在需在manager页面重装主机,疑惑的点是重装主机之前需不需要做preinstall和precheck?
  • [二次开发] FusionInsight HD 6517版本集群,hbase如何实现两张表数据一致性比对?
    FusionInsight HD 6517版本集群,hbase如何实现两张表数据一致性比对?
  • [最佳实践] Python对接clickhouse
    Python对接clickhouseclickhouse通用jdbc端口clickhouse jdbc接口使用HTTP协议,具体对应华为clickhouse端口可以在Manager->clickhouse页面 逻辑集群部分查看针对非加密、加密端口,对接使用的jdbc url有区别,具体如下非加密端口 21426 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21426/default加密端口 21428 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21428/default?ssl=true&sslmode=none其中连接的ip 为 clickhouse balancer实例对应的ip注意:本次使用非加密端口进行对接前提条件安装python3环境,以及需要连接的MRS集群环境 1、 下载python3 源码 编译tar zxvf Python-3.8.0.tgz cd Python-3.8.0 mkdir -p /usr/local/python-3.8.0 ./configure --prefix=/usr/local/python-3.8.0 -enable-optimizations --with-ssl make && make install 编译 ln -s /usr/local/python-3.8.0/bin/python3 /usr/bin/python3 ln -s /usr/local/python-3.8.0/bin/pip3 /usr/bin/pip3 ll /usr/bin/python*使用通用jdbc进行连接安装对应依赖./pip3 install jpype1==1.4.1 ./pip3 install JayDeBeApi==1.2.3Python代码import jaydebeapi import jpype import os conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:21426/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar']) import pandas as pd sql = "Select * From addressbook" df_ck = pd.read_sql(sql, conn) df_ck conn.close()注解: 将所需的lib文件放在对应目录下commons-codec-1.15.jar commons-logging-1.2.jar httpclient-4.5.13.jar httpcore-4.4.13.jar lz4-java-1.7.1.jar slf4j-api-1.7.36.jar bcprov-jdk15on-1.70.jar clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar us-common-1.0.66.jar使用clickhouse_connect进行连接安装对应依赖./pip3 install clickhouse_connect==0.6.4 python代码import clickhouse_connect client = clickhouse_connect.get_client(host='x.x.x.x', port=21426, username='username', password='passwd') client.command('show tables') client.command('select * from people')注:参照cid:link_0FAQPython代码执行报错classnotfoundconn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse:// x.x.x.x:port/default?ssl=true&sslmode=none?user=username&password=passwd!@",jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar'])查看该方法源码 help(jaydebeapi.connect)解决方法: 修改connection的参数设置为conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:port/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar'])python代码执行报错classnotfoundgrep –R ‘x.x.x.x’ 文件夹找到对应的jar包放到对应目录下,在jars参数上加上对应的jar包
  • [运维管理] Kafka集群为什么会建议集群总分区数不超过10000?
    Kafka集群为什么会建议集群总分区数不超过10000?总分区数要减去副本分区数不?
  • [最佳实践] Loader使用RestAPI最佳实践
    Loader简介Loader是实现FusionInsight HD与关系型数据库、文件系统之间交互数据和文件的数据加载工具。基于开源Sqoop研发,做了大量优化和扩展。提供可视化向导式的作业配置管理界面;提供定时调度任务,周期性执行Loader作业;在界面中可指定多种不同的数据源、配置数据的清洗和转换步骤、配置集群存储系统等。Loader的特点图形化:提供图形化配置、监控界面,操作简便。高性能:利用MapReduce并行处理数据。高可靠:Loader Server采用主备双机;作业通过MapReduce执行,支持失败重试;作业失败后,不会残留数据。安全:Kerberos认证;作业权限管理。背景loader界面提供了任务历史记录以及运行状态等信息展示,客户需要通过RestAPI方式获取任务状态以及历史记录信息关键点1.交互过程使用kerberos认证,http访问场景下也叫做spnego认证2.同组件原生界面交互时需要跟服务端做ssl3.认证文件user.keytab文件以及krb5.conf放置于conf目录下代码解读 //loader String url = "https://x.x.x.x:20026/Loader/LoaderServer/124/loader/v1/job/all?paged=true&offset=1&limit=2&kw=&group=0&order=desc&order-by=cdate"; // String url = "https://x.x.x.x:20026/Loader/LoaderServer/124/loader/v1/submission/history/3?paged=true&limit=10&offset=1"; System.out.println("PATH_TO_KEYTAB " + PATH_TO_KEYTAB); System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");loader端口为20026,第一个url为获取所有任务信息,limit可以修改获取任务条数;第二个url为获取单个任务历史记录,history后为用户id,可以修改运行样例:获取全部任务:获取单个任务历史记录:返回json{"all":[{"exception":"","counters":{"org.apache.hadoop.mapreduce.FileSystemCounter":{"FILE_LARGE_READ_OPS":0,"HDFS_BYTES_READ_EC":0,"FILE_WRITE_OPS":0,"HDFS_READ_OPS":270,"HDFS_BYTES_READ":3659,"HDFS_LARGE_READ_OPS":0,"FILE_READ_OPS":0,"FILE_BYTES_WRITTEN":11554899,"FILE_BYTES_READ":0,"HDFS_WRITE_OPS":91,"HDFS_BYTES_WRITTEN":166800009},"org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter":{"BYTES_WRITTEN":166800009},"org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter":{"BYTES_READ":0},"org.apache.hadoop.mapreduce.JobCounter":{"TOTAL_LAUNCHED_MAPS":30,"MB_MILLIS_MAPS":3549155328,"SLOTS_MILLIS_REDUCES":0,"VCORES_MILLIS_MAPS":866493,"SLOTS_MILLIS_MAPS":3465972,"OTHER_LOCAL_MAPS":30,"MILLIS_MAPS":866493},"org.apache.sqoop.submission.counter.SqoopCounters":{"ROWS_SKIPPED":0,"ROWS_READ":5903983,"ROWS_WRITTEN":5903983},"org.apache.hadoop.mapreduce.TaskCounter":{"SPILLED_RECORDS":0,"MERGED_MAP_OUTPUTS":0,"VIRTUAL_MEMORY_BYTES":124308541440,"MAP_INPUT_RECORDS":0,"MAP_PHYSICAL_MEMORY_BYTES_MAX":541884416,"SPLIT_RAW_BYTES":3659,"FAILED_SHUFFLE":0,"PHYSICAL_MEMORY_BYTES":15452405760,"GC_TIME_MILLIS":15096,"MAP_VIRTUAL_MEMORY_BYTES_MAX":4216520704,"MAP_OUTPUT_RECORDS":5903983,"CPU_MILLISECONDS":817330,"COMMITTED_HEAP_BYTES":18252038144}},"last-update-date":1686644843155,"last-udpate-user":"xxx","output":"--: --","input":"MYSQL: server_diskspace[null]","caller":-1,"creation-user":"xxx","progress":1.0,"creation-date":1686644784844,"external-id":"job_1680145428800_0188","dirty-data-link":"http:\/\/172-16-9-118:25002\/explorer.html#\/user\/loader\/etl_dirty_data_dir\/1\/1680145428800_0188","job":1,"external-link":"http:\/\/172-16-4-22:26000\/proxy\/application_1680145428800_0188\/","status":"SUCCEEDED"}],"total-num":1}任务执行失败返回json{"all":[{"output":"--: --","exception":"执行SQL语句失败。 原因: sql execute error","input":"MYSQL: people[null]","caller":-1,"creation-user":"xxx","progress":-1.0,"creation-date":1686644587315,"last-update-date":1686644587315,"dirty-data-link":"","job":3,"last-udpate-user":"xxx","status":"FAILURE_ON_SUBMIT"}],"total-num":1}
  • [运维管理] 线下的FusionInsight HD 6513版本集群spark2X是否有方案关闭kerberos
    线下的FusionInsight HD 6513版本集群spark2X是否有方案关闭kerberos
  • [基础组件] yarn任务的counter文件解析样例
    1 背景Yarn的任务的统计结果在HDFS的指定文件(/mr-history/done/2023/05/15  目录 xxx.jhist 文件)存放。解析该文件,即可最小化影响HDFS性能(统计每个MR的counter只需访问1次HDFS,获取该20K文件,然后在客户端解析文件内容即可)2 获取xxx.jhist文件查看对应日期的jhist文件(如:2023年5月23日):hdfs dfs -ls /mr-history/done/2023/05/23/000000下载jhist文件:hdfs dfs -get /mr-history/done/2023/05/23/000000/job_1683342225080_0138-1684848693047-Loader%3A+testsftp2hive_1684848475163-1684848723140-1-0-SUCCEEDED-default-1684848703642.jhist3 解析counter示例使用java代码解析counter3.1 添加依赖<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-hs</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.8.5</version></dependency>3.2 解析代码package com.huawei.bigdata.mapreduce.examples;import static org.mockito.Mockito.mock;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.CounterGroup;import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.JobACLsManager;import org.apache.hadoop.mapreduce.v2.api.records.JobId;import java.io.IOException;public class TestYarnCounter { public static void main(String args[]) { Path fullHistoryPath = new Path("D:\\history\ job_1683342225080_0138-1684848693047-Loader%3A+testsftp2hive_1684848475163-1684848723140-1-0-SUCCEEDED-default-1684848703642.jhist"); Configuration conf = new Configuration(); boolean loadTasks = false; HistoryFileInfo info = mock(HistoryFileInfo.class); JobId jobId = null; JobACLsManager jobAclsManager = new JobACLsManager(conf); try { CompletedJob completedJob = new CompletedJob(conf, jobId, fullHistoryPath, loadTasks, "user", info, jobAclsManager); CounterGroup counterGroup = completedJob.getAllCounters() .getGroup("org.apache.hadoop.mapreduce.FileSystemCounter"); for (Counter counter : counterGroup) { System.out.println(counter.getName() + ":" + counter.getValue()); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}4 运行结果FILE_BYTES_READ:0FILE_BYTES_WRITTEN:393363FILE_READ_OPS:0FILE_LARGE_READ_OPS:0FILE_WRITE_OPS:0HDFS_BYTES_READ:204HDFS_BYTES_WRITTEN:153HDFS_READ_OPS:3HDFS_LARGE_READ_OPS:0HDFS_WRITE_OPS:3运行的结果也可在ResourceManager WebUI页面查看:点击对应的application id,显示如下页面 在该页面点击history,查看counter信息
  • [基础组件] elasticsearch自定义分词器样例
    1 text_en_splitting_tight分词名称分词器和过滤器输入分词效果text_en_splitting_tight1.中英文根据空格分词2.替换成同义词,比如搜索北大换成北京大学3.删除停顿词,比如a an but4.把特殊符号去掉,比如wi-fi 替换成wifi5.大写转换成小写6.保护词免于被分词器修改7.英文单词复数变单数形势(比如dogs变成dog)8.避免重复处理我们 的祖国 名称 是 ChI_na, we are 北大 dogs我们的祖国名称是chinawe北京大学dog1.1 创建分词器curl -XPUT --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323?pretty" -H 'Content-Type:application/json' -d'{"settings": {"analysis": {"char_filter": {"my_char_filter": {"type": "mapping","mappings": ["北大 =>北京大学","_ => "]}},"filter": {"my_stopword": {"type": "stop","stopwords": ["a","an", "but","are"]}},"tokenizer":{"my_tokenizer":{"type":"pattern","pattern":"[ ]"}},"analyzer":{"text_en_splitting_tight":{"type":"custom","char_filter":["my_char_filter"],"filter":["my_stopword","lowercase"],"tokenizer":"my_tokenizer"}}}}}'1.2 输入查询curl -XGET --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323/_analyze?pretty" -H 'Content-Type:application/json' -d'{ "analyzer":"text_en_splitting_tight","text":"我们 的祖国 名称 是 ChI_na, we are 北大 dogs"}'2 text_general分词名称分词器和过滤器输入分词效果text_generalIndex1.自动给拆分成的单个词添加type2.删除停顿词,比如a an but3.大写转换成小写我们 的祖国 名称 是 ChI_na, we are 北大 dogs我们的祖国名称是chinawe北大dogsquery1.自动给拆分成的单个词添加type2.删除停顿词,比如a an but3.替换成同义词,比如搜索北大换成北京大学4.大写转换成小写我们 的祖国 名称 是 ChI_na, we are 北大 dogs我们的祖国名称是chinawe北京大学dogs2.1 分词创建-indexcurl -XPUT --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323?pretty" -H 'Content-Type:application/json' -d'{"settings": {"analysis": {"char_filter": {"my_char_filter": {"type": "mapping","mappings": ["_ => "]}},"filter": {"my_stopword": {"type": "stop","stopwords": ["a","an", "but","are"]}},"tokenizer":{"my_tokenizer":{"type":"pattern","pattern":"[ ]"}},"analyzer":{"text_general":{"type":"custom","char_filter":["my_char_filter"],"filter":["my_stopword","lowercase"],"tokenizer":"my_tokenizer"}}}}}'2.2 输入查询-indexcurl -XGET --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323/_analyze?pretty" -H 'Content-Type:application/json' -d'{ "analyzer":"text_general","text":"我们 的祖国 名称 是 ChI_na, we are 北大 dogs"}'2.3 分词创建-querycurl -XPUT --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323?pretty" -H 'Content-Type:application/json' -d'{"settings": {"analysis": {"char_filter": {"my_char_filter": {"type": "mapping","mappings": ["北大 =>北京大学","_ => "]}},"filter": {"my_stopword": {"type": "stop","stopwords": ["a","an", "but","are"]}},"tokenizer":{"my_tokenizer":{"type":"pattern","pattern":"[ ]"}},"analyzer":{"text_general":{"type":"custom","char_filter":["my_char_filter"],"filter":["my_stopword","lowercase"],"tokenizer":"my_tokenizer"}}}}}'2.4 输入查询-querycurl -XGET --tlsv1.2 --negotiate -k -u : "https://xx.xx:24100/h0323/_analyze?pretty" -H 'Content-Type:application/json' -d'{ "analyzer":"text_general","text":"我们 的祖国 名称 是 ChI_na, we are 北大 dogs"}'
  • [基础组件] ElasticSearch评分函数样例
    Lucene(或Elasticsearch)使用布尔模型(Boolean model) 查找匹配文档,并用一个名为实用评分函数(practical scoring function) 的公式来计算相关度。ES中的自定义评分机制function_score主要用于让用户自定义查询相关性得分,实现精细化控制评分的目的详细参考: https://www.elastic.co/guide/cn/elasticsearch/guide/current/practical-scoring-function.html1 创建索引curl -XPUT cid:link_02 创建mappingcurl -H "Content-Type: application/json" -XPUT cid:link_0/video/_mapping?include_type_name=true -d '{ "video": { "properties": { "title": { "type": "text", "analyzer": "snowball" }, "description": { "type": "text", "analyzer": "snowball" }, "views": { "type": "integer" }, "likes": { "type": "integer" }, "created_at": { "type": "date" } } }}'3 添加数据curl -H "Content-Type: application/json" -XPUT cid:link_0/video/1 -d '{ "title": "Sick Sad World: Cold Breeze on the Interstate", "description": "Is your toll collector wearing pants a skirt or nothing but a smile Cold Breeze on the Interstate next on Sick ", "views": 500, "likes":2, "created_at": "2023-04-22T08:00:00"}'curl -H "Content-Type: application/json" -XPUT cid:link_0/video/2 -d '{ "title": "Sick Sad World: The Severed Pianist", "description": "When he turned up his nose at accordion lessons, they cut off his inheritance molto allegro. The Severed Pianist, ne", "views": 6000, "likes": 100, "created_at": "2023-04-22T12:00:00"}'curl -H "Content-Type: application/json" -XPUT cid:link_0/video/3 -d '{ "title": "Sick Sad World: Avant Garde Obstetrician", "description": "Meet the avant-garde obstetrician who has turned his cast offs into art work. Severed Umbilical cord sculpture next,", "views": 100, "likes": 130, "created_at": "2023-04-22T23:00:00"}'4 计算分数错误样例:curl -H "Content-Type: application/json" -XPOST cid:link_0/video/_search -d '{ "query": { "function_score": { "query": { "match": { "_all": "severed" } }, "script_score": { "script": "_score * Math.log(doc['likes'].value + doc['views'].value + 1)" } } }}'正确样例,注意单引号 \u0027A、使用ES内置的script_score方法计算分数curl -X GET "cid:link_0/video/_search?pretty" -H 'Content-Type: application/json' –d '{ "query": { "function_score": { "query": { "match": { "_all": "severed" } }, "script_score": { "script": { "source": "Math.log(2 + doc[\u0027likes\u0027].value)" } } } }}'输出结果:{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 0, "relation" : "eq" }, "max_score" : null, "hits" : [ ] }}B、使用衰减函数linear计算分数curl -H "Content-Type: application/json" -XPOST cid:link_0/video/_search -d ' { "query": { "function_score": { "functions": [ { "linear": { "views": { "origin": 5000, "scale": 2500 } } }, { "linear": { "likes": { "origin": 200, "scale": 90 } } } ] } }}'输出结果:{ "took": 5, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.35555556, "hits": [ { "_index": "searchtub_2", "_type": "video", "_id": "2", "_score": 0.35555556, "_source": { "title": "Sick Sad World: The Severed Pianist", "description": "When he turned up his nose at accordion lessons, they cut off his inheritance molto allegro. The Severed Pianist, ne", "views": 6000, "likes": 100, "created_at": "2023-04-22T12:00:00" } }, { "_index": "searchtub_2", "_type": "video", "_id": "3", "_score": 0.012222222, "_source": { "title": "Sick Sad World: Avant Garde Obstetrician", "description": "Meet the avant-garde obstetrician who has turned his cast offs into art work. Severed Umbilical cord sculpture next,", "views": 100, "likes": 130, "created_at": "2023-04-22T23:00:00" } }, { "_index": "searchtub_2", "_type": "video", "_id": "1", "_score": 0, "_source": { "title": "Sick Sad World: Cold Breeze on the Interstate", "description": "Is your toll collector wearing pants a skirt or nothing but a smile Cold Breeze on the Interstate next on Sick ", "views": 500, "likes": 2, "created_at": "2023-04-22T08:00:00" } } ] }}
  • [集成开发] 【开发样例】hdfs-springboot
    springboot调用hdfs1 HDFS简介HDFS(Hadoop Distribute File System)是一个适合运行在通用硬件之上,具备高度容错特性,支持高吞吐量数据访问的分布式文件系统,非常适合大规模数据集应用2 样例背景HDFS的业务操作对象是文件,代码样例中所涉及的文件操作主要包括创建文件夹写文件追加文件内容读文件删除文件/文件夹HDFS还有其他的业务处理,例如设置文件权限等,其他操作可以在掌握本代码样例之后,再扩展学习。3 Windows环境样例调用步骤环境准备https://bbs.huaweicloud.com/forum/thread-88552-1-1.html比对时间,与集群时间误差不能超过5分钟检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息在IDEA打开样例代码中的hdfs-springboot目录,默认会自动下载依赖,如未下载,选中该目录下的pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕从Manager界面下载用户认证凭据后,解压缩获取秘钥文件user.keytab和krb5.conf从客户端 /opt/client/HDFS/hadoop/etc/hadoop 目录中获取core-site.xml和hdfs-site.xml把上面获取的user.keytab\krb5.conf\core-stie.xml\hdfs-site.xml四个文件放到统一目录下配置application.properties中的用户名和配置文件存放目录(第7步文件的目录)打开测试类 HDFApplication.java, 文件右键执行Run 运行代码调用接口 POST cid:link_0创建hdfs目录4 Linux环境调试步骤完成Windows环境样例调用步骤在windows环境中执行打包检查linux环境时间与集群误差不超过5分钟检查linux环境的JDK版本为1.8检查linux环境的/etc/hosts文件中包含所有集群节点的域名IP映射信息创建样例执行路径,例如/opt/hdfstest上传windows环境打包后生成的target目录下的 hdfd-springboot-1.0-SNAPSHOT.jar 包到/opt/hdfstest目录上传windows环境中调试通过后的配置文件到/opt/hdfstest/conf目录配置application.properties中的用户名和配置文件存放目录(/opt/hdfstest/conf/)执行如下命令启动服务java –jar hdfs-springboot-1.0-SNAPSHOT.jar
  • [最佳实践] Flink读写Clickhouse
    Flink读写Clickhouse场景1: flink读取kafka数据写入clickhouse前提条件:已经配置好flink客户端准备kafka样例数据aaa,18,2023-4-14 bbb,20,2023-4-15 ccc,21,2023-4-16 ddd,22,2023-4-17确认连接clickhouse集群说明:连接的是ck的balancer实例,该实例会做负载,分发写入、读写任务。 HTTP端口是jdbc接口对应端口,TCP端口是后台客户端命令对应端口。打开样例工程写入CK需要用到ClickHouse_Sink以及WriteIntoCK修改样例代码,写入CK需要修改代码里面的ck链接方式注意:默认的ck集群名字为default_cluster。本示例使用的database是testdb,名字为flinkck。如果pom文件依赖导入有问题,可以使用手动打包方式:从安装好的客户端手动导入依赖包,如下图:参考如下图打包将打好的包放入后台路径比如/opt/flinkck使用如下命令启动flink集群./bin/yarn-session.sh -jm 1024 -tm 1024 -t test/ -d使用如下命令提交读取kafka写入ck的命令bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoCK /opt/flinkck/flinkckdemo.jar --topic writeck --bootstrap.servers xxx.xx.x.xx:21005准备kafka, topic名字为writeck, 使用如下命令生产数据bin/kafka-console-producer.sh --broker-list xxx.xx.x.xx:21005 --topic writeck --producer.config config/producer21005.propertiesflink任务:使用如下命令去ck检查 (第一次运行会自动建表)clickhouse client --host xxx.xx.x.xxx --user sandbox --password --port 21425 --secure场景2:读取ck数据打印完成场景1,或者在ck创建好场景1的ck数据打开样例工程,并修改ReadCK的clickhouse连接部分参考场景1的部分完成打包,并上传到后台/opt/flinkck路径使用如下命令启动flink集群./bin/yarn-session.sh -jm 1024 -tm 1024 -t test/ -d使用如下命令提交读取命令bin/flink run --class com.huawei.bigdata.flink.examples.ReadCK /opt/flinkck/flinkckdemo.jar打开任务检查打印内容
  • [运维宝典] hive设置永久udf函数流程(启用Ranger场景,未在ranger中设置用户为is Role admin权限)
    背景:由于有些场景在启用Ranger情况下,客户在分配权限时候对高权限有特别要求,尽可能给用户设置低权限,无法在ranger中设置用户为 is Role admin权限(hive管理员权限)测试版本:MRS-3.1.2版本(hive启用ranger鉴权)操作流程:1、在Ranger中配置用户具有udf创建查看权限以rangeradmin用户登录Ranger——>点击hive​添加一个新的策略​创建策略(注意步骤2可以选择指定库,步骤3中默认是table要选成udf,值可以定义为*)​(步骤4选择添加的用户,步骤5中至少要给create权限,drop权限为删除udf函数权限可选是否给)​2、在客户端安装节点,把UDF函数jar包打包(如AddDoublesUDF.jar),并上传到HDFS指定目录下(例如“/user/hive_examples_jars”)。创建函数的用户与使用函数的用户都需要具有该文件的可读权限。示例语句:hdfs dfs -put ./hive_examples_jars /user/hive_examples_jarshdfs dfs -chmod 777 /user/hive_examples_jars3、安全模式,需要使用具有udf create管理权限的用户登录beeline客户端,执行如下命令:kinit Hive业务用户beeline4、在Hive Server中定义该函数,以下语句用于创建永久函数:(其中addDoubles是该函数的别名,用于SELECT查询中使用)CREATE FUNCTION addDoubles AS 'com.huawei.bigdata.hive.example.udf.AddDoublesUDF' using jar 'hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar';以下语句用于创建临时函数:CREATE TEMPORARY FUNCTION addDoubles AS 'com.huawei.bigdata.hive.example.udf.AddDoublesUDF' using jar 'hdfs://hacluster/user/hive_examples_jars/AddDoublesUDF.jar';•addDoubles是该函数的别名,用于SELECT查询中使用。•关键字TEMPORARY说明该函数只在当前这个Hive Server的会话过程中定义使用。5、在Hive Server中使用该函数,执行SQL语句:SELECT addDoubles(1,2,3);6、在Hive Server中删除该函数,执行SQL语句:DROP FUNCTION addDoubles;
  • Flink消费kafka topic内容写入DWS
    Flink读取Kafka Topic消费内容写入DWS一、配置客户端进入集群客户端安装目录,例如“/opt/Bigdata/client/”,导入环境变量。cd /opt/Bigdata/client/ source bigdata_env执行kinit命令,输入用户密码进行用户认证。 kinit flinkuser进入“/opt”目录解压keytab文件。cd /opt/ tar -xvf xx_keytab.tar修改flink-conf.yaml文件。cd /opt/Bigdata/client/Flink/flink/conf vi flink-conf.yaml 修改参数如下: security.kerberos.login.keytab: /opt/user.keytab //修改keytab文件位置 security.kerberos.login.principal: flinkuser //修改用户名安全模式下需要将客户端安装节点的业务ip以及Manager的浮动ip追加到“/opt/Bigdata/client/Flink/flink/conf/flink-conf.yaml”文件中的jobmanager.web.allow-access-address配置项中,ip之间使用英文逗号分隔。生成证书文件。cd /opt/Bigdata/client/Flink/flink ./bin/generate_keystore.sh根据提示,输入用户自定义密码,在“/opt/Bigdata/client/Flink/flink/conf”目录下生成“flink.keystore”和“flink.truststore”。 在“flink”目录下新建“test”目录,将“flink.keystore”和“flink.truststore”拷贝到该目录下。cd /opt/Bigdata/client/Flink/flink/ mkdir test cp conf/flink.keystore test/ cp conf/flink.truststore test/修改flink-conf.yaml文件。cd /opt/Bigdata/client/Flink/flink/conf vi flink-conf.yaml 修改参数如下: security.ssl.keystore: test/flink.keystore //修改flink.keystore位置,必须为相对路径 security.ssl.truststore: test/flink.truststore //修改flink.truststore位置,必须为相对路径说明 “security.ssl.key-password”、“security.ssl.keystore-password”和“security.ssl.truststore-password”的值需要使用Manager明文加密API进行获取:curl -k -i -u : -X POST -HContent-type:application/json -d '{"plainText":""}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt';其中要与签发证书时使用的密码一致,x.x.x.x为集群Manager的浮动IP。 参数名和参数取值之间需要以空格隔开。进入Flink客户端主目录,启动集群。cd /opt/Bigdata/client/Flink/flink/ ./bin/yarn-session.sh -jm 1024 -tm 1024 -t test/ -d二、准备工作准备JAR包和配置文件 jar包可以随意创建目录放进去,后面启动程序时会指定路径,flink.properties配置文件必须放置到flink客户端安装的家目录(/opt/lyfclient/hadoopclient/Flink/flink/conf),根据实际客户端安装路径更改。 flink.properties文件driver="org.postgresql.Driver"; url="jdbc:postgresql://x.x.x.x:25308/default"; username= ""; password= ""; sql = "Copy demotest from STDIN DELIMITER ','"配置文件中配置:url连接数据的字符串,其中IP需要更换成200的数据库节点IP,端口号根据环境填写,postgres表示默认数据库,可以修改。username表示连接数据库的用户。password表示连接数据库的密码。sql = "Copy demotest from STDIN DELIMITER ','"GAUSSdb准备工作配置白名单su – omm gs_guc set -Z coordinator -N all -I all -h "host all allX.X.X.X/X sha256"备注:只需要修改IP及掩码即可,其他保持默认。创建数据库用户并赋权 按照顺序执行如下命令:1. su – omm 2. source /opt/huawei/Bigdata/mppdb/.mppdbgs_profile 3. gsql -d postgres -p 25308 -r 4. CREATE USER test WITH CREATEDB PASSWORD "Huawei@123"; 5. GRANT ALL PRIVILEGES TO test;创建GaussDB200数据表gsql -d postgres -p 25308 -r -Utest -WHuawei@123(需要提前创建test数据库用户,并赋予权限) create table demotest(id integer,name varchar2);使用新用户登录,并创建样例表gsql -d postgres -p 25308 -r -Utest -passwd create table demotest(id integer,name varchar2);三、提交任务将打包好的flinkdws jar包提交到对应的目录下提交命令bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoDWS /opt/sandbox/FlinkKafkaJavaExample.jar --topic dws --bootstrap.servers x.x.x.x:21005在对应的topic中写入数据cd $KAFKA_HOME bin/kafka-console-producer.sh --broker-list X.X.X.X:21007 --topic dws --producer.config config/producer.properties消费数据bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --bootstrap-server x.x.x.x:21007 --topic dws --from-beginning查看flink 页面任务 查看是否读取到了写入的数据 四、查看数据 进入GAUSSDB数据库su - omm source /opt/huawei/Bigdata/mppdb/.mppdbgs_profile gsql -d test -p 25308 (注意postgres是默认database)select * from demotest; 读取的数据都写入了该表
  • Spark读安全kafka写安全hbase, hdfs, es
    Spark读安全kafka写安全hbase, hdfs, es场景sparkstreaming读取安全kafka两个topic的数据分别写入hbase两张表,hdfs两个表以及es赞批方式写入两个索引数据源(参考样例代码 CreateData类):Detail kafka数据 2010-00-03 16:32:51|1|65358977|汇总|6|Upward Report kafka数据 2|0710123708000000500000000000|状态报告匹配消息|99604118259|99602358259|2003-02-13 16:32:51|2007-07 16:35:52|10|expired,短消息超过有效期|22|详细内2容提前在kafka创建两个topic名字叫做detaildata以及reportdata对接Kafka安全模式注意事项spark使用yarn client模式提交命令时使用Spark客户端下面的jaas.zk(driver端读取)以及jaas-zk.conf (executor端读取) Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="sandbox" useTicketCache=false storeKey=true debug=true; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="sandbox" useTicketCache=false storeKey=true debug=true; };注意:配置的路径为相对路径是为了和后面的spark-submit命令 --files 配合使用,也可以填绝对路径。对接安全ES注意事项ES kerberos安全认证为华为自研开发如图所示,ESclient默认会获取运行当前目录的conf下获取配置文件进行认证,需要三个文件esParams.properties、krb5.conf、user.keytab。其中esParams.properties样例内容如下然后自己做认证,不用特别在代码里写认证逻辑详细说明参考产品文档相关部分:需要注意的是,要看HwRestClient初始化是发生在spark driver端还是executor端。如果是driver端,则需要在提交命令的linux路径创建conf目录,放入esParams.properties, user.keytab, krb5.conf配置文件如果是executor端,则需要使用zip conf.zip esParams.properties krb5.conf user.keytab打包提交任务是使用 --archive指定比如 --archives /home/sandbox/conf.zip#conf对接HDFS安全认证注意事项hdfs相关写入在spark架构中做认证,无需在代码特别写认证逻辑,只需要source /opt/hadoopclient/bigdata_env,kinit认证之后spark-submit提交即可spark-submit提交命令是 --keytab --principal参数是保证长时间运行spark任务的时候driver端使用指定参数使用该参数做认证,下发到executor端,避免长时间任务挂掉--files参数是上传需要读取的配置文件对接HBase安全认证注意事项hbase认证在driver端用代码做好认证,可以指定对应的hdfs-site.xml,core-site.xml以及hbase-site.xml配置文件,或者可以在提交spark-submit之前使用source /opt/hadoopclient/bigdata_env自动加载init加载配置文件login做kerberos认证打包以及提交任务使用IDEA进行打包,上传至客户端某路径比如/opt/sandbox,准备好提交jar包, conf目录放置es配置文件,以及jars里面是可能需要的jar包spark-submit提交命令 spark-submit --master yarn-client --files ./user.keytab --class com.huawei.bigdata.spark.examples.streaming.SparkOnStreamingToESHdfsHbase_master --keytab /opt/sandbox/user.keytab --principal sandbox --archives /opt/sandbox/conf.zip#conf --jars /opt/sandbox/jars/spark-streaming-kafka-0-10_2.12-3.1.1-h0.cbu.mrs.320.r11.jar,./jars/kafka-clients-2.4.0-h0.cbu.mrs.320.r11.jar,./jars/spark-token-provider-kafka-0-10_2.12-3.1.1-h0.cbu.mrs.320.r11.jar /opt/sandbox/Spark2HBaseESHDFS-1.0.jar /tmp detaildata,reportdata x.x.x.x:21007写入数据并检查结果分别向kafka的detaildata以及reportdata写入数据检查hbase(任务会自动创建hbase表,表名为 detail+年份+月份, report+年份+月份)检查hdfs数据(需要手动创建hive表)#hive建表查询 CREATE external TABLE `detail_table`( `date` varchar(128), `gateway_name` varchar(128), `enterprise_code` varchar(128), `business_code` varchar(128), `service_code` varchar(128), `send_direction` varchar(128)) stored as parquet location '/sandbox/detail_data'; #spark查 val parqDF = spark.read.parquet("/sandbox/detail_data") parqDF.createOrReplaceTempView("ParquetTable") spark.sql("select * from ParquetTable").explain() val parkSQL = spark.sql("select * from ParquetTable") parkSQL.show()#hive建表查询 CREATE external TABLE `report_table`( `gatewayname` varchar(128), `message_id` varchar(128), `message_type` varchar(128), `calling_number` varchar(128), `called_number` varchar(128), `submission_time` varchar(128), `final_end_time` varchar(128), `message_status` varchar(128), `message_status_description` varchar(128), `message_length` varchar(128), `details` varchar(128)) stored as parquet location '/sandbox/report_data'; #spark查 val parqDF = spark.read.parquet("/sandbox/report_data") parqDF.createOrReplaceTempView("ParquetTable") spark.sql("select * from ParquetTable").explain() val parkSQL = spark.sql("select * from ParquetTable") parkSQL.show()检查es数据(任务会自动创建es索引,索引名为 detail+年份+月份, report+年份+月份)curl -XGET --tlsv1.2 --negotiate -k -u : 'https://x.x.x.x:24100/report_2023_3/_search?pretty'curl -XGET --tlsv1.2 --negotiate -k -u : 'https://x.x.x.x:24100/detail_2023_3/_search?pretty'FAQ问题1: 使用spark-submit提交命令时,遇到es认证报错分析报错可知es启动client的时候需要读取指定配置文件esParams.properties里的es node连接信息mrs的es组件会默认找当前目录下conf目录作为配置文件的存放路径(esParams.properties, user.keytab, krb5.conf)。又根据代码知道es认证发生在driver端,所以认证方法同executor端有所区别。解决办法: 在指定路径比如/opt/sandbox准备好conf目录,以及配置文件再次提交问题解决
  • Spark读取安全kafka写安全ES
    Spark读取安全kafka写安全ES场景使用sparkstreaming读取安全kafka topic的数据,写入同集群安全模式的ES。kafka topic名字sandboxtopic, 写入es 索引名examplehuaweiKafka准备样例数据创建样例topic,比如sandboxtopic,需要提前创建样例数据:{"date":"2023-2-15","textbody":"hello","title":"haha"} {"date":"2023-2-16","textbody":"hello2","title":"haha2"} {"date":"2023-2-17","textbody":"hello3","title":"haha3"} {"date":"2023-2-18","textbody":"hello4","title":"haha4"}对接Kafka安全模式注意事项spark使用yarn client模式提交命令时使用Spark客户端下面的jaas.conf(driver端读取)以及jaas-zk.conf (executor端读取)Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="sandbox" useTicketCache=false storeKey=true debug=true; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="sandbox" useTicketCache=false storeKey=true debug=true; };注意:配置的路径为相对路径是为了和后面的spark-submit命令 --files 配合使用,也可以填绝对路径。对接安全ES注意事项ES kerberos安全认证为华为自研开发如图所示,ESclient默认会获取运行当前目录的conf下获取配置文件进行认证,需要三个文件esParams.properties、krb5.conf、user.keytab。其中esParams.properties样例内容如下然后自己做认证,不用特别在代码里写认证逻辑详细说明参考产品文档相关部分:针对上述特点,使用spark-submit提交命令时,如果运行ES的kerberos认证逻辑在executor端发生,由于yarn无法放置conf目录,需要借助--archive实现。可以使用如下步骤完成。由下图可知es认证在executor发生 注意:如果es认证是在driver端发生,则需要在提交命令的linux创建conf目录放入这三个配置文件在提交linux客户端准备好三个文件esParams.properties、krb5.conf、user.keytab。使用命令``zip conf.zip esParams.properties krb5.conf user.keytab`打包提交任务是使用 --archive指定比如 --archives /home/sandbox/conf.zip#conf打包以及提交任务使用IDEA进行打包,上传至客户端某路径比如/home/sandbox,准备好打包的conf.zip, 提交jar包,以及jars里面是可能需要的jar包提交命令spark-submit --master yarn --deploy-mode client --class streaming010.TestStreaming010 --keytab /home/sandbox/user.keytab --principal sandbox --files ./user.keytab --archives /home/sandbox/conf.zip#conf --jars ./jars/spark-streaming-kafka-0-10_2.12-3.1.1-h0.cbu.mrs.320.r11.jar,./jars/kafka-clients-2.4.0-h0.cbu.mrs.320.r11.jar,./jars/spark-token-provider-kafka-0-10_2.12-3.1.1-h0.cbu.mrs.320.r11.jar /home/sandbox/sparkkafka2es-1.0.0.jar /tmp/test 172.16.9.114:21007 sandboxtopic 10写入数据并检查结果bin/kafka-console-producer.sh --broker-list x.x.x.x:21007 --topic sandboxtopic --producer.config config/producer.propertiescurl -XGET --tlsv1.2 --negotiate -k -u : 'https://x.x.x.x:24100/examplehuawei/_search?pretty'注意点1:本地模式调试读取安全kafka在local模式调试的时候需要注意,要在代码里面做认证。如果使用spark-submit提交可以不需要在代码里做认证,会通过--principal --keytab参数做认证连接安全模式kafka需要配置jaas.conf配置文件,可以参考上图的setJaasFile配置好KafkaClient否则会报错:Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)注意点2:本地模式调试读取安全ES参考上述特点,本地调试ES时需要准备对应的配置conf目录,放置三个文件esParams.properties、krb5.conf、user.keytab即可认证FAQ错误1:写入ES的时候在Spark Executor报错仔细观察堆栈信息发现es的配置没有读取到报错原因:写入ES的时候需要读取ES相关的配置文件esParams.properties, 认证的user.keytab, krb5.conf三个文件打包esParams.properties文件内容如下在linux可以使用命令打包zip conf.zip esParams.properties krb5.conf user.keytab成功读取到配置文件后executor正确日志
总条数:206 到第
上滑加载中