SparkSQL使用整理(一)

阅读: 评论:0

SparkSQL使用整理(一)

SparkSQL使用整理(一)

1. Import  spark.implicits._ 的作用

支持在DataFrame中使用 $”列名” 这个语法糖,表示的是Column对象

df.select($"name").show()

如果不加$,那么就会当一个String来处理

 

支持把一个RDD隐式转换为一个DataFrame

---- 引申: 目前Spark官网提供了三种方法来实现从RDD转DataFrame

1.1 直接指定列名,让Spark自动推断类型

1.2利用反射机制推断RDD模式

需要一个case class 还有上面那个导入,然后RDD就可以用.toDF了

1.3使用编程方式定义RDD模式

无法提前定义case class时,采用编程的方式定义RDD,即定义: val schema = StructType(StructField(name,StringType,true), StructField(age,StringType,true))

接着使用ateDataFrame(RDD,schema)即可

/ (基于Spark2.1.0的)

ps:如果是DataSet的话,只能用1.2和1.3的方式,因为DataSet是一个typed transformations,在代码中就要指定好数据的类型,它的类型检查是在编译期完成的,而DataFrame是Untyped transformations,它的类型检查是在运行期完成的

 

2. 报错有个包找不到 org.spark_project.urrent.ExecutionError: java.lang.NoClassDefFoundError:org/codehaus/commons/compiler/UncheckedCompileException

参考这个

将SparkSQL自身的那两个包排掉,换成网页中提到的3.0.7的包

`<dependency>

  <groupId>org.apache.spark</groupId>

  <artifactId>spark-sql_2.11</artifactId>

  <version>2.1.1</version>

  <exclusions>

   <exclusion>

     <groupId&dehaus.janino</groupId>

     <artifactId>janino</artifactId>

   </exclusion>

   <exclusion>

     <groupId&dehaus.janino</groupId>

     <artifactId>commons-compiler</artifactId>

   </exclusion>

  </exclusions>

 </depenedency>

 <dependency>

   <groupId&dehaus.janino</groupId>

   <artifactId>commons-compiler</artifactId>

   <version>3.07</version>

 </dependency>

 <dependency>

   <groupId&dehaus.janino</groupId>

   <artifactId>janino</artifactId>

   <version>3.07</version>

 </dependency>`

 

3.    看来spark读取json文件的话,会自己构建一个schema,不需要自己外部隐式定义一个case class什么了.(验证过了,是对的)

 

注意这个import, 导入的不是类,而是sparkSession这个变量里面的东西,sparkSession是你自己创建的SparkSession

 

 

4.     DataFrame使用SQL语句

         5.1要想使用SQL语句的功能的话,得把DataFrame转换位类似数据库的表,然后才能对这张”表”使用SQL语句进行操作.(注册成表的性能会比直接操作DataSet快还是慢?)

         注册成的表有两种类型:

                  5.1.1  LocalTempView

                  5.1.2  GlobalTempView

         两者的区别在于它们的生命周期,LocalTempView生命周期和sparksession生命周期相同,使用时通过sparksession变量调用即可. 而另外一个是注册在一个叫global_temp这张系统表上的,可以在多个session中共享,使用的时候需要用SELECT * FROM global_temp.people.

         5.2 还需要import sparkSession.implicits._,不然也是不能使用SQL语句的

 

DataFrame中的每一个元素都是一个Row,代表一行数据

 

6.     Datasets

         没有使用java或者Kryo序列化,而是使用了Encoder这个类来序列化对象,每定义一种类型的Datasets,就需要编写对应的Encoder类,不然会报错!.

 

---- 引申: 目前Spark官网提供了两种方法来实现从RDD转Dataset,和上面的DataFrame几乎是一样的…

1.1利用反射机制推断RDD模式

需要一个case class 还有上面那个导入,然后RDD转为DataFrame

因为DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame

转化为DataSet&#DF()这样可以把DataSet转化为DataFrame。

                  或者直接转为ad.json(path).as[Person]

 

1.2使用编程方式定义RDD模式

                  比RDD转DataFrame要麻烦点,想要将original RDD -> RDD[Row]的结构,然后创建对应的StructType, ateDataFrame(rowRDD, schema)… 这不还是转成DataFrame么?!

         轨迹生成的代码里面好像也是转成DataFrame的,然后使用groupByKey之类的操作之后,DF就自动变成DataSet了!

         那就是说,根本就没有直接转成Datasets的方法,RDD只能先转成DataFrame,然后再转换到DataSet…

 

7.parquet merge schema的理解

         即数据集合并,相当于是做笛卡尔积(全连接). 可以分散在两个文件中,也可以在同一个文件中.

         合并规则为:

                  7.1 相同的列,在新的数据集中,是通用的列

                  7.2 各自不同的列,作为新的数据集的列

 

参考:.html

 

 

再引申说一点parquet的东西:

         Parquet是一种基于列式存储的结构,列式存储因为同一列的很多内容可能相同,所以易于压缩,间接的减少了磁盘IO,并且支持分区等功能(如果业务中只是读取少量的列的话,是非常适合用parquet的!)

         Spark中在读取Spark文件的时候,可以不用定义schema就可以直接生产DataFrame,所有的columns都会被自动发现(分区发现—需要深入去理解下,不懂这个机制)

参考: .html

 

 

8.Data Source为JSON

         Schema也不需要写,会自动检测出来,并且ad.json(path),获取的直接是DataSet[ROW]的结构.

 

9.使用dataframe.cache的时候,压缩默认为true

 

参考:

(RDD/DataFrame/DataSet互转)

本文发布于:2024-02-03 08:39:18,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170692075849896.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:SparkSQL
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23