1.
import org.apache.spark.sql.{DataFrame, DataFrameReader, Dataset, SparkSession}object MyDataFrame{case class Person(name: String, age: Long)def main(args:Array[String]): Unit ={//1.get a SparkSession//2.don't forget to use getOrCreateval spark = SparkSession.builder().master("local").appName("MyDataFrame").getOrCreate()readFile(spark)runBasicDataFrameExample(spark)runDatasetCreationExample(spark)}private def readFile(spark:SparkSession): Unit ={//1.get a DataFrameReaderval res0: DataFrameReader = ad//2.the type of file is .txtval res2: DataFrameReader = res0.format("text")//3.get a DataFrameReader instanceval res3: DataFrame = res2.load("F:\")res3.show()//4.the result of res3.show() has one column,the column named value ,/*+------------------+| value|+------------------+| name,Michael|| name,Andy,age,30||name,Justin,age,19|+------------------+*/}private def runBasicDataFrameExample(spark:SparkSession):Unit= {val res0: DataFrameReader = ad//get a DataFrameReaderval res1: DataFrame = res0.json("F:\spark\src\main\resources\people.json")//read a json fileres1.show()res1.select(col = "name").show/*+-------+| name|+-------+|Michael|| Andy|| Justin|+-------+*/res1.select("name","age").show/*+-------+----+| name| age|+-------+----+|Michael|null|| Andy| 30|| Justin| 19|+-------+----+*/import spark.implicits._res1.select($"name").show//should use a implicit transformation/*+-------+| name|+-------+|Michael|| Andy|| Justin|+-------+*/res1.select($"name",$"age"+1).show//get column(age)+1/*if one column named "exampleColumn" is in json file,you can use select("exampleColumn") to show,even though this columndoesn't exist in other rows.*/res1.select($"hh").showres1.select("name","hh").showres1.select($"age">20).showval tempView = ateOrReplaceTempView("people")//create one view, and the view is temporaryvar sqlFrame: DataFrame = spark.sql("select * from people")//the result of select is type of DataFramesqlFrame.show}private def runDatasetCreationExample(spark:SparkSession): Unit ={import spark.implicits._// $example on:create_ds$// Encoders are created for case classes/*1.the Seq is equal llection.Seq2.in Seq[A],the A is one Type contains common types or you defined class*/val intDS: Seq[Int] = Seq(1,2,DS().show()val stringDS1: Dataset[String] = Seq("LittleLawson","great").toDS()stringDS1.show()val caseClassDS: Dataset[Person] = Seq(Person("Andy", 32)).toDS()caseClassDS.show()// +----+---+// |name|age|// +----+---+// |Andy| 32|// +----+---+// Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = "F:\people.json"val peopleDS = ad.json(path).as[Person]peopleDS.show()}private def runInferSchemaExample(spark:SparkSession)= {// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = File("F:\").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).Int)).toDF()ateOrReplaceTempView("people") // Register the DataFrame as a temporary view// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+// or by field nameteenagersDF.map(teenager => "Name: " + As[String]("name")).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+}
}
本文发布于:2024-02-04 19:00:57,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170714252858593.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |