RDD、Dataset、DataFrame

阅读: 评论:0

RDD、Dataset、DataFrame

RDD、Dataset、DataFrame

RDD、Dataset、DataFrame 相互转换

1.三者之间好既有区别,也有联系

优点缺点
RDD(关注数据本身)1.内置很多函数操作,group,map,filter 等,方便处理结构化或非结构化数据
2.面向对象编程,直接存储的 java 对象,类型转化也安全
1.由于它基本和 hadoop 一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于 sql 来比非常麻烦
2.默认采用的是 java 序列号方式,序列化结果比较大,而且数据存储在 java 堆内存中,导致 gc 比较频繁
DataFrame(关注数据的结构与类型)1.结构化数据处理非常方便,支持 Avro, CSV, elastic search, and Cassandra 等 kv 数据,也支持HIVE tables, MySQL 等传统数据表
2.有针对性的优化,由于数据结构元信息 spark已经保存,序列化时不需要带上元信息,大大的减少了序列化大小,而且数据保存在堆外内存中,减少了 gc 次数
3.hive 兼容,支持 hql,udf 等
1.编译时不能类型转化安全检查,运行时才能确定是否有问题
2.对于对象支持不友好,rdd 内部数据直接以 java 对象存储,DataFrame 内存存储的是 row 对象而不能是自定义对象
DataSet(关注数据(对象))1.Dataset 整合了 RDD 和 DataFrame 的优点,支持结构化和非结构化数据
2.和 RDD 一样,支持自定义对象存储
3.和 DataFrame 一样,支持结构化数据的 sql查询
4.采用堆外内存存储,gc 友好
5.类型转化安全,代码友好
很多情况下,Dataset 的性能实际上是会比 DataFrame 要来得差的,因为 Dataset 会涉及到额外的数据格式转换成本很多情况下,Dataset 的性能实际上是会比 DataFrame 要来得差的,因为 Dataset 会涉及到额外的数据格式转换成本

2.如何转化

1.RDD -> Dataset

val ds = DS()

import org.apache.spark.sql.SparkSessionobject CreateDataSetDemo {//提前准备好样例类case class Point(label:String,x:Double,y:Double)case class Category(id:Long,name:String)def main(args: Array[String]): Unit = {//todo:创建sparksession对象val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()val sc = spark.sparkContext//todo:导包import spark.implicits._//todo:创建rddval pointRDD = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))//方法一://todo:使用spark对象直接toDataSetval ds1 = DS()//方法二://todo;通过样例类创建DataSetval pointDS = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()//todo:分区创建基本相同val pointsRDD=sc.parallelize(List(("bar",3.0,5.6),("foo",-1.0,3.0)))val categoriesRDD=sc.parallelize(List((1,"foo"),(2,"bar")))val pointsDS=pointsRDD.map(line=>Point(line._1,line._2,line._3)).toDSval categoriesDS=categoriesRDD.map(line=>Category(line._1,line._2)).toDS//todo:两者joinpointsDS.join(categoriesDS,pointsDS("label")===categoriesDS("name")).show}
}

2.RDD -> DataFrame

val df&#DF()

import org.apache.spark.sql.SparkSession
import org.apache.pes._
import org.apache.spark.sql.Rowobject newDataFrame {def main(args: Array[String]): Unit = {//todo:创建sparksession对象val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()val sc = spark.sparkContext//todo:导包(toDF需要)import spark.implicits._val rdd1 = sc.parallelize(List(("zhangsan","green",Array(3,5,6,8)),("zhangsan",null,Array(3,5,6,10)),("lisi","red",Array(3,5,6,33)),("zhangsna2","green",Array(3,5,233,9)),("zhangsan","green",Array(3,42,44,9))))//todo:方法一:val DF1 = DF("name","color","numbers")//todo:方法二:val schema = StructType(Array(StructField("name", StringType, true),StructField("color", StringType, true),StructField("numbers", ArrayType(IntegerType), true)))val rdd2 = rdd1.map(x=>Row(x._1,x._2,x._3))val DF2 = ateDataFrame(rdd2,schema)DF1.show()DF2.show()}
}

3.Dataset -> RDD

val rdd = ds.rdd

4.Dataset -> DataFrame

val df = ds.toDF()

import org.apache.spark.sql.SparkSessionobject DS2DF {case class Point(label:String,x:Double,y:Double)def main(args: Array[String]): Unit = {//todo:创建sparksession对象val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()val sc = spark.sparkContext//todo:导包import spark.implicits._//todo:创建rddval pointRDD = sc.makeRDD(List(("bar",3.0,4.0),("foo",2.0,2.5)))//todo;通过样例类创建DataSetval pointDS = pointRDD.map(x=>Point(x._1,x._2,x._3)).toDS()//todo:DS转DFval pointDF = DF("label","X","Y")pointDF.show()}
}

5.DataFrame -> RDD

val rdd = df.rdd

6.DataFrame -> Dataset

val ds = df.toJSON

val ds = df.as[T]

import org.apache.spark.sql.SparkSessionobject DF2DS {case class Person(name:String,color:String,number:Array[Int])def main(args: Array[String]): Unit = {//todo:创建sparksession对象val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()val sc = spark.sparkContext//todo:导包import spark.implicits._val rdd1 = sc.parallelize(List(("zhangsan","green",Array(3,5,6,8)),("zhangsan",null,Array(3,5,6,10)),("lisi","red",Array(3,5,6,33)),("zhangsna2","green",Array(3,5,233,9)),("zhangsan","green",Array(3,42,44,9))))//todo:生成DFval DF1 = DF("name","color","numbers")//方法一:tojsonval DS1 = JSON//方法二:as【T】val DS2 = DF1.as[Person]DS1.show()DS2.show()}
}

本文发布于:2024-02-04 19:01:07,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170714255458594.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:RDD   Dataset   DataFrame
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23