Spark SQL and its interfaces DataFrames and Datasets are the future of Spark performance. DataFrames and Datasets are the most important features in getting the best performance out of Spark for the structured data. These data structures use more efficient storage options and optimizers to give users the best performance.
SQL Engine was introduced in Spark 1.0, DataFrames in Spark 1.3 and Dataset in Spark 1.6.
Developers in Spark mostly use DataFrames/Datasets for all processing of data.
DataFrames and Datasets in Spark are higher-level APIs which internally use RDDs. Even though we can do anything we wanted to do with our data with RDDs, the higher-level APIs DataFrames and Datasets allow us to become proficient with Spark quicker especially if you have RDBMS and SQL background.
SparkSQL is the module for structured data processing with the added benefit of Schema for the data which we did not have for RDDs. Schema gives more information about the data which Spark is processing. Hence it can perform more optimizations on the data during the processing. And we can work on the data using interactive SQL queries which adhere to the 2003 ANSI SQL. It is also compatible with HIVE.
The other option for querying and processing data is the DataFrames. DataFrames have distributed a collection of row objects. A row is an object which contains the data and we can access each column of the data. So DataFrames can be thought of as a database table with the data organized in rows and columns. In Spark 2.0 the higher-level APIs were unified to Dataset. DataFrame can be thought of as a row of Dataset i.e Dataset[Row]. The DataFrames can be converted to RDDs and then back to DataFrames as and when required.
Querying DataFrames/Datasets is very easy. Querying DataFrames can be done using Domain Specific Language (DSL) and is very relational in nature. This allows Spark for optimizations.
The below diagram shows the steps in Query execution in SparkSQL/DataFrames/Datasets.
When a query is executed it is resolved into an unresolved logical plan. This means there are unresolved attributes and relations in the plan. So then it has to look into the catalog to fill in the missing information for the plan. This leads to the generation of a logical plan. Here a series of optimizations are performed which generates an optimized logical plan. This optimization engine in Spark is called Catalyst optimizer. The optimized plan is then converted to multiple physical plans where a Cost model is used to select an Optimal Physical Plan. This then gets into the final Code Generation step and then the final query is executed to generate the final output as RDDs.
Let’s look at how we can create DataFrames/Datasets or how to execute Spark SQL. We have seen how SparkContext can be created. In Spark 2.0 we have something called SparkSession which is a simplified entry point for Spark applications. SparkSession encapsulates the SparkContext. Earlier Spark had different contexts to use with different use cases like SQLContext, HiveContext, and SparkContext. All this is unified into SparkSession which simplifies things for developers as there is no confusion as to which context to use.
Another benefit of having SparkSession is unlike SparkContext we can create multiple SparkSessions when needed like below.
Let’s look at how we can create a DataFrame and query the data. DataFrames can be created by loading data from external files from the filesystem, HDFS, S3, RDBMS, HBase, etc. They can also be created from existing DataFrames by applying transformations. For simplicity we will create a DataFrameby following method as below:
We can collect() the RDD created from our DataFrame but we do not get to see what the DataFrame intended to give us. So we do a .show() which gives us a nice tabular view of our data. The other thing we can do is to check the schema using .schema or .printSchema().
If we just do a listDf, and press tab we can see all the methods available.
Also, since we did not provide any column names to our DataFrame we see _1,_2 as the default names. We can change it by giving proper names using toDF() function.
We can query the DataFrames similar to how we query a Table using SQL. This is very similar by using Domain-Specific Language in Spark. Below we can see the way it can be done.
Select * from Country
Select Id from Country
Select * from Country where Id = 1
listDF2.select("*").where(col("Id") === 1 ).show
The DataFrame can also be saved in a filesystem, HDFS, S3, etc. Here we just save it to file system.
Working with Spark SQL is very similar to working with DataFrames. The advantage is that we can use our familiar ANSI SQL queries instead of DSL which makes it very convenient and reduces the learning curve a lot. For that, we just need to register our DataFrame as a temporary table or a view and then we can run all our SQL queries. Let’s see how to do it below:
Let’s move on to Datasets. Datasets are very similar to DataFrames with the distinction that they have a strongly typed collection of objects. So they are type safe. This helps us catch some of the errors at compile time which is not possible with RDDs and DataFrames. And using Datasets is almost similar to using DataFrames like processing and querying we have seen earlier.
Now since we have understood all the three APIs Spark provides i.e. RDD, DataFrame, and Dataset, we should understand which one to use when and how is the performance of each of these APIs.
RDD: We should use RDDs in the following use cases
DataFrame: We should use DataFrames in the following use cases
We should remember that DataFrames and Datasets are unified API since Spark 2.0 and so most of the functionalities are now available in both.
Dataset: We should go with Dataset for the following reasons
We can summarise the above as below with performance as the parameter
In this section we looked at the higher level APIs and understood when and how to use them. We also looked at their performance comparison which gives us a clear picture about their usage.