代码6.1

import org.apache.spark.sql.SparkSession
val spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
// 引入spark.implicits._,以便于RDDs和DataFrames之间的隐式转换
import spark.implicits._


代码6.2

scala> val df = spark.read.json("/usr/local/spark-2.3.0-bin-hadoop2.7/examples/src/main/resources/people.json ")
//调用DataFrame的createOrReplaceTempView方法,将df注册成people临时表
scala> df.createOrReplaceTempView("people")
//调用sparkSession提供的sql接口,对people临时表进行sql查询,sql()返回的也是DataFrame对象
scala> val sqlDF = spark.sql("SELECT * FROM people")
scala> sqlDF.show()
// +-----+-----------+
// | age|  name|
// +-----+----------+
// |null|Michael|
// | 30 |  Andy |
// | 19 |  Justin|
// +-----+----------+

代码6.3

//使用sparkSession对象提供的read()方法可读取数据源(read方法返回DataFrameReader对象),进而通过json()方法标识数据源具体类型为Json格式
scala> val df = sparkSession.read.json("/home/ubuntu/student.json")
df: org.apache.spark.sql.DataFrame = [age: string, institute: string ... 3 more fields]

//调用SparkSession.read方法中的通用load方法也可以读取数据源。
//scala> val peopleDf = sparkSession.read.format("json").load("/home/ubuntu/student.json")

// 推导出来的schema,可用printSchema打印出来
scala> df.printSchema()
root
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- institute: string (nullable = true)
 |-- name: string (nullable = true)
//在返回的DataFrame对象使用show(n)方法,展示数据集的前n条数据
scala> df.show(6)

+---------+----------+-----+-----------+--------------------------+--------+
| Height| Weight|age| country|          institute|name|
+---------+----------+-----+-----------+--------------------------+--------+
|   185|    75| 20|   china|computer science ...|  MI|
|   187|    70| 21|   Spain|   medical college|  MU|
|   155|    60| 25| Portugal|chemical engineer...|  MY|
|   166|    62| 19|  Japan|             SEM|  MK|
|   187|    80| 24|  France| school of materials|  Ab |
|   167|    60| 21|  Russia| school of materials|   Ar|
+----------+---------+----+------------+-------------------------+---------+
only showing top 6 rows
// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
scala> val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
scala> val otherPeople = spark.read.json(otherPeopleDataset)
scala> otherPeople.show()
+----------------------+-------+
|       address|name|
+----------------------+-------+
|[Columbus,Ohio]|  Yin|
+----------------------+-------+


代码6.4

//常见类的编码器可以通过导入spark.implicits._自动提供
scala> import spark.implicits._
scala> val peopleDF = spark.read.json("/usr/local/spark-2.3.0-bin-hadoop2.7/examples/src
/main/resources/people.json")

// peopleDF保存为parquet文件时,依然会保留着结构信息
scala> peopleDF.write.parquet("people.parquet")

//读取创建的people.parquet文件,Parquet文件是自描述的,所以结构信息被保留。
//读取Parquet文件的结果是已经具有完整结构信息的DataFrame对象
val parquetFileDF = spark.read.parquet("people.parquet")
//因为Spark SQL的默认数据源格式为Parquet格式,所以读取格式可为:
//val parquetFileDF = spark.read.load("people.parquet ")
//可以使用SQL直接查询Parquet文件,查询地址为该Parquet文件的存放位置。
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`/home/ubuntu/people.parquet`")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+-----+-----------+
| age|  name|
+-----+-----------+
|null| Michael|
| 30|   Andy|
| 19|  Justin|
+----+-----------+

// Parquet 文件也可以用来创建临时视图,然后在SQL语句中使用
scala> parquetFileDF.createOrReplaceTempView("parquetFile")
scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
scala> namesDF.map(attributes => "Name: " + attributes(0)).show()
// +-----------------+
// |      value|
// +-----------------+
// |Name: Justin|
// +-----------------+


代码6.5

//引用spark.implicits._用于将RDD隐式转换为DataFrame
scala> import spark.implicits._

// 创建一个的DataFrame,存储到一个分区目录(data/test_table/key=1)
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
scala> squaresDF.write.parquet("data/test_table/key=1")

// 创建一个新的DataFrame,将其存储到相同表下的新的分区目录(data/test_table/key=2)
// 增加了一个cube列,去掉了一个已存在的square列
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF.write.parquet("data/test_table/key=2")
//读取分区表,自动实现了两个分区(key=1/2)的合并
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")

//通过基础DataFrame函数,以树格式打印Schema,包含分区目录下全部的分区表
scala> mergedDF.printSchema()
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)


代码6.6

import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation指向托管数据库和表的默认位置
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 使用HiveQL进行查询
sql("SELECT * FROM src").show()
// +----+-----------+
// |key|  value|
// +----+-----------+
// |238|val_238|
// | 86|  val_86|
// |311|val_311|
// ...
// 包含着Hive聚合函数COUNT()的查询依然被支持
sql("SELECT COUNT(*) FROM src").show()
// +-----------+
// |count(1)|
// +-----------+
// |   500 |
// +-----------+
// SQL查询的结果本身就是DataFrame,并支持所有正常的功能
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// DataFrame中的元素是Row类型的,允许按顺序访问每个列
val stringsDS = sqlDF.map {
	case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}

stringsDS.show()
// +-------------------------+
// |           value|
// +-------------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 也可以使用DataFrame在 SparkSession中创建临时视图
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

//sql查询中可以对DataFrame注册的临时表和Hive表执行Join连接操作
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +-----+-------+-----+-------+
// |key| value|key| value|
// +-----+-------+-----+-------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

//使用HQL语法而不是Spark SQL本机语法创建Hive托管Parquet表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")  //保存DataFrame到Hive托管表中
val df = spark.table("src") df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show() // +---+-------+ // |key|  value| // +---+-------+ // |238|val_238|
// | 86| val_86| // |311|val_311|
// ...  val dataDir = "/tmp/parquet_data" spark.range(10).write.parquet(dataDir)

//创建一个Hive额外的Parquet表
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") sql("SELECT * FROM hive_ints").show() // +---+ // |key| // +---+
// |  0| // |  1| // |  2| // ... // 打开Hive动态分区的标志 spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") 
// 使用DataFrame API创建Hive分区表 df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl") 
// 分区列’key’被移至schema的末尾 sql("SELECT * FROM hive_part_tbl").show() // +----------+-----+ // |  value|key| // +----------+-----+ // |val_238|238| // | val_86 | 86 | // |val_311|311| // ... 
spark.stop()


代码6.7


spark-shell --jars /usr/local/spark-2.3.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark-2.3.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
		

代码6.8

ubuntu@ubuntu:~$ service mysql start
ubuntu@ubuntu:~$ mysql -u root -p

代码6.9

mysql> create database student;
mysql> use student;
mysql> create table stu(id int(10) auto_increment not null primary key,name varchar(10),country varchar(20),Height Double,Weight Double);
mysql> describe stu;
+------------+------------------+-------+------+-----------+----------------------+
| Field   | Type       | Null | Key | Default | Extra         |
+------------+------------------+-------+------+-----------+----------------------+
| id      |int(10)     |  NO| PRI | NULL  | auto_increment |
| name   | varchar(10) |  YES|   | NULL  |              |
|country  | varchar(20) |  YES|   | NULL  |              |
| Height  |double     |  YES|   | NULL  |              |
|Weight  |double     |  YES|   | NULL  |              |
+------------+------------------+--------+-----+-----------+----------------------+
5 rows in set (0.00 sec)
mysql> insert into stu values(null,'MI','china','180','60');
mysql> insert into stu values(null,'UI','UK','160','50'); 
mysql> insert into stu values(null,'DI','UK','165','55'); 
mysql> insert into stu values(null,'Bo','china','167','45'); 
mysql> select * from stu;
+---+-------+----------+----------+---------+                                                
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1 |MI  | china  |  180 |   60 |
| 2 | UI  | UK    |  160 |   50 |
| 3 | DI  | UK    |  165 |   55 |
| 4 |Bo  | china  |  167 |   45 |
+---+-------+----------+----------+---------+
4 rows in set (0.00 sec)


代码6.10

//创建Properties类对象需要的包
scala> import java.util.Properties
import java.util.Properties
scala> val jdbcDF = spark.read
//识别读取的是JDBC数据源。
.format("jdbc") 
//要连接的JDBC URL属性,其中student是创建的数据库名。
.option("url","jdbc:mysql://localhost:3306/student")
// driver部分是Spark SQL访问数据库的具体驱动类名。
.option("driver","com.mysql.jdbc.Driver")
//dbtable部分是需要访问的student库中的表stu。
.option("dbtable","stu")
//user部分是用于访问mysql数据库的用户
.option("user","root")
//password部分是该用户访问数据库的密码
.option("password","mysql")
.load()
scala> jdbcDF.show()
+---+-------+----------+----------+---------+                                                
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1|  MI|  china|  180.0|  60.0|
| 2|  UI |    UK|  160.0|  50.0|
| 3|  DI |    UK|  165.0|  55.0|
| 4|  Bo|  china|  167.0|  45.0|
+---+-------+----------+----------+---------+      
//实例化Properties类对象,并添加相应的JDBC连接属性以键值对形式
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
//将user属性和password属性添加至Properties类对象中。
scala> connectionProperties.put("user","root")
scala> connectionProperties.put("password","mysql")
//addstu是含有需要写入stu表中的数据的DataFrame。
scala> val addstu = spark.read.json("/home/ubuntu/Desktop/stu.json")
scala> addstu.show()
+--------+----------+----------+-------+
|Height|Weight|country|name|
+--------+----------+----------+-------+
|  168|    48|  china|  Am|
|  189|    80|  Spain|  Bo |
+--------+----------+----------+-------+
scala> addstu.write
  .mode("append")
  .format("jdbc")
  .option("url", " jdbc:mysql://localhost:3306/student ")
  .option("dbtable", "stu")
  .option("user", "root")
  .option("password", "mysql")
  .save()
//与读取JDBC数据源相同,也可以将connectionProperties对象传入write.jdbc()方法中来实现数据表的写入
scala> addstu.write
.mode("append")
.jdbc("jdbc:mysql://localhost:3306/student","student.stu",connectionProperties)


代码6.11

mysql> select * from stu;
+---+-------+----------+----------+---------+                                                
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1|  MI|  china|   180|    60|
| 2|  UI |    UK|   160|    50|
| 3|  DI |    UK|   165|    55|
| 4|  Bo|  china|   167|    45|
|5 | Am | china  |   168|    48|
|6 |  Bo|Spain  |   189|    80|
+---+-------+----------+----------+---------+
6 rows in set (0.00 sec)


代码6.12

scala> case class student(name:String,age:Int,Height:Int,Weight:Int)
defined class student

scala> import spark.implicits._
import spark.implicits._

scala> val stuRDD = spark.sparkContext.textFile("/home/ubuntu/student.txt").map(_.split(",")).map(elements=>student(elements(0),elements(1).trim.toInt,elements(4).trim.toInt,elements(5).trim.toInt))
stuRDD: org.apache.spark.rdd.RDD[student] = MapPartitionsRDD[13] at map at <console>:28

scala> val stuDF = stuRDD.toDF()
stuDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> stuDF.createOrReplaceTempView("student")

scala> val stu_H_W = spark.sql("SELECT name,age,Height,Weight FROM student WHERE age BETWEEN 13 AND 19")
stu_H_W: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]

scala> stu_H_W.show()
+-------+-----+---------+---------+
|name|age|Height|Weight|
+-------+-----+---------+---------+
|  MK| 19|  166|    62|
|  CT | 18|  169|    60|
+-------+----+--------+----------+


代码6.13

//导入Spark SQL的data types包
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
//导入Spark SQL的Row包
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
// 创建peopleRDD
scala> val stuRDD = spark.sparkContext.textFile("/home/ubuntu/student.txt")
stuRDD: org.apache.spark.rdd.RDD[String] = /home/ubuntu/student.txt MapPartitionsRDD[19] at textFile at <console>:30
// schema字符串
scala> val schemaString = "name age country"
schemaString: String = name age country

//将schema字符串按空格分隔返回字符串数组,对字符串数组进行遍历,并对数组中的每一个元素进一步封装成StructField对象,进而构成了Array[StructField]
scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true), StructField(country,StringType,true))

//将fields强制转换为StructType对象,形成了可用于构建DataFrame对象的Schema
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true), StructField(country,StringType,true))

//将peopleRDD(RDD[String])转化为RDD[Rows]
scala> val rowRDD = stuRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2)))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at map at <console>:32

//将schema应用到rowRDD上,完成DataFrame的转换
scala> val stuDF = spark.createDataFrame(rowRDD,schema)
stuDF: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]

//可以对stuDF直接操作
scala> stuDF.show(9)
+-------+-----+----------+
|name|age|country|
+-------+-----+----------+
|  MI| 20|  china |
|  MU| 21|  Spain |
|  MY| 25|Portugal|
|  MK| 19|  Japan |
|  Ab| 24| France |
|  Ar| 21|  Russia|
|  Ad| 20| Geneva |
|  Am| 20|  china |
|  Bo| 20|  Spain |
+-------+----+-----------+
//也可将stuDF注册成临时表“student”, 调用sql接口,运行SQL表达式,进行SQL
//查询,sql()返回值依然是DataFrame对象。
scala> stuDF.createOrReplaceTempView("student")
scala> val results = spark.sql("SELECT name,age,country FROM student WHERE age BETWEEN 13 and 19").show()
+-------+-----+----------+
|name|age|country |
+-------+-----+----------+
|  MK| 19|  Japan |
|  CT| 18| France |
+-------+-----+----------+


代码6.14

scala> df.agg("age" -> "mean","Height" -> "min","Weight" -> "max").show()
+---------------------------+----------------+-----------------+
|          avg(age)|min(Height)|max(Weight)|
+---------------------------+----------------+-----------------+
|20.90909090909091|       155|        85|
+---------------------------+----------------+-----------------+


代码6.15

scala> df("Height")
res11: org.apache.spark.sql.Column = Height

scala> df.apply("Weight")
res12: org.apache.spark.sql.Column = Weight


代码6.16

scala> df.col("name")
res4: org.apache.spark.sql.Column = name


代码6.17

scala> df.select(df("name"),df("Weight") as "Weight_KG").show(6)
+--------+---------------+
| name|Weight_KG|
+--------+---------------+
|  MI |       75|
|  MU |       70|
|  MY |       60|
|  MK |       62|
|  Ab |       80|
|  Ar |       60|
+-------+---------------+
only showing top 6 rows
scala> df.select(df("name"),df("Weight")*2 as "Weight_Jin").show(6)
+-------+---------------+
|name |Weight_Jin|
+-------+---------------+
|  MI |     150.0|
|  MU |     140.0|
|  MY |     120.0|
|  MK |     124.0|
|  Ab |     160.0|
|  Ar |     120.0|
+-------+---------------+
only showing top 6 rows


代码6.18

scala> df.select("age").show(6)
+----+
|age|
+----+
| 20|
| 21|
| 25|
| 19|
| 24|
| 21|
+----+
only showing top 6 rows
scala> df.select("age").distinct.show()
+----+                                                                           
|age|
+----+
| 19|
| 25|
| 24|
| 20|
| 21|
+----+


代码6.19

scala> df.printSchema()
root
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- institute: string (nullable = true)
 |-- name: string (nullable = true)
scala> df.drop("institute").printSchema()
root
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- name: string (nullable = true)


代码6.20

scala> val newdf = spark.read.json("/home/ubuntu/newstudent.json")
newdf: org.apache.spark.sql.DataFrame = [Height: string, Weight: string ... 4 more fields]
scala> newdf.show(false)
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|Height|Weight|age|country|institute                              |name|
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|185  |75    |20 |china  |computer science and technology department|MI  |
+--------+----------+----+----------+-------------------------------------------------------------+-------+
scala> df.show(false)
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|Height|Weight|age|country |institute                               |name|
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|185   |75    |20 |china   |computer science and technology department |  MI |
|187   |70    |21 |Spain   |medical college                            |  MU |
|155   |60    |25 |Portugal|chemical engineering institude             |  MY |
|166   |62    |19 |Japan   |SEM                                        |  MK |
|187   |80    |24 | France |school of materials                        |  Ab |
|167   |60    |21 | Russia |school of materials                        |  Ar |
|185   |75    |20 |Geneva  |medical college                            |  Ad |
|168   |48    |20 |china   |computer science and technology department |  Am |
|189   |80    |20 |Spain   |chemical engineering institude             |  Bo | 
|164   |55    |20 |china   |SEM                                        |  By |
|195   |85    |20 |Japan   |SEM                                        |  CY |
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
scala> df.except(newdf).show(false)
+----------+----------+-----+----------+------------------------------------------------------------+-------+    
| Height | Weight|age|country|institute                                |name|
+----------+----------+-----+----------+------------------------------------------------------------+-------+
|166   |62    |19 |Japan   |SEM                                        | MK |
|155   |60    |25 |Portugal|chemical engineering institude             | MY |
|164   |55    |20 |china   |SEM                                        | By |
|195   |85    |20 |Japan   |SEM                                        | CY |
|187   |70    |21 |Spain   |medical college                            | MU |
|168   |48    |20 |china   |computer science and technology department | Am |
|187   |80    |24 | France |school of materials                        | Ab |
|185   |75    |20 |Geneva  |medical college                            | Ad |
|189   |80    |20 |Spain   |chemical engineering institude             | Bo |
|167   |60    |21 | Russia |school of materials                        | Ar |
+----------+----------+-----+----------+------------------------------------------------------------+-------+


代码6.21

scala> df.filter("age >24 ").show(false)
+---------+---------+----+-----------+-----------------------------------------+-------+
|Height|Weight|age|country |institute                      |name|
+---------+---------+----+-----------+-----------------------------------------+-------+
|155  |60    |25 |Portugal |chemical engineering institude |MY  |
+---------+---------+----+-----------+-----------------------------------------+-------+


代码6.22

scala> df.groupBy("country").agg("Height" -> "mean").show()
+----------+----------------------------+                                                   
|country |       avg(Height)|
+----------+----------------------------+
| Russia |             167.0|
| France |             187.0|
|  Spain |             188.0|
| Geneva |             185.0|
|  Japan |             180.5|
|  china |172.33333333333334|
|Portugal|             155.0|
+-----------+----------------------------+


代码6.23

scala> df.groupBy("country").count().show()
+----------+-------+                                                                
|country |count|
+----------+-------+
| Russia |    1|
| France |    1|
|  Spain |    2|
|Geneva  |    1|
|  Japan |    2|
|  china |    3|
|Portugal|    1|
+-----------+-------+


代码6.24

scala> df.intersect(newdf).show(false)
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|Height|Weight|age|country|institute                                 |name|
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|185   |75    |20 |china  |computer science and technology department|MI  |
+--------+----------+----+----------+-------------------------------------------------------------+-------+


代码6.25

scala> df.limit(3).show(false)
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|Height|Weight|age|country |institute                                 | name|
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|185   |75    |20 |china   |computer science and technology department|  MI |
|187   |70    |21 |Spain   |medical college                           |  MU |
|155   |60    |25 |Portugal|chemical engineering institude            |  MY |
+---------+----------+----+-----------+-------------------------------------------------------------+-------+


代码6.26


scala> df.orderBy("age","Height").show(false)
+---------+---------+-----+----------+------------------------------------------------------------+-------+
|Height|Weight|age|country |institute                                |name|
+---------+---------+-----+----------+-------------------------------------------------------------+------+
|166  |62    |19 |Japan   |SEM                                       | MK |
|164  |55    |20 |china   |SEM                                       | By |
|168  |48    |20 |china   |computer science and technology department| Am |
|185  |75    |20 |china   |computer science and technology department| MI |
|185  |75    |20 | Geneva |medical college                           | Ad |
|189  |80    |20 |Spain   |chemical engineering institude            | Bo |
|195  |85    |20 | Japan  |SEM                                       | CY |
|167  |60    |21 |Russia  |school of materials                       | Ar |
|187  |70    |21 |Spain   |medical college                           | MU |
|187  |80    |24 |France  |school of materials                       | Ab |
|155  |60    |25 |Portugal|chemical engineering institude            | MY |
+---------+---------+-----+----------+-------------------------------------------------------------+------+


代码6.27


scala> df.sort($"age".desc).show(false)
+---------+---------+----+-----------+------------------------------------------------------------+-------+
|Height|Weight|age|country |institute                                |name|
+---------+---------+----+-----------+------------------------------------------------------------+-------+
|155  |60    |25 |Portugal|chemical engineering institude            |MY  |
|187  |80    |24 |France  |school of materials                       |Ab  |
|187  |70    |21 |Spain   |medical college                           |MU  |
|167  |60    |21 |Russia  |school of materials                       |Ar  |
|185  |75    |20 |china   |computer science and technology department|MI  |
|185  |75    |20 | Geneva |medical college                           |Ad  |
|164  |55    |20 |china   |SEM                                       |By  |
|189  |80    |20 |Spain   |chemical engineering institude            |Bo  |
|168  |48    |20 |china   |computer science and technology department|Am  |
|195  |85    |20 |Japan   |SEM                                       |CY  |
|166  |62    |19 |Japan   |SEM                                       |MK  |
+--------+---------+----+-------------+--------------------------------------------------------------+-------+


代码6.28


scala> df.sample(true,0.5).show()
+--------+----------+----+----------+------------------------+--------+
|Height|Weight|age|country|        institute|name|
+--------+----------+----+----------+------------------------+--------+
|  187|     80| 24| France| school of materials|  Ab|
|  187|     80| 24| France| school of materials|  Ab|
|  189|     80| 20| Spain |chemical engineer...|  Bo|
+--------+----------+----+----------+------------------------+--------+

scala> df.sample(false,0.5).show()
+--------+----------+----+----------+--------------------------+-------+
|Height|Weight|age|country|           institute|name|
+--------+----------+----+----------+--------------------------+--------+
|  187|    80| 24| France | school of materials|  Ab|
|  168|    48| 20|   china|computer science ...|  Am|
|  164|    55| 20|   china|                 SEM|  By|
+--------+----------+----+----------+--------------------------+--------+


代码6.29


scala> df.where("name = 'Ar'").show()
+--------+----------+----+----------+------------------------+--------+
|Height|Weight|age|country|        institute  |name|
+--------+----------+----+----------+------------------------+--------+
|  167|    60| 21| Russia |school of materials|  Ar|
+--------+----------+----+----------+------------------------+--------+

scala>  df.where($"age">24).show()
+--------+----------+----+----------+--------------------------+--------+
|Height|Weight|age|country|         institute  | name|
+--------+----------+----+----------+--------------------------+--------+
|  155|    60| 25|Portugal|chemical engineer...|   MY|
+--------+----------+----+----------+--------------------------+--------+


代码6.30

scala> val joindf = spark.read.json("/home/ubuntu/Desktop/joininfo.json")
joindf: org.apache.spark.sql.DataFrame = [math_score: bigint, name: string] 

scala> joindf.show()
+---------------+--------+
|math_score|name|
+---------------+--------+
|       90|   MI|
|       75|   MU|
|       90| OOO |
+---------------+--------+

scala> df.join(joindf,"name").show()
+------+----------+----------+----+----------+---------------------------+----------------+
|name|Height|Weight|age|country|           institute|math_score|
+------+----------+----------+----+----------+---------------------------+----------------+
|  MI|   185|    75| 20|  china|computer science ...|        90|
| MU |   187|    70| 21|  Spain|     medical college|        75|
+------+----------+----------+----+----------+---------------------------+----------------+


代码6.31

scala> val na_df = spark.read.json("/home/ubuntu/exStudent.json")
na_df: org.apache.spark.sql.DataFrame = [Height: string, Weight: string ... 4 more fields]

scala> na_df.show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight| age|country|institute| name|
+---------+---------+-----+----------+-----------+-------+
|   185|    75|null|  china|     null|  MI |
|   187|    70| 21 |  china|     null|   MU|
|  null|  null|null|  null |     null| null|
|   166|    62|  19|  Japan|      SEM|   MK|
|   164|    55|  20|  china|      SEM|   By|
|   195|    85|  20|  Japan|      SEM|   CY|
+---------+---------+-----+----------+-----------+-------+

scala> na_df.na.drop().show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight|age|country|institute|name|
+---------+---------+-----+----------+-----------+-------+
|  166|     62| 19|  Japan|      SEM|  MK|
|  164|     55| 20|  china|      SEM|  By|
|  195|     85| 20|  Japan|      SEM|  CY|
+---------+---------+-----+----------+-----------+-------+

scala> na_df.na.fill(Map(("age",0),("institute","jsj"),("Height","0"),("Weight","0"),("country","china"),("name","XXX"))).show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight|age|country|institute|name|
+---------+---------+-----+----------+-----------+-------+
|   185|    75|  0|  china|      jsj|  MI|
|   187|    70| 21|  china|      jsj| MU |
|     0|     0|  0|  china|      jsj| XXX|
|   166|    62| 19|  Japan|      SEM| MK |
|   164|    55| 20|  china|      SEM|  By|
|   195|    85| 20|  Japan|      SEM|  CY|
+---------+---------+-----+----------+-----------+-------+


代码6.32


scala> df.collect()
res83: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU], [155,60,25,Portugal,chemical engineering institude,MY], [166,62,19,Japan,SEM,MK], [187,80,24,France,school of materials,Ab], [167,60,21,Russia,school of materials,Ar], [185,75,20,Geneva,medical college,Ad], [168,48,20,china,computer science and technology department,Am], [189,80,20,Spain,chemical engineering institude,Bo], [164,55,20,china,SEM,By], [195,85,20,Japan,SEM,CY])


代码6.33


scala> df.collectAsList()
res84: java.util.List[org.apache.spark.sql.Row] = [[185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU], [155,60,25,Portugal,chemical engineering institude,MY], [166,62,19,Japan,SEM,MK], [187,80,24,France,school of materials,Ab], [167,60,21,Russia,school of materials,Ar], [185,75,20,Geneva,medical college,Ad], [168,48,20,china,computer science and technology department,Am], [189,80,20,Spain,chemical engineering institude,Bo], [164,55,20,china,SEM,By], [195,85,20,Japan,SEM,CY]]


代码6.34

scala> df.count()
res85: Long = 11

代码6.35

scala> df.describe("Height").show()
+------------+----------------------------+
|summary |             Height|
+------------+----------------------------+
|  count |                11 |
|  mean  | 177.0909090909091 |
|  stddev| 13.232192149863495|
|   min  |               155 |
|   max  |               195 |
+-----------+-----------------------------+


代码6.36

scala> df.first()
res87: org.apache.spark.sql.Row = [185,75,20,china,computer science and technology department,MI]

代码6.37

scala> df.head(2)
res88: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU])

代码6.38

scala> df.show(2,false)
+---------+---------+----+-----------+-----------------------------------------------------------+-------+
|Height|Weight|age|country|institute                                |name|
+---------+---------+----+-----------+-----------------------------------------------------------+-------+
|185  |75    |20 |china  |computer science and technology department|MI  |
|187  |70    |21 |Spain  |medical college                           | MU |
+--------+----------+----+-----------+-----------------------------------------------------------+-------+
only showing top 2 rows
scala> df.show(2,true)
+---------+---------+----+----------+--------------------------+-------+
|Height|Weight|age|country|         institute|name|
+---------+---------+----+----------+--------------------------+-------+
|  185|    75| 20|  china|computer science ...|  MI|
|  187|    70| 21|  Spain|     medical college| MU |
+---------+---------+----+----------+---------------------------+------+
only showing top 2 rows


代码6.39

scala> df.take(2)
res91: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU])

代码6.40

//读取parquet格式数据
scala> val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
//保存成Parquet格式
scala> usersDF.write.save("userInfo.parquet")
//也可以选择部分数据保存成Parquet格式
scala> usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

代码6.41

scala> val peopleDf = sparkSession.read.format("json").load("/home/ubuntu/people.json")
peopleDf: org.apache.spark.sql.DataFrame = [age: string, institute: string ... 3 more fields]

scala> peopleDf.select("name","age").write.format("parquet").save("nameAndAgesInfo.parquet")


代码6.42

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable
import java.text.SimpleDateFormat

object SparkSQL01 {
  def main(args: Array[String]): Unit = {
    /** 
      * sparksession
      */
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("test")
      .config("spark.sql.shuffle.partitions", "5")
      .getOrCreate()

/** ************************ student表结构*****************************/
    val studentRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/student.txt")
val StudentSchema: StructType = StructType(mutable.ArraySeq(  //学生表
	  StructField("Sno", StringType, nullable = false),           //学号
      StructField("Sname", StringType, nullable = false),         //学生姓名
      StructField("Ssex", StringType, nullable = false),          //学生性别
      StructField("Sbirthday", StringType, nullable = true),      //学生出生年月
      StructField("SClass", StringType, nullable = true)          //学生所在班级
    ))
val studentData = studentRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
    val studentDF = spark.createDataFrame(studentData,StudentSchema)
    studentDF.createOrReplaceTempView("student")

/** ************************ teacher表结构*****************************/
val teacherRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/teacher.txt")
val TeacherSchema: StructType = StructType(mutable.ArraySeq(  //教师表
      StructField("Tno", StringType, nullable = false),           //教工编号(主键)
      StructField("Tname", StringType, nullable = false),         //教工姓名
      StructField("Tsex", StringType, nullable = false),          //教工性别
      StructField("Tbirthday", StringType, nullable = true),      //教工出生年月
      StructField("Prof", StringType, nullable = true),           //职称
StructField("Depart", StringType, nullable = false)         //教工所在部门
    ))
val teacherData = teacherRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4),attributes(5)))
    val teacherDF = spark.createDataFrame(teacherData,TeacherSchema)
    teacherDF.createOrReplaceTempView("teacher")

/** ************************ course表结构*****************************/
    val courseRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/course.txt")
    val CourseSchema: StructType = StructType(mutable.ArraySeq(   //课程表
      StructField("Cno", StringType, nullable = false),           //课程号
      StructField("Cname", StringType, nullable = false),         //课程名称
      StructField("Tno", StringType, nullable = false)            //教工编号
    ))
val courseData = courseRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2)))
    val courseDF = spark.createDataFrame(courseData,CourseSchema)
    courseDF.createOrReplaceTempView("course")

/** ************************ score表结构*****************************/
    val scoreRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/score.txt")
    val ScoreSchema: StructType = StructType(mutable.ArraySeq(    //成绩表
      StructField("Sno", StringType, nullable = false),           //学号(外键)
      StructField("Cno", StringType, nullable = false),           //课程号(外键)
StructField("Degree", IntegerType, nullable = true)         //成绩
    ))
val scoreData = scoreRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2)))
    val scoreDF = spark.createDataFrame(scoreData,ScoreSchema)
scoreDF.createOrReplaceTempView("score")

/** ************************对各表的处理*****************************/
//按照班级降序排序显示所有学生信息
spark.sql("SELECT * FROM student ORDER BY SClass DESC").show()
//+-----+---------------+-----------+------------+---------+
//|Sno|    Sname|   Ssex|Sbirthday| SClass|
//+-----+---------------+-----------+------------+---------+
//|107|    GuiGui|  male| 1992/5/5| 95033|
//|108|  ZhangSan|  male| 1995/9/1| 95033|
//|106|    LiuBing| female|1996/5/20| 95033|
//|105|KangWeiWei|female| 1996/6/1| 95031|
//|101| WangFeng|   male| 1993/8/8| 95031|
//|109| DuBingYan|   male|1995/5/21| 95031|
//+-----+---------------+-----------+--------------+---------+

//查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("SELECT tname, prof " +
        "FROM Teacher " +
        "WHERE prof NOT IN (SELECT a.prof " +
        "FROM (SELECT prof " +
        "FROM Teacher " +
        "WHERE depart = 'department of computer' " +
        ") a " +
        "JOIN (SELECT prof " +
        "FROM Teacher " +
        "WHERE depart = 'department of electronic engineering' " +
        ") b ON a.prof = b.prof) ").show(false)
//+-----------------+-------------------------+
//|tname     |prof            |
//+-----------------+-------------------------+
//|LinYu      |Associate professor|
//|DuMei     |Assistant professor|
//|RenLi      |Lecturer         |
//|GongMOMO|Associate professor|
//|DuanMu   | Assistant professor|
//+-----------------+-------------------------+

//显示student表中记录数
println(studentDF.count())
//6

//显示student表中名字和性别的信息
studentDF.select("Sname","Ssex").show()
//+-----------------+---------+
//|     Sname|  Ssex|
//+-----------------+---------+
//|   ZhangSan| male|
//|KangWeiWei|female|
//|     GuiGui| male|
//| WangFeng|  male|
//|   LiuBing| female|
//| DuBingYan|  male|
//+--------------+----------+

//显示性别为男的教师信息
teacherDF.filter("Tsex = 'male'").show(false)
//+-----+---------+------+------------+-----------------------+-------------------------------------+
//|Tno|Tname |Tsex|Tbirthday|        Prof   |          Depart       |
//+-----+---------+------+------------+------------------------+------------------------------------+
//|825|LinYu|male|1958/1/1|Associate professor|  department of computer|
//|888|RenLi |male|1972/5/1 |Lecturer |department of electronic engneering|
//|864|DuanMu|male|1985/6/1|Assistant professor|department of computer|
//+-----+---------+------+------------+------------------------+-------------------------------------+

//显示不重复的教师部门信息
teacherDF.select("Depart").distinct().show(false)
//+------------------------------------------------+
//|Depart                        |
//+------------------------------------------------+
//|department of computer          |
//|computer science department     |
//|department of electronic engneering|
//+-------------------------------------------------+

//显示学号为101的学生信息
studentDF.where("Sno = '101'").show()
//+-----+--------------+-----+-------------+--------+
//|Sno|   Sname|Ssex|Sbirthday| SClass|
//+-----+--------------+-----+-------------+---------+
//|101|WangFeng|male|1993/8/8|95031|
//+-----+--------------+-----+-------------+---------+

//将教师信息以List的形式显示
println(teacherDF.collectAsList())
//[[825,LinYu,male,1958/1/1,Associate professor,department of computer],
//[804,DuMei,female,1962/1/1,Assistant professor,computer science department],
//[888,RenLi,male,1972/5/1,Lecturer,department of electronic engineering],
//[852,GongMOMO,female,1986/1/5,Associate professor,computer science department],
//[864,DuanMu,male,1985/6/1,Assistant professor,department of computer]]

//查询所有“女”教师和“女”同学的name、sex和birthday
spark.sql("SELECT sname, ssex, sbirthday " +
"FROM Student " +
        "WHERE ssex = 'female' " +
        "UNION " +
        "SELECT tname, tsex, tbirthday " +
        "FROM Teacher " +
        "WHERE tsex = 'female'").show()
//+-----------------+----------+------------+
//|     sname|   ssex|sbirthday|
//+-----------------+----------+------------+
//| GongMOMO|female| 1986/1/5|
//|KangWeiWei|female| 1996/6/1|
//|    LiuBing|female|1996/5/20|
//|     DuMei|female| 1962/1/1|
//+----------------+----------+-------------+
  }
}