SparkSQL修仙学习04

阅读: 评论:0

SparkSQL修仙学习04

SparkSQL修仙学习04

Spark SQL是Spark用来处理结构化数据的一个模块.

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
效率逐个变高

sparksql实操

1.SparkSession操作步骤

object Demo1 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Demo1").master("local[*]")//            .enableHiveSupport()//支持hive的特定操作.getOrCreate()//读取数据,json格式val pdf = ad.json("C:\Users\70201\Desktop\sql\people.json")println("------获取表中的元数据信息-----------")pdf.printSchema()println("------获取表中的数据信息-----------")pdf.show()println("------筛选表中的个别字段-----------")ateOrReplaceTempView("people")var sql="""|select|name,age|from|people|""".stripMarginspark.sql(sql).show()println("------条件查询-----------")sql="""|select|name,age|from|people|where name="肖楚轩"|""".stripMarginspark.sql(sql).show()println("------统计-----------")sql="""|select|count(*)|from|people|""".stripMarginspark.sql(sql).show()println("------复杂统计统计-----------")sql="""|select|province,count(name) Count,max(age) maxAge|from|people|group by province|""".stripMarginspark.sql(sql).show()spark.stop()}
}

2. DataFrame的构建

/*** SparkSQL中的编程模型主要有:DataFrame和Dataset* DataFrame的构建分为了两种方式*      基于反射的方式构建*      基于动态编程的方式构建* Dataset的构建方式是和dataframe差不多一样*/
object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()//List[person]>>List[Row]val rows = List(Person1("韩香彧", 17, 167.5),Person1("石云涛", 88, 147.5),Person1("刘炳文", 20, 170.5),Person1("乔钰芹", 16, 167.5)).map(person => {Row(person.name,person.age,person.height)})//List[Row]>>java的List[Row]val rows1 = JavaConversions.seqAsJavaList(rows)val structType = StructType(Array(StructField("name", DataTypes.StringType, false),StructField("age", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))//java的List[Row]val df = ateDataFrame(rows1, structType)df.showcreatByBean(sparkSession)sparkSession.stop()}def creatByBean(spakSession:SparkSession): Unit ={//Applies a schema to a List of Java Beansval persons1 = new util.ArrayList[Person]persons1.add(new Person("韩香彧", 17, 167.5))persons1.add(new Person("石云涛", 88, 147.5))persons1.add(new Person("刘炳文", 20, 170.5))//需要java的listval pdf = ateDataFrame(persons1, classOf[Person])pdf.show()}
}
case class Person1(name: String, age: Int, height: Double)

3.Dataset的构建

 /***         * Dataset的构建*         **         * dataset在构造的时候需要两个条件:*         *  第一导入隐式转换:import spark.implicits._*         *  第二要求封装数据类型为Product的子类,最好就是case class**/*/
object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo3").getOrCreate()//要求封装数据类型为Product的子类(int.string等和case class),最好就是case classval list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))//scala的list,隐式转换import sparkSession.implicits._val value = ateDataset(list)value.show()}
}
case class Student(name: String, age: Int, height: Double)

4.编程模型之间的转换

/*** 编程模型之间的互相转换:*   rdd--dataframe/dataset*   dataframe-->rdd/dataset*   dataset=-->dataframe/rdd*/
object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo4").getOrCreate()val list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))val value:RDD[Student] = sparkSession.sparkContext.parallelize(list)//隐式转换import sparkSession.implicits._println("rdd--->dataframe")val df = DF()df.show()println("rdd--->dataSet")val ds = DS()ds.show()println("dataframe--->rdd")//dataframe的数据是一个一个的Rowval rdd = df.rddrdd.foreach{case Row(name,age,height)=>{println(s"${name},${age},${height}")}}println("dataframe--->dataSet")println("""| dataframe 不能直接转化为Dataset| 为什么?我们前了解到dataframe中的泛型是Row,那么转化为Dataset其实就成了Dataset[Row]| 由于Row并不是Product的子类,并没有提供一个Encoder所以不能作为dataset的数据类型| 故而,不可直接转化为dataset|""".stripMargin)println("dataSet--->dataframe")ds.toDF().show()println("dataSet--->rdd")val rdd1 = ds.rddrdd1.foreach(stu=>{println(s"${stu.name},${stu.age},${stu.height}")})sparkSession.stop()}
}

5.数据写出保存落地

//数据落地
object Demo6 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()import sparkSession.implicits._val ds = File("C:\Users\70201\Desktop\sql\").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).Intinfo(name,age)})/*数据的落地SaveMode:ErrorIfExists 默认的Append        追加Overwrite     覆盖Ignore        忽略,如果目录已经存在,则忽略,如果目录不存在,则执行创建*/de(SaveMode.Ignore).save("C:\Users\70201\Desktop\test\save")de(SaveMode.Ignore).json("C:\Users\70201\Desktop\test\json")
//  csv导出默认是,
//    Michael,29
//    Andy,30
//    Justin,19//可以指定输出格式option("header","true").option("delimiter","|")de(SaveMode.Overwrite).option("header","true").option("delimiter","|").csv("C:\Users\70201\Desktop\test\csv")var url="jdbc:mysql://localhost:3306/test"var table="info"//最好先建表,虽然此处会自动生成,但表字段的数据类型给的不是很完美varchar--Textval properties = new Properties()properties.put("user","root")properties.put("password","123456")ds.write.jdbc(url,table,properties)sparkSession.stop()}
}

6.数据加载

/*** SparkSQL对数据的统一加载和落地操作*  加载使用*      read.load*           not a Parquet file. expected magic numbe ==> 默认加载的文件格式要求是parquet,是一个二进制的列式存储格式文件,twitter公司开源到apache的*      option*          .html#csv-specific-options*  落地使用*      write.save*/
//数据加载
object Dmeo5 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()var df = ad.load("C:\Users\70201\Desktop\sql\sqldf.parquet")df.show()//对于复杂的操作,需要设置一些option选项来完成过滤或者修正df = ad.option("header","true").option("delimiter","|")//对不规范的csv要指定分隔符,让程序怎么切分成对应字段.csv("C:\Users\70201\Desktop\sql\student.csv")df = ad.csv("C:\Users\70201\Desktop\sql\country.csv").toDF("id","country","code")//对没有表头的csv可以转成df并指定字段名df.show()//orc是一种列式存储文件,式rc的升级版本呢,是facebook用来存储数据的文件格式df&#ad.orc("C:\Users\70201\Desktop\sql\")df.show()df&#ad.json("C:\Users\70201\Desktop\sql\product_info.json")df.show()import sparkSession.implicits._//隐式转换,为了解决ds的encoder//text文件认为只有一列数据,我们可以拆分val ds = File("C:\Users\70201\Desktop\sql\").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).Intinfo(name,age)})ds.show()
//    url: String,
//    table: String,
//    columnName: String,var url="jdbc:mysql://localhost:3306/test"var table="wordcounts"val properties = new Properties()//a "user" and "password" propertyproperties.put("user","root")properties.put("password","123456")df&#ad.jdbc(url,table,properties)df.show()sparkSession.stop()}
}
case class info(name: String,age:Int)

7.sql操作hive表

打包插件

/*spark和hive整合时需要注意的地方:
*      1、为了能够让spark正常的解析hive的仓库为止,需要将l传递给spark,加载到spark的classpath(resources目录)中
*          一种通过直接将l放到spark的conf目录下面
*          另外一种就是通过程序的方式放到classpath即可(第二种)
*      2、在l中最重要的就是一个参数
*      <property>
*      <name&astore.warehouse.dir</name>
*      <value>/user/hive/warehouse</value>
*      </property>
*      如果没有配置这个参数,就会在当前程序的当前目录下面指定hive的warehouse,而真正的数据在hdfs里卖弄,执行的时候会找不到数据:
*          except: file:///  hdfs://
*       3、同时如果配置hadoop高可用得需要解析出hdfs的具体路径,所以也需要将l和l也打到classpath下面
*       4、得需要将mysql的驱动包打入classpath中
*/
//1.导入打包插件,将spark-hive依赖取消,因为linux的spark有支持,省的打包时文件过大
//2.对父module打包到本地install,因为有spark_sql的相关依赖spark_common,spark_sql打包package会先去本地仓库找spark_common,找不到再去远程仓库,
//3.上传linux是带有依赖的jar
//4.编写执行脚本,要去掉local,因为程序有了,要把enableHiveSupport()打开
//5.执行
object Demo1 {def main(args: Array[String]): Unit = {if(args==null||args.length!=2){println("null")it(-1)}val Array(basic,info)=argsval sparkSession = SparkSession.builder().appName("Demo1").master("local[*]").enableHiveSupport()//支持hive的特定操作.getOrCreate()println("1.创建数据库")sparkSession.sql("""|create database if not exists info|""".stripMargin)println("2.创建表teacher_basic")sparkSession.sql("""|create table if not acher_basic(|name string,|age int,|merry boolean,|course int|)|row format delimited fields terminated by ','|""".stripMargin)println("3.创建表teacher_info")sparkSession.sql("""|create table if not acher_info(|name string,|height int|)|row format delimited fields terminated by ','|""".stripMargin)println("4.加载数据到表teacher_basic")sparkSession.sql(s"""|load data inpath '${basic}' into acher_basic|""".stripMargin)println("5.加载数据到表teacher_info")sparkSession.sql(s"""|load data inpath '${info}' into acher_info|""".stripMargin)println("6.关联两张表")val df = sparkSession.sql("""|select|b.name,b.,b.course,i.height|acher_basic b|acher_info i|on b.name=i.name|""".stripMargin)println("7.将关联数据写入到表teacher")df.write.saveAsTable(&#acher")println("结束!")sparkSession.stop()}
}

8.自定义UDF函数

object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()val df = ad.json("C:\Users\70201\Desktop\sql\people.json")//3.建视图/*注册一张临时表global在整个应用范围内有效,不带的话只在当前sparkSession内有效replace如果该视图存在,则会覆盖,否则新建*/df.createOrReplaceTempView("people")//2.注册udf函数  返回值类型int,输入类型ister[Int,String]("mylength",str=>myle(str))//4.使用,执行之前要把之前hive.sql依赖解开,把l去掉,否则会加载到hdfs找数据sparkSession.sql("""|select|name,|mylength(name) mylen,|length(name) len|from|people|""".stripMargin).show()sparkSession.stop()}//1.自定义udf函数def myle(str:String):Int={str.length}
}

9.自定义UDAF函数

object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo3").master("local[*]").getOrCreate()val df = ad.json("C:\Users\70201\Desktop\sql\people.json")df.createOrReplaceTempView("student")ister("myavg",new AvgHeight)sparkSession.sql("""|select|round(avg(height),1) avg,|round(myavg(height),1) myavg|from|student|""".stripMargin).show()sparkSession.stop()}
}

自定义类

class AvgHeight extends UserDefinedAggregateFunction{/*该udaf输入参数的类型说明*/override def inputSchema: StructType = StructType(List(StructField("height", DataTypes.DoubleType, false)))/*为了计算聚合结果所需要的涉及到的临时变量的类型平均数=总数/个数,这里面涉及到了2个临时变量,总数,个数*/override def bufferSchema: StructType =  StructType(List(StructField("sum", DataTypes.DoubleType, false),StructField("count", DataTypes.IntegerType, false)))/*该udaf返回值的数据类型*/override def dataType: DataType = DataTypes.DoubleType/*确定性,相同的输入,其返回值是确定,不会有其他可能,称之为确定性,即返回为truegiven the same input,always return the same output.*/override def deterministic: Boolean = true//初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0.0)buffer.update(1, 0)}//局部聚合,new input data from `input`,This is called once per input row.override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, Double(0) + Double(0))buffer.update(1, Int(1) + 1)}//分区间的全局聚合override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, Double(0) + Double(0))buffer1.update(1, Int(1) + Int(1))}override def evaluate(buffer: Row): Double = {Double(0) / Int(1)}
}

10.开窗函数,分组求TOPN

object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo4").master("local[*]").getOrCreate()val df = ad.json("C:\Users\70201\Desktop\sql\people.json")df.createOrReplaceTempView("student")sparkSession.sql("""|select|tmp.*|from|(select|name,age,province,height,|row_number() over(partition by province order by height desc) rank|from|student) tmp|where tmp.rank<3|""".stripMargin).show()sparkSession.stop()}
}

11.数据倾斜(重点!!!)

本文发布于:2024-01-30 15:44:51,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170660069521082.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:SparkSQL
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23