• [问题求助] 提交Spark作业,AdvancedTableMapping报错NullpointException
    提交Spark-submit到Yarn时,报错:java.lang.NullPointException at org.apache.spark.om.hdfs.AdvancedTableMapping.load(AdvancedTableMapping.scala:24) at org.apache.spark.om.hdfs.AdvancedTableMapping.resolve(AdvancedTableMapping.scala:65) at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(CachedDNSToSwitchMapping.java:119)AdvancedTableMapping这个类在spark-hw-plugin_2.12-8.2.0-320.r77这个包里。感觉是加载某个东西没配置??
  • [问题求助] spark写高斯数据库异常提示
    采用spark将计算好的数据写入高斯数据库,提示invalid input syntax for type oid:"xxxxx"。导致部分数据无法写入这个异常具体怎么回事,有大神能够帮忙解释一下吗?
  • [问题求助] hive-sql和spark-sql语句问题
    1.同样的表,使用hive-sql和spark-sql查询出的数据不一样?2.spark-sq,用union all 的时候,也没有起到作用。但是同样的sql,用hive执行就可以?
  • [问题求助] spark filter 中文列名支持
    我用mysql创建了一个中文列名的表,然后根据这个表创建的spark的dataset,如果直接查询没有问题,spark可以正常解析== Parsed Logical Plan =='Project ['人员, 'col1]+- Project [人员#242, 1 AS col1#245] +- Project [人员#242] +- Project [cast(人员#240 as string) AS 人员#242] +- Relation [人员#240] JDBCRelation(`test1111`) [numPartitions=1]== Analyzed Logical Plan ==人员: string, col1: intProject [人员#242, col1#245]+- Project [人员#242, 1 AS col1#245] +- Project [人员#242] +- Project [cast(人员#240 as string) AS 人员#242] +- Relation [人员#240] JDBCRelation(`test1111`) [numPartitions=1]== Optimized Logical Plan ==Project [人员#240, 1 AS col1#245]+- Relation [人员#240] JDBCRelation(`test1111`) [numPartitions=1]== Physical Plan ==*(1) Project [人员#240, 1 AS col1#245]+- *(1) Scan JDBCRelation(`test1111`) [numPartitions=1] [人员#240] PushedFilters: [], ReadSchema: struct<人员:string>一旦我准备对中文列名做数据过滤dataset = dataset.filter(" ( (`人员` = '111') ) ");spark就无法解析了== Parsed Logical Plan =='Project ['人员, 'col1]+- Filter (人员#200 = 111) +- Project [人员#200, 1 AS col1#203] +- Project [人员#200] +- Project [cast(人员#198 as string) AS 人员#200] +- Relation [人员#198] JDBCRelation(`test1111`) [numPartitions=1]== Analyzed Logical Plan ==人员: string, col1: intProject [人员#200, col1#203]+- Filter (人员#200 = 111) +- Project [人员#200, 1 AS col1#203] +- Project [人员#200] +- Project [cast(人员#198 as string) AS 人员#200] +- Relation [人员#198] JDBCRelation(`test1111`) [numPartitions=1]== Optimized Logical Plan ==Project [人员#198, 1 AS col1#203]+- Filter (isnotnull(人员#198) AND (人员#198 = 111)) +- Relation [人员#198] JDBCRelation(`test1111`) [numPartitions=1]== Physical Plan ==org.apache.spark.sql.catalyst.parser.ParseException: Syntax error at or near '人'(line 1, pos 0)== SQL ==人员^^^请问有人知道原因和解决方案么
  • [问题求助] spark-submit提交任务报错AdvancedTableMapping not found
    提交命令如上,报错如下:使用独立的MRS环境,请问这个问题是还缺少什么依赖么
  • [生态对接] spark --jars提交依赖冲突,有没有办法忽略集群中的依赖。只使用fat-jar和--jar提供的
         我最近开发了一个maven项目,想使用spark读取/写入greenplum的数据,但是由于jdbc的传输速度限制。所以想采用greenplum-spark connect这个连接器。当我使用--jars将项目和这个依赖包一起提交上去的时候出现了jar包冲突 报错:classnotfound。  同时自己搭建了一套开源集群,相同的步骤 spark读取/写入greenplum 并且也使用这个连接器--jars提供第三方依赖包,正常读取数据。    所以我想有没有办法忽略集群中的依赖。只使用fat-jar和--jar提供的      
  • [生态对接] 使用greenplum-spark connect连接器遇到的坑
       我最近开发了一个maven项目,想使用spark读取/写入greenplum的数据,但是由于jdbc的传输速度限制。所以想采用greenplum-spark connect这个连接器。当我使用--jars将项目和这个依赖包一起提交上去的时候出现了jar包冲突 报错:classnotfound。所以我又尝试使用jarjar.jar修改相应的包名    但是现在又出现了新的问题,代码中的postgresql依赖找不到对应的class类名报错。   既然外部没办法搞定就从内部项目入手 ……  由于这个依赖属于第三方并不是maven仓库官方提供所以没法加载进入项目中。我尝试加入私有仓库然后再放入我的本地项目中使用maven-shade-plugin将类名包名重定向。不知道是不是第三方依赖的原因,只要提交到cluster上后就会显示找不到类,相当于没有加上这个greenplum-spark依赖。请各位大佬看看是什么原因。 我这边在本地用idea或者Local模式提交都是可以成功的
  • [运维管理] FusionInsight HD 6513 在线升级 FusionInsight HD 6517版本 需要多长时间?怎么评估的?
    FusionInsight HD 6513 在线升级 FusionInsight HD 6517版本  需要多长时间?怎么评估的?
  • [运维管理] FusionInsight HD 6513升级 FusionInsight HD 6517版本,是否支持部分组件(如kafka 、zookeeper)在线升级,其他组件离线升级?
    FusionInsight HD 6513升级 FusionInsight HD 6517版本,是否支持部分组件在线升级,其他组件离线升级?
  • [问题求助] hudi静态表的timeline文件无法自动归档,导致hdfs小文件过多
    mrs320版本,hudi0.11。场景是静态表离线跑批。使用spark-sql每天向hudi cow表里insert select 0条数据,timeline文件无法archive,导致小文件越来越多希望大佬给个解决方案spark-sql复现步骤如下:--创建源表CREATE TABLE emp_test ( empno int, ename string, job string, mgr int, hiredate string, sal int, comm int, deptno int, tx_date string)using hudioptions( type='cow' ,primaryKey='empno' ,payloadclass='org.apache.hudi.common.model.OverwriteNonDefaultWithLatestAvroPayLoad' ,preCombineField='tx_date' ,hoodie.cleaner.commits.retained='1' ,hoodie.keep.min.commits='2' ,hoodie.keep.max.commits='3' ,hoodie.index.type='SIMPLE');insert into emp_test values(7369,'SMITH','CLERK',7902,'1980-12-17',800,100,20,'2022-11-17'),(7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600,300,30,'2022-11-17'),(5233,'ANDY','DEVELOPER',9192,'1996-05-30',5000,3000,10,'2022-11-13');--创建2表create table emp_test2 using hudioptions ( type='cow' ,primaryKey='empno' ,payloadclass='org.apache.hudi.common.model.OverwriteNonDefaultWithLatestAvroPayLoad' ,preCombineField='tx_date' ,hoodie.cleaner.commits.retained='1' ,hoodie.keep.min.commits='2' ,hoodie.keep.max.commits='3' ,hoodie.index.type='SIMPLE' ) as select * from emp_test where 1<>1;--初始化2表insert into emp_test2 select * from emp_test;--2表每天无新增数据insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;insert into emp_test2 select * from emp_test limit 0;观察hdfs2表/.hoodie下timeline instant文件一直新增,不归档/archived(无归档文件)
  • [分享交流] 基于华为云的Spark集群环境搭建
    Hadoop集群搭建1、环境配置本文搭建Hadoop平台,在master主机的/usr/local目录下新建hadoop文件夹,将下载好的hadoop压缩包上传到该文件夹中,使用如下命令进行解压:# cd /usr/local/hadoop# tar -zxvf hadoop-2.7.1.tar.gz修改配置文件/etc/profile,添加如下配置:export HADOOP_HOME=/usr/local/hadoop/hadoop-2.7.1export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin输入命令使配置文件生效:# source /etc/profile2、修改相关文件修改master主机中Hadoop的如下配置文件,这些配置文件都位于/usr/local/hadoop/hadoop-2.7.1/etc/hadoop目录下。修改slaves文件,这里让master节点主机仅作为NameNode节点使用。slave01slave02修改hadoop-env.sh export JAVA_HOME项:export JAVA_HOME=/usr/local/java/jdk1.8.0_202修改core-site.xml:<configuration> <property> <name>hadoop.tmp.dir</name> <value>file:/usr/local/hadoop/hadoop-2.7.1/tmp</value> <description>Abase for other temporary directories.</description> </property> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000</value> </property></configuration>修改hdfs-site.xml:<configuration> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.namenode.http-address</name> <value>master:50070</value> </property> <property> <name>dfs.namenode.secondary.http-address</name> <value>master:50090</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/usr/local/hadoop/hadoop-2.7.1/hdfs/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/usr/local/hadoop/hadoop-2.7.1/hdfs/data</value> </property> </configuration>修改mapred-site.xml:将mapred-site.xml.template文件内容复制到mapred-site.xml,再修改mapred-site.xml文件。相关命令和修改内容如下:# cd /usr/local/hadoop/hadoop-2.7.1/etc/hadoop# cp mapred-site.xml.template mapred-site.xml<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property></configuration>修改yarn-site.xml:<configuration> <!-- Site specific YARN configuration properties --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>master</value> </property></configuration>3、slave节点配置Hadoop通过如下命令将master主机的hadoop目录拷贝给slave01和slave02。# scp -r /usr/local/hadoop/ root@slave01:/usr/local/hadoop/# scp -r /usr/local/hadoop/ root@slave02:/usr/local/hadoop/再配置/etc/profile文件中Hadoop相关环境变量即可。4、启动Hadoop集群在master主机中输入如下命令即可启动Hadoop集群:# cd /usr/local/hadoop/hadoop-2.7.1# bin/hdfs namenode -format# sbin/start-all.sh注:关闭Hadoop集群命令如下,尽量保证服务有开就有停,可以避免很多问题出现。# sbin/stop-all.sh5、查看Hadoop管理页面浏览器输入http://master公网IP:50070即可访问到如下Hadoop管理页面。
  • [最佳实践] 利用Spark-mllib进行聚类,分类,回归分析的代码实现(python)
    利用Spark-mllib进行聚类,分类,回归分析的代码实现(python)前提条件安装python3环境,以及需要连接的MRS集群环境下载python3 源码 编译tar zxvf Python-3.8.0.tgz cd Python-3.8.0 mkdir -p /usr/local/python-3.8.0 ./configure --prefix=/usr/local/python-3.8.0 -enable-optimizations --with-ssl make && make install 编译 ln -s /usr/local/python-3.8.0/bin/python3 /usr/bin/python3 ln -s /usr/local/python-3.8.0/bin/pip3 /usr/bin/pip3 ll /usr/bin/python*上传加载数据集创建文件file.csvfeature1,feature2,feature3,label 1.2,3.4,2.8,0 2.1,4.5,1.7,0 3.5,2.8,6.1,1 4.2,5.1,3.9,0 5.3,1.9,2.2,1将该文件上传至hdfshdfs dfs -mkdir -p /tmp/sandboxhdfs dfs -put file.csv /tmp/sandbox在这个示例中,数据集有四个特征列(feature1、feature2、feature3)和一个标签列(label)。每一行代表一个数据样本,特征列的值用逗号分隔,最后是标签的值。这个数据集可以用于分类或回归任务的训练和测试,以下是一些可能的使用方式:分类任务:加载数据集:使用 Spark 的数据加载功能读取 file.csv 文件,并将其转换为 DataFrame 格式。特征工程:根据实际情况,对特征进行预处理、特征选择或特征提取。你可以使用 Spark ML 的特征转换器(如 VectorAssembler、StandardScaler)来创建特征向量,并对特征进行标准化等处理。划分训练集和测试集:将数据集划分为训练集和测试集,一般按照某种比例进行划分,例如 70% 的数据作为训练集,30% 的数据作为测试集。训练模型:选择合适的分类算法(如逻辑回归、决策树、随机森林等)进行模型训练。使用训练集数据拟合模型。模型评估:使用测试集数据对训练好的模型进行评估,计算分类性能指标(如准确率、精确率、召回率、F1 值等)。调优和优化:根据评估结果,调整模型参数或尝试其他的分类算法,以获得更好的性能。回归任务:加载数据集:使用 Spark 的数据加载功能读取 file.csv 文件,并将其转换为 DataFrame 格式。特征工程:根据实际情况,对特征进行预处理、特征选择或特征提取。你可以使用 Spark ML 的特征转换器(如 VectorAssembler、StandardScaler)来创建特征向量,并对特征进行标准化等处理。划分训练集和测试集:将数据集划分为训练集和测试集,一般按照某种比例进行划分,例如 70% 的数据作为训练集,30% 的数据作为测试集。训练模型:选择合适的回归算法(如线性回归、岭回归、决策树回归等)进行模型训练。使用训练集数据拟合模型。模型评估:使用测试集数据对训练好的模型进行评估,计算回归性能指标(如均方误差(MSE)、均方根误差(RMSE)、平均绝对误差(MAE)等)。调优和优化:根据评估结果,调整模型参数或尝试其他的回归算法,以获得更好的性能。聚类(Clustering)示例:K-means 算法import sys sys.path.insert(0, '/opt/140client/Spark2x/spark/python') sys.path.insert(0, '/opt/140client/Spark2x/spark/python/lib/py4j-0.10.9-src.zip') import os os.environ["PYSPARK_PYTHON"]="/usr/anaconda3/bin/python3" import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark import SparkContext from pyspark.ml.clustering import KMeans from pyspark.ml.evaluation import ClusteringEvaluator from pyspark.ml.feature import VectorAssembler os.system('source /opt/140client/bigdata_env') os.system('echo password | kinit 用户名') spark = SparkSession \ .builder \ .appName("MLlibPythonExample") \ .getOrCreate() # 加载数据集 data = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("/tmp/sandbox/file.csv") # 创建特征向量列 assembler = VectorAssembler(inputCols=data.columns, outputCol="features") data = assembler.transform(data) # 训练 K-means 聚类模型 kmeans = KMeans(k=2, seed=123) model = kmeans.fit(data) # 进行预测 predictions = model.transform(data) # 评估聚类模型 evaluator = ClusteringEvaluator() silhouette = evaluator.evaluate(predictions) print("Silhouette with squared euclidean distance = " + str(silhouette))输出轮廓系数为0.370580730000838Silhouette(轮廓系数)是一种用于评估聚类算法效果的指标。它结合了样本与其所分配到的簇内部的紧密度和与其他簇之间的分离度,用于衡量聚类结果的紧密性和分离性。轮廓系数的取值范围在[-1, 1]之间,具体含义如下:接近 1 表示样本与所在簇的紧密度高且与其他簇的分离度好,聚类效果较好。接近 0 表示样本与所在簇的紧密度和与其他簇的分离度相当,聚类效果一般。接近 -1 表示样本与所在簇的紧密度低且与其他簇的分离度差,聚类效果较差。分类(Classification)示例:逻辑回归算法import sys sys.path.insert(0, '/opt/140client/Spark2x/spark/python') sys.path.insert(0, '/opt/140client/Spark2x/spark/python/lib/py4j-0.10.9-src.zip') import os os.environ["PYSPARK_PYTHON"]="/usr/anaconda3/bin/python3" import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark import SparkContext os.system('source /opt/140client/bigdata_env') os.system('echo password | kinit 用户名') spark = SparkSession \ .builder \ .appName("MLlibPythonExample2") \ .getOrCreate() from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import VectorAssembler # 加载数据集 data = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("/tmp/sandbox/file.csv") # 创建特征向量列和标签列 assembler = VectorAssembler(inputCols=data.columns[:-1], outputCol="features") data = assembler.transform(data) # 划分训练集和测试集 trainData, testData = data.randomSplit([0.7, 0.3], seed=123) # 训练逻辑回归模型 lr = LogisticRegression() model = lr.fit(trainData) # 进行预测 predictions = model.transform(testData) # 评估模型性能 evaluator = BinaryClassificationEvaluator(labelCol="label") accuracy = evaluator.evaluate(predictions) print("Accuracy: ", accuracy)输出准确率为1.0 准确率(Accuracy)是一种用于评估分类算法效果的指标,表示分类器正确分类的样本数量与总样本数量之比。准确率是最常用的分类性能指标之一,它简单直观,并且对于数据类别相对均衡的情况下是一种有效的度量。然而,当数据类别不平衡时,准确率可能会产生误导,因为一个简单的分类器只需要将所有样本都预测为多数类别,就能获得较高的准确率。在这种情况下,需要考虑其他性能指标(如精确率、召回率、F1 值等)来全面评估分类器的表现。回归(Regression)示例:线性回归算法import sys sys.path.insert(0, '/opt/140client/Spark2x/spark/python') sys.path.insert(0, '/opt/140client/Spark2x/spark/python/lib/py4j-0.10.9-src.zip') import os os.environ["PYSPARK_PYTHON"]="/usr/anaconda3/bin/python3" import pyspark from pyspark.sql import SparkSession from pyspark import SparkConf from pyspark import SparkContext os.system('source /opt/140client/bigdata_env') os.system('echo password | kinit 用户名') spark = SparkSession \ .builder \ .appName("MLlibPythonExample3") \ .getOrCreate() from pyspark.ml.regression import LinearRegression from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.feature import VectorAssembler # 加载数据集 data = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("/tmp/sandbox/file.csv") # 创建特征向量列和标签列 assembler = VectorAssembler(inputCols=data.columns[:-1], outputCol="features") data = assembler.transform(data) # 划分训练集和测试集 trainData, testData = data.randomSplit([0.7, 0.3], seed=123) # 训练线性回归模型 lr = LinearRegression() model = lr.fit(trainData) # 进行预测 predictions = model.transform(testData) # 评估模型性能 evaluator = RegressionEvaluator(labelCol="label") rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}) print("RMSE: ", rmse)RMSE的值为0.8103354037267079均方根误差(Root Mean Square Error,RMSE)是一种常用的评估回归模型预测精度的指标,用于衡量实际观测值与模型预测值之间的差异程度。RMSE 的值越小,表示模型的预测精度越高,与真实观测值之间的差异程度越小。RMSE 是对误差的平均值进行开方,因此保留了误差的单位,通常与原始数据的单位相同。使用spark-submit提交任务创建脚本mllib.py,上传认证凭据user.keytab至对应目录下提交命令spark-submit --master yarn /opt/sandbox/mllib.py --keytab /opt/sandbox/user.keytab --principal sandbox聚类(Clustering)示例:K-means 算法 分类(Classification)示例:逻辑回归算法 回归(Regression)示例:线性回归算法
  • [赋能学习] sprk-sql访问数据库
    1 spark-sql访问clickhouse数据库1.1 启动命令:spark-sql --master yarn --deploy-mode client --jars /home/hadoopclient/Spark/spark/jars/clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r33.jarjar包下载地址:clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r33.jar​1.2 创建表:CREATE TABLE ck_test USING org.apache.spark.sql.jdbc options ( driver 'ru.yandex.clickhouse.ClickHouseDriver', url 'jdbc:clickhouse://{addr}:{port}', dbtable '{database}.{table}', user '{username}', password '{password}', ssl 'true', isCheckConnection: 'true', sslMode 'none');​参数说明:参数说明{database}连接ck的数据库名{addr}:{port}连接ck的地址和端口{username}登录ck的用户名{password}登录ck的密码{table}ck表名1.3 查询结果:输入select * from ck_test查询结果2 spark-sql访问mysql数据库2.1 启动命令:spark-sql --master yarn --deploy-mode client --jars /home/hadoopclient/Spark/spark/jars/mysql-connector-java-8.0.24.jar​2.2 创建表:CREATE TABLE mysql_test USING org.apache.spark.sql.jdbc options ( driver ' com.mysql.cj.jdbc.Driver', url ' jdbc:mysql://{addr}:{port}', dbtable '{database}.{table}', user '{username}', password '{password}');参数说明:说明参数{database}连接mysql的数据库名{addr}:{port}连接mysql的地址和端口,默认端口为3306{username}登录mysql的用户名{password}登录mysql的密码{table}mysql表名说明:mysql本身设定有访问权限,一般来讲安装的时候如果没有允许远程访问,非localhost 的IP是无法访问到mysql的,需要创建远程访问用户并授权。1、创建远程访问用户:create user 'spark_test'@'%' identified by '123456';创建的用户名:spark_test用户的密码:1234562、给创建用户授权:grant all privileges on test_data.* to 'spark_test'@'%' with grant option;这里表示给用户spark_test赋予数据库test_data(这是之前创建好的数据库)中所有表的所有权限3、刷新权限:flush privileges;2.3 查询结果输入select * from mysql_test查询结果3 spark-sql访问PostgreSQL数据库3.1 启动命令:spark-sql --master yarn --deploy-mode client --jars /home/hadoopclient/Spark/spark/jars/postgresql-42.6.0.jar​3.2 创建表:CREATE TABLE postgres_test USING org.apache.spark.sql.jdbc options ( driver 'org.postgresql.Driver', url 'jdbc:postgresql://{addr}:{port}/{database}', dbtable '{schema}.{table}', user '{username}', password '{password}');参数说明:参数说明{database}连接的数据库名{addr}:{port}连接Postgresql的地址和端口,默认端口为5432{username}登录postgresql的用户名{password}登录postgresql的密码{schema} {table}分别为postgresql的schema(模式)和表名3.3 查询结果输入select * from postgres_test 查询结果
  • [赋能学习] MRS3.2.0版本 二次开发: Spark读写Clickhouse样例
    一、Spark简介Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景:数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。二、背景说明在Spark应用中,用户可以通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作。通过ClickHouse JDBC驱动创建数据库和表,并插入数据。然后使用Spark JDBC接口读取ClickHouse表中数据,进行转换处理后再追加写入到ClickHouse表中。主要分为四个部分:创建ClickHouse数据库和表,将数据插入表中。使用Spark JDBC接口读取ClickHouse表中数据。注册临时表,并对表中字段ID进行处理,返回新的数据集。将新的数据集数据追加写入到ClickHouse表中。三、样例调试前提:Linux环境有安装集群客户端比对“本地时间和Linux机器时间”与集群时间误都不能超过5分钟检查linux环境的JDK版本为1.8配置linux环境的/etc/hosts文件检查 C:\Windows\System32\drivers\etc\hosts文件中是否包含所有集群节点的域名IP映射信息在IDEA打开样例代码的SparkOnEsJavaExample目录,检查SDK配置默认自动加载依赖,如未加载,则打开后选中pom.xml文件,右键点击“Add As Maven Project”后等待项目自动将依赖下载完毕在Linux环境新建目录,例如“/opt/spark-on-ck/”在集群中获取clickhouse.jdbc驱动,将驱动放置到spark-on-ck中find / -name clickhouse-jdbc*10、打包样例代码在IDEA主页面,选择“View->Tool Windows->Maven”打开“Maven”工具窗口。在Maven工具窗口,选择clean生命周期,执行Maven构建过程。在Maven工具窗口,选择package生命周期,执行Maven构建过程。从IDEA项目目录下的target文件夹中获取到Jar包,拷贝到Spark运行环境下(即Spark客户端),如“/opt/spark-on-ck”。​​​​​​四、linux环境下运行1、java代码中设置了需传入5个参数,因此执行命令必须进行传入该集群的这5个参数jdbcUrl:jdbc:clickhouse://x.x.x.x:21428/testdb2?ssl=true&sslmode=noneckDBName: ck中的数据库,不需要提前创建ckTableName:ck中创建的表,不需要提前创建userName:集群用户名password:集群用户名密码参考产品文档,进入clickhouse命令为clickhouse client --host ip --user username --password --port 21425 --secure将打包好的jar包上传到spark-on-ck目录下,进入该目录,执行如下命令spark-submit --master yarn --deploy-mode client --jars ./clickhouse-jdbc-0.3.1-h0.cbu.mrs.320.r11.jar --class com.huawei.bigdata.spark.examples.SparkOnClickHouseExample SparkOnClickHouseJavaExample-1.0.jar "jdbc:clickhouse://x.x.x.x:21428/testdb2?ssl=true&sslmode=none" "testdb3" "testlyf" "lyf" "passwd"进入创建的数据库查看创建的表​​​​​​​
  • [赋能学习] spark经典维护案例集合
    spark全部案例集合见维护宝典:https://support.huawei.com/hedex/hdx.do?docid=EDOC1100222546&lang=zh&idPath=22658044|22662728|22666212|22396131(FusionInsight HD&MRS租户面集群故障案例(6.5.X-8.X)->维护故障类->spark->常见故障)spark经典案例、总结、重大问题见下表:经典案例分类序号案例出现频次spark性能问题1.1SparkStreaming任务因shuffle用时长导致任务整体变慢★★★★1.2★★1.3★★★★1.4★★★★★spark任务常见异常2.1spark任务executor心跳丢失★★★★★2.2spark-sql或spark-beeline查询报错:File does not exist★★2.3spark任务提交使用Python3★★2.4★★★JDBCServer任务失败排查3.1spark资源类相关参数介绍★★3.2★★★★★3.3★★★★★3.4★★★★★3.5★★★★★spark任务日志4.1spark基本原理介绍以及日志收集★★★4.2spark任务日志级别修改★★★4.3yarn-client模式下Driver端打印gc.log★★★