• Spark跨集群读HDFS写入ES样例代码(saveJsonToES接口)
    说明:使用开源的接口saveJsonToES写ES记录一下使用代码方式读A集群(161)的hdfs数据写入B集群(71)的es。使用样例代码自带的接口, 实际上用样例代码自带saveJsonToEs 接口写数据, 核心是让写es的代码不要放在远端,否则报错。所以是用71集群的客户端来提交命令的,这个任务起在71集群的yarn上面的需要做的操作如下1.    首先集群161和集群71双边配置互信提交的spark任务要提交到71集群上面,意味着需要用71对端集群的客户端来提交jar包2.    需要修改71对端客户端spark的配置文件/opt/client/Spark2x/spark/conf/spark-defaults.conf修改地方加​写成161集群的主namenode+25000形式,写两次3.    代码里面读hdfs的地方写成主namenode+25000方式读数据4.    然后打jar包,在后台提交的时候创建路径全部提交过程中都使用对端71集群的user.keytab和krb5.conf文件。 jars路径下面放对应jar包elasticsearch-spark-30_2.12-7.12.0jar包就是接口saveJsonToEs 需要的Jar包5.    然后提交的后台路径准备的文件esParams.properties是用来连ES的然后用命令 zip conf.zip krb5.conf user.keytab esParams.properties打包提交命令:cd /opt/sparkMultiEStestspark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.SparkOnEs --keytab /opt/sparkMultiEStest/user.keytab --principal sandbox@HUAWEI.COM --files ./user.keytab --archives /opt/sparkMultiEStest/conf.zip#conf --jars ./jars/elasticsearch-spark-30_2.12-7.12.0.jar,./jars/fastjson-1.2.4.jar /opt/sparkMultiEStest/SparkOnEs-1.0.jar去71集群检查es结果FAQSpark读hdfs的时候没问题Spark写ES的时候报错报错原因是标的参数要注释掉,不注释会报上面错误
  • [最佳实践] Spark跨集群读HDFS写入ES样例代码(highlevelclient接口)
    跨集群读HDFS写入ES说明集群A:xx.xx.xx.161-163 (域为:HADOOP.COM)集群B:xx.xx.xx.71-73 (域为:HUAWEI.COM)MRS版本:8.2.0.1任务说明:A、B集群已配置互信,使用B集群的keytab进行认证,在A集群的client客户端提交代码,从集群A读取HDFS数据,写入到集群B的ES中注意:如未特别注明,均在A集群上主节点操作准备工作1、将集群B的/etc/hosts路径下节点配置,追加到集群A的/etc/hosts2、下载集群B的user.keytab,krb5.conf传到A集群的/opt/sandbox目录下3、cd到A集群的/opt/sandbox目录下,执行如下命令,通过A集群的客户端,使用B集群的keytab文件连接B集群的ES,确保执行成功,证明集群A和集群B已互信source /opt/client/bigdata_env kinit -kt user.keytab sandbox@HUAWEI.COM curl -XGET --tlsv1.2 --negotiate -k -u : 'https://xx.xx.xx.71:24100/_cat/indices?v'只要不报错,就代表互信已经验证通过4、下载样例配置文件esParams.properties传到/opt/sandbox目录下,修改esServerHost属性为集群B的对应ip端口,以及其他的用户密码,esindex配置项5、使用如下命令,生成conf.zip压缩包zip conf.zip user.keytab krb5.conf esParams.properties6、将样例代码打成jar包,放到/opt/sandbox目录下7、将样例数据people.json上传到集群A的HDFS文件系统的/user/spark-on-es/路径下people.json数据{"id":"1","name":"jack","age":12,"createdTime":"2008-06-10T10:28:55Z"} {"id":"2","name":"pony","age":22,"createdTime":"2009-07-13T20:18:52Z"} {"id":"3","name":"susan","age":11,"createdTime":"2012-12-08T12:29:37Z"} {"id":"4","name":"micheal","age":13,"createdTime":"2015-10-09T16:53:26Z"} {"id":"5","name":"vordas","age":14,"createdTime":"2018-03-21T17:28:15Z"}8、在/opt/sandbox/jar路径下放入依赖jar包fastjson-1.2.4.jarspark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.SparkOnEs --keytab /opt/sandbox/user.keytab --principal sandbox@HUAWEI.COM --files ./user.keytab --archives /opt/sandbox/conf.zip#conf --jars ./jars/fastjson-1.2.4.jar /opt/sandbox/spark2es.jar说明: --class com.huawei.bigdata.spark.examples.SparkOnEs,为样例代码中主类的全类名--keytab /opt/sandbox/user.keytab, 为集群B的认证文件user.keytab--principal sandbox@HUAWEI.COM,为集群B的用户名@域名--files ./user.keytab, 为集群B的认证文件--archives /opt/sandbox/conf.zip#conf,为集群B的认证文件user.keytab,krb5.conf,esParams.properties./jars/fastjson-1.2.4.jar,为样例代码的依赖jar包/opt/sandbox/spark2es.jar, 为样例代码打成jar包后的路径位置查看结果9、在A集群的客户端执行curl -XGET --tlsv1.2 --negotiate -k -u : 'https://xxx.xx.x.71:24100/_cat/indices?v' curl -XGET --tlsv1.2 --negotiate -k -u : 'https://xxx.xx.x.71:24100/people/_search?pretty'命令查看结果 注意:xxx.xx.x.71为集群B节点ip,people为数据的索引
  • [运维管理] HD 线下6.5.1.7版本集群,hdfs 将副本临时调整1后再调回3会发生什么现象?
    HD 线下6.5.1.7版本集群,hdfs 将副本临时调整1后再调回3会发生什么现象?
  • [环境搭建] 线下HD 6517版本 ,扩容hbase是否可以只扩容一台?
    线下HD 6517版本 ,扩容hbase是否可以只扩容一台?
  • [运维管理] habse 每次写入提交的条数多少条合适?为什么?
    habse 每次写入提交的条数多少条合适?为什么?
  • [运维管理] flume 怎么实现平替--CDL?
    flume 怎么实现平替--CDL?
  • [环境搭建] 为什么不建议Flume和DataNode部署在同一节点?为什么会存在数据不均衡的风险?
    为什么不建议Flume和DataNode部署在同一节点?为什么会存在数据不均衡的风险?
  • Python对接Kafka安全模式
    Python对接安全kafka相关版本MRS: 820Kafka版本: 2.4.0Python相关依赖 Python: 3.8.8Python相关依赖./pip3 freeze | grep kafkaconfluent-kafka==2.2.0 kafka==1.3.5 kafka-python==2.0.2./pip3 freeze | grep krbticketkrbticket==1.0.6./pip3 freeze | grep gssapigssapi==1.8.3Producer代码from krbticket import KrbConfig, KrbCommand import os from kafka import KafkaProducer import json jaas_conf = os.path.join('', '/opt/kafka_jaas.conf') krb5_conf = os.path.join('', '/opt/sandbox/krb5.conf') user_name = 'sandbox' keytab_conf = os.path.join('', f'/opt/sandbox/user.keytab') jaas_conf = os.path.join( '/opt/kafka_jaas.conf') os.environ['KRB5CCNAME'] = os.path.join('', f'/tmp//krb5cc_0') kconfig = KrbConfig(principal='sandbox@HADOOP.COM', keytab=keytab_conf) KrbCommand.kinit(kconfig) os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \ f' -Djava.security.krb5.conf={krb5_conf}' producer = KafkaProducer(bootstrap_servers=['xxx.xxx.xx.xx:21007'], security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', api_version=(2,4,0)) import json msg = json.dumps("haha").encode() producer.send('aaa',msg)注意:需准备认证的有户名,keytab文件,krb5.conf文件创建kafka_jaas.conf文件,内容参考KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true principal="sandbox@HADOOP.COM" keyTab="/opt/sandbox/user.keytab" useTicketCache=false serviceName="kafka" storeKey=true debug=true; };需要检查集群域名,配置sasl_kerberos_domain_name参数,默认是hadoop.hadoop.com,如果集群更改域名需更改注意api_version参数和MRS kafka版本对应produce生产数据的时候要对数据value做序列化,参考代码最后写入json转码部分kafka topic名字是aaa测试效果:消费结果Consumer代码from kafka import KafkaConsumer from kafka import KafkaProducer from kafka.errors import KafkaError import sys from krbticket import KrbConfig, KrbCommand import os from kafka import KafkaProducer import json jaas_conf = os.path.join('', '/opt/kafka_jaas.conf') krb5_conf = os.path.join('', '/opt/sandbox/krb5.conf') user_name = 'sandbox' keytab_conf = os.path.join('', f'/opt/sandbox/user.keytab') jaas_conf = os.path.join( '/opt/kafka_jaas.conf') os.environ['KRB5CCNAME'] = os.path.join('', f'/tmp//krb5cc_0') kconfig = KrbConfig(principal='sandbox@HADOOP.COM', keytab=keytab_conf) KrbCommand.kinit(kconfig) os.environ['KAFKA_OPTS'] = f'-Djava.security.auth.login.config={jaas_conf}' \ f' -Djava.security.krb5.conf={krb5_conf}' consumer2 = KafkaConsumer('aaa', bootstrap_servers=['xxx.xx.x.xxx:21007'], security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', auto_offset_reset='earliest', group_id='python_mfa_group', sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name='hadoop.hadoop.com', api_version=(2,4,0)) for message in consumer2: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))查看效果客户端生产查看结果FAQ创建consumer报错 问题原因:需要在代码中做kerberos认证,以及确认kafka版本填入到API参数中 在produce数据的时候报错: 问题原因,该方法produce是需要序列化,参考下图更改 在produce数据的时候超时报错:同时consume的时候报错:问题原因:Python依赖gssapi没有安装,使用如下命令安装./pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple gssapi
  • [互动交流] CTBase Java 连接失败
    如何获取HD 6.5.1 CTBase Java API 二次开发指导文档或样例代码?
  • [互动交流] FusionInsight HD 6.5.1版本数据节点重装操作系统后需执行preinstall嘛?
    集群环境:生成环境问题描述:某单一数据节点已完成操作系统的重装,现在需在manager页面重装主机,疑惑的点是重装主机之前需不需要做preinstall和precheck?
  • [二次开发] FusionInsight HD 6517版本集群,hbase如何实现两张表数据一致性比对?
    FusionInsight HD 6517版本集群,hbase如何实现两张表数据一致性比对?
  • [最佳实践] Python对接clickhouse
    Python对接clickhouseclickhouse通用jdbc端口clickhouse jdbc接口使用HTTP协议,具体对应华为clickhouse端口可以在Manager->clickhouse页面 逻辑集群部分查看针对非加密、加密端口,对接使用的jdbc url有区别,具体如下非加密端口 21426 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21426/default加密端口 21428 对应jdbc连接 url为: jdbc:clickhouse://x.x.x.x:21428/default?ssl=true&sslmode=none其中连接的ip 为 clickhouse balancer实例对应的ip注意:本次使用非加密端口进行对接前提条件安装python3环境,以及需要连接的MRS集群环境 1、 下载python3 源码 编译tar zxvf Python-3.8.0.tgz cd Python-3.8.0 mkdir -p /usr/local/python-3.8.0 ./configure --prefix=/usr/local/python-3.8.0 -enable-optimizations --with-ssl make && make install 编译 ln -s /usr/local/python-3.8.0/bin/python3 /usr/bin/python3 ln -s /usr/local/python-3.8.0/bin/pip3 /usr/bin/pip3 ll /usr/bin/python*使用通用jdbc进行连接安装对应依赖./pip3 install jpype1==1.4.1 ./pip3 install JayDeBeApi==1.2.3Python代码import jaydebeapi import jpype import os conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:21426/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar']) import pandas as pd sql = "Select * From addressbook" df_ck = pd.read_sql(sql, conn) df_ck conn.close()注解: 将所需的lib文件放在对应目录下commons-codec-1.15.jar commons-logging-1.2.jar httpclient-4.5.13.jar httpcore-4.4.13.jar lz4-java-1.7.1.jar slf4j-api-1.7.36.jar bcprov-jdk15on-1.70.jar clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar us-common-1.0.66.jar使用clickhouse_connect进行连接安装对应依赖./pip3 install clickhouse_connect==0.6.4 python代码import clickhouse_connect client = clickhouse_connect.get_client(host='x.x.x.x', port=21426, username='username', password='passwd') client.command('show tables') client.command('select * from people')注:参照cid:link_0FAQPython代码执行报错classnotfoundconn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse:// x.x.x.x:port/default?ssl=true&sslmode=none?user=username&password=passwd!@",jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar'])查看该方法源码 help(jaydebeapi.connect)解决方法: 修改connection的参数设置为conn = jaydebeapi.connect("ru.yandex.clickhouse.ClickHouseDriver","jdbc:clickhouse://x.x.x.x:port/default",["username","passwd"],jars=['/opt/lyf/lib1/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar','/opt/lyf/lib1/commons-codec-1.15.jar','/opt/lyf/lib1/commons-logging-1.2.jar','/opt/lyf/lib1/httpclient-4.5.13.jar','/opt/lyf/lib1/httpcore-4.4.13.jar','/opt/lyf/lib1/lz4-java-1.7.1.jar','/opt/lyf/lib1/slf4j-api-1.7.36.jar','/opt/lyf/lib1/us-common-1.0.66.jar','/opt/lyf/lib1/bcprov-jdk15on-1.70.jar'])python代码执行报错classnotfoundgrep –R ‘x.x.x.x’ 文件夹找到对应的jar包放到对应目录下,在jars参数上加上对应的jar包
  • [运维管理] Kafka集群为什么会建议集群总分区数不超过10000?
    Kafka集群为什么会建议集群总分区数不超过10000?总分区数要减去副本分区数不?
  • [最佳实践] Loader使用RestAPI最佳实践
    Loader简介Loader是实现FusionInsight HD与关系型数据库、文件系统之间交互数据和文件的数据加载工具。基于开源Sqoop研发,做了大量优化和扩展。提供可视化向导式的作业配置管理界面;提供定时调度任务,周期性执行Loader作业;在界面中可指定多种不同的数据源、配置数据的清洗和转换步骤、配置集群存储系统等。Loader的特点图形化:提供图形化配置、监控界面,操作简便。高性能:利用MapReduce并行处理数据。高可靠:Loader Server采用主备双机;作业通过MapReduce执行,支持失败重试;作业失败后,不会残留数据。安全:Kerberos认证;作业权限管理。背景loader界面提供了任务历史记录以及运行状态等信息展示,客户需要通过RestAPI方式获取任务状态以及历史记录信息关键点1.交互过程使用kerberos认证,http访问场景下也叫做spnego认证2.同组件原生界面交互时需要跟服务端做ssl3.认证文件user.keytab文件以及krb5.conf放置于conf目录下代码解读 //loader String url = "https://x.x.x.x:20026/Loader/LoaderServer/124/loader/v1/job/all?paged=true&offset=1&limit=2&kw=&group=0&order=desc&order-by=cdate"; // String url = "https://x.x.x.x:20026/Loader/LoaderServer/124/loader/v1/submission/history/3?paged=true&limit=10&offset=1"; System.out.println("PATH_TO_KEYTAB " + PATH_TO_KEYTAB); System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");loader端口为20026,第一个url为获取所有任务信息,limit可以修改获取任务条数;第二个url为获取单个任务历史记录,history后为用户id,可以修改运行样例:获取全部任务:获取单个任务历史记录:返回json{"all":[{"exception":"","counters":{"org.apache.hadoop.mapreduce.FileSystemCounter":{"FILE_LARGE_READ_OPS":0,"HDFS_BYTES_READ_EC":0,"FILE_WRITE_OPS":0,"HDFS_READ_OPS":270,"HDFS_BYTES_READ":3659,"HDFS_LARGE_READ_OPS":0,"FILE_READ_OPS":0,"FILE_BYTES_WRITTEN":11554899,"FILE_BYTES_READ":0,"HDFS_WRITE_OPS":91,"HDFS_BYTES_WRITTEN":166800009},"org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter":{"BYTES_WRITTEN":166800009},"org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter":{"BYTES_READ":0},"org.apache.hadoop.mapreduce.JobCounter":{"TOTAL_LAUNCHED_MAPS":30,"MB_MILLIS_MAPS":3549155328,"SLOTS_MILLIS_REDUCES":0,"VCORES_MILLIS_MAPS":866493,"SLOTS_MILLIS_MAPS":3465972,"OTHER_LOCAL_MAPS":30,"MILLIS_MAPS":866493},"org.apache.sqoop.submission.counter.SqoopCounters":{"ROWS_SKIPPED":0,"ROWS_READ":5903983,"ROWS_WRITTEN":5903983},"org.apache.hadoop.mapreduce.TaskCounter":{"SPILLED_RECORDS":0,"MERGED_MAP_OUTPUTS":0,"VIRTUAL_MEMORY_BYTES":124308541440,"MAP_INPUT_RECORDS":0,"MAP_PHYSICAL_MEMORY_BYTES_MAX":541884416,"SPLIT_RAW_BYTES":3659,"FAILED_SHUFFLE":0,"PHYSICAL_MEMORY_BYTES":15452405760,"GC_TIME_MILLIS":15096,"MAP_VIRTUAL_MEMORY_BYTES_MAX":4216520704,"MAP_OUTPUT_RECORDS":5903983,"CPU_MILLISECONDS":817330,"COMMITTED_HEAP_BYTES":18252038144}},"last-update-date":1686644843155,"last-udpate-user":"xxx","output":"--: --","input":"MYSQL: server_diskspace[null]","caller":-1,"creation-user":"xxx","progress":1.0,"creation-date":1686644784844,"external-id":"job_1680145428800_0188","dirty-data-link":"http:\/\/172-16-9-118:25002\/explorer.html#\/user\/loader\/etl_dirty_data_dir\/1\/1680145428800_0188","job":1,"external-link":"http:\/\/172-16-4-22:26000\/proxy\/application_1680145428800_0188\/","status":"SUCCEEDED"}],"total-num":1}任务执行失败返回json{"all":[{"output":"--: --","exception":"执行SQL语句失败。 原因: sql execute error","input":"MYSQL: people[null]","caller":-1,"creation-user":"xxx","progress":-1.0,"creation-date":1686644587315,"last-update-date":1686644587315,"dirty-data-link":"","job":3,"last-udpate-user":"xxx","status":"FAILURE_ON_SUBMIT"}],"total-num":1}
  • [运维管理] 线下的FusionInsight HD 6513版本集群spark2X是否有方案关闭kerberos
    线下的FusionInsight HD 6513版本集群spark2X是否有方案关闭kerberos
总条数:202 到第
上滑加载中