1. Import spark.implicits._ 的作用
支持在DataFrame中使用 $”列名” 这个语法糖,表示的是Column对象
df.select($"name").show()
如果不加$,那么就会当一个String来处理
支持把一个RDD隐式转换为一个DataFrame
---- 引申: 目前Spark官网提供了三种方法来实现从RDD转DataFrame
1.1 直接指定列名,让Spark自动推断类型
1.2利用反射机制推断RDD模式
需要一个case class 还有上面那个导入,然后RDD就可以用.toDF了
1.3使用编程方式定义RDD模式
无法提前定义case class时,采用编程的方式定义RDD,即定义: val schema = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
接着使用ateDataFrame(RDD,schema)即可
/ (基于Spark2.1.0的)
ps:如果是DataSet的话,只能用1.2和1.3的方式,因为DataSet是一个typed transformations,在代码中就要指定好数据的类型,它的类型检查是在编译期完成的,而DataFrame是Untyped transformations,它的类型检查是在运行期完成的
2. 报错有个包找不到 org.spark_project.urrent.ExecutionError: java.lang.NoClassDefFoundError:org/codehaus/commons/compiler/UncheckedCompileException
参考这个
将SparkSQL自身的那两个包排掉,换成网页中提到的3.0.7的包
`<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
<exclusions>
<exclusion>
<groupId&dehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
<exclusion>
<groupId&dehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
</exclusions>
</depenedency>
<dependency>
<groupId&dehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.07</version>
</dependency>
<dependency>
<groupId&dehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.07</version>
</dependency>`
3. 看来spark读取json文件的话,会自己构建一个schema,不需要自己外部隐式定义一个case class什么了.(验证过了,是对的)
注意这个import, 导入的不是类,而是sparkSession这个变量里面的东西,sparkSession是你自己创建的SparkSession
4. DataFrame使用SQL语句
5.1要想使用SQL语句的功能的话,得把DataFrame转换位类似数据库的表,然后才能对这张”表”使用SQL语句进行操作.(注册成表的性能会比直接操作DataSet快还是慢?)
注册成的表有两种类型:
5.1.1 LocalTempView
5.1.2 GlobalTempView
两者的区别在于它们的生命周期,LocalTempView生命周期和sparksession生命周期相同,使用时通过sparksession变量调用即可. 而另外一个是注册在一个叫global_temp这张系统表上的,可以在多个session中共享,使用的时候需要用SELECT * FROM global_temp.people.
5.2 还需要import sparkSession.implicits._,不然也是不能使用SQL语句的
DataFrame中的每一个元素都是一个Row,代表一行数据
6. Datasets
没有使用java或者Kryo序列化,而是使用了Encoder这个类来序列化对象,每定义一种类型的Datasets,就需要编写对应的Encoder类,不然会报错!.
---- 引申: 目前Spark官网提供了两种方法来实现从RDD转Dataset,和上面的DataFrame几乎是一样的…
1.1利用反射机制推断RDD模式
需要一个case class 还有上面那个导入,然后RDD转为DataFrame
因为DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame
转化为DataSetDF()这样可以把DataSet转化为DataFrame。
或者直接转为ad.json(path).as[Person]
1.2使用编程方式定义RDD模式
比RDD转DataFrame要麻烦点,想要将original RDD -> RDD[Row]的结构,然后创建对应的StructType, ateDataFrame(rowRDD, schema)… 这不还是转成DataFrame么?!
轨迹生成的代码里面好像也是转成DataFrame的,然后使用groupByKey之类的操作之后,DF就自动变成DataSet了!
那就是说,根本就没有直接转成Datasets的方法,RDD只能先转成DataFrame,然后再转换到DataSet…
7.parquet merge schema的理解
即数据集合并,相当于是做笛卡尔积(全连接). 可以分散在两个文件中,也可以在同一个文件中.
合并规则为:
7.1 相同的列,在新的数据集中,是通用的列
7.2 各自不同的列,作为新的数据集的列
参考:.html
再引申说一点parquet的东西:
Parquet是一种基于列式存储的结构,列式存储因为同一列的很多内容可能相同,所以易于压缩,间接的减少了磁盘IO,并且支持分区等功能(如果业务中只是读取少量的列的话,是非常适合用parquet的!)
Spark中在读取Spark文件的时候,可以不用定义schema就可以直接生产DataFrame,所有的columns都会被自动发现(分区发现—需要深入去理解下,不懂这个机制)
参考: .html
8.Data Source为JSON
Schema也不需要写,会自动检测出来,并且ad.json(path),获取的直接是DataSet[ROW]的结构.
9.使用dataframe.cache的时候,压缩默认为true
参考:
(RDD/DataFrame/DataSet互转)
本文发布于:2024-02-03 08:39:18,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170692075849896.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |