spark基于dataFrame和sparksql对hdfs文件夹下多个文件进行读、写、join等操作

阅读: 评论:0

spark基于dataFrame和sparksql对hdfs文件夹下多个文件进行读、写、join等操作

spark基于dataFrame和sparksql对hdfs文件夹下多个文件进行读、写、join等操作

  本文主要介绍spark读取hdfs文本文件,并利用spark-sql进行join操作,最后将结果写入hdfs文件系统,话不多说,直接上代码。代码是基于在windows上安装的hadoop,提交到yarn上可以不加hdfs文件的根路径。

1,准备数据文件

<


<

2,代码实现

2.1,初始化sparkContext或者sparkSession

    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")val ss  = SparkSession.builder().config(conf).getOrCreate()import ss.implicits._val sc = ss.sparkContext

2.2,读取hdfs上的文件

  可以通过sparkContext或者sparkSession来读取,但是各自的返回值类型不一样,后续的处理也有细微差别。
  sparkContext读取文本后返回的是RDD[String]
  sparkSession返回的是DataFrame,且只有一列,需要将DataFrame转成RDD,转换之后变成:RDD[Row]。如果需要对RDD[Row]作map操作,并且还需要对每一行数据进行处理的话,还得增加一个步骤,将RDD[Row]转成String:
  RDD[Row].map(_.mkString(’’))

  RDD[Row].map(_.getString(0))

注意:如果line是以空(null)结尾,分割之后会读不出来最后一个位置的元素,需要对map之后的每一行作处理,将最后一个位置补null或者空字符串

    //本地hdfs路径val bp = "hdfs://localhost:9000/fenghuo/test/"//sc.textFile返回的是rdd[String]val math_rdd = sc.textFile(bp+"").map(line => {val fields = line.split("t")(fields(0),fields(1),fields(2))}).foreach(println)val math_df = (bp+"").rdd //转成rdd[ROW].map(line => {val ls = String(0)val fields = ls.split("t")dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","role","team").show(10)

结果如下:

**&#File()的结果:'**
(罗斯,PG,KNICKS)
(威少,PG,LAKERS)
(霍华德,C,LAKERS)
(哈登,SG,NETS)
(锡安,PF,PELICANS)
(老詹,SF,LAKERS)
(科比,SG,LAKERS)**&#ad.text()的结果:'**
+------+----+--------+
|  name|role|    team|
+------+----+--------+
|  威少|  PG|  LAKERS|
|  哈登|  SG|    NETS|
|  老詹|  SF|  LAKERS|
|  科比|  SG|  LAKERS|
|  罗斯|  PG|  KNICKS|
|霍华德|   C|  LAKERS|
|  锡安|  PF|PELICANS|
+------+----+--------+

2.3,建临时表,做join操作

  join时候需要做null值处理,在写入hdfs文件时,如果有null值,null不会被写入,特别是导入hive,可能会导致数据错位,常用方法是将null值转成空字符串:""。

    ateOrReplaceTempView("math_df_table")ateOrReplaceTempView("sports_df_table")val join_df = ss.sql("select " +"nvl(a.name,'') as name," +"nvl(a.age,'') as age," +"nvl(a.skills,'') as skills," +"le,'') as role," +"am,'') as team" +" from sports_df_table a left join math_df_table b on " +"a.name=b.name " +"order by age desc").show(10)

结果如下:

+------+---+----------+----+------+
|  name|age|    skills|role|  team|
+------+---+----------+----+------+
|艾弗森| 41| CROSSOVER|    |      |
|  麦迪| 41|          |    |      |
|  科比| 40|   FADEWAY|  SG|LAKERS|
|  韦德| 33|LIGHTENING|    |      |
|  老詹| 33|      KING|  SF|LAKERS|
|  威少| 30|      DUNK|  PG|LAKERS|
|杜兰特| 30|     SCORE|    |      |
|      | 30|          |    |      |
|  罗斯| 30|     SPEED|  PG|KNICKS|
|  哈登| 29|          |  SG|  NETS|
+------+---+----------+----+------+

2.4,写入hdfs文件

  通过()写入hdfs文件。
注意:直接用dataframe.write方式写入文本文件时候,需要注意dataframe只能有一列,否则会报错。
解决方法:将DataFrame合并成一列

	//方法一:val allColumnsLine = lumns.mkString(",")val join_result_df = join_df.selectExpr(s"concat_ws('t',$allColumnsLine) " +s"as allColumns2OneColumn")//方法二:val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('t',name,age,skills,role,team) " +s"as allColumns2OneColumn")join_partition(1).de("overwrite").text(path = bp+"spark/joinResult")

结果如下:

3,项目整体代码如下,希望能帮到你

scala.sparkCoreimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}object ReadAndWriteHdfs {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("SparkHdfsOps")val ss  = SparkSession.builder().config(conf).getOrCreate()val sc = ss.sparkContextimport ss.implicits._//本地hdfs路径val bp = "hdfs://localhost:9000/fenghuo/test/"//sc.textFile返回的是rdd[String]val math_rdd = sc.textFile(bp+"").map(line => {val fields = line.split("t")(fields(0),fields(1),fields(2))})val math_df = (bp+"").rdd //转成rdd[ROW].map(line => {val ls = String(0)val fields = ls.split("t")dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","role","team")val sports_df = (bp+"sports/*").rdd.map(line => {val ls = String(0)val fields = ls.split("t")val length = fields.dsWith("t")){(fields(0),fields(1),null)}else{(fields(0),fields(1),fields(2))}}).toDF("name","age","skills")//创建表,作join操作println("创建临时表 math_df_table")ateOrReplaceTempView("math_df_table")println("创建临时表 sports_df_table")ateOrReplaceTempView("sports_df_table")println("join结果如下: ")ss.sql("select a.name,a.age,a.am from sports_df_table a left join math_df_table b on " +"a.name=b.name").show(10)val join_df = ss.sql("select " +"nvl(a.name,'') as name," +"nvl(a.age,'') as age," +"nvl(a.skills,'') as skills," +"le,'') as role," +"am,'') as team" +" from sports_df_table a left join math_df_table b on " +"a.name=b.name " +"order by age desc")//方法一:将df变成一列,且每一行都是以","分割的Stringval allColumnsLine = lumns.mkString(",")val join_result_df = join_df.selectExpr(s"concat_ws('t',$allColumnsLine) " +s"as allColumns2OneColumn").show(10)//直接拼接列名val join_result_df1:DataFrame = join_df.selectExpr(s"concat_ws('t',name,age,skills,role,team) " +s"as allColumns2OneColumn")join_partition(1).de("overwrite").text(path = bp+"spark/joinResult")
//    Thread.sleep(20000)ss.close()}
}

本文发布于:2024-01-30 02:27:37,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170655286318587.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:多个   文件夹   操作   文件   dataFrame
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23