大数据之Spark(五)

阅读: 评论:0

大数据之Spark(五)

大数据之Spark(五)

一、Spqrk的SQL模块
----------------------------------------------------------1.该模块能在Spack上运行Sql语句2.可以处理广泛的数据源3.DataFrame --- RDD --- table   //数据框4.可以使用SQL语句和DataFrameAPI,访问数据库二、Spark的JDBC实现
-------------------------------------------------------------1.Spark-Shell方式实现//a.创建样例类scala> case class Customer(id:Int,name:String,age:Int)defined class Customer//b.构造数据scala> val arr = Array("1,tom,12","2,tomas,13","3,tomasLee,14")arr: Array[String] = Array(1,tom,12, 2,tomas,13, 3,tomasLee,14)//c.创建RDD对象scala> val rdd1 = sc.parallelize(arr)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26//d.创建 Customer RDDscala> val rdd2 = rdd1.map(e=>{val arr = e.split(","); Customer(arr(0).toInt,arr(1),arr(2).toInt)})rdd2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[24] at map at <console>:30//e.通过rdd创建数据框scala> val df = ateDataFrame(rdd2);df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]//f.打印表结构scala> df.printSchemaroot|-- id: integer (nullable = true)|-- name: string (nullable = true)|-- age: integer (nullable = true)//g.查询数据scala> df.show+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  2|   tomas| 13||  3|tomasLee| 14|+---+--------+---+//h.创建临时视图[sql中的表名tablename]scala> df.createTempView("customers")//i.执行sql语句scala> val df2 = spark.sql("select * from customers")df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df2.show+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  2|   tomas| 13||  3|tomasLee| 14|+---+--------+---+scala> spark.sql("select * from customers").show+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  2|   tomas| 13||  3|tomasLee| 14|+---+--------+---+//j.联合查询unionscala> val df1 = spark.sql("select * from customers where id < 2");df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> val df2 = spark.sql("select * from customers where id > 2");df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> ateTempView("c1")scala> ateTempView("c2")scala> spark.sql("select * from c1 union select * from c2").show()+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  3|tomasLee| 14|+---+--------+---+scala> df1.union(df2)res10: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field]scala> val df3 = df1.union(df2)df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field]scala> df3.show+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  3|tomasLee| 14|+---+--------+---+//k.其他查询scala> spark.sql("select id,name from customers").show+---+--------+| id|    name|+---+--------+|  1|     tom||  2|   tomas||  3|tomasLee|+---+--------+scala> df.selectExpr("id","name").show+---+--------+| id|    name|+---+--------+|  1|     tom||  2|   tomas||  3|tomasLee|+---+--------+scala> df.where("name like 't%'").show+---+--------+---+| id|    name|age|+---+--------+---+|  1|     tom| 12||  2|   tomas| 13||  3|tomasLee| 14|+---+--------+---+scala> df.where("name like '%e'").show+---+--------+---+| id|    name|age|+---+--------+---+|  3|tomasLee| 14|+---+--------+---+//l.聚合和映射a.统计客户age的总和,聚合操作DataSet[Int]scala> df.map(x=> x.getAs[Int]("age")).reduce( _ + _ )res17: Int = 39b.聚合函数scala> df.agg(sum("age"),max("age"),min("age")).show+--------+--------+--------+|sum(age)|max(age)|min(age)|+--------+--------+--------+|      39|      14|      12|+--------+--------+--------+2.Java方式实现a.创建模块,添加依赖<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.0</version></dependency>b.将DataFrame转换成RDD的操作JavaRDD<Row> rdd = JavaRDD();c.保存spark的sql计算结果(json)//保存成json文件。df.write().json(dir) ;//设置保存模式df.mode(SaveMode.APPEND) ;d.代码演示
st.spark.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import java.util.function.Consumer;public class TestSparkSQL {public static void main(String [] args){SparkSession session = SparkSession.builder().appName("SQLJava")//设置master方式..config("spark.master","local").getOrCreate();Dataset<Row> df = ad().json("file:///D:\share\scala\data\json.dat");//df.show();df.createOrReplaceTempView("t1");//Dataset<Row> df1 = session.sql("select * from t1 where id > 3");//df.where("id > 3").show();//session.sql("select count(id) from t1").show();//System.out.unt());//df.l("id").plus(1),df.col("name").substr(1,2),df.col("name")).show();//将DataFrame转换成RDD的操作JavaRDD<Row> jrdd = df.toJavaRDD();llect().forEach(new Consumer<Row>() {@Overridepublic void accept(Row row) {//System.out.Long(0) + ":" + Long(1) + ":" + String(2));}});//设置保存模式df.write().mode(SaveMode.Append);//保存spark的sql计算结果(json)df.write().json("d://share/spark/res");}}
三、SparkSQL整合MySQL
------------------------------------------------------------1.spark-shell方式实现scala> val jdbcDF = ad.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.43.1:3306/mybatis", "driver" -> &#sql.jdbc.Driver", "dbtable" -> "persons", "user" -> "mysql", "password" -> "mysql")).load()jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> jdbcDF.show()+---+----+-----------+| id|name|      phone|+---+----+-----------+|  1|  唐华|13560191111||  2|  赵嗄|18022222222||  3|  朱宽|18332561111||  4|  张噢|15032295555||  5|  任阳|15614209999||  6|  刘飞|15732641111||  7|  梁鹏|15778421111||  8|  杨谋|18301581111||  9|  史让|15811111111|| 10|  张类|17731086666|| 11|  郭彤|18641241111|| 12|  杜跑|15733218888|| 13| 张锕 |15133333333|| 14|  王以|13269364444|| 15|  刘宗|18620191111|| 16|  李平|15338597777|| 17|  段星|13341101111|| 18|  温英|13520401111|+---+----+-----------+scala> spark.sql("select * from customers where id == 1 ").show+---+----+-----------+| id|name|      phone|+---+----+-----------+|  1|  唐华|13560191111|+---+----+-----------+2.java编程实现a.引入mysql驱动<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.0</version></dependency>b.代码演示
 st.spark.sql;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SaveMode;import org.apache.spark.sql.SparkSession;import java.util.Properties;import java.util.function.Consumer;public class TestSparkJDBC {public static void main(String [] args){SparkSession session = SparkSession.builder().appName("SQLJava")//设置master方式..config("spark.master","local").getOrCreate();String url = "jdbc:mysql://192.168.43.1:3306/mybatis";String table = "persons";String user = "mysql";String password = "mysql";//读取mysql数据库中的表Dataset<Row> df = ad().format("jdbc").option("url", url).option("dbtable",table).option("user",user).option("password",password).option("driver",&#sql.jdbc.Driver").load();df.show();Dataset<Row> df1 = df.where("phone like '135%'");df1 = df1.distinct();//向mysql数据库中写入新表Properties prop = new Properties();prop.put("user",user);prop.put("password",password);prop.put("driver",&#sql.jdbc.Driver");df1.write().jdbc(url,"subpersons",prop);}}
四、SparkSQL整合Hive
------------------------------------------------------1.Spark-Shell上:a.保证在spark worker节点上,存在hive的类库b.复制l到spark/conf 下,并分发到所有节点c.复制hive/lib/下的musql驱动jar文件,到/soft/spark/jars下,并分发d.修改spark/sbin/spark-config.sh 文件,添加三个_HOME路径...# included in all the spark scripts with source command# should not be executable directly# also should not be passed any arguments, since we need original $*export JAVA_HOME=/soft/jdkexport HADOOP_HOME=/soft/hadoop/etc/hadoopexport HIVE_HOME=/soft/hive# symlink and absolute path should rely on SPARK_HOME to resolveif [ -z "${SPARK_HOME}" ]; thenexport SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)&#启动spark-shell,并指定启动模式spark-shell --master localf.创建表scala> spark.sql("create (id int, name string , age int)row format delimited fields terminated by ','lines terminated by 'n'stored as textfile")g.加载数据到hive表中scala> spark.sql("load data local inpath '/mnt/hgfs/' into ")scala> spark.sql("select * ").show+---+----+---+| id|name|age|+---+----+---+|  1|tom1| 11||  2|tom2| 12||  3|tom3| 13||  4|tom4| 14||  5|tom5| 15|+---+----+---+h.然后就可以使用Hsql语句就行操作hive数据库了scala> spark.sql("select ........")2.SparkSQL操作hive的表--java版a.创建java模块,添加meven依赖<?xml version="1.0" encoding="UTF-8"?><project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.it18zhang</groupId><artifactId>SparkDemo1</artifactId><version>1.0-SNAPSHOT</version><build><sourceDirectory>src/main/java</sourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><configuration><recompileMode>incremental</recompileMode></configuration><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.0</version></dependency></dependencies></project>b.复制配置文件到java工程resources目录下lc.代码演示
st.spark.sql;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Encoders;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;public class TestSparkHive {public static void main(String [] args){String warehouseLocation = "/spark-warehouse";// init spark session with hive supportSparkSession spark = SparkSession.builder().appName("Java Spark Hive Example").master("local[*]").config("spark.sql.warehouse.dir", warehouseLocation).getOrCreate();String sqlStr1 = "create table te3(id int, name string , age int)";//String sqlStr2 = "load data local inpath '/mnt/hgfs/' into table te3";//String sqlStr3 = "select * from te3";Dataset<Row> df = spark.sql(sqlStr1);df.show();}}
五、Beeline连接spark,远程访问hive
-----------------------------------------------------1.启动spark集群,完全分布式(standalone)2.创建hive表$>hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by 'n' stored as textfile"3.加载数据到hive表中.$>hive -e "load data local inpath 'file:mnt/hgfs/' into table tt"$>hive -e "select * from tt"4.启动spark-shell$>spark-shell --master spark://s100:7077scala> spark.sql("select * from tt").show();+---+----+---+| id|name|age|+---+----+---+|  1|tom1| 11||  2|tom2| 12||  3|tom3| 13||  4|tom4| 14||  5|tom5| 15|+---+----+---+5.启动thriftserver服务器$> cd /soft/spark/sbin$> start-thriftserver.sh --master spark://s100:7077$> netstat -anop | grep 100006.启动beeline客户端$> cd /soft/spark/bin$> beeline -u jdbc:hive2://s100:10000$beeline> select * from tt;7.java编程实现
 st.spark.sql;import com.sun.apache.bcel.internal.util.ClassLoader;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;public class TestSparkBeeline {public static void main(String [] args){try {Class.forName("org.apache.hive.jdbc.HiveDriver");Connection conn = Connection("jdbc:hive2://s100:10000");Statement st =  ateStatement();ResultSet set = st.executeQuery("select * from tt");while (()) {int id = Int(1);String name = String(2);int age = Int(3);System.out.println(id + ":" + name + ":" + age);}} catch (Exception e) {e.printStackTrace();}}}

 

本文发布于:2024-02-04 16:24:18,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170711569157206.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   Spark
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23