代码6.1
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// 引入spark.implicits._,以便于RDDs和DataFrames之间的隐式转换
import spark.implicits._
代码6.2
scala> val df = spark.read.json("/usr/local/spark-2.3.0-bin-hadoop2.7/examples/src/main/resources/people.json ")
//调用DataFrame的createOrReplaceTempView方法,将df注册成people临时表
scala> df.createOrReplaceTempView("people")
//调用sparkSession提供的sql接口,对people临时表进行sql查询,sql()返回的也是DataFrame对象
scala> val sqlDF = spark.sql("SELECT * FROM people")
scala> sqlDF.show()
// +-----+-----------+
// | age| name|
// +-----+----------+
// |null|Michael|
// | 30 | Andy |
// | 19 | Justin|
// +-----+----------+
代码6.3
//使用sparkSession对象提供的read()方法可读取数据源(read方法返回DataFrameReader对象),进而通过json()方法标识数据源具体类型为Json格式
scala> val df = sparkSession.read.json("/home/ubuntu/student.json")
df: org.apache.spark.sql.DataFrame = [age: string, institute: string ... 3 more fields]
//调用SparkSession.read方法中的通用load方法也可以读取数据源。
//scala> val peopleDf = sparkSession.read.format("json").load("/home/ubuntu/student.json")
// 推导出来的schema,可用printSchema打印出来
scala> df.printSchema()
root
|-- Height: string (nullable = true)
|-- Weight: string (nullable = true)
|-- age: string (nullable = true)
|-- country: string (nullable = true)
|-- institute: string (nullable = true)
|-- name: string (nullable = true)
//在返回的DataFrame对象使用show(n)方法,展示数据集的前n条数据
scala> df.show(6)
+---------+----------+-----+-----------+--------------------------+--------+
| Height| Weight|age| country| institute|name|
+---------+----------+-----+-----------+--------------------------+--------+
| 185| 75| 20| china|computer science ...| MI|
| 187| 70| 21| Spain| medical college| MU|
| 155| 60| 25| Portugal|chemical engineer...| MY|
| 166| 62| 19| Japan| SEM| MK|
| 187| 80| 24| France| school of materials| Ab |
| 167| 60| 21| Russia| school of materials| Ar|
+----------+---------+----+------------+-------------------------+---------+
only showing top 6 rows
// 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
scala> val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
scala> val otherPeople = spark.read.json(otherPeopleDataset)
scala> otherPeople.show()
+----------------------+-------+
| address|name|
+----------------------+-------+
|[Columbus,Ohio]| Yin|
+----------------------+-------+
代码6.4
//常见类的编码器可以通过导入spark.implicits._自动提供
scala> import spark.implicits._
scala> val peopleDF = spark.read.json("/usr/local/spark-2.3.0-bin-hadoop2.7/examples/src
/main/resources/people.json")
// peopleDF保存为parquet文件时,依然会保留着结构信息
scala> peopleDF.write.parquet("people.parquet")
//读取创建的people.parquet文件,Parquet文件是自描述的,所以结构信息被保留。
//读取Parquet文件的结果是已经具有完整结构信息的DataFrame对象
val parquetFileDF = spark.read.parquet("people.parquet")
//因为Spark SQL的默认数据源格式为Parquet格式,所以读取格式可为:
//val parquetFileDF = spark.read.load("people.parquet ")
//可以使用SQL直接查询Parquet文件,查询地址为该Parquet文件的存放位置。
scala> val sqlDF = spark.sql("SELECT * FROM parquet.`/home/ubuntu/people.parquet`")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+-----+-----------+
| age| name|
+-----+-----------+
|null| Michael|
| 30| Andy|
| 19| Justin|
+----+-----------+
// Parquet 文件也可以用来创建临时视图,然后在SQL语句中使用
scala> parquetFileDF.createOrReplaceTempView("parquetFile")
scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
scala> namesDF.map(attributes => "Name: " + attributes(0)).show()
// +-----------------+
// | value|
// +-----------------+
// |Name: Justin|
// +-----------------+
代码6.5
//引用spark.implicits._用于将RDD隐式转换为DataFrame
scala> import spark.implicits._
// 创建一个的DataFrame,存储到一个分区目录(data/test_table/key=1)
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
scala> squaresDF.write.parquet("data/test_table/key=1")
// 创建一个新的DataFrame,将其存储到相同表下的新的分区目录(data/test_table/key=2)
// 增加了一个cube列,去掉了一个已存在的square列
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF.write.parquet("data/test_table/key=2")
//读取分区表,自动实现了两个分区(key=1/2)的合并
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
//通过基础DataFrame函数,以树格式打印Schema,包含分区目录下全部的分区表
scala> mergedDF.printSchema()
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
代码6.6
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation指向托管数据库和表的默认位置
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 使用HiveQL进行查询
sql("SELECT * FROM src").show()
// +----+-----------+
// |key| value|
// +----+-----------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 包含着Hive聚合函数COUNT()的查询依然被支持
sql("SELECT COUNT(*) FROM src").show()
// +-----------+
// |count(1)|
// +-----------+
// | 500 |
// +-----------+
// SQL查询的结果本身就是DataFrame,并支持所有正常的功能
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// DataFrame中的元素是Row类型的,允许按顺序访问每个列
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +-------------------------+
// | value|
// +-------------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 也可以使用DataFrame在 SparkSession中创建临时视图
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
//sql查询中可以对DataFrame注册的临时表和Hive表执行Join连接操作
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +-----+-------+-----+-------+
// |key| value|key| value|
// +-----+-------+-----+-------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
//使用HQL语法而不是Spark SQL本机语法创建Hive托管Parquet表
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET") //保存DataFrame到Hive托管表中
val df = spark.table("src") df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
sql("SELECT * FROM hive_records").show() // +---+-------+ // |key| value| // +---+-------+ // |238|val_238|
// | 86| val_86| // |311|val_311|
// ... val dataDir = "/tmp/parquet_data" spark.range(10).write.parquet(dataDir)
//创建一个Hive额外的Parquet表
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'") sql("SELECT * FROM hive_ints").show() // +---+ // |key| // +---+
// | 0| // | 1| // | 2| // ... // 打开Hive动态分区的标志 spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// 使用DataFrame API创建Hive分区表 df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// 分区列’key’被移至schema的末尾 sql("SELECT * FROM hive_part_tbl").show() // +----------+-----+ // | value|key| // +----------+-----+ // |val_238|238| // | val_86 | 86 | // |val_311|311| // ...
spark.stop()
代码6.7
spark-shell --jars /usr/local/spark-2.3.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark-2.3.0-bin-hadoop2.7/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
代码6.8
ubuntu@ubuntu:~$ service mysql start
ubuntu@ubuntu:~$ mysql -u root -p
代码6.9
mysql> create database student;
mysql> use student;
mysql> create table stu(id int(10) auto_increment not null primary key,name varchar(10),country varchar(20),Height Double,Weight Double);
mysql> describe stu;
+------------+------------------+-------+------+-----------+----------------------+
| Field | Type | Null | Key | Default | Extra |
+------------+------------------+-------+------+-----------+----------------------+
| id |int(10) | NO| PRI | NULL | auto_increment |
| name | varchar(10) | YES| | NULL | |
|country | varchar(20) | YES| | NULL | |
| Height |double | YES| | NULL | |
|Weight |double | YES| | NULL | |
+------------+------------------+--------+-----+-----------+----------------------+
5 rows in set (0.00 sec)
mysql> insert into stu values(null,'MI','china','180','60');
mysql> insert into stu values(null,'UI','UK','160','50');
mysql> insert into stu values(null,'DI','UK','165','55');
mysql> insert into stu values(null,'Bo','china','167','45');
mysql> select * from stu;
+---+-------+----------+----------+---------+
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1 |MI | china | 180 | 60 |
| 2 | UI | UK | 160 | 50 |
| 3 | DI | UK | 165 | 55 |
| 4 |Bo | china | 167 | 45 |
+---+-------+----------+----------+---------+
4 rows in set (0.00 sec)
代码6.10
//创建Properties类对象需要的包
scala> import java.util.Properties
import java.util.Properties
scala> val jdbcDF = spark.read
//识别读取的是JDBC数据源。
.format("jdbc")
//要连接的JDBC URL属性,其中student是创建的数据库名。
.option("url","jdbc:mysql://localhost:3306/student")
// driver部分是Spark SQL访问数据库的具体驱动类名。
.option("driver","com.mysql.jdbc.Driver")
//dbtable部分是需要访问的student库中的表stu。
.option("dbtable","stu")
//user部分是用于访问mysql数据库的用户
.option("user","root")
//password部分是该用户访问数据库的密码
.option("password","mysql")
.load()
scala> jdbcDF.show()
+---+-------+----------+----------+---------+
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1| MI| china| 180.0| 60.0|
| 2| UI | UK| 160.0| 50.0|
| 3| DI | UK| 165.0| 55.0|
| 4| Bo| china| 167.0| 45.0|
+---+-------+----------+----------+---------+
//实例化Properties类对象,并添加相应的JDBC连接属性以键值对形式
scala> val connectionProperties = new Properties()
connectionProperties: java.util.Properties = {}
//将user属性和password属性添加至Properties类对象中。
scala> connectionProperties.put("user","root")
scala> connectionProperties.put("password","mysql")
//addstu是含有需要写入stu表中的数据的DataFrame。
scala> val addstu = spark.read.json("/home/ubuntu/Desktop/stu.json")
scala> addstu.show()
+--------+----------+----------+-------+
|Height|Weight|country|name|
+--------+----------+----------+-------+
| 168| 48| china| Am|
| 189| 80| Spain| Bo |
+--------+----------+----------+-------+
scala> addstu.write
.mode("append")
.format("jdbc")
.option("url", " jdbc:mysql://localhost:3306/student ")
.option("dbtable", "stu")
.option("user", "root")
.option("password", "mysql")
.save()
//与读取JDBC数据源相同,也可以将connectionProperties对象传入write.jdbc()方法中来实现数据表的写入
scala> addstu.write
.mode("append")
.jdbc("jdbc:mysql://localhost:3306/student","student.stu",connectionProperties)
代码6.11
mysql> select * from stu;
+---+-------+----------+----------+---------+
| id|name|country| Height|Weight|
+---+-------+----------+----------+---------+
| 1| MI| china| 180| 60|
| 2| UI | UK| 160| 50|
| 3| DI | UK| 165| 55|
| 4| Bo| china| 167| 45|
|5 | Am | china | 168| 48|
|6 | Bo|Spain | 189| 80|
+---+-------+----------+----------+---------+
6 rows in set (0.00 sec)
代码6.12
scala> case class student(name:String,age:Int,Height:Int,Weight:Int)
defined class student
scala> import spark.implicits._
import spark.implicits._
scala> val stuRDD = spark.sparkContext.textFile("/home/ubuntu/student.txt").map(_.split(",")).map(elements=>student(elements(0),elements(1).trim.toInt,elements(4).trim.toInt,elements(5).trim.toInt))
stuRDD: org.apache.spark.rdd.RDD[student] = MapPartitionsRDD[13] at map at <console>:28
scala> val stuDF = stuRDD.toDF()
stuDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]
scala> stuDF.createOrReplaceTempView("student")
scala> val stu_H_W = spark.sql("SELECT name,age,Height,Weight FROM student WHERE age BETWEEN 13 AND 19")
stu_H_W: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]
scala> stu_H_W.show()
+-------+-----+---------+---------+
|name|age|Height|Weight|
+-------+-----+---------+---------+
| MK| 19| 166| 62|
| CT | 18| 169| 60|
+-------+----+--------+----------+
代码6.13
//导入Spark SQL的data types包
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
//导入Spark SQL的Row包
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
// 创建peopleRDD
scala> val stuRDD = spark.sparkContext.textFile("/home/ubuntu/student.txt")
stuRDD: org.apache.spark.rdd.RDD[String] = /home/ubuntu/student.txt MapPartitionsRDD[19] at textFile at <console>:30
// schema字符串
scala> val schemaString = "name age country"
schemaString: String = name age country
//将schema字符串按空格分隔返回字符串数组,对字符串数组进行遍历,并对数组中的每一个元素进一步封装成StructField对象,进而构成了Array[StructField]
scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,StringType,true), StructField(country,StringType,true))
//将fields强制转换为StructType对象,形成了可用于构建DataFrame对象的Schema
scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,StringType,true), StructField(country,StringType,true))
//将peopleRDD(RDD[String])转化为RDD[Rows]
scala> val rowRDD = stuRDD.map(_.split(",")).map(elements => Row(elements(0),elements(1).trim,elements(2)))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[21] at map at <console>:32
//将schema应用到rowRDD上,完成DataFrame的转换
scala> val stuDF = spark.createDataFrame(rowRDD,schema)
stuDF: org.apache.spark.sql.DataFrame = [name: string, age: string ... 1 more field]
//可以对stuDF直接操作
scala> stuDF.show(9)
+-------+-----+----------+
|name|age|country|
+-------+-----+----------+
| MI| 20| china |
| MU| 21| Spain |
| MY| 25|Portugal|
| MK| 19| Japan |
| Ab| 24| France |
| Ar| 21| Russia|
| Ad| 20| Geneva |
| Am| 20| china |
| Bo| 20| Spain |
+-------+----+-----------+
//也可将stuDF注册成临时表“student”, 调用sql接口,运行SQL表达式,进行SQL
//查询,sql()返回值依然是DataFrame对象。
scala> stuDF.createOrReplaceTempView("student")
scala> val results = spark.sql("SELECT name,age,country FROM student WHERE age BETWEEN 13 and 19").show()
+-------+-----+----------+
|name|age|country |
+-------+-----+----------+
| MK| 19| Japan |
| CT| 18| France |
+-------+-----+----------+
代码6.14
scala> df.agg("age" -> "mean","Height" -> "min","Weight" -> "max").show()
+---------------------------+----------------+-----------------+
| avg(age)|min(Height)|max(Weight)|
+---------------------------+----------------+-----------------+
|20.90909090909091| 155| 85|
+---------------------------+----------------+-----------------+
代码6.15
scala> df("Height")
res11: org.apache.spark.sql.Column = Height
scala> df.apply("Weight")
res12: org.apache.spark.sql.Column = Weight
代码6.16
scala> df.col("name")
res4: org.apache.spark.sql.Column = name
代码6.17
scala> df.select(df("name"),df("Weight") as "Weight_KG").show(6)
+--------+---------------+
| name|Weight_KG|
+--------+---------------+
| MI | 75|
| MU | 70|
| MY | 60|
| MK | 62|
| Ab | 80|
| Ar | 60|
+-------+---------------+
only showing top 6 rows
scala> df.select(df("name"),df("Weight")*2 as "Weight_Jin").show(6)
+-------+---------------+
|name |Weight_Jin|
+-------+---------------+
| MI | 150.0|
| MU | 140.0|
| MY | 120.0|
| MK | 124.0|
| Ab | 160.0|
| Ar | 120.0|
+-------+---------------+
only showing top 6 rows
代码6.18
scala> df.select("age").show(6)
+----+
|age|
+----+
| 20|
| 21|
| 25|
| 19|
| 24|
| 21|
+----+
only showing top 6 rows
scala> df.select("age").distinct.show()
+----+
|age|
+----+
| 19|
| 25|
| 24|
| 20|
| 21|
+----+
代码6.19
scala> df.printSchema()
root
|-- Height: string (nullable = true)
|-- Weight: string (nullable = true)
|-- age: string (nullable = true)
|-- country: string (nullable = true)
|-- institute: string (nullable = true)
|-- name: string (nullable = true)
scala> df.drop("institute").printSchema()
root
|-- Height: string (nullable = true)
|-- Weight: string (nullable = true)
|-- age: string (nullable = true)
|-- country: string (nullable = true)
|-- name: string (nullable = true)
代码6.20
scala> val newdf = spark.read.json("/home/ubuntu/newstudent.json")
newdf: org.apache.spark.sql.DataFrame = [Height: string, Weight: string ... 4 more fields]
scala> newdf.show(false)
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|Height|Weight|age|country|institute |name|
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|185 |75 |20 |china |computer science and technology department|MI |
+--------+----------+----+----------+-------------------------------------------------------------+-------+
scala> df.show(false)
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|Height|Weight|age|country |institute |name|
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|185 |75 |20 |china |computer science and technology department | MI |
|187 |70 |21 |Spain |medical college | MU |
|155 |60 |25 |Portugal|chemical engineering institude | MY |
|166 |62 |19 |Japan |SEM | MK |
|187 |80 |24 | France |school of materials | Ab |
|167 |60 |21 | Russia |school of materials | Ar |
|185 |75 |20 |Geneva |medical college | Ad |
|168 |48 |20 |china |computer science and technology department | Am |
|189 |80 |20 |Spain |chemical engineering institude | Bo |
|164 |55 |20 |china |SEM | By |
|195 |85 |20 |Japan |SEM | CY |
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
scala> df.except(newdf).show(false)
+----------+----------+-----+----------+------------------------------------------------------------+-------+
| Height | Weight|age|country|institute |name|
+----------+----------+-----+----------+------------------------------------------------------------+-------+
|166 |62 |19 |Japan |SEM | MK |
|155 |60 |25 |Portugal|chemical engineering institude | MY |
|164 |55 |20 |china |SEM | By |
|195 |85 |20 |Japan |SEM | CY |
|187 |70 |21 |Spain |medical college | MU |
|168 |48 |20 |china |computer science and technology department | Am |
|187 |80 |24 | France |school of materials | Ab |
|185 |75 |20 |Geneva |medical college | Ad |
|189 |80 |20 |Spain |chemical engineering institude | Bo |
|167 |60 |21 | Russia |school of materials | Ar |
+----------+----------+-----+----------+------------------------------------------------------------+-------+
代码6.21
scala> df.filter("age >24 ").show(false)
+---------+---------+----+-----------+-----------------------------------------+-------+
|Height|Weight|age|country |institute |name|
+---------+---------+----+-----------+-----------------------------------------+-------+
|155 |60 |25 |Portugal |chemical engineering institude |MY |
+---------+---------+----+-----------+-----------------------------------------+-------+
代码6.22
scala> df.groupBy("country").agg("Height" -> "mean").show()
+----------+----------------------------+
|country | avg(Height)|
+----------+----------------------------+
| Russia | 167.0|
| France | 187.0|
| Spain | 188.0|
| Geneva | 185.0|
| Japan | 180.5|
| china |172.33333333333334|
|Portugal| 155.0|
+-----------+----------------------------+
代码6.23
scala> df.groupBy("country").count().show()
+----------+-------+
|country |count|
+----------+-------+
| Russia | 1|
| France | 1|
| Spain | 2|
|Geneva | 1|
| Japan | 2|
| china | 3|
|Portugal| 1|
+-----------+-------+
代码6.24
scala> df.intersect(newdf).show(false)
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|Height|Weight|age|country|institute |name|
+--------+----------+----+----------+-------------------------------------------------------------+-------+
|185 |75 |20 |china |computer science and technology department|MI |
+--------+----------+----+----------+-------------------------------------------------------------+-------+
代码6.25
scala> df.limit(3).show(false)
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|Height|Weight|age|country |institute | name|
+---------+---------+-----+----------+--------------------------------------------------------------+-------+
|185 |75 |20 |china |computer science and technology department| MI |
|187 |70 |21 |Spain |medical college | MU |
|155 |60 |25 |Portugal|chemical engineering institude | MY |
+---------+----------+----+-----------+-------------------------------------------------------------+-------+
代码6.26
scala> df.orderBy("age","Height").show(false)
+---------+---------+-----+----------+------------------------------------------------------------+-------+
|Height|Weight|age|country |institute |name|
+---------+---------+-----+----------+-------------------------------------------------------------+------+
|166 |62 |19 |Japan |SEM | MK |
|164 |55 |20 |china |SEM | By |
|168 |48 |20 |china |computer science and technology department| Am |
|185 |75 |20 |china |computer science and technology department| MI |
|185 |75 |20 | Geneva |medical college | Ad |
|189 |80 |20 |Spain |chemical engineering institude | Bo |
|195 |85 |20 | Japan |SEM | CY |
|167 |60 |21 |Russia |school of materials | Ar |
|187 |70 |21 |Spain |medical college | MU |
|187 |80 |24 |France |school of materials | Ab |
|155 |60 |25 |Portugal|chemical engineering institude | MY |
+---------+---------+-----+----------+-------------------------------------------------------------+------+
代码6.27
scala> df.sort($"age".desc).show(false)
+---------+---------+----+-----------+------------------------------------------------------------+-------+
|Height|Weight|age|country |institute |name|
+---------+---------+----+-----------+------------------------------------------------------------+-------+
|155 |60 |25 |Portugal|chemical engineering institude |MY |
|187 |80 |24 |France |school of materials |Ab |
|187 |70 |21 |Spain |medical college |MU |
|167 |60 |21 |Russia |school of materials |Ar |
|185 |75 |20 |china |computer science and technology department|MI |
|185 |75 |20 | Geneva |medical college |Ad |
|164 |55 |20 |china |SEM |By |
|189 |80 |20 |Spain |chemical engineering institude |Bo |
|168 |48 |20 |china |computer science and technology department|Am |
|195 |85 |20 |Japan |SEM |CY |
|166 |62 |19 |Japan |SEM |MK |
+--------+---------+----+-------------+--------------------------------------------------------------+-------+
代码6.28
scala> df.sample(true,0.5).show()
+--------+----------+----+----------+------------------------+--------+
|Height|Weight|age|country| institute|name|
+--------+----------+----+----------+------------------------+--------+
| 187| 80| 24| France| school of materials| Ab|
| 187| 80| 24| France| school of materials| Ab|
| 189| 80| 20| Spain |chemical engineer...| Bo|
+--------+----------+----+----------+------------------------+--------+
scala> df.sample(false,0.5).show()
+--------+----------+----+----------+--------------------------+-------+
|Height|Weight|age|country| institute|name|
+--------+----------+----+----------+--------------------------+--------+
| 187| 80| 24| France | school of materials| Ab|
| 168| 48| 20| china|computer science ...| Am|
| 164| 55| 20| china| SEM| By|
+--------+----------+----+----------+--------------------------+--------+
代码6.29
scala> df.where("name = 'Ar'").show()
+--------+----------+----+----------+------------------------+--------+
|Height|Weight|age|country| institute |name|
+--------+----------+----+----------+------------------------+--------+
| 167| 60| 21| Russia |school of materials| Ar|
+--------+----------+----+----------+------------------------+--------+
scala> df.where($"age">24).show()
+--------+----------+----+----------+--------------------------+--------+
|Height|Weight|age|country| institute | name|
+--------+----------+----+----------+--------------------------+--------+
| 155| 60| 25|Portugal|chemical engineer...| MY|
+--------+----------+----+----------+--------------------------+--------+
代码6.30
scala> val joindf = spark.read.json("/home/ubuntu/Desktop/joininfo.json")
joindf: org.apache.spark.sql.DataFrame = [math_score: bigint, name: string]
scala> joindf.show()
+---------------+--------+
|math_score|name|
+---------------+--------+
| 90| MI|
| 75| MU|
| 90| OOO |
+---------------+--------+
scala> df.join(joindf,"name").show()
+------+----------+----------+----+----------+---------------------------+----------------+
|name|Height|Weight|age|country| institute|math_score|
+------+----------+----------+----+----------+---------------------------+----------------+
| MI| 185| 75| 20| china|computer science ...| 90|
| MU | 187| 70| 21| Spain| medical college| 75|
+------+----------+----------+----+----------+---------------------------+----------------+
代码6.31
scala> val na_df = spark.read.json("/home/ubuntu/exStudent.json")
na_df: org.apache.spark.sql.DataFrame = [Height: string, Weight: string ... 4 more fields]
scala> na_df.show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight| age|country|institute| name|
+---------+---------+-----+----------+-----------+-------+
| 185| 75|null| china| null| MI |
| 187| 70| 21 | china| null| MU|
| null| null|null| null | null| null|
| 166| 62| 19| Japan| SEM| MK|
| 164| 55| 20| china| SEM| By|
| 195| 85| 20| Japan| SEM| CY|
+---------+---------+-----+----------+-----------+-------+
scala> na_df.na.drop().show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight|age|country|institute|name|
+---------+---------+-----+----------+-----------+-------+
| 166| 62| 19| Japan| SEM| MK|
| 164| 55| 20| china| SEM| By|
| 195| 85| 20| Japan| SEM| CY|
+---------+---------+-----+----------+-----------+-------+
scala> na_df.na.fill(Map(("age",0),("institute","jsj"),("Height","0"),("Weight","0"),("country","china"),("name","XXX"))).show()
+---------+---------+-----+----------+-----------+-------+
|Height|Weight|age|country|institute|name|
+---------+---------+-----+----------+-----------+-------+
| 185| 75| 0| china| jsj| MI|
| 187| 70| 21| china| jsj| MU |
| 0| 0| 0| china| jsj| XXX|
| 166| 62| 19| Japan| SEM| MK |
| 164| 55| 20| china| SEM| By|
| 195| 85| 20| Japan| SEM| CY|
+---------+---------+-----+----------+-----------+-------+
代码6.32
scala> df.collect()
res83: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU], [155,60,25,Portugal,chemical engineering institude,MY], [166,62,19,Japan,SEM,MK], [187,80,24,France,school of materials,Ab], [167,60,21,Russia,school of materials,Ar], [185,75,20,Geneva,medical college,Ad], [168,48,20,china,computer science and technology department,Am], [189,80,20,Spain,chemical engineering institude,Bo], [164,55,20,china,SEM,By], [195,85,20,Japan,SEM,CY])
代码6.33
scala> df.collectAsList()
res84: java.util.List[org.apache.spark.sql.Row] = [[185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU], [155,60,25,Portugal,chemical engineering institude,MY], [166,62,19,Japan,SEM,MK], [187,80,24,France,school of materials,Ab], [167,60,21,Russia,school of materials,Ar], [185,75,20,Geneva,medical college,Ad], [168,48,20,china,computer science and technology department,Am], [189,80,20,Spain,chemical engineering institude,Bo], [164,55,20,china,SEM,By], [195,85,20,Japan,SEM,CY]]
代码6.34
scala> df.count()
res85: Long = 11
代码6.35
scala> df.describe("Height").show()
+------------+----------------------------+
|summary | Height|
+------------+----------------------------+
| count | 11 |
| mean | 177.0909090909091 |
| stddev| 13.232192149863495|
| min | 155 |
| max | 195 |
+-----------+-----------------------------+
代码6.36
scala> df.first()
res87: org.apache.spark.sql.Row = [185,75,20,china,computer science and technology department,MI]
代码6.37
scala> df.head(2)
res88: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU])
代码6.38
scala> df.show(2,false)
+---------+---------+----+-----------+-----------------------------------------------------------+-------+
|Height|Weight|age|country|institute |name|
+---------+---------+----+-----------+-----------------------------------------------------------+-------+
|185 |75 |20 |china |computer science and technology department|MI |
|187 |70 |21 |Spain |medical college | MU |
+--------+----------+----+-----------+-----------------------------------------------------------+-------+
only showing top 2 rows
scala> df.show(2,true)
+---------+---------+----+----------+--------------------------+-------+
|Height|Weight|age|country| institute|name|
+---------+---------+----+----------+--------------------------+-------+
| 185| 75| 20| china|computer science ...| MI|
| 187| 70| 21| Spain| medical college| MU |
+---------+---------+----+----------+---------------------------+------+
only showing top 2 rows
代码6.39
scala> df.take(2)
res91: Array[org.apache.spark.sql.Row] = Array([185,75,20,china,computer science and technology department,MI], [187,70,21,Spain,medical college,MU])
代码6.40
//读取parquet格式数据
scala> val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
//保存成Parquet格式
scala> usersDF.write.save("userInfo.parquet")
//也可以选择部分数据保存成Parquet格式
scala> usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
代码6.41
scala> val peopleDf = sparkSession.read.format("json").load("/home/ubuntu/people.json")
peopleDf: org.apache.spark.sql.DataFrame = [age: string, institute: string ... 3 more fields]
scala> peopleDf.select("name","age").write.format("parquet").save("nameAndAgesInfo.parquet")
代码6.42
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import scala.collection.mutable
import java.text.SimpleDateFormat
object SparkSQL01 {
def main(args: Array[String]): Unit = {
/**
* sparksession
*/
val spark = SparkSession
.builder()
.master("local")
.appName("test")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
/** ************************ student表结构*****************************/
val studentRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/student.txt")
val StudentSchema: StructType = StructType(mutable.ArraySeq( //学生表
StructField("Sno", StringType, nullable = false), //学号
StructField("Sname", StringType, nullable = false), //学生姓名
StructField("Ssex", StringType, nullable = false), //学生性别
StructField("Sbirthday", StringType, nullable = true), //学生出生年月
StructField("SClass", StringType, nullable = true) //学生所在班级
))
val studentData = studentRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
val studentDF = spark.createDataFrame(studentData,StudentSchema)
studentDF.createOrReplaceTempView("student")
/** ************************ teacher表结构*****************************/
val teacherRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/teacher.txt")
val TeacherSchema: StructType = StructType(mutable.ArraySeq( //教师表
StructField("Tno", StringType, nullable = false), //教工编号(主键)
StructField("Tname", StringType, nullable = false), //教工姓名
StructField("Tsex", StringType, nullable = false), //教工性别
StructField("Tbirthday", StringType, nullable = true), //教工出生年月
StructField("Prof", StringType, nullable = true), //职称
StructField("Depart", StringType, nullable = false) //教工所在部门
))
val teacherData = teacherRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4),attributes(5)))
val teacherDF = spark.createDataFrame(teacherData,TeacherSchema)
teacherDF.createOrReplaceTempView("teacher")
/** ************************ course表结构*****************************/
val courseRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/course.txt")
val CourseSchema: StructType = StructType(mutable.ArraySeq( //课程表
StructField("Cno", StringType, nullable = false), //课程号
StructField("Cname", StringType, nullable = false), //课程名称
StructField("Tno", StringType, nullable = false) //教工编号
))
val courseData = courseRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2)))
val courseDF = spark.createDataFrame(courseData,CourseSchema)
courseDF.createOrReplaceTempView("course")
/** ************************ score表结构*****************************/
val scoreRDD = spark.sparkContext.textFile("/home/ubuntu01/SqlExample/score.txt")
val ScoreSchema: StructType = StructType(mutable.ArraySeq( //成绩表
StructField("Sno", StringType, nullable = false), //学号(外键)
StructField("Cno", StringType, nullable = false), //课程号(外键)
StructField("Degree", IntegerType, nullable = true) //成绩
))
val scoreData = scoreRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2)))
val scoreDF = spark.createDataFrame(scoreData,ScoreSchema)
scoreDF.createOrReplaceTempView("score")
/** ************************对各表的处理*****************************/
//按照班级降序排序显示所有学生信息
spark.sql("SELECT * FROM student ORDER BY SClass DESC").show()
//+-----+---------------+-----------+------------+---------+
//|Sno| Sname| Ssex|Sbirthday| SClass|
//+-----+---------------+-----------+------------+---------+
//|107| GuiGui| male| 1992/5/5| 95033|
//|108| ZhangSan| male| 1995/9/1| 95033|
//|106| LiuBing| female|1996/5/20| 95033|
//|105|KangWeiWei|female| 1996/6/1| 95031|
//|101| WangFeng| male| 1993/8/8| 95031|
//|109| DuBingYan| male|1995/5/21| 95031|
//+-----+---------------+-----------+--------------+---------+
//查询“计算机系”与“电子工程系“不同职称的教师的Tname和Prof。
spark.sql("SELECT tname, prof " +
"FROM Teacher " +
"WHERE prof NOT IN (SELECT a.prof " +
"FROM (SELECT prof " +
"FROM Teacher " +
"WHERE depart = 'department of computer' " +
") a " +
"JOIN (SELECT prof " +
"FROM Teacher " +
"WHERE depart = 'department of electronic engineering' " +
") b ON a.prof = b.prof) ").show(false)
//+-----------------+-------------------------+
//|tname |prof |
//+-----------------+-------------------------+
//|LinYu |Associate professor|
//|DuMei |Assistant professor|
//|RenLi |Lecturer |
//|GongMOMO|Associate professor|
//|DuanMu | Assistant professor|
//+-----------------+-------------------------+
//显示student表中记录数
println(studentDF.count())
//6
//显示student表中名字和性别的信息
studentDF.select("Sname","Ssex").show()
//+-----------------+---------+
//| Sname| Ssex|
//+-----------------+---------+
//| ZhangSan| male|
//|KangWeiWei|female|
//| GuiGui| male|
//| WangFeng| male|
//| LiuBing| female|
//| DuBingYan| male|
//+--------------+----------+
//显示性别为男的教师信息
teacherDF.filter("Tsex = 'male'").show(false)
//+-----+---------+------+------------+-----------------------+-------------------------------------+
//|Tno|Tname |Tsex|Tbirthday| Prof | Depart |
//+-----+---------+------+------------+------------------------+------------------------------------+
//|825|LinYu|male|1958/1/1|Associate professor| department of computer|
//|888|RenLi |male|1972/5/1 |Lecturer |department of electronic engneering|
//|864|DuanMu|male|1985/6/1|Assistant professor|department of computer|
//+-----+---------+------+------------+------------------------+-------------------------------------+
//显示不重复的教师部门信息
teacherDF.select("Depart").distinct().show(false)
//+------------------------------------------------+
//|Depart |
//+------------------------------------------------+
//|department of computer |
//|computer science department |
//|department of electronic engneering|
//+-------------------------------------------------+
//显示学号为101的学生信息
studentDF.where("Sno = '101'").show()
//+-----+--------------+-----+-------------+--------+
//|Sno| Sname|Ssex|Sbirthday| SClass|
//+-----+--------------+-----+-------------+---------+
//|101|WangFeng|male|1993/8/8|95031|
//+-----+--------------+-----+-------------+---------+
//将教师信息以List的形式显示
println(teacherDF.collectAsList())
//[[825,LinYu,male,1958/1/1,Associate professor,department of computer],
//[804,DuMei,female,1962/1/1,Assistant professor,computer science department],
//[888,RenLi,male,1972/5/1,Lecturer,department of electronic engineering],
//[852,GongMOMO,female,1986/1/5,Associate professor,computer science department],
//[864,DuanMu,male,1985/6/1,Assistant professor,department of computer]]
//查询所有“女”教师和“女”同学的name、sex和birthday
spark.sql("SELECT sname, ssex, sbirthday " +
"FROM Student " +
"WHERE ssex = 'female' " +
"UNION " +
"SELECT tname, tsex, tbirthday " +
"FROM Teacher " +
"WHERE tsex = 'female'").show()
//+-----------------+----------+------------+
//| sname| ssex|sbirthday|
//+-----------------+----------+------------+
//| GongMOMO|female| 1986/1/5|
//|KangWeiWei|female| 1996/6/1|
//| LiuBing|female|1996/5/20|
//| DuMei|female| 1962/1/1|
//+----------------+----------+-------------+
}
}