Accreditation Bodies
Accreditation Bodies
Accreditation Bodies
Supercharge your career with our Multi-Cloud Engineer Bootcamp
KNOW MOREPySpark is open-source distributed computing software. It helps to create more scalable analytics and pipelines to increase processing speed. It also works as a library for large-scale real-time data processing. In this article, we will cover some frequently asked PySpark interview questions to help you understand the types of questions you might be asked in a Python and PySpark-related interview. Whether you are a beginner or an intermediate or an experienced PySpark professional, this guide will help you increase your confidence and knowledge in PySpark. The questions include important topics from PySpark such as DataFrames, RDD, serializers, cluster managers, SparkContext, SparkSession, etc. With PySpark Interview Questions, you can be confident that you will be well-prepared for your next interview. So, if you are looking to advance your career in PySpark, this guide is the perfect resource for you.
Filter By
Clear all
PySpark framework is easy to learn and implement. No programming language or database experience is required to learn PySpark. It is very easy to learn if you know programming languages and frameworks. Before getting familiar with PySpark concepts, you should have some knowledge of Apache Spark and Python. Learning advanced concepts of PySpark is very helpful.
MLlib can be used to implement machine learning in Spark. Spark provides a scalable machine learning dataset called MLlib. It is primarily used to make machine learning scalable and lightweight, with common learning algorithms and use cases such as clustering, decay filtering, and dimensionality reduction. This is how machine learning can be implemented in Spark.
There are some basic differences between PySpark and other programming languages. PySpark has its built-in APIs, but other programming languages require APIs to be integrated externally from third parties. Next difference is, implicit communications can be done in PySpark, but it is not possible in other programming languages. PySpark is map based, so developers can use maps to reduce functions. PySpark allows for multiple nodes, again which is not possible in other programming languages.
No, we cannot use PySpark with small data sets. They are not very useful as they are typical library systems with more complex objects than accessible objects. PySpark is great for large amounts of records, so it should be used for large data sets.
Data Science is based on two programming languages, Python and Machine Learning. PySpark is integrated with Python. There are interfaces and built-in environments which are made using both Python and Machine Learning. This makes PySpark an essential tool for data science. Once the dataset is processed, the prototype model is transformed into a production-ready workflow. Because of such reasons, PySpark is important in data science.
Spark can form distributed datasets from any storage source supported by different file systems. These file systems include:
Also, Spark supports text files, Sequence files, and any other Hadoop Input Format.
This is a frequently asked question in PySpark Interview Questions. PySpark is an Apache Spark tool or interface developed by the Apache Spark community and Python to help Python and Spark work together. The tool works with Apache Spark and uses an API written in Python to support features such as Spark SQL, Spark DataFrame, Spark Streaming, Spark Core, and Spark MLlib. It provides an interactive PySpark shell for analyzing and processing structured and semi-structured data in distributed environments by providing a streamlined API that helps programs read data from various data sources. PySpark functionality is implemented in Python in the Py4j library. The availability of the Py4j library allows the user to easily manipulate his Resilient Distributed Datasets (RDDs) in the Python programming language. Python supports many different libraries that support big data processing and machine learning.
We can install PySpark on PyPi using the below command:
pip install PySpark
There are four main characteristics of PySpark. The detailed description of those features can be given as below:
In PySpark, the full form of RDD is Resilient Distributed Datasets. This RDD is the central data structure of PySpark. RDD is a low-level object that is very efficient at performing distributed tasks.
In PySpark, RDDs are elements that can be run and manipulated on multiple nodes to perform parallel processing within a cluster. These are immutable items. This means that once the RDD is created it can not be changed. Also, RDDs are fault tolerant. In case of failure, they are automatically restored. Multiple operations can be applied to an RDD to accomplish a specific task. You can learn more about this concept at Certified Programming courses.
SparkContext serves as the entry point for all Spark functions. When the Spark application runs, the driver program is started and the main function and SparkContext are started. The driver program then executes the operation within the worker node’s executor. In PySpark, SparkContext is recognized as PySpark SparkContext. Using Py4J library, it starts a JVM and creates a JavaSparkContext. PySpark SparkContext is available as ‘sc’ by default, so it does not create a new SparkContext.
PySpark SparkFiles is used to load files into Apache Spark applications. This is one of the functions under SparkContext that you can call with sc.addFile() to load files into Apache Spark. You can also use SparkFiles to get the path using SparkFile.get(). It can also be used to resolve paths to files added using the sc.addFile() method. The class methods in the SparkFiles directory are getrootdirectory() and get(filename).
A must-know for anyone heading into a PySpark interview, this question is frequently asked in PySpark Interview Questions. In PySpark, serialization is the process used to perform performance tuning in Spark. PySpark supports serializers because it needs to continuously inspect data sent and received to disk or storage over the network. There are two types of serializers that PySpark supports. The serializers are:
PySpark ArrayType is a collection data type that extends PySparks’s DataType class. And this DataType class is the superclass of all types. A PySpark ArrayType only contains elements of the same type. We can also create an instance of ArrayType using the ArrayType() method. This method accepts two arguments. The arguments are valueType and valueContainsNull.
For example:
from PySpark.sql.types import StringType, ArrayType
ArrayColumn = ArrayType(StringType().False)
A PySpark DataFrame is a distributed collection of well-organized data. These are just like relational database tables, arranged in named columns. PySpark DataFrames are more optimized than the R or Python programming languages as they can be created from a variety of sources such as Hive tables, structured data files, existing RDDs and external databases.
The biggest advantage of PySpark DataFrame is that data in PySpark DataFrame is distributed to different computers in the cluster and operations performed on it are executed in parallel on all computers. This makes it easy to handle large collections of petabytes of structured or semi-structured data.
In PySpark, the cluster manager is a cluster-mode platform that facilitates Spark execution by allotting all resources worker nodes as needed.
The Spark Cluster Manager ecosystem includes a master node and multiple worker nodes. Master nodes, with the help of the cluster manager, provide resources, such as memory and processor allocation, to worker nodes according to the node's needs.
PySpark supports different cluster managers. Those can be explained as:
In PySpark, SparkSession is the application entry point. For the first version of PySpark, it used SparkContext as an entry point. SparkSession replaces the SparkContext since PySpark version 2.0. From the PySpark version 2.0, SparkSession serves as a starting point to access all PySpark functions related to RDDs, DataFrames, Datasets, etc. This is also the unified API used to access SQLContext, StreamingContext, HiveContext, and all other contexts within it to replace PySpark.
SparkSession internally makes SparkContext and SparkConfig according to details provided in SparkSession. We can also create a SparkSession using the builder patterns.
The main function of Spark Core is to implement several important functions such as storage management, fault tolerance, job monitoring, job setup, and communication with storage systems. It also includes additional libraries built on top of the middle tier used for various streaming, machine learning, and SQL workloads.
The Spark Core is used for the functions like:
There is a common workflow followed by a spark program. The first step is to create the input RDD according to the external data. Data can come from a variety of data sources. Next, after the RDD is created, depending on business logic, RDD transformation operations such as filter() and map() are performed to create a new RDD. In case, if we need to reuse the intermediate RDDs for later purposes, we can keep them. Finally, if there are action operations like first(), count(), etc., Spark fires them up to start parallel computation. This workflow is used by Spark program.
In PySpark, startsWith() and endsWith() methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application.
PySpark supports custom profilers. Custom profilers are used to build predictive models. A profiler is also used to ensure that the data is valid and can be used at the time of consumption. If we want a custom profiler, we will need to define some methods. These methods are:
Yes, PySpark is faster than Pandas. Because it supports parallel execution of statements in a distributed environment. For example, PySpark can run on different cores and machines, which is not available with Pandas. This is the main reason PySpark is faster than Pandas. Check KnowledgeHut's best certification for Programmers for more information about this topic.
PySpark StorageLevel is used to control RDD storage. It can control how and where the RDD is stored. The PySpark StorageLevel decides whether the RDD is stored in memory, on disk, or both. It also determines if we need to replicate the RDD partitions or serialize the RDD. The code for PySpark StorageLevel looks like:
class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)
There are some most used Spark ecosystems like Spark SQL, Spark Streaming, GraphX, MLlib, SparkR, etc.
The main difference between get(filename) and getrootdirectory() is that get(filename) is used to get the correct path to the file that is added using SparkContext.addFile(). On the other hand, getrootdirectory() is used to get the root directory containing the file added using SparkContext.addFile().
The Apache Spark execution engine is a graph execution engine that makes it easy for users to explore large datasets with high presentation. If we want the data to be manipulated at different stages of processing, we need to keep Spark in memory for a radical performance boost.
Hive is used in HQL (Hive Query Language), and Spark SQL is used in Structured Query Language to process and query data. We can easily connect SQL table and HQL table to Spark SQL. Flash SQL is used as a unique segment on the Spark Core engine that supports SQL and Hive Query Language without changing the sentence structure.
Install PySpark using pip with the command pip install pyspark. Ensure Python and Java are installed on our system first.
PySpark itself is not designed for real-time processing but can handle near-real-time data through Spark Streaming, which processes data in micro-batches.
The default level of parallelism in PySpark depends on the cluster configuration but is typically set to the number of cores available on all executor nodes.
Use the read.json() method of the SparkSession object to read JSON files.
Example: spark.read.json("path_to_file.json").
Actions in PySpark are operations that trigger computation and return values.
Examples include count(), collect(), and first().
Transformations in PySpark are operations that create a new RDD from an existing one, such as map(), filter(), and reduceByKey().
Use the dropDuplicates() method on a DataFrame to remove duplicate rows.
Use the printSchema() method on a DataFrame to display its schema.
Use the withColumn() method.
For example: df.withColumn('new_column', expression).
Lazy evaluation in PySpark means that the execution will not start until an action is triggered. This optimization allows PySpark to run more efficiently.
PySpark can read several file formats including but not limited to JSON, CSV, Parquet, ORC, and text files.
Use the cache() method on a DataFrame to persist it in memory, which can speed up subsequent queries.
Use the coalesce() method on an RDD or DataFrame to reduce the number of partitions, ideally to avoid shuffling.
A stage in Spark represents a set of tasks that perform the same computation but on different data partitions without shuffling data.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Use the write.csv() method.
Example: df.write.csv('path_to_save.csv').
DataFrames provide a higher level abstraction, allowing optimizations through Catalyst optimizer; they are faster and easier to use than RDDs for structured data.
Use the na functions in PySpark to handle null values. For instance, we can replace all nulls with a specific value using df.na.fill(value) or drop rows that contain any null values with df.na.drop().
There are some advantages as well as disadvantages of PySpark and this is another frequently asked question in the PySpark interview rounds. Let’s discuss them one by one.
Advantages of PySpark:
Disadvantages of PySpark:
A common question in the PySpark Interview Questions, don't miss this one. Let me tell you about the key differences between RDD, DataFrame and DataSet one by one.
SparkCore is a high-level execution engine for the Spark platform that includes all features. It offers in-memory computing capabilities for superior speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs to simplify development.
The primary role of SparkCore is to perform all basic Input/Output functions, scheduling, monitoring, etc. It is also responsible for troubleshooting and effective memory management.
The key functions of the SparkCore can be listed as:
PySpark also provides a machine learning API called MLlib which is very much like Apache Spark. MLlib supports machine learning algorithms. These algorithms are:
PySpark Partition is a way to divide a large dataset into smaller datasets based on one or more partition keys. As a result, the execution speed is improved because transformations on partitioned data are performed faster because transformations for each partition are performed in parallel. PySpark supports both in-memory (DataFrame) and disk (filesystem) partitioning. When we create a DataFrame from a file or table, PySpark creates the DataFrame in memory with a certain number of subdivisions based on the specified criteria.
It is also easier to split multiple columns with partitionBy() method by passing the column we want to split as an argument to this method. Syntax of this method is: partitionBy(self, *cols).
PySpark recommends having 4x partitions for the number of cores in the cluster that the application can use.
There are some key advantages of PySpark RDD. Here is the detail explanation of those:
Nowadays, every industry makes use of big data to evaluate where they stand and grow. When we hear the term big data, Apache Spark comes to our mind. There are the industry benefits of using PySpark that supports Spark. These benefits can be listed as:
PySpark SparkConf is used to set the configuration and parameters required to run the application on the cluster or on the local system. We can run SparkConf by running the following class:
class PySpark.SparkConf ( localdefaults = True, _jvm = None, _jconf = None )
Where:
We can create DataFrame in PySpark by making use of the createDataFrame() method of the SparkSession. Let me explain with an example.
For example:
data = [(‘Raj’, 20), (‘Rahul’, 20), (‘Simran’, 20)] columns = [“Name”, “Age”] df = spark.createDataFrame(data = data, schema = columns)
This will create a dataframe where columns will be Name and Age. The data will be filled in the columns accordingly.
Now, we can get the schema of dataframe by using the method df.printSchema(). This will look like:
>> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true)
Yes, we can create PySpark DataFrame from the external data source. Real-time applications leverage external file systems such as local systems, HDFS, HBase, MySQL tables, and S3 Azure. The following example shows how to read data from a CSV file residing on your local system and create a DataFrame. PySpark supports many file extensions like csv, text, avro, parquet, tsv, etc.
For example:
df = spark.read.csv(“path/to/filename.csv”)
Expect to come across this popular question in PySpark Interview Questions. PySpark SQL is an organized data library for Spark. PySpark SQL provides more details about data structures and operations than the PySpark RDD API. It comes with the "DataFrame" programming paradigm.
A DataFrame is a fixed distributed collection of data. DataFrame can handle large amounts of organized data such as relational databases, and semi-structured data such as JavaScript Object Notation or JSON. After creating the DataFrame, we can handle the data using SQL syntax/queries.
In PySpark SQL, the first step is to create a temporary table in the DataFrame using the createOrReplaceTempView() function. Tables are available in the SparkSession via the sql() method. These temporary tables can be dropped by closing the SparkSession.
The example of PySpark SQL:
import findspark findspark.init() import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql(“select ‘spark’ as hello”) df.show()
The Lineage Graph is a collection of RDD dependencies. There are separate lineage graphs for every Spark application. The lineage graph recompiles RDDs on-demand and restores misplaced data from persisted RDDs. An RDD lineage graph lets us assemble a new RDD or restore data from a lost persisted RDD. It was created by using changes to the RDD and generating a regular execution plan.
Catalyst Optimizer plays a very important role in Apache Spark. It helps to improve structural queries in SQL or expressed through DataFrame or DataSet APIs by reducing the program execution time and cost. The Spark Catalyst Optimizer supports both cost-based and rule-based optimization. Rule-based optimization contains a set of rules that define how a query is executed. Cost-based optimization uses rules to create multiple plans and calculate their costs. Catalyst optimizer also manages various big data challenges like semi-structured data and advanced analytics.
PySpark is a Python API. It was created and distributed by the Apache Spark agency to make working with Spark less complicated for Python programmers. Scala is the programming language utilized by Apache Spark. It can work with different languages like Java, R, and Python.
Because Scala is a compile-time, type-secure language, Apache Spark has many skills that PySpark does not. One of which is incorporating Datasets. Datasets are a series of domain-specific objects that can be used to execute concurrent calculations.
In PySpark, the Parquet file is a column-type format supported by various data processing systems. Using a Parquet file, Spark SQL can perform both read and write operations.
A Parquet file contains column-type format storage that provides some benefits. The benefits are: It is small and takes up less space. It makes it easier for us to access specific columns. It follows type-specific encoding. It offers better summarized data. It contains very limited Input/Output operations.
We can use some steps to connect Spark with Mesos.
First, configure the sparkle driver program to associate with Mesos. The paired Spark volume must be in an open Mesos region. Then install Apache Spark in a similar region as Apache Mesos and design the "spark.mesos.executor.home" property to point to the region where it is installed. This is how we can associate Spark with Apache Mesos.
In PySpark, DStream is an acronym for Discretized Stream. It is a collection of groups or RDDs of information divided into small clusters. It is also called Apache Spark Discretized Stream and is used as a collection of RDDs in the cluster. DStreams are grounded in Spark RDDs. They are used to allow uninterrupted coordination of streaming with some other Apache Spark options such as Spark MLlib and Spark SQL.
The code to see the specific keyword exists or not will be:
lines = sc.textFile(“hdfs://Hadoop/user/test_file.txt”); def isFound(line): if line.find(“my_keyword”) > -1 return 1 return 0 foundBits = lines.map(isFound); sum = foundBits.reduce(sum); if sum > 0: print “Found” else: print “Not Found”;
If the keyword is found it will print as “Found” else if it not found it will print “Not Found”.
There are two types of errors in Python: syntax errors and exceptions.
Syntax errors are often referred to as parsing errors. Bugs are errors in a program that can cause it to crash or terminate unexpectedly. When the parser detects an error, it repeats the problematic line and then displays an arrow pointing to the beginning of the line.
Exceptions occur in a program when the program's normal flow is disrupted by an external event. Even if the program's syntax is accurate, there is a chance that an error will be encountered during execution; however, this error is an exception. ZeroDivisionError, TypeError, and NameError are some examples of exceptions.
Receivers are unique objects in Apache Spark Streaming whose sole purpose is to consume data from various data sources and then push it to Spark. By streaming contexts as long-running tasks on different executors, we can generate receiver objects.
There are two different types of receivers which are as follows:
When data is unevenly distributed, PySpark has a few tricks up its sleeve to deal with it. It can shuffle data around to balance things out, sort of like how a teacher might rearrange students in a classroom to balance the teams. This helps everything run more smoothly and evenly.
Tungsten is all about making Spark run faster and use less memory. It does this by being really smart about how it handles data behind the scenes—think of it as the wizard who fine-tunes the engine of a car for better performance and efficiency.
In PySpark, the Catalyst optimizer takes care of making our queries run faster. It works like a smart planner that tweaks and rearranges the steps of our query to make it more efficient. It's like having an expert optimise our travel route, avoiding traffic jams and roadblocks.
Broadcasting is like giving a cheat sheet to all parts of our Spark program. Instead of sending data to each worker node every time it's needed, PySpark sends it once, and each node holds onto it. This is super handy for data that we need to reference often across many nodes because it saves a lot of time and network traffic.
Accumulators are like counters or tally marks that PySpark uses to keep track of information across tasks. For example, if we wanted to count how many times a certain word appears in documents across all nodes, we could use an accumulator to keep a running total even as our data is processed in parallel.
Whenever PySpark performs a transform operation using filter(), map(), or reduce(), they are executed on a remote node that uses the variables supplied with the tasks. These variables are not reusable and cannot be shared across jobs because they are not returned to the controller. To solve the problem of reusability and sharing, we use shared variables in PySpark. There are two types of shared variables:
Broadcast Variables:
They are also known as read-only shared variables. These are used in cases of data lookup requests. These variables are cached and made available on all cluster nodes to use. These variables are not sent with every task. Instead, they are distributed to nodes using efficient algorithms to reduce communication costs. When we run an RDD task operation that uses Broadcast variables, PySpark does this:
Broadcast variables are created in PySpark using the broadcast(variable) method of the SparkContext class. The main reason for the use of broadcast variables is that the variables are not sent to the tasks when the broadcast function is called. They will be sent when the variables are first required by the executors.
Accumulator Variables:
These variables are known as updatable shared variables. These variables are added through associative and commutative methods. They are used for performing counter or sum operations. PySpark supports the default creation of numeric type accumulators. It can also be used to add custom accumulator types. The custom types are of two types: Named accumulator and Unnamed accumulator.
One of the most frequently posed PySpark Interview Questions, be ready for it. The DAG is a full form of Direct Acyclic Graph. In Spark, DAGScheduler represents scheduling layer that implements task scheduling in a phase-oriented manner using tasks and phases. The logical execution plan (the dependencies of the transformation action line on the RDD) is transformed into a physical execution plan consisting of phases. It calculates the DAG of phases needed for each task and keeps track of which phases are realized RDDs and finds the minimum schedule for executing the tasks. These stages are then sent to the TaskScheduler to run the stages. This is shown in the image below:
DAGScheduler computes the DAG execution for the task. It specifies preferred locations for running each task. It handles failure due to loss of output files during mixing.
PySpark DAGScheduler follows an event queue architecture. Here, the thread publishes events of type DAGSchedulerEvent, such as a new stage or task. DAGScheduler then reads the phases and executes them sequentially in topological order.
Let’s understand the process through an example. Here, we want to capitalize the first letter of every word in a string. There is no default feature in PySpark that can achieve this. However, we can make this happen by creating a UDF capitalizeWord(str) and using it on the DataFrames.
First, we will create a Python function capitalizeWord(). This function takes a string as an input and then capitalizes the first character of every word.
def capitalizeWord(str): result = “” words = str.split(“”) for word in words: result = result + word[0:1].upper() + word[1:len(x)] + “” return result
Now, we will register the function as a PySpark UDF using the udf() method from org.apache.spark.sql.functions.udf package. This package must be imported. This function will return the object of class org.apache.spark.sql.expressions.UserDefinedFunction. The code for converting a function to UDF is:
capitalizeWordUDF = udf(lambda z: capitalizeWord(z), StringType())
Next step is to use UDF with DataFrame. We can apply UDF on a Python DataFrame as it will act as the built-in function of DataFrame. Consider we have a DataFrame stored in variable df, which has the columns as ID_COLUMN and NAME_COLUMN. Now, to capitalize every first character of the word, we will code as:
df.select(col(“ID_COLUMN”), convertUDF(col(“NAME_COLUMN”)) .alias(“NAME_COLUMN”) ) .show(truncate = False)
UDFs must be designed so that the algorithms are efficient and take up less time and space. If care is not taken, the performance of DataFrame operations will be affected.
We use the builder pattern to create a SparkSession. In PySpark.sql library, there is the SparkSession class which has the getOrCreate() method. This method will create a new SparkSession if there is none or else it will return the existing SparkSession object. The code for create SparkSession looks like:
import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.master(“local[1]”) .appName(‘KnowledgeHuntSparkSession’) .getOrCreate()
Where,
If we want to create a new SparkSession object each time, we can use the newSession method. This method looks like:
import PySpark from PySpark.sql import SparkSession spark_session = SparkSession.newSession
In PySpark, there are various methods used to create RDD. Let’s discuss them one by one.
Using sparkContext.parallelize() - SparkContext's parallelize() method can be used to create RDDs. This method retrieves an existing collection from the controller and parallelizes it. This is the basic approach to creating an RDD and is used when we have data already present in memory. This also requires all data to be present in the driver before the RDD is created. The code to create an RDD using the parallelize method for the python list is:
list = [1,2,3,4,5,6,7,8,9,10,11] rdd = spark.sparkContext.parallelize(list)
Using sparkContext.textFile() - We can read .txt file and convert it into RDD. The syntax looks like:
rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")
Using sparkContext.wholeTextFiles() - This method returns PairRDD. PairRDD is an RDD that contains key-value pairs. In this PairRDD, file path is the key and file content is the value. This method reads entire file into an RDD as a single record. Besides, this method can also read files in other formats like CSV, JSON, parquet, etc. and create the RDDs.
rdd_whole_text = spark.sparkContext.wholeTextFiles("/path/to/textFile.txt")
Empty RDD with no partition using sparkContext.emptyRDD - The RDD without any data is known as empty RDD. We can make such RDDs which have no partitions by using the emptyRDD() method. The code piece for that will look like:
empty_rdd = spark.sparkContext.emptyRDD empty_rdd_string = spark.sparkContext.emptyRDD[String]
Empty RDD with partitions using sparkContext.parallelize - When we require partition but not the data, then we create empty RDD by using the parallelize method. For example, below code create the empty RDD with 10 partitions:
empty_partitioned_rdd = spark.sparkContext.parallelize([],10)
PySpark SQL is the most famous PySpark module used to process structured columnar data. Once the DataFrame is created, we can work with the data using SQL syntax. Spark SQL is used to pass native raw SQL queries to Spark using select, where group by, join, union, etc. To use PySpark SQL, the first step is to create a temporary table on a DataFrame using the createOrReplaceTempView() method. Once created, the table is accessible within a SparkSession using the sql() function. When the SparkSession is terminated, the temporary table will get dropped.
For example, consider that we have a DataFrame assigned to the variable df which contains Name, Age and Gender of Students as the columns. Now, we will create a temporary table of the DataFrame that gets access to the SparkSession by using the sql() function. The SQL queries can be run within the function.
df.createOrReplaceTempView("STUDENTS") df_new = spark.sql("SELECT * from STUDENTS") df_new.printSchema() The schema will be shown as: >> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true) |-- Gender: string (nullable = true)
We can use join() method that is present in PySpark SQL. The syntax of the method looks like:
join(self, other, on = None, how = None)
Where,
where() and filter() methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method.
For example, consider we have two dataframes named Employee and Department. Both have columns named as emp_id, emp_name, empdept_id and dept_id, dept_name respectively. We can internally join the Employee DataFrame with the Department DataFrame to get the department information along with the employee information. The code will look like:
emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False)
PySpark Streaming is a scalable, fault-tolerant, high-throughput process streaming system that supports both streaming and batch loading to support real-time data from data sources such as TCP Socket, S3, Kafka, Twitter, file system folders, etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS, etc.
To stream from a TCP socket, we can use the readStream.format("socket") method of the Spark session object to read data from the TCP socket and provide the host and port of the streaming source as options. The code will look like:
from PySpark import SparkContext from PySpark.streaming import StreamingContext from PySpark.sql import SQLContext from PySpark.sql.functions import desc sc = SparkContext() ssc = StreamingContext(sc, 10) sqlContext = SQLContext(sc) socket_stream = ssc.socketTextStream("127.0.0.1", 5555) lines = socket_stream.window(20) df.printSchema()
Spark retrieves the data from the socket and represents it in the value column of the DataFrame object. After processing the data, the DataFrame can be streamed to the console or other destinations on demand such as Kafka, dashboards, databases, etc.
Spark automatically stores intermediate data from various shuffle processes. However, it is recommended to use RDD's persist() function. There are many levels of persistence for storing RDDs in memory, disk, or both, with varying levels of replication. The following persistence levels are available in Spark:
The persist() function has the following syntax for using persistence levels:
df.persist(StorageLevel.)
The streaming application must be available 24/7 and tolerant of errors outside the application code (e.g., system crashes, JVM crashes, etc.). The checkpointing process makes streaming applications more fault tolerant. We can store data and metadata in the checkpoint directory.
Checkpoint can be of two types – metadata check and data check.
A metadata checkpoint allows you to store the information that defines a streaming computation in a fault-tolerant storage system such as HDFS. This helps to recover data after a streaming application controller node failure.
Data checkpointing means saving the created RDDs to a safe place. This type of checkpoint requires several state calculations that combine data from different batches.
We can determine the total number of unique words by following certain steps in PySpark. The steps to follow can be listed as:
Open the text file in RDD mode.
sc.textFile(“hdfs://Hadoop/user/test_file.txt”);
Then write a function that will convert each line into a single word.
def toWords(line): return line.split();
Now, run the toWords function on every member of the RDD in Spark.
words = line.flatMap(toWords);
Next, generate a (key, value) pair for every word.
def toTuple(word): return (word, 1); wordTuple = words.map(toTuple);
Run the reduceByKey() command.
def sum(x, y): return x+y: counts = wordsTuple.reduceByKey(sum)
Then print it out.
counts.collect()
The basis of Spark Streaming is dividing the content of the data stream into batches of X seconds, known as DStreams. These DStreams allow developers to cache data, which can be especially useful if data from a DStream is used multiple times. The cache() function or the persist() method can be used to cache data with the correct persistence settings. For input streams receiving data over networks such as Kafka, Flume, and others, the default persistence level is configured to achieve data replication across two nodes to achieve fault tolerance.
Cache method -
val cacheDf = dframe.cache()
Persist method -
val persistDf = dframe.persist(StorageLevel.MEMORY_ONLY)
There are some benefits of Caching. The key benefits are time saving and cost efficiency. Since Spark computations are expensive, caching helps in data reuse, which leads to reuse of computations and reduces the cost of operations. By reusing calculations, we can save a lot of time. Worker nodes can execute/run more tasks by reducing the computation execution time. Hence more tasks can be achieved.
Spark RDD is extended with a robust API called GraphX that supports graphs and graph-based computations. The Resilient Distributed Property Graph is an enhanced property of Spark RDD, which is a directed multigraph with many parallel edges. User-defined characteristics are associated with each edge and vertex. Multiple connections between the same set of vertices are represented by parallel edges. GraphX offers a collection of operators that enable graph computations such as subgraph, mapReduceTriplets, joinVertices, and so on. It also offers many chart builders and algorithms to facilitate chart analysis.
According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs, which allows us to assemble different parts of jobs that can use any language. An RDD transformation can be created using the pipe() function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.
import findspark findspark.init() from PySpark. sql import Sparksession, types spark = Sparksession.builder.master("local").appName( "Modes of Dataframereader')\ .getorcreate() sc = spark.sparkContext from PySpark.sql.types import * schm structype([ structField("col_1", stringType(), True), structField("col_2", stringType(), True), structrield("col", stringtype(), True), ]) df = spark.read.option("mode", "DROPMALFORMED").csv('input1.csv', heade r= True, schema = schm) df. show()
import findspark findspark.init() from PySpark.sql import Sparksession, types spark = Sparksession.builder.master("local").appliame("scenario based")\ -getorcreate() sc = spark.sparkContext dfaspark.read.text("input.csv") df.show(truncate = 0) header = df.first()[0] schema = header.split(-') df_imput = df.filter(df['value'] l= header).rdd.map(lambda x: x[0]. split('-|')).toDF (schema) df_input.show(truncate = 0)
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('scenario based')\ .getorCreate() sc = spark.sparkContext in_df = spark.read.option("delimiter","|").csv("input4.csv", header-True) in_df.show() from PySpark.sql.functions import posexplode_outer, split in_df.withColumn("Qualification", explode_outer(split("Education",","))).show() in_df.select("*", posexplode_outer(split("Education",","))).withColumnRenamed ("col", "Qualification").withColumnRenamed ("pos", "Index").drop(“Education”).show()
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('Modes of Dataframereader')\ .getorCreate() sc = spark.sparkContext df1 = spark.read.option("delimiter","|").csv('input.csv') df2 = spark.read.option("delimiter","|").csv("input2.csv",header=True) from PySpark.sql.functions import lit df_add = df1.withColumn("Gender",lit("null")) df_add. union(df2).show()
For the Union-
from PySpark.sql.types import * schema = StructType( [ StructField("Name", StringType(), True), StructField("Age", StringType(), True), StructField("Gender", StringType(),True), ] ) df3 = spark.read.option("delimiter","|").csv("input.csv", header = True, schema = schema) df4 = spark.read.option("delimiter","|").csv("input2.csv", header = True, schema = schema) df3.union(df4).show()
PySpark is great for big text projects like analysing tons of reviews or processing huge documents. We use things like RDDs or DataFrames to break down the text and analyze it—think word counts or searching for specific phrases. It's like having a super-powered text editor that can instantly handle and analyse entire libraries. Using functions like flatMap() for tokenization and reduceByKey() for counting words ensures efficient processing over clusters.
Window functions in PySpark help us make sense of data that's ordered or grouped in some way. For example, if we’re looking at sales data, we could use a window function to calculate a running total of sales or average sales over a specific period directly within our dataset. It's a bit like having a spreadsheet that automatically updates those figures as we add new data. We can use the over() function to specify a window partitioned by one column, ordered by another, and then apply aggregation functions like sum() or avg() over this window.
Optimising PySpark applications involves several strategies like choosing the right data abstraction (RDD, DataFrame, or DataSet), tuning resource allocation (e.g., memory and cores), and optimising data partitioning. Additionally, minimising data shuffling and using broadcast variables and accumulators wisely can significantly improve performance.
GraphFrames are an enhanced version of DataFrames specifically designed for graph processing in PySpark. They allow us to perform advanced graph operations like finding the shortest paths, calculating PageRank, or running breadth-first searches. Since GraphFrames are integrated with DataFrames, we can seamlessly create, query, and manage graphs while also working with regular tabular data. Imagine trying to figure out the shortest path in a network of roads or ranking websites by importance. GraphFrames build on DataFrames to let us handle these tasks efficiently in PySpark, combining graph algorithms with the ease of SQL-like operations.
Dynamic resource allocation allows PySpark to automatically adjust the number of executors based on workload. This is particularly beneficial in cloud environments where computing resources are elastic. By scaling up resources when demand spikes and scaling down when less compute power is needed, it optimises costs and ensures efficient processing. Dynamic resource allocation is like having an elastic band that stretches resources when needed and contracts when things are quiet. In cloud environments, where we pay for what we use, PySpark can automatically scale up to handle big jobs and scale down when the job's done, saving us money and improving processing efficiency.
MLlib in PySpark is like a treasure chest of machine learning tools. It’s built right into PySpark, so it lets us run algorithms like classification, regression, clustering, and collaborative filtering, as well as feature transformation. MLlib is PySpark's machine learning library that makes practical machine learning scalable and easy. Using MLlib in PySpark allows data scientists to leverage Spark’s distributed computing power to run algorithms faster across clusters. It's perfect for when we need to uncover insights from large datasets quickly.
While PySpark is powerful, it has limitations such as higher memory consumption for certain operations and less efficiency for complex iterative algorithms that do not map well to its execution model. Additionally, its Python API can run slower than the Scala API due to the overhead of Py4J that translates Python API calls to JVM operations. PySpark isn’t perfect—it can be a bit of a memory hog and sometimes slower, especially if we’re using Python instead of Scala. It's also not the best fit for very complex iterative algorithms where we keep looping over the same data. But for many big data tasks, it's still a solid choice.
Keeping our data clean and consistent in PySpark involves setting up checks right from the start. I would use built-in tools to filter out duplicates, correct errors, and make sure everything looks right. Utilising DataFrame API’s built-in functions like dropDuplicates(), filter(), and where() help maintain data consistency. Additionally, writing unit tests and using data validation frameworks can further ensure the quality and consistency of the data processed. Think of it as proofreading our data before it goes out into the world.
Speculative execution is like having a backup plan for slow tasks in PySpark. Speculative execution in PySpark involves launching duplicate tasks for those that are running slower than expected. If certain tasks are straggling due to issues like slow nodes, PySpark will launch duplicate tasks on other nodes. Whichever task finishes first is used, and this can significantly improve performance by reducing delays caused by slow nodes.
StatFunctions in PySpark are part of the DataFrame API and provide statistical methods to quickly summarise data. For example, wecan use df.stat.corr('column1', 'column2') to calculate the correlation between two columns. It’s handy for exploratory data analysis, like when we’re trying to find relationships between variables or just get a quick overview of our data’s distribution and trends. It’s like having a fast, built-in data scientist to help us understand large datasets more intuitively.
PySpark framework is easy to learn and implement. No programming language or database experience is required to learn PySpark. It is very easy to learn if you know programming languages and frameworks. Before getting familiar with PySpark concepts, you should have some knowledge of Apache Spark and Python. Learning advanced concepts of PySpark is very helpful.
MLlib can be used to implement machine learning in Spark. Spark provides a scalable machine learning dataset called MLlib. It is primarily used to make machine learning scalable and lightweight, with common learning algorithms and use cases such as clustering, decay filtering, and dimensionality reduction. This is how machine learning can be implemented in Spark.
There are some basic differences between PySpark and other programming languages. PySpark has its built-in APIs, but other programming languages require APIs to be integrated externally from third parties. Next difference is, implicit communications can be done in PySpark, but it is not possible in other programming languages. PySpark is map based, so developers can use maps to reduce functions. PySpark allows for multiple nodes, again which is not possible in other programming languages.
No, we cannot use PySpark with small data sets. They are not very useful as they are typical library systems with more complex objects than accessible objects. PySpark is great for large amounts of records, so it should be used for large data sets.
Data Science is based on two programming languages, Python and Machine Learning. PySpark is integrated with Python. There are interfaces and built-in environments which are made using both Python and Machine Learning. This makes PySpark an essential tool for data science. Once the dataset is processed, the prototype model is transformed into a production-ready workflow. Because of such reasons, PySpark is important in data science.
Spark can form distributed datasets from any storage source supported by different file systems. These file systems include:
Also, Spark supports text files, Sequence files, and any other Hadoop Input Format.
This is a frequently asked question in PySpark Interview Questions. PySpark is an Apache Spark tool or interface developed by the Apache Spark community and Python to help Python and Spark work together. The tool works with Apache Spark and uses an API written in Python to support features such as Spark SQL, Spark DataFrame, Spark Streaming, Spark Core, and Spark MLlib. It provides an interactive PySpark shell for analyzing and processing structured and semi-structured data in distributed environments by providing a streamlined API that helps programs read data from various data sources. PySpark functionality is implemented in Python in the Py4j library. The availability of the Py4j library allows the user to easily manipulate his Resilient Distributed Datasets (RDDs) in the Python programming language. Python supports many different libraries that support big data processing and machine learning.
We can install PySpark on PyPi using the below command:
pip install PySpark
There are four main characteristics of PySpark. The detailed description of those features can be given as below:
In PySpark, the full form of RDD is Resilient Distributed Datasets. This RDD is the central data structure of PySpark. RDD is a low-level object that is very efficient at performing distributed tasks.
In PySpark, RDDs are elements that can be run and manipulated on multiple nodes to perform parallel processing within a cluster. These are immutable items. This means that once the RDD is created it can not be changed. Also, RDDs are fault tolerant. In case of failure, they are automatically restored. Multiple operations can be applied to an RDD to accomplish a specific task. You can learn more about this concept at Certified Programming courses.
SparkContext serves as the entry point for all Spark functions. When the Spark application runs, the driver program is started and the main function and SparkContext are started. The driver program then executes the operation within the worker node’s executor. In PySpark, SparkContext is recognized as PySpark SparkContext. Using Py4J library, it starts a JVM and creates a JavaSparkContext. PySpark SparkContext is available as ‘sc’ by default, so it does not create a new SparkContext.
PySpark SparkFiles is used to load files into Apache Spark applications. This is one of the functions under SparkContext that you can call with sc.addFile() to load files into Apache Spark. You can also use SparkFiles to get the path using SparkFile.get(). It can also be used to resolve paths to files added using the sc.addFile() method. The class methods in the SparkFiles directory are getrootdirectory() and get(filename).
A must-know for anyone heading into a PySpark interview, this question is frequently asked in PySpark Interview Questions. In PySpark, serialization is the process used to perform performance tuning in Spark. PySpark supports serializers because it needs to continuously inspect data sent and received to disk or storage over the network. There are two types of serializers that PySpark supports. The serializers are:
PySpark ArrayType is a collection data type that extends PySparks’s DataType class. And this DataType class is the superclass of all types. A PySpark ArrayType only contains elements of the same type. We can also create an instance of ArrayType using the ArrayType() method. This method accepts two arguments. The arguments are valueType and valueContainsNull.
For example:
from PySpark.sql.types import StringType, ArrayType
ArrayColumn = ArrayType(StringType().False)
A PySpark DataFrame is a distributed collection of well-organized data. These are just like relational database tables, arranged in named columns. PySpark DataFrames are more optimized than the R or Python programming languages as they can be created from a variety of sources such as Hive tables, structured data files, existing RDDs and external databases.
The biggest advantage of PySpark DataFrame is that data in PySpark DataFrame is distributed to different computers in the cluster and operations performed on it are executed in parallel on all computers. This makes it easy to handle large collections of petabytes of structured or semi-structured data.
In PySpark, the cluster manager is a cluster-mode platform that facilitates Spark execution by allotting all resources worker nodes as needed.
The Spark Cluster Manager ecosystem includes a master node and multiple worker nodes. Master nodes, with the help of the cluster manager, provide resources, such as memory and processor allocation, to worker nodes according to the node's needs.
PySpark supports different cluster managers. Those can be explained as:
In PySpark, SparkSession is the application entry point. For the first version of PySpark, it used SparkContext as an entry point. SparkSession replaces the SparkContext since PySpark version 2.0. From the PySpark version 2.0, SparkSession serves as a starting point to access all PySpark functions related to RDDs, DataFrames, Datasets, etc. This is also the unified API used to access SQLContext, StreamingContext, HiveContext, and all other contexts within it to replace PySpark.
SparkSession internally makes SparkContext and SparkConfig according to details provided in SparkSession. We can also create a SparkSession using the builder patterns.
The main function of Spark Core is to implement several important functions such as storage management, fault tolerance, job monitoring, job setup, and communication with storage systems. It also includes additional libraries built on top of the middle tier used for various streaming, machine learning, and SQL workloads.
The Spark Core is used for the functions like:
There is a common workflow followed by a spark program. The first step is to create the input RDD according to the external data. Data can come from a variety of data sources. Next, after the RDD is created, depending on business logic, RDD transformation operations such as filter() and map() are performed to create a new RDD. In case, if we need to reuse the intermediate RDDs for later purposes, we can keep them. Finally, if there are action operations like first(), count(), etc., Spark fires them up to start parallel computation. This workflow is used by Spark program.
In PySpark, startsWith() and endsWith() methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application.
PySpark supports custom profilers. Custom profilers are used to build predictive models. A profiler is also used to ensure that the data is valid and can be used at the time of consumption. If we want a custom profiler, we will need to define some methods. These methods are:
Yes, PySpark is faster than Pandas. Because it supports parallel execution of statements in a distributed environment. For example, PySpark can run on different cores and machines, which is not available with Pandas. This is the main reason PySpark is faster than Pandas. Check KnowledgeHut's best certification for Programmers for more information about this topic.
PySpark StorageLevel is used to control RDD storage. It can control how and where the RDD is stored. The PySpark StorageLevel decides whether the RDD is stored in memory, on disk, or both. It also determines if we need to replicate the RDD partitions or serialize the RDD. The code for PySpark StorageLevel looks like:
class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)
There are some most used Spark ecosystems like Spark SQL, Spark Streaming, GraphX, MLlib, SparkR, etc.
The main difference between get(filename) and getrootdirectory() is that get(filename) is used to get the correct path to the file that is added using SparkContext.addFile(). On the other hand, getrootdirectory() is used to get the root directory containing the file added using SparkContext.addFile().
The Apache Spark execution engine is a graph execution engine that makes it easy for users to explore large datasets with high presentation. If we want the data to be manipulated at different stages of processing, we need to keep Spark in memory for a radical performance boost.
Hive is used in HQL (Hive Query Language), and Spark SQL is used in Structured Query Language to process and query data. We can easily connect SQL table and HQL table to Spark SQL. Flash SQL is used as a unique segment on the Spark Core engine that supports SQL and Hive Query Language without changing the sentence structure.
Install PySpark using pip with the command pip install pyspark. Ensure Python and Java are installed on our system first.
PySpark itself is not designed for real-time processing but can handle near-real-time data through Spark Streaming, which processes data in micro-batches.
The default level of parallelism in PySpark depends on the cluster configuration but is typically set to the number of cores available on all executor nodes.
Use the read.json() method of the SparkSession object to read JSON files.
Example: spark.read.json("path_to_file.json").
Actions in PySpark are operations that trigger computation and return values.
Examples include count(), collect(), and first().
Transformations in PySpark are operations that create a new RDD from an existing one, such as map(), filter(), and reduceByKey().
Use the dropDuplicates() method on a DataFrame to remove duplicate rows.
Use the printSchema() method on a DataFrame to display its schema.
Use the withColumn() method.
For example: df.withColumn('new_column', expression).
Lazy evaluation in PySpark means that the execution will not start until an action is triggered. This optimization allows PySpark to run more efficiently.
PySpark can read several file formats including but not limited to JSON, CSV, Parquet, ORC, and text files.
Use the cache() method on a DataFrame to persist it in memory, which can speed up subsequent queries.
Use the coalesce() method on an RDD or DataFrame to reduce the number of partitions, ideally to avoid shuffling.
A stage in Spark represents a set of tasks that perform the same computation but on different data partitions without shuffling data.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Use the write.csv() method.
Example: df.write.csv('path_to_save.csv').
DataFrames provide a higher level abstraction, allowing optimizations through Catalyst optimizer; they are faster and easier to use than RDDs for structured data.
Use the na functions in PySpark to handle null values. For instance, we can replace all nulls with a specific value using df.na.fill(value) or drop rows that contain any null values with df.na.drop().
There are some advantages as well as disadvantages of PySpark and this is another frequently asked question in the PySpark interview rounds. Let’s discuss them one by one.
Advantages of PySpark:
Disadvantages of PySpark:
A common question in the PySpark Interview Questions, don't miss this one. Let me tell you about the key differences between RDD, DataFrame and DataSet one by one.
SparkCore is a high-level execution engine for the Spark platform that includes all features. It offers in-memory computing capabilities for superior speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs to simplify development.
The primary role of SparkCore is to perform all basic Input/Output functions, scheduling, monitoring, etc. It is also responsible for troubleshooting and effective memory management.
The key functions of the SparkCore can be listed as:
PySpark also provides a machine learning API called MLlib which is very much like Apache Spark. MLlib supports machine learning algorithms. These algorithms are:
PySpark Partition is a way to divide a large dataset into smaller datasets based on one or more partition keys. As a result, the execution speed is improved because transformations on partitioned data are performed faster because transformations for each partition are performed in parallel. PySpark supports both in-memory (DataFrame) and disk (filesystem) partitioning. When we create a DataFrame from a file or table, PySpark creates the DataFrame in memory with a certain number of subdivisions based on the specified criteria.
It is also easier to split multiple columns with partitionBy() method by passing the column we want to split as an argument to this method. Syntax of this method is: partitionBy(self, *cols).
PySpark recommends having 4x partitions for the number of cores in the cluster that the application can use.
There are some key advantages of PySpark RDD. Here is the detail explanation of those:
Nowadays, every industry makes use of big data to evaluate where they stand and grow. When we hear the term big data, Apache Spark comes to our mind. There are the industry benefits of using PySpark that supports Spark. These benefits can be listed as:
PySpark SparkConf is used to set the configuration and parameters required to run the application on the cluster or on the local system. We can run SparkConf by running the following class:
class PySpark.SparkConf ( localdefaults = True, _jvm = None, _jconf = None )
Where:
We can create DataFrame in PySpark by making use of the createDataFrame() method of the SparkSession. Let me explain with an example.
For example:
data = [(‘Raj’, 20), (‘Rahul’, 20), (‘Simran’, 20)] columns = [“Name”, “Age”] df = spark.createDataFrame(data = data, schema = columns)
This will create a dataframe where columns will be Name and Age. The data will be filled in the columns accordingly.
Now, we can get the schema of dataframe by using the method df.printSchema(). This will look like:
>> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true)
Yes, we can create PySpark DataFrame from the external data source. Real-time applications leverage external file systems such as local systems, HDFS, HBase, MySQL tables, and S3 Azure. The following example shows how to read data from a CSV file residing on your local system and create a DataFrame. PySpark supports many file extensions like csv, text, avro, parquet, tsv, etc.
For example:
df = spark.read.csv(“path/to/filename.csv”)
Expect to come across this popular question in PySpark Interview Questions. PySpark SQL is an organized data library for Spark. PySpark SQL provides more details about data structures and operations than the PySpark RDD API. It comes with the "DataFrame" programming paradigm.
A DataFrame is a fixed distributed collection of data. DataFrame can handle large amounts of organized data such as relational databases, and semi-structured data such as JavaScript Object Notation or JSON. After creating the DataFrame, we can handle the data using SQL syntax/queries.
In PySpark SQL, the first step is to create a temporary table in the DataFrame using the createOrReplaceTempView() function. Tables are available in the SparkSession via the sql() method. These temporary tables can be dropped by closing the SparkSession.
The example of PySpark SQL:
import findspark findspark.init() import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql(“select ‘spark’ as hello”) df.show()
The Lineage Graph is a collection of RDD dependencies. There are separate lineage graphs for every Spark application. The lineage graph recompiles RDDs on-demand and restores misplaced data from persisted RDDs. An RDD lineage graph lets us assemble a new RDD or restore data from a lost persisted RDD. It was created by using changes to the RDD and generating a regular execution plan.
Catalyst Optimizer plays a very important role in Apache Spark. It helps to improve structural queries in SQL or expressed through DataFrame or DataSet APIs by reducing the program execution time and cost. The Spark Catalyst Optimizer supports both cost-based and rule-based optimization. Rule-based optimization contains a set of rules that define how a query is executed. Cost-based optimization uses rules to create multiple plans and calculate their costs. Catalyst optimizer also manages various big data challenges like semi-structured data and advanced analytics.
PySpark is a Python API. It was created and distributed by the Apache Spark agency to make working with Spark less complicated for Python programmers. Scala is the programming language utilized by Apache Spark. It can work with different languages like Java, R, and Python.
Because Scala is a compile-time, type-secure language, Apache Spark has many skills that PySpark does not. One of which is incorporating Datasets. Datasets are a series of domain-specific objects that can be used to execute concurrent calculations.
In PySpark, the Parquet file is a column-type format supported by various data processing systems. Using a Parquet file, Spark SQL can perform both read and write operations.
A Parquet file contains column-type format storage that provides some benefits. The benefits are: It is small and takes up less space. It makes it easier for us to access specific columns. It follows type-specific encoding. It offers better summarized data. It contains very limited Input/Output operations.
We can use some steps to connect Spark with Mesos.
First, configure the sparkle driver program to associate with Mesos. The paired Spark volume must be in an open Mesos region. Then install Apache Spark in a similar region as Apache Mesos and design the "spark.mesos.executor.home" property to point to the region where it is installed. This is how we can associate Spark with Apache Mesos.
In PySpark, DStream is an acronym for Discretized Stream. It is a collection of groups or RDDs of information divided into small clusters. It is also called Apache Spark Discretized Stream and is used as a collection of RDDs in the cluster. DStreams are grounded in Spark RDDs. They are used to allow uninterrupted coordination of streaming with some other Apache Spark options such as Spark MLlib and Spark SQL.
The code to see the specific keyword exists or not will be:
lines = sc.textFile(“hdfs://Hadoop/user/test_file.txt”); def isFound(line): if line.find(“my_keyword”) > -1 return 1 return 0 foundBits = lines.map(isFound); sum = foundBits.reduce(sum); if sum > 0: print “Found” else: print “Not Found”;
If the keyword is found it will print as “Found” else if it not found it will print “Not Found”.
There are two types of errors in Python: syntax errors and exceptions.
Syntax errors are often referred to as parsing errors. Bugs are errors in a program that can cause it to crash or terminate unexpectedly. When the parser detects an error, it repeats the problematic line and then displays an arrow pointing to the beginning of the line.
Exceptions occur in a program when the program's normal flow is disrupted by an external event. Even if the program's syntax is accurate, there is a chance that an error will be encountered during execution; however, this error is an exception. ZeroDivisionError, TypeError, and NameError are some examples of exceptions.
Receivers are unique objects in Apache Spark Streaming whose sole purpose is to consume data from various data sources and then push it to Spark. By streaming contexts as long-running tasks on different executors, we can generate receiver objects.
There are two different types of receivers which are as follows:
When data is unevenly distributed, PySpark has a few tricks up its sleeve to deal with it. It can shuffle data around to balance things out, sort of like how a teacher might rearrange students in a classroom to balance the teams. This helps everything run more smoothly and evenly.
Tungsten is all about making Spark run faster and use less memory. It does this by being really smart about how it handles data behind the scenes—think of it as the wizard who fine-tunes the engine of a car for better performance and efficiency.
In PySpark, the Catalyst optimizer takes care of making our queries run faster. It works like a smart planner that tweaks and rearranges the steps of our query to make it more efficient. It's like having an expert optimise our travel route, avoiding traffic jams and roadblocks.
Broadcasting is like giving a cheat sheet to all parts of our Spark program. Instead of sending data to each worker node every time it's needed, PySpark sends it once, and each node holds onto it. This is super handy for data that we need to reference often across many nodes because it saves a lot of time and network traffic.
Accumulators are like counters or tally marks that PySpark uses to keep track of information across tasks. For example, if we wanted to count how many times a certain word appears in documents across all nodes, we could use an accumulator to keep a running total even as our data is processed in parallel.
Whenever PySpark performs a transform operation using filter(), map(), or reduce(), they are executed on a remote node that uses the variables supplied with the tasks. These variables are not reusable and cannot be shared across jobs because they are not returned to the controller. To solve the problem of reusability and sharing, we use shared variables in PySpark. There are two types of shared variables:
Broadcast Variables:
They are also known as read-only shared variables. These are used in cases of data lookup requests. These variables are cached and made available on all cluster nodes to use. These variables are not sent with every task. Instead, they are distributed to nodes using efficient algorithms to reduce communication costs. When we run an RDD task operation that uses Broadcast variables, PySpark does this:
Broadcast variables are created in PySpark using the broadcast(variable) method of the SparkContext class. The main reason for the use of broadcast variables is that the variables are not sent to the tasks when the broadcast function is called. They will be sent when the variables are first required by the executors.
Accumulator Variables:
These variables are known as updatable shared variables. These variables are added through associative and commutative methods. They are used for performing counter or sum operations. PySpark supports the default creation of numeric type accumulators. It can also be used to add custom accumulator types. The custom types are of two types: Named accumulator and Unnamed accumulator.
One of the most frequently posed PySpark Interview Questions, be ready for it. The DAG is a full form of Direct Acyclic Graph. In Spark, DAGScheduler represents scheduling layer that implements task scheduling in a phase-oriented manner using tasks and phases. The logical execution plan (the dependencies of the transformation action line on the RDD) is transformed into a physical execution plan consisting of phases. It calculates the DAG of phases needed for each task and keeps track of which phases are realized RDDs and finds the minimum schedule for executing the tasks. These stages are then sent to the TaskScheduler to run the stages. This is shown in the image below:
DAGScheduler computes the DAG execution for the task. It specifies preferred locations for running each task. It handles failure due to loss of output files during mixing.
PySpark DAGScheduler follows an event queue architecture. Here, the thread publishes events of type DAGSchedulerEvent, such as a new stage or task. DAGScheduler then reads the phases and executes them sequentially in topological order.
Let’s understand the process through an example. Here, we want to capitalize the first letter of every word in a string. There is no default feature in PySpark that can achieve this. However, we can make this happen by creating a UDF capitalizeWord(str) and using it on the DataFrames.
First, we will create a Python function capitalizeWord(). This function takes a string as an input and then capitalizes the first character of every word.
def capitalizeWord(str): result = “” words = str.split(“”) for word in words: result = result + word[0:1].upper() + word[1:len(x)] + “” return result
Now, we will register the function as a PySpark UDF using the udf() method from org.apache.spark.sql.functions.udf package. This package must be imported. This function will return the object of class org.apache.spark.sql.expressions.UserDefinedFunction. The code for converting a function to UDF is:
capitalizeWordUDF = udf(lambda z: capitalizeWord(z), StringType())
Next step is to use UDF with DataFrame. We can apply UDF on a Python DataFrame as it will act as the built-in function of DataFrame. Consider we have a DataFrame stored in variable df, which has the columns as ID_COLUMN and NAME_COLUMN. Now, to capitalize every first character of the word, we will code as:
df.select(col(“ID_COLUMN”), convertUDF(col(“NAME_COLUMN”)) .alias(“NAME_COLUMN”) ) .show(truncate = False)
UDFs must be designed so that the algorithms are efficient and take up less time and space. If care is not taken, the performance of DataFrame operations will be affected.
We use the builder pattern to create a SparkSession. In PySpark.sql library, there is the SparkSession class which has the getOrCreate() method. This method will create a new SparkSession if there is none or else it will return the existing SparkSession object. The code for create SparkSession looks like:
import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.master(“local[1]”) .appName(‘KnowledgeHuntSparkSession’) .getOrCreate()
Where,
If we want to create a new SparkSession object each time, we can use the newSession method. This method looks like:
import PySpark from PySpark.sql import SparkSession spark_session = SparkSession.newSession
In PySpark, there are various methods used to create RDD. Let’s discuss them one by one.
Using sparkContext.parallelize() - SparkContext's parallelize() method can be used to create RDDs. This method retrieves an existing collection from the controller and parallelizes it. This is the basic approach to creating an RDD and is used when we have data already present in memory. This also requires all data to be present in the driver before the RDD is created. The code to create an RDD using the parallelize method for the python list is:
list = [1,2,3,4,5,6,7,8,9,10,11] rdd = spark.sparkContext.parallelize(list)
Using sparkContext.textFile() - We can read .txt file and convert it into RDD. The syntax looks like:
rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")
Using sparkContext.wholeTextFiles() - This method returns PairRDD. PairRDD is an RDD that contains key-value pairs. In this PairRDD, file path is the key and file content is the value. This method reads entire file into an RDD as a single record. Besides, this method can also read files in other formats like CSV, JSON, parquet, etc. and create the RDDs.
rdd_whole_text = spark.sparkContext.wholeTextFiles("/path/to/textFile.txt")
Empty RDD with no partition using sparkContext.emptyRDD - The RDD without any data is known as empty RDD. We can make such RDDs which have no partitions by using the emptyRDD() method. The code piece for that will look like:
empty_rdd = spark.sparkContext.emptyRDD empty_rdd_string = spark.sparkContext.emptyRDD[String]
Empty RDD with partitions using sparkContext.parallelize - When we require partition but not the data, then we create empty RDD by using the parallelize method. For example, below code create the empty RDD with 10 partitions:
empty_partitioned_rdd = spark.sparkContext.parallelize([],10)
PySpark SQL is the most famous PySpark module used to process structured columnar data. Once the DataFrame is created, we can work with the data using SQL syntax. Spark SQL is used to pass native raw SQL queries to Spark using select, where group by, join, union, etc. To use PySpark SQL, the first step is to create a temporary table on a DataFrame using the createOrReplaceTempView() method. Once created, the table is accessible within a SparkSession using the sql() function. When the SparkSession is terminated, the temporary table will get dropped.
For example, consider that we have a DataFrame assigned to the variable df which contains Name, Age and Gender of Students as the columns. Now, we will create a temporary table of the DataFrame that gets access to the SparkSession by using the sql() function. The SQL queries can be run within the function.
df.createOrReplaceTempView("STUDENTS") df_new = spark.sql("SELECT * from STUDENTS") df_new.printSchema() The schema will be shown as: >> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true) |-- Gender: string (nullable = true)
We can use join() method that is present in PySpark SQL. The syntax of the method looks like:
join(self, other, on = None, how = None)
Where,
where() and filter() methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method.
For example, consider we have two dataframes named Employee and Department. Both have columns named as emp_id, emp_name, empdept_id and dept_id, dept_name respectively. We can internally join the Employee DataFrame with the Department DataFrame to get the department information along with the employee information. The code will look like:
emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False)
PySpark Streaming is a scalable, fault-tolerant, high-throughput process streaming system that supports both streaming and batch loading to support real-time data from data sources such as TCP Socket, S3, Kafka, Twitter, file system folders, etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS, etc.
To stream from a TCP socket, we can use the readStream.format("socket") method of the Spark session object to read data from the TCP socket and provide the host and port of the streaming source as options. The code will look like:
from PySpark import SparkContext from PySpark.streaming import StreamingContext from PySpark.sql import SQLContext from PySpark.sql.functions import desc sc = SparkContext() ssc = StreamingContext(sc, 10) sqlContext = SQLContext(sc) socket_stream = ssc.socketTextStream("127.0.0.1", 5555) lines = socket_stream.window(20) df.printSchema()
Spark retrieves the data from the socket and represents it in the value column of the DataFrame object. After processing the data, the DataFrame can be streamed to the console or other destinations on demand such as Kafka, dashboards, databases, etc.
Spark automatically stores intermediate data from various shuffle processes. However, it is recommended to use RDD's persist() function. There are many levels of persistence for storing RDDs in memory, disk, or both, with varying levels of replication. The following persistence levels are available in Spark:
The persist() function has the following syntax for using persistence levels:
df.persist(StorageLevel.)
The streaming application must be available 24/7 and tolerant of errors outside the application code (e.g., system crashes, JVM crashes, etc.). The checkpointing process makes streaming applications more fault tolerant. We can store data and metadata in the checkpoint directory.
Checkpoint can be of two types – metadata check and data check.
A metadata checkpoint allows you to store the information that defines a streaming computation in a fault-tolerant storage system such as HDFS. This helps to recover data after a streaming application controller node failure.
Data checkpointing means saving the created RDDs to a safe place. This type of checkpoint requires several state calculations that combine data from different batches.
We can determine the total number of unique words by following certain steps in PySpark. The steps to follow can be listed as:
Open the text file in RDD mode.
sc.textFile(“hdfs://Hadoop/user/test_file.txt”);
Then write a function that will convert each line into a single word.
def toWords(line): return line.split();
Now, run the toWords function on every member of the RDD in Spark.
words = line.flatMap(toWords);
Next, generate a (key, value) pair for every word.
def toTuple(word): return (word, 1); wordTuple = words.map(toTuple);
Run the reduceByKey() command.
def sum(x, y): return x+y: counts = wordsTuple.reduceByKey(sum)
Then print it out.
counts.collect()
The basis of Spark Streaming is dividing the content of the data stream into batches of X seconds, known as DStreams. These DStreams allow developers to cache data, which can be especially useful if data from a DStream is used multiple times. The cache() function or the persist() method can be used to cache data with the correct persistence settings. For input streams receiving data over networks such as Kafka, Flume, and others, the default persistence level is configured to achieve data replication across two nodes to achieve fault tolerance.
Cache method -
val cacheDf = dframe.cache()
Persist method -
val persistDf = dframe.persist(StorageLevel.MEMORY_ONLY)
There are some benefits of Caching. The key benefits are time saving and cost efficiency. Since Spark computations are expensive, caching helps in data reuse, which leads to reuse of computations and reduces the cost of operations. By reusing calculations, we can save a lot of time. Worker nodes can execute/run more tasks by reducing the computation execution time. Hence more tasks can be achieved.
Spark RDD is extended with a robust API called GraphX that supports graphs and graph-based computations. The Resilient Distributed Property Graph is an enhanced property of Spark RDD, which is a directed multigraph with many parallel edges. User-defined characteristics are associated with each edge and vertex. Multiple connections between the same set of vertices are represented by parallel edges. GraphX offers a collection of operators that enable graph computations such as subgraph, mapReduceTriplets, joinVertices, and so on. It also offers many chart builders and algorithms to facilitate chart analysis.
According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs, which allows us to assemble different parts of jobs that can use any language. An RDD transformation can be created using the pipe() function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.
import findspark findspark.init() from PySpark. sql import Sparksession, types spark = Sparksession.builder.master("local").appName( "Modes of Dataframereader')\ .getorcreate() sc = spark.sparkContext from PySpark.sql.types import * schm structype([ structField("col_1", stringType(), True), structField("col_2", stringType(), True), structrield("col", stringtype(), True), ]) df = spark.read.option("mode", "DROPMALFORMED").csv('input1.csv', heade r= True, schema = schm) df. show()
import findspark findspark.init() from PySpark.sql import Sparksession, types spark = Sparksession.builder.master("local").appliame("scenario based")\ -getorcreate() sc = spark.sparkContext dfaspark.read.text("input.csv") df.show(truncate = 0) header = df.first()[0] schema = header.split(-') df_imput = df.filter(df['value'] l= header).rdd.map(lambda x: x[0]. split('-|')).toDF (schema) df_input.show(truncate = 0)
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('scenario based')\ .getorCreate() sc = spark.sparkContext in_df = spark.read.option("delimiter","|").csv("input4.csv", header-True) in_df.show() from PySpark.sql.functions import posexplode_outer, split in_df.withColumn("Qualification", explode_outer(split("Education",","))).show() in_df.select("*", posexplode_outer(split("Education",","))).withColumnRenamed ("col", "Qualification").withColumnRenamed ("pos", "Index").drop(“Education”).show()
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('Modes of Dataframereader')\ .getorCreate() sc = spark.sparkContext df1 = spark.read.option("delimiter","|").csv('input.csv') df2 = spark.read.option("delimiter","|").csv("input2.csv",header=True) from PySpark.sql.functions import lit df_add = df1.withColumn("Gender",lit("null")) df_add. union(df2).show()
For the Union-
from PySpark.sql.types import * schema = StructType( [ StructField("Name", StringType(), True), StructField("Age", StringType(), True), StructField("Gender", StringType(),True), ] ) df3 = spark.read.option("delimiter","|").csv("input.csv", header = True, schema = schema) df4 = spark.read.option("delimiter","|").csv("input2.csv", header = True, schema = schema) df3.union(df4).show()
PySpark is great for big text projects like analysing tons of reviews or processing huge documents. We use things like RDDs or DataFrames to break down the text and analyze it—think word counts or searching for specific phrases. It's like having a super-powered text editor that can instantly handle and analyse entire libraries. Using functions like flatMap() for tokenization and reduceByKey() for counting words ensures efficient processing over clusters.
Window functions in PySpark help us make sense of data that's ordered or grouped in some way. For example, if we’re looking at sales data, we could use a window function to calculate a running total of sales or average sales over a specific period directly within our dataset. It's a bit like having a spreadsheet that automatically updates those figures as we add new data. We can use the over() function to specify a window partitioned by one column, ordered by another, and then apply aggregation functions like sum() or avg() over this window.
Optimising PySpark applications involves several strategies like choosing the right data abstraction (RDD, DataFrame, or DataSet), tuning resource allocation (e.g., memory and cores), and optimising data partitioning. Additionally, minimising data shuffling and using broadcast variables and accumulators wisely can significantly improve performance.
GraphFrames are an enhanced version of DataFrames specifically designed for graph processing in PySpark. They allow us to perform advanced graph operations like finding the shortest paths, calculating PageRank, or running breadth-first searches. Since GraphFrames are integrated with DataFrames, we can seamlessly create, query, and manage graphs while also working with regular tabular data. Imagine trying to figure out the shortest path in a network of roads or ranking websites by importance. GraphFrames build on DataFrames to let us handle these tasks efficiently in PySpark, combining graph algorithms with the ease of SQL-like operations.