- DataSet is a distributed collection of typed data.
- It needs a encoder which will convert your scala types to Spark Internal types.Now i know DataFrame is just a DataSet[Row] ,but below i will talk as if they are two separate things just for my understanding to understand its subtle differences.In package.scala of sql => type DataFrame = Dataset[Row]
- Things to note below,when you read a external data ie SparkSession read method will still return a DataFrameReader,so you would have first create a dataframe and then use a Encoder (with as) and convert that to a Scala object(below Employee object)
bash-4.1$ hadoop fs -cat /user/loc1/test1/vin.txt
id,name,age,amt
1,vinyas,28,76.34
2,shetty,30,67.2
3,namratha,28,65.2
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> val sch = StructType(StructField("id",IntegerType)::StructField("name",StringType)::StructField("age",IntegerType)::StructField("amt",DoubleType)::Nil)
sch: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(amt,DoubleType,true))
scala> val df1 = spark.read.schema(sch).option("header",true).csv("hdfs://nn1/user/lg489741/test1/vin.txt")
scala> case class Employee(id:Int,name:String,age:Int,amt:Double)
defined class Employee
scala> val ds1 = df1.as[Employee]
ds1: org.apache.spark.sql.Dataset[Employee] = [id: int, name: string ... 2 more fields]
Now DataSet will have all the methods that was available on dataframe but we will certain minor changes:
When you do operation like *select, join ,agg,explode,withColumn,withColumnRenamed,drop,describe,summary on DataSet, it will return a DataFrame. Seems like whenever there is a possibility of the Columns changing then such datasets return a Dataframe which would make sense since we don't know what new structure we would get.
To convert that back to DataSet you need again use a encoder
scala> ds1.select($"name",$"age")
res40: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> case class Emp(name:String,age:Int)
defined class Emp
scala> ds1.select($"name",$"age").as[Emp]
res41: org.apache.spark.sql.Dataset[Emp] = [name: string, age: int]
The Encoder object name and type should match with dataframe columns ,but you can have fewer and order also can be different.We will talk about map further.See the ordering and number of columns is different.
scala> case class Emp1(id:Int,name:String)
defined class Emp1
scala> ds1.select($"name",$"age",$"id").as[Emp1]
res49: org.apache.spark.sql.Dataset[Emp1] = [name: string, age: int ... 1 more field]
scala> ds1.select($"name",$"age",$"id").as[Emp1].map(x=>x.id).show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
scala> ds1.select($"name",$"age",$"id").as[Emp1].map(x=>x.name).show
+--------+
| value|
+--------+
| vinyas|
| shetty|
|namratha|
+--------+
scala> ds1.select($"name",$"age",$"id").as[Emp1].map(x=>x.age).show
<console>:37: error: value age is not a member of Emp1
ds1.select($"name",$"age",$"id").as[Emp1].map(x=>x.age).show
^
We can select on DataSet return a DataSet but this will have a encoder of Tuple.
scala> val ds99 = ds1.select($"name".as[String],$"age".as[Int]) //This can go only upto 5 columns.
ds99: org.apache.spark.sql.Dataset[(String, Int)] = [name: string, age: int]
//Can still do this
scala> ds1.select($"name".as[String],$"age".as[Int]).select($"name")
res58: org.apache.spark.sql.DataFrame = [name: string]
//But not below since x is a tuple now
scala> ds99.map(x=>x.name)
<console>:37: error: value name is not a member of (String, Int)
ds99.map(x=>x.name)
^
scala> ds99.map(x=>x._1)
res61: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds99.map(x=>x._2)
res62: org.apache.spark.sql.Dataset[Int] = [value: int]
Special "joinWith" which returns a DataSet[(T,U)] .We can joinWith two DataSets of type T and U and it returns a DataSet[(T,U)]
scala> val j1 = ds1.joinWith(ds2,ds1("id") <=> ds2("id"))
j1: org.apache.spark.sql.Dataset[(Employee, Employee)] = [_1: struct<id: int, name: string ... 2 more fields>, _2: struct<id: int, name: string ... 2 more fields>]
//As you see it actually returns a DataSet of type Tuple2[T,U] where T is type of DataSet1
// which is joined with DataSet2 of type U.The columns names are _1 and _2.This is same behaviour as above.
scala> j1.schema
res66: org.apache.spark.sql.types.StructType = StructType(StructField(_1,StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(amt,DoubleType,true)),false), StructField(_2,StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(age,IntegerType,true), StructField(amt,DoubleType,true)),false))
scala> j1.columns
res67: Array[String] = Array(_1, _2)
scala> j1.map(x=>x)
res68: org.apache.spark.sql.Dataset[(Employee, Employee)] = [_1: struct<id: int, name: string ... 2 more fields>, _2: struct<id: int, name: string ... 2 more fields>]
scala> j1.map(x=>x._1)
res69: org.apache.spark.sql.Dataset[Employee] = [id: int, name: string ... 2 more fields]
scala> j1.select($"_1"("name"))
res73: org.apache.spark.sql.DataFrame = [_1.name: string]
"joinWith" with two dataframes .df1 and df2 are two dataframes
scala> val j2 = df1.joinWith(df2,df1("id") <=> df2("id"))
j2: org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = [_1: struct<id: int, name: string ... 2 more fields>, _2: struct<id: int, name: string ... 2 more fields>]
scala> j2.select($"_1"("name"))
res76: org.apache.spark.sql.DataFrame = [_1.name: string]
//Difference it its a DataSet[(Row,Row)]
We can use all the map , flatMap , mapPartitions operations on DataSet,but be careful on its return type.
//As you see it returns a DataSet[Int]
scala> ds1.map(r=>r.id+10)
res0: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds1.map(r=>(r.name,r.id+10))
res1: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]
scala> ds1.map(r=>(r.name,r.id+10)).select($"_1",$"_2").show
+--------+---+
| _1| _2|
+--------+---+
| vinyas| 11|
| shetty| 12|
|namratha| 13|
+--------+---+
scala> ds1.map(r=>Emp99(r.id,r.amt+10))
res4: org.apache.spark.sql.Dataset[Emp99] = [id: int, amt: double]
scala> ds1.map(r=>Emp99(r.id,r.amt+10)).show
+---+-----+
| id| amt|
+---+-----+
| 1|86.34|
| 2| 77.2|
| 3| 75.2|
+---+-----+
scala> ds1.flatMap(r=>List(Emp99(r.id,r.amt+10)))
res9: org.apache.spark.sql.Dataset[Emp99] = [id: int, amt: double]
scala> ds1.flatMap(r=>List(Emp99(r.id,r.amt+10))).show
+---+-----+
| id| amt|
+---+-----+
| 1|86.34|
| 2| 77.2|
| 3| 75.2|
+---+-----+
We can persist/cache a DataSet also.
We have groupByKey operator on a DataSet.
ds.groupByKey(x=>x.name) // Now it will group by x.name,This returns
a KeyValueGroupedDataset which is different from what is returned by groupBy ie RelationalGroupedDataset
import org.apache.spark.sql.types._
case class Employee(id:Int,name:String,amt:Double)
val sch = new StructType().add("id",IntegerType).add("name",StringType).add("amt",DoubleType)
val ds1 = spark.read.schema(sch).csv("/FileStore/tables/vin.csv").as[Employee]
ds1.show
import org.apache.spark.sql.functions._
ds1.groupByKey(x=>x.id).agg(sum($"amt").as("summed").as[Double]).show
Aggregation :
scala> val ids = spark.range(10)
ids: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> ids.agg(sum($"id").as("sum"))
res0: org.apache.spark.sql.DataFrame = [sum: bigint]
/*
Now as you see ,we can run all aggregations operator directly on the DataSet,since if you look the DataSet code,
agg operator first does a empty groupBy() ie groupBy on all the columns and then does a agg.Actual
agg implementation is actually on the RelationalGroupedDataset class
*/
Group By :
scala> ids.show
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
scala> ids.groupBy(($"id" % 2).as("grp")).agg(sum("id")).show
//We can do some expression on groupBy and use that to group and also we can rename,but that new column is NOT
//available for aggregation ,see next statement.
+---+-------+
|grp|sum(id)|
+---+-------+
| 0| 20|
| 1| 25|
+---+-------+
scala> ids.groupBy(($"id" % 2).as("grp")).agg(sum("grp")).show //This will throw Error
scala> ids.groupBy(($"id" % 2).as("grp")).sum("id").show //We can directly use sum,max,min,avg on RelationalGroupedDataSet
+---+-------+
|grp|sum(id)|
+---+-------+
| 0| 20|
| 1| 25|
+---+-------+
scala> ids.sum("id")
<console>:26: error: value sum is not a member of org.apache.spark.sql.Dataset[Long]
scala> val summ = ids.groupBy(($"id" % 2).as("grp")).agg(sum("id").as("summed"))
summ: org.apache.spark.sql.DataFrame = [grp: bigint, summed: bigint]
scala> ids.join(summ,$"id" % 2 <=> $"grp").show
+---+---+------+
| id|grp|summed|
+---+---+------+
| 0| 0| 20|
| 1| 1| 25|
| 2| 0| 20|
| 3| 1| 25|
| 4| 0| 20|
| 5| 1| 25|
| 6| 0| 20|
| 7| 1| 25|
| 8| 0| 20|
| 9| 1| 25|
+---+---+------+
scala> val wndw = Window.partitionBy($"id" % 2)
wndw: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3ad3b062
scala> ids.select($"id",$"id"%2,sum($"id").over(wndw).as("sum")).show
+---+--------+---+
| id|(id % 2)|sum|
+---+--------+---+
| 0| 0| 20|
| 2| 0| 20|
| 4| 0| 20|
| 6| 0| 20|
| 8| 0| 20|
| 1| 1| 25|
| 3| 1| 25|
| 5| 1| 25|
| 7| 1| 25|
| 9| 1| 25|
+---+--------+---+
//If we need to collect all elements from a groupBy
scala> ids.groupBy(($"id" % 2).as("grp")).agg(collect_list($"id"))
res45: org.apache.spark.sql.DataFrame = [grp: bigint, collect_list(id): array<bigint>]
scala> ids.groupBy(($"id" % 2).as("grp")).agg(collect_list($"id")).show
+---+----------------+
|grp|collect_list(id)|
+---+----------------+
| 0| [0, 2, 4, 6, 8]|
| 1| [1, 3, 5, 7, 9]|
+---+----------------+
scala> ids.groupBy(($"id" % 2).as("grp")).agg(collect_list($"id"))
res45: org.apache.spark.sql.DataFrame = [grp: bigint, collect_list(id): array<bigint>]
scala> ids.groupBy(($"id" % 2).as("grp")).agg(collect_set($"id")).show
+---+----------------+
|grp|collect_list(id)|
+---+----------------+
| 0| [0, 2, 4, 6, 8]|
| 1| [1, 3, 5, 7, 9]|
+---+----------------+
4 Query Plan in a DataSet.3 are Logical and 1 is Physical.
Convert case Objects to StructType :
import org.apache.spark.sql.catalyst.ScalaReflection
val userschema = ScalaReflection.schemaFor[User].dataType.asInstanceOf[StructType]