本文主要介绍spark读取hdfs文本文件,并利用spark-sql进行join操作,最后将结果写入hdfs文件系统,话不多说,直接上代码。代码是基于在windows上安装的hadoop,提交到yarn上可以不加hdfs文件的根路径。
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")val ss = SparkSession.builder().config(conf).getOrCreate()import ss.implicits._val sc = ss.sparkContext
可以通过sparkContext或者sparkSession来读取,但是各自的返回值类型不一样,后续的处理也有细微差别。
sparkContext读取文本后返回的是RDD[String]
sparkSession返回的是DataFrame,且只有一列,需要将DataFrame转成RDD,转换之后变成:RDD[Row]。如果需要对RDD[Row]作map操作,并且还需要对每一行数据进行处理的话,还得增加一个步骤,将RDD[Row]转成String:
RDD[Row].map(_.mkString(’’))
RDD[Row].map(_.getString(0))
注意:如果line是以空(null)结尾,分割之后会读不出来最后一个位置的元素,需要对map之后的每一行作处理,将最后一个位置补null或者空字符串
//本地hdfs路径val bp = "hdfs://localhost:9000/fenghuo/test/"//sc.textFile返回的是rdd[String]val math_rdd = sc.textFile(bp+"").map(line => {val fields = line.split("t")(fields(0),fields(1),fields(2))}).foreach(println)val math_df = (bp+"").rdd //转成rdd[ROW].map(line => {val ls = String(0)val fields = ls.split("t")dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","role","team").show(10)
结果如下:
**File()的结果:'**
(罗斯,PG,KNICKS)
(威少,PG,LAKERS)
(霍华德,C,LAKERS)
(哈登,SG,NETS)
(锡安,PF,PELICANS)
(老詹,SF,LAKERS)
(科比,SG,LAKERS)**ad.text()的结果:'**
+------+----+--------+
| name|role| team|
+------+----+--------+
| 威少| PG| LAKERS|
| 哈登| SG| NETS|
| 老詹| SF| LAKERS|
| 科比| SG| LAKERS|
| 罗斯| PG| KNICKS|
|霍华德| C| LAKERS|
| 锡安| PF|PELICANS|
+------+----+--------+
join时候需要做null值处理,在写入hdfs文件时,如果有null值,null不会被写入,特别是导入hive,可能会导致数据错位,常用方法是将null值转成空字符串:""。
ateOrReplaceTempView("math_df_table")ateOrReplaceTempView("sports_df_table")val join_df = ss.sql("select " +"nvl(a.name,'') as name," +"nvl(a.age,'') as age," +"nvl(a.skills,'') as skills," +"le,'') as role," +"am,'') as team" +" from sports_df_table a left join math_df_table b on " +"a.name=b.name " +"order by age desc").show(10)
结果如下:
+------+---+----------+----+------+
| name|age| skills|role| team|
+------+---+----------+----+------+
|艾弗森| 41| CROSSOVER| | |
| 麦迪| 41| | | |
| 科比| 40| FADEWAY| SG|LAKERS|
| 韦德| 33|LIGHTENING| | |
| 老詹| 33| KING| SF|LAKERS|
| 威少| 30| DUNK| PG|LAKERS|
|杜兰特| 30| SCORE| | |
| | 30| | | |
| 罗斯| 30| SPEED| PG|KNICKS|
| 哈登| 29| | SG| NETS|
+------+---+----------+----+------+
通过()写入hdfs文件。
注意:直接用dataframe.write方式写入文本文件时候,需要注意dataframe只能有一列,否则会报错。
解决方法:将DataFrame合并成一列
//方法一:val allColumnsLine = lumns.mkString(",")val join_result_df = join_df.selectExpr(s"concat_ws('t',$allColumnsLine) " +s"as allColumns2OneColumn")//方法二:val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('t',name,age,skills,role,team) " +s"as allColumns2OneColumn")join_partition(1).de("overwrite").text(path = bp+"spark/joinResult")
结果如下:
scala.sparkCoreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}object ReadAndWriteHdfs {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")val ss = SparkSession.builder().config(conf).getOrCreate()val sc = ss.sparkContextimport ss.implicits._//本地hdfs路径val bp = "hdfs://localhost:9000/fenghuo/test/"//sc.textFile返回的是rdd[String]val math_rdd = sc.textFile(bp+"").map(line => {val fields = line.split("t")(fields(0),fields(1),fields(2))})val math_df = (bp+"").rdd //转成rdd[ROW].map(line => {val ls = String(0)val fields = ls.split("t")dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","role","team")val sports_df = (bp+"sports/*").rdd.map(line => {val ls = String(0)val fields = ls.split("t")val length = fields.dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","age","skills")//创建表,作join操作println("创建临时表 math_df_table")ateOrReplaceTempView("math_df_table")println("创建临时表 sports_df_table")ateOrReplaceTempView("sports_df_table")println("join结果如下: ")ss.sql("select a.name,a.age,a.am from sports_df_table a left join math_df_table b on " +"a.name=b.name").show(10)val join_df = ss.sql("select " +"nvl(a.name,'') as name," +"nvl(a.age,'') as age," +"nvl(a.skills,'') as skills," +"le,'') as role," +"am,'') as team" +" from sports_df_table a left join math_df_table b on " +"a.name=b.name " +"order by age desc")//方法一:将df变成一列,且每一行都是以","分割的Stringval allColumnsLine = lumns.mkString(",")val join_result_df = join_df.selectExpr(s"concat_ws('t',$allColumnsLine) " +s"as allColumns2OneColumn").show(10)//直接拼接列名val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('t',name,age,skills,role,team) " +s"as allColumns2OneColumn")join_partition(1).de("overwrite").text(path = bp+"spark/joinResult")
// Thread.sleep(20000)ss.close()}
}
本文发布于:2024-01-30 02:27:37,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170655286318587.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |