PySpark Interview Questions and Answers for 2024

PySpark is open-source distributed computing software. It helps to create more scalable analytics and pipelines to increase processing speed. It also works as a library for large-scale real-time data processing. In this article, we will cover some frequently asked PySpark interview questions to help you understand the types of questions you might be asked in a Python and PySpark-related interview. Whether you are a beginner or an intermediate or an experienced PySpark professional, this guide will help you increase your confidence and knowledge in PySpark. The questions include important topics from PySpark such as DataFrames, RDD, serializers, cluster managers, SparkContext, SparkSession, etc. With PySpark Interview Questions, you can be confident that you will be well-prepared for your next interview. So, if you are looking to advance your career in PySpark, this guide is the perfect resource for you.

  • 4.7 Rating
  • 66 Question(s)
  • 30 Mins of Read
  • 13846 Reader(s)

Beginner

PySpark framework is easy to learn and implement. No programming language or database experience is required to learn PySpark. It is very easy to learn if you know programming languages and frameworks. Before getting familiar with PySpark concepts, you should have some knowledge of Apache Spark and Python. Learning advanced concepts of PySpark is very helpful.

MLlib can be used to implement machine learning in Spark. Spark provides a scalable machine learning dataset called MLlib. It is primarily used to make machine learning scalable and lightweight, with common learning algorithms and use cases such as clustering, decay filtering, and dimensionality reduction. This is how machine learning can be implemented in Spark.

There are some basic differences between PySpark and other programming languages. PySpark has its built-in APIs, but other programming languages require APIs to be integrated externally from third parties. Next difference is, implicit communications can be done in PySpark, but it is not possible in other programming languages. PySpark is map based, so developers can use maps to reduce functions. PySpark allows for multiple nodes, again which is not possible in other programming languages.

No, we cannot use PySpark with small data sets. They are not very useful as they are typical library systems with more complex objects than accessible objects. PySpark is great for large amounts of records, so it should be used for large data sets.

Data Science is based on two programming languages, Python and Machine Learning. PySpark is integrated with Python. There are interfaces and built-in environments which are made using both Python and Machine Learning. This makes PySpark an essential tool for data science. Once the dataset is processed, the prototype model is transformed into a production-ready workflow. Because of such reasons, PySpark is important in data science.

Spark can form distributed datasets from any storage source supported by different file systems. These file systems include: 

  • Hadoop Distributed File Systems (HDFS) 
  • Local File System 
  • Cassandra 
  • HBase 
  • Amazon S3 

Also, Spark supports text files, Sequence files, and any other Hadoop Input Format. 

This is a frequently asked question in PySpark Interview Questions. PySpark is an Apache Spark tool or interface developed by the Apache Spark community and Python to help Python and Spark work together. The tool works with Apache Spark and uses an API written in Python to support features such as Spark SQL, Spark DataFrame, Spark Streaming, Spark Core, and Spark MLlib. It provides an interactive PySpark shell for analyzing and processing structured and semi-structured data in distributed environments by providing a streamlined API that helps programs read data from various data sources. PySpark functionality is implemented in Python in the Py4j library. The availability of the Py4j library allows the user to easily manipulate his Resilient Distributed Datasets (RDDs) in the Python programming language. Python supports many different libraries that support big data processing and machine learning. 

We can install PySpark on PyPi using the below command:  

pip install PySpark  

There are four main characteristics of PySpark. The detailed description of those features can be given as below: 

  1. Nodes are abstracted - Nodes are abstracted in the PySpark. This means that individual worker nodes are inaccessible. 
  2. PySpark is based on MapReduce - PySpark is based on Hadoop’s MapReduce model. This means that the programmer provides maps and reduced functions.  
  3. API for Spark functions - PySpark provides an API for using Spark functions. 
  4. Abstracted networks - PySpark provides abstract networks. This means that networking is abstracted in PySpark and only implicit communications are allowed. 

In PySpark, the full form of RDD is Resilient Distributed Datasets. This RDD is the central data structure of PySpark. RDD is a low-level object that is very efficient at performing distributed tasks. 

In PySpark, RDDs are elements that can be run and manipulated on multiple nodes to perform parallel processing within a cluster. These are immutable items. This means that once the RDD is created it can not be changed. Also, RDDs are fault tolerant. In case of failure, they are automatically restored. Multiple operations can be applied to an RDD to accomplish a specific task. You can learn more about this concept at Certified Programming courses. 

SparkContext serves as the entry point for all Spark functions. When the Spark application runs, the driver program is started and the main function and SparkContext are started. The driver program then executes the operation within the worker node’s executor. In PySpark, SparkContext is recognized as PySpark SparkContext. Using Py4J library, it starts a JVM and creates a JavaSparkContext. PySpark SparkContext is available as ‘sc’ by default, so it does not create a new SparkContext.

PySpark SparkFiles is used to load files into Apache Spark applications. This is one of the functions under SparkContext that you can call with sc.addFile() to load files into Apache Spark. You can also use SparkFiles to get the path using SparkFile.get(). It can also be used to resolve paths to files added using the sc.addFile() method. The class methods in the SparkFiles directory are getrootdirectory() and get(filename).

A must-know for anyone heading into a PySpark interview, this question is frequently asked in PySpark Interview Questions. In PySpark, serialization is the process used to perform performance tuning in Spark. PySpark supports serializers because it needs to continuously inspect data sent and received to disk or storage over the network. There are two types of serializers that PySpark supports. The serializers are: 

  • PickleSerializer: This is used to serialize objects with its PickleSerializer class in Python using the PySpark. This serializer supports all Python objects. 
  • MarshalSerializer: The MarshalSerializer is used to perform object serialization. This is available using the PySpark MarshalSerializer class. This serializer is much faster than the PickleSerializer, but it supports limited types.  

PySpark ArrayType is a collection data type that extends PySparks’s DataType class. And this DataType class is the superclass of all types. A PySpark ArrayType only contains elements of the same type. We can also create an instance of ArrayType using the ArrayType() method. This method accepts two arguments. The arguments are valueType and valueContainsNull.  

  1. valueType: This argument must extend PySpark’s DataType class. 
  2. valueContainsNull: This is an optional argument. It indicates whether the value can accept nulls and it is set to True by default. 

For example: 

from PySpark.sql.types import StringType, ArrayType 

ArrayColumn = ArrayType(StringType().False) 

A PySpark DataFrame is a distributed collection of well-organized data. These are just like relational database tables, arranged in named columns. PySpark DataFrames are more optimized than the R or Python programming languages as they can be created from a variety of sources such as Hive tables, structured data files, existing RDDs and external databases.  

The biggest advantage of PySpark DataFrame is that data in PySpark DataFrame is distributed to different computers in the cluster and operations performed on it are executed in parallel on all computers. This makes it easy to handle large collections of petabytes of structured or semi-structured data.

In PySpark, the cluster manager is a cluster-mode platform that facilitates Spark execution by allotting all resources worker nodes as needed.

The Spark Cluster Manager ecosystem includes a master node and multiple worker nodes. Master nodes, with the help of the cluster manager, provide resources, such as memory and processor allocation, to worker nodes according to the node's needs.

PySpark supports different cluster managers. Those can be explained as: 

  • Standalone: Standalone is a very simple cluster manager that comes with Spark. 
  • Apache Mesos: This cluster manager is useful to run Hadoop MapReduce and PySpark apps.  
  • Hadoop YARN: This cluster manager is used with Hadoop2. 
  • Kubernetes: This is an open-source cluster manager that helps to automate the deployment, scaling, and automated management of containerized apps. 
  • Local: This cluster manager is a path for running Spark applications on laptops/desktops.  

In PySpark, SparkSession is the application entry point. For the first version of PySpark, it used SparkContext as an entry point. SparkSession replaces the SparkContext since PySpark version 2.0. From the PySpark version 2.0, SparkSession serves as a starting point to access all PySpark functions related to RDDs, DataFrames, Datasets, etc. This is also the unified API used to access SQLContext, StreamingContext, HiveContext, and all other contexts within it to replace PySpark. 

SparkSession internally makes SparkContext and SparkConfig according to details provided in SparkSession. We can also create a SparkSession using the builder patterns.  

The main function of Spark Core is to implement several important functions such as storage management, fault tolerance, job monitoring, job setup, and communication with storage systems. It also includes additional libraries built on top of the middle tier used for various streaming, machine learning, and SQL workloads.  

The Spark Core is used for the functions like: 

  1. Resilience and recovery. 
  2. To interact with the storage system. 
  3. Memory management. 
  4. Scheduling and monitoring jobs in a cluster.

There is a common workflow followed by a spark program. The first step is to create the input RDD according to the external data. Data can come from a variety of data sources. Next, after the RDD is created, depending on business logic, RDD transformation operations such as filter() and map() are performed to create a new RDD. In case, if we need to reuse the intermediate RDDs for later purposes, we can keep them. Finally, if there are action operations like first(), count(), etc., Spark fires them up to start parallel computation. This workflow is used by Spark program.

In PySpark, startsWith() and endsWith() methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application. 

  • startsWith() method: This method is used to return a boolean value. It indicates TRUE if the value in the column starts with the specified string, and FALSE if the value in that column does not match.  
  • endsWith() method: This method is used to return a boolean value. It indicates TRUE if the column value ends with the specified string, and FALSE if the match for that column value is not satisfied. Both methods are case sensitive.  

PySpark supports custom profilers. Custom profilers are used to build predictive models. A profiler is also used to ensure that the data is valid and can be used at the time of consumption. If we want a custom profiler, we will need to define some methods. These methods are: 

  1. stats: This method is used to return aggregated profiling statistics. 
  2. profile: This method is used to create a kind of system profile. 
  3. dump: This is used to back up the profile to the specified path.  
  4. dump(id, path): This is used to dump a specific RDD ID to the specified path.  
  5. add: This is used to add a profile to an existing cumulative profile. A profile class must be selected when creating a SparkContext.

Yes, PySpark is faster than Pandas. Because it supports parallel execution of statements in a distributed environment. For example, PySpark can run on different cores and machines, which is not available with Pandas. This is the main reason PySpark is faster than Pandas. Check KnowledgeHut's best certification for Programmers for more information about this topic.  

PySpark StorageLevel is used to control RDD storage. It can control how and where the RDD is stored. The PySpark StorageLevel decides whether the RDD is stored in memory, on disk, or both. It also determines if we need to replicate the RDD partitions or serialize the RDD. The code for PySpark StorageLevel looks like: 

class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)  

There are some most used Spark ecosystems like Spark SQL, Spark Streaming, GraphX, MLlib, SparkR, etc.  

  • Spark SQL is for developers. It is also known as Shark. 
  • Spark Streaming is used for processing live data streams. 
  • Graphx is used for generating and calculating graphs. 
  • MLlib (also called as Machine Learning Algorithms) 
  • SparkR is used to promote the R programming language in the Spark engine.  

The main difference between get(filename) and getrootdirectory() is that get(filename) is used to get the correct path to the file that is added using SparkContext.addFile(). On the other hand, getrootdirectory() is used to get the root directory containing the file added using SparkContext.addFile().

The Apache Spark execution engine is a graph execution engine that makes it easy for users to explore large datasets with high presentation. If we want the data to be manipulated at different stages of processing, we need to keep Spark in memory for a radical performance boost. 

Hive is used in HQL (Hive Query Language), and Spark SQL is used in Structured Query Language to process and query data. We can easily connect SQL table and HQL table to Spark SQL. Flash SQL is used as a unique segment on the Spark Core engine that supports SQL and Hive Query Language without changing the sentence structure.

Install PySpark using pip with the command pip install pyspark. Ensure Python and Java are installed on our system first.

PySpark itself is not designed for real-time processing but can handle near-real-time data through Spark Streaming, which processes data in micro-batches.

The default level of parallelism in PySpark depends on the cluster configuration but is typically set to the number of cores available on all executor nodes.

Use the read.json() method of the SparkSession object to read JSON files. 

Example: spark.read.json("path_to_file.json").

Actions in PySpark are operations that trigger computation and return values.
Examples include count(), collect(), and first().

Transformations in PySpark are operations that create a new RDD from an existing one, such as map(), filter(), and reduceByKey().

Use the dropDuplicates() method on a DataFrame to remove duplicate rows.

Use the printSchema() method on a DataFrame to display its schema.

Use the withColumn() method.
For example: df.withColumn('new_column', expression).

Lazy evaluation in PySpark means that the execution will not start until an action is triggered. This optimization allows PySpark to run more efficiently.

PySpark can read several file formats including but not limited to JSON, CSV, Parquet, ORC, and text files.

Use the cache() method on a DataFrame to persist it in memory, which can speed up subsequent queries.

Use the coalesce() method on an RDD or DataFrame to reduce the number of partitions, ideally to avoid shuffling.

A stage in Spark represents a set of tasks that perform the same computation but on different data partitions without shuffling data.

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Use the write.csv() method.
Example: df.write.csv('path_to_save.csv').

DataFrames provide a higher level abstraction, allowing optimizations through Catalyst optimizer; they are faster and easier to use than RDDs for structured data.

Use the na functions in PySpark to handle null values. For instance, we can replace all nulls with a specific value using df.na.fill(value) or drop rows that contain any null values with df.na.drop().

Intermediate

There are some advantages as well as disadvantages of PySpark and this is another frequently asked question in the PySpark interview rounds. Let’s discuss them one by one. 

Advantages of PySpark: 

  1. PySpark is an easy language to learn. It is easy to learn and implement if you know Python and Apache Spark. 
  2. PySpark is easy to use. It provides easy-to-write parallelized code. 
  3. Error handling is easy in the PySpark framework. It provides easy error handling and sync point management. 
  4. PySpark is a Python API for Apache Spark. PySpark provides excellent library support. Python has a huge collection of libraries for doing data science and data visualization compared to other languages. 
  5. Spark has already written and implemented many important algorithms. It offers many algorithms in machine learning or graphs.  

Disadvantages of PySpark: 

  1. Since PySpark is based on Hadoop’s MapReduce model, it can be difficult to manage and express problems in MapReduce models. 
  2. Apache Spark was originally written in Scala, and PySpark was used in Python programs, so it is not as efficient as other programming models. It is about 10 times slower than a Scala program. This negatively impacts the performance of heavy computing applications. 
  3. PySpark’s Spark Streaming API is not as efficient as Scala’s Streaming API. It still needs to improve. 
  4. PySpark has abstracted nodes and uses an abstracted network, so it cannot be used to change Spark's internal workings. Scala is recommended in this case.  

A common question in the PySpark Interview Questions, don't miss this one. Let me tell you about the key differences between RDD, DataFrame and DataSet one by one. 

  1. RDD: RDD is full form of Resilient Distributed Dataset. It is the central data structure of PySpark. RDDs are low-level objects that are very efficient at performing distributed tasks. RDDs are ideal for low-level transformations, manipulations, and controls on datasets. RDDs are primarily used to manipulate data using functional programming constructs rather than domain-specific expressions. An RDD can be efficiently reserved if there is a similar arrangement of data that needs to be recomputed. RDD contains all records and DataFrame in PySpark. 
  2. DataFrame: A DataFrame corresponds to a relational table in Spark SQL. It has structures such as lines and line segments are easier to see. When working with Python, it's best to start with DataFrames and progress to RDDs if we need flexibility. One of biggest drawbacks DataFrame has been its Compile Time Wellbeing. For example, if the information structure is unknown, we have no control over it. 
  3. DataSet: A dataset is a distributed collection of data. This is a subset of DataFrame. Dataset is a new interface added in Spark 1.6 to provide the benefits of RDDs. DataSet consists of optimally encoded components. Unlike information fringes, it provides time certainty in an organized way. DataSet provides a higher level of compile-time type safety. It can be used when a typed JVM object is required. DataSet allows you to take advantage of Catalyst optimizations. It can also be used to take advantage of Tungsten's fast code generation.

SparkCore is a high-level execution engine for the Spark platform that includes all features. It offers in-memory computing capabilities for superior speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs to simplify development.  

The primary role of SparkCore is to perform all basic Input/Output functions, scheduling, monitoring, etc. It is also responsible for troubleshooting and effective memory management.  

The key functions of the SparkCore can be listed as: 

  • To perform all basic Input/Output functions 
  • Job scheduling 
  • Job monitoring 
  • Storage management 
  • Fault tolerance 
  • To interact with storage systems

PySpark also provides a machine learning API called MLlib which is very much like Apache Spark. MLlib supports machine learning algorithms. These algorithms are: 

  1. mllib.classification: This machine learning API supports many types of functions of binary or multi-class classification and regression analysis. These functions include random forests, decision trees, and naive bayes. 
  2. mllib.clustering: This machine learning API solves the clustering problem for grouping subsets of entities based on identical functions. 
  3. mllib.fpm: FPM is an acronym for Frequent Pattern Matching for this machine learning API. This machine learning API is used to mine common elements, subsequences, or other structures used to analyze large datasets. 
  4. mllib.linalg: This machine learning API is used to solve linear algebra problems. 
  5. mllib.recommendation: This machine learning API is used for collaborative filtering and recommendation systems. 
  6. spark.mllib: This machine learning API is used to support model-based collaborative filtering which identifies the small latent factors using the Alternating Least Squares (ALS) algorithm. This ALS algorithm is used to predict missing entries. 
  7. mllib.regression: This machine learning API is useful for resolving problems using regression algorithms. These algorithms find relationships and variable dependencies.  

PySpark Partition is a way to divide a large dataset into smaller datasets based on one or more partition keys. As a result, the execution speed is improved because transformations on partitioned data are performed faster because transformations for each partition are performed in parallel. PySpark supports both in-memory (DataFrame) and disk (filesystem) partitioning. When we create a DataFrame from a file or table, PySpark creates the DataFrame in memory with a certain number of subdivisions based on the specified criteria.

It is also easier to split multiple columns with partitionBy() method by passing the column we want to split as an argument to this method. Syntax of this method is: partitionBy(self, *cols).

PySpark recommends having 4x partitions for the number of cores in the cluster that the application can use.

There are some key advantages of PySpark RDD. Here is the detail explanation of those: 

  1. Immutability: In PySpark, RDDs are immutable. Once created, it cannot be changed later. Every time I apply a transform operation to an RDD, I must create a new RDD. 
  2. Fault Tolerance: PySpark RDD provides fault tolerance capabilities. If the operation fails, the data is automatically reloaded from other available partitions. This allows us to run PySpark applications seamlessly. 
  3. Partitioning: By default, when we create an RDD from arbitrary data, the elements of the RDD are split among the available cores. 
  4. Lazy Evolution: PySpark RDDs follow the Lazy Evolution process. In PySpark RDDs, transform operations are not executed as soon as they occur. Operations are stored in a DAG and evaluated when the first RDD action is found. 
  5. In-memory processing: PySpark RDDs are used to load data from disk into memory. We can keep the RDD in memory and reuse the computation.

Nowadays, every industry makes use of big data to evaluate where they stand and grow. When we hear the term big data, Apache Spark comes to our mind. There are the industry benefits of using PySpark that supports Spark. These benefits can be listed as: 

  1. Media streaming: Spark can be used to achieve real-time streaming to provide personalized recommendations to subscribers. Netflix is one such example that uses Apache Spark. It processes around 450 billion events every day to flow to its server-side apps. 
  2. Finance: Bank uses Spark to access and analyze social media profiles. In return, we gain insight into which strategies can help us make the right decisions in terms of customer segmentation, credit risk assessment, fraud detection, and more. 
  3. Healthcare: Healthcare provider uses Spark to analyze patient historical records to identify health issues patients may face after discharge. Spark is also used to perform genomic sequencing, reducing the time required to process genomic data. 
  4. The Travel Industry: Companies like TripAdvisor use Spark to help users plan the perfect trip, comparing data and reviews from hundreds of websites about places, hotels, and more to make travel decisions. We offer personalized recommendations to enthusiasts.  
  5. Retail and E-Commerce: This is a key industry segment that requires big data analytics for targeted advertising. Companies like Alibaba run Spark jobs to analyze petabytes of data to improve customer experience, deliver targeted offers and sales, and optimize overall performance.

PySpark SparkConf is used to set the configuration and parameters required to run the application on the cluster or on the local system. We can run SparkConf by running the following class: 

class PySpark.SparkConf ( 
localdefaults = True, 
_jvm = None, 
_jconf = None 
) 

Where: 

  1. localdefaults – It is of type boolean and it indicates whether we require loading values from Java System Properties. Its value is True by default.  
  2. _jvm – This belongs to the class py4j.java_gateway.JVMView. It is an internal parameter that is used for passing the handle to JVM. This need not be set by the users. 
  3. _jconf – This belongs to the class py4j.java_gateway.JavaObject. This parameter is optional and can be used to pass an existing SparkConf handle to use the parameter.  

We can create DataFrame in PySpark by making use of the createDataFrame() method of the SparkSession. Let me explain with an example. 

For example: 

data = [(‘Raj’, 20), 
(‘Rahul’, 20), 
(‘Simran’, 20)] 
columns = [“Name”, “Age”] 
df = spark.createDataFrame(data = data, schema = columns) 

This will create a dataframe where columns will be Name and Age. The data will be filled in the columns accordingly. 

Now, we can get the schema of dataframe by using the method df.printSchema(). This will look like: 

>> df.printSchema() 
root 
|-- Name: string (nullable = true) 
|-- Age: integer (nullable = true) 

Yes, we can create PySpark DataFrame from the external data source. Real-time applications leverage external file systems such as local systems, HDFS, HBase, MySQL tables, and S3 Azure. The following example shows how to read data from a CSV file residing on your local system and create a DataFrame. PySpark supports many file extensions like csv, text, avro, parquet, tsv, etc. 

For example: 

df = spark.read.csv(“path/to/filename.csv”) 

Expect to come across this popular question in PySpark Interview Questions. PySpark SQL is an organized data library for Spark. PySpark SQL provides more details about data structures and operations than the PySpark RDD API. It comes with the "DataFrame" programming paradigm. 

A DataFrame is a fixed distributed collection of data. DataFrame can handle large amounts of organized data such as relational databases, and semi-structured data such as JavaScript Object Notation or JSON. After creating the DataFrame, we can handle the data using SQL syntax/queries. 

In PySpark SQL, the first step is to create a temporary table in the DataFrame using the createOrReplaceTempView() function. Tables are available in the SparkSession via the sql() method. These temporary tables can be dropped by closing the SparkSession. 

The example of PySpark SQL: 

import findspark 
findspark.init() 
import PySpark 
from PySpark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate() 
df = spark.sql(“select ‘spark’ as hello”) 
df.show() 

The Lineage Graph is a collection of RDD dependencies. There are separate lineage graphs for every Spark application. The lineage graph recompiles RDDs on-demand and restores misplaced data from persisted RDDs. An RDD lineage graph lets us assemble a new RDD or restore data from a lost persisted RDD. It was created by using changes to the RDD and generating a regular execution plan.

Catalyst Optimizer plays a very important role in Apache Spark. It helps to improve structural queries in SQL or expressed through DataFrame or DataSet APIs by reducing the program execution time and cost. The Spark Catalyst Optimizer supports both cost-based and rule-based optimization. Rule-based optimization contains a set of rules that define how a query is executed. Cost-based optimization uses rules to create multiple plans and calculate their costs. Catalyst optimizer also manages various big data challenges like semi-structured data and advanced analytics.

PySpark is a Python API. It was created and distributed by the Apache Spark agency to make working with Spark less complicated for Python programmers. Scala is the programming language utilized by Apache Spark. It can work with different languages like Java, R, and Python.

Because Scala is a compile-time, type-secure language, Apache Spark has many skills that PySpark does not. One of which is incorporating Datasets. Datasets are a series of domain-specific objects that can be used to execute concurrent calculations.

In PySpark, the Parquet file is a column-type format supported by various data processing systems. Using a Parquet file, Spark SQL can perform both read and write operations.

A Parquet file contains column-type format storage that provides some benefits. The benefits are: It is small and takes up less space. It makes it easier for us to access specific columns. It follows type-specific encoding. It offers better summarized data. It contains very limited Input/Output operations.

We can use some steps to connect Spark with Mesos.

First, configure the sparkle driver program to associate with Mesos. The paired Spark volume must be in an open Mesos region. Then install Apache Spark in a similar region as Apache Mesos and design the "spark.mesos.executor.home" property to point to the region where it is installed. This is how we can associate Spark with Apache Mesos. 

In PySpark, DStream is an acronym for Discretized Stream. It is a collection of groups or RDDs of information divided into small clusters. It is also called Apache Spark Discretized Stream and is used as a collection of RDDs in the cluster. DStreams are grounded in Spark RDDs. They are used to allow uninterrupted coordination of streaming with some other Apache Spark options such as Spark MLlib and Spark SQL.

The code to see the specific keyword exists or not will be: 

lines = sc.textFile(“hdfs://Hadoop/user/test_file.txt”); 
def isFound(line): 
if line.find(“my_keyword”) > -1 
return 1 
return 0 
foundBits = lines.map(isFound); 
sum = foundBits.reduce(sum); 
if sum > 0: 
print “Found” 
else: 
print “Not Found”; 

If the keyword is found it will print as “Found” else if it not found it will print “Not Found”. 

There are two types of errors in Python: syntax errors and exceptions.

Syntax errors are often referred to as parsing errors. Bugs are errors in a program that can cause it to crash or terminate unexpectedly. When the parser detects an error, it repeats the problematic line and then displays an arrow pointing to the beginning of the line.

Exceptions occur in a program when the program's normal flow is disrupted by an external event. Even if the program's syntax is accurate, there is a chance that an error will be encountered during execution; however, this error is an exception. ZeroDivisionError, TypeError, and NameError are some examples of exceptions.

Receivers are unique objects in Apache Spark Streaming whose sole purpose is to consume data from various data sources and then push it to Spark. By streaming contexts as long-running tasks on different executors, we can generate receiver objects.  

There are two different types of receivers which are as follows: 

  • Reliable Receiver: When data is received and properly copied to Apache Spark Storage, this receiver validates the data sources. 
  • Unreliable receiver: When receiving or replicating data in Apache Spark Storage, these receivers do not recognize data sources. 

When data is unevenly distributed, PySpark has a few tricks up its sleeve to deal with it. It can shuffle data around to balance things out, sort of like how a teacher might rearrange students in a classroom to balance the teams. This helps everything run more smoothly and evenly.

Tungsten is all about making Spark run faster and use less memory. It does this by being really smart about how it handles data behind the scenes—think of it as the wizard who fine-tunes the engine of a car for better performance and efficiency.

In PySpark, the Catalyst optimizer takes care of making our queries run faster. It works like a smart planner that tweaks and rearranges the steps of our query to make it more efficient. It's like having an expert optimise our travel route, avoiding traffic jams and roadblocks.

Broadcasting is like giving a cheat sheet to all parts of our Spark program. Instead of sending data to each worker node every time it's needed, PySpark sends it once, and each node holds onto it. This is super handy for data that we need to reference often across many nodes because it saves a lot of time and network traffic.

Accumulators are like counters or tally marks that PySpark uses to keep track of information across tasks. For example, if we wanted to count how many times a certain word appears in documents across all nodes, we could use an accumulator to keep a running total even as our data is processed in parallel.

Advanced

Whenever PySpark performs a transform operation using filter(), map(), or reduce(), they are executed on a remote node that uses the variables supplied with the tasks. These variables are not reusable and cannot be shared across jobs because they are not returned to the controller. To solve the problem of reusability and sharing, we use shared variables in PySpark. There are two types of shared variables:  

Broadcast Variables:  

They are also known as read-only shared variables. These are used in cases of data lookup requests. These variables are cached and made available on all cluster nodes to use. These variables are not sent with every task. Instead, they are distributed to nodes using efficient algorithms to reduce communication costs. When we run an RDD task operation that uses Broadcast variables, PySpark does this: 

  • The task is divided into different phases with distributed mixing. Actions are performed in these phases. 
  • The phases are then divided into tasks. 
  • Broadcast variables are broadcast to jobs if jobs need to use them. 

Broadcast variables are created in PySpark using the broadcast(variable) method of the SparkContext class. The main reason for the use of broadcast variables is that the variables are not sent to the tasks when the broadcast function is called. They will be sent when the variables are first required by the executors.  

Accumulator Variables: 

These variables are known as updatable shared variables. These variables are added through associative and commutative methods. They are used for performing counter or sum operations. PySpark supports the default creation of numeric type accumulators. It can also be used to add custom accumulator types. The custom types are of two types: Named accumulator and Unnamed accumulator. 

  • Named Accumulator: These accumulators are seen under the “Accumulator” tab in the PySpark web UI.  
  • Unnamed Accumulator: These accumulators are not seen under the “Accumulator” tab in the PySpark web UI. 

One of the most frequently posed PySpark Interview Questions, be ready for it. The DAG is a full form of Direct Acyclic Graph. In Spark, DAGScheduler represents scheduling layer that implements task scheduling in a phase-oriented manner using tasks and phases. The logical execution plan (the dependencies of the transformation action line on the RDD) is transformed into a physical execution plan consisting of phases. It calculates the DAG of phases needed for each task and keeps track of which phases are realized RDDs and finds the minimum schedule for executing the tasks. These stages are then sent to the TaskScheduler to run the stages. This is shown in the image below:

DAGScheduler computes the DAG execution for the task. It specifies preferred locations for running each task. It handles failure due to loss of output files during mixing.

PySpark DAGScheduler follows an event queue architecture. Here, the thread publishes events of type DAGSchedulerEvent, such as a new stage or task. DAGScheduler then reads the phases and executes them sequentially in topological order. 

Let’s understand the process through an example. Here, we want to capitalize the first letter of every word in a string. There is no default feature in PySpark that can achieve this. However, we can make this happen by creating a UDF capitalizeWord(str) and using it on the DataFrames.  

First, we will create a Python function capitalizeWord(). This function takes a string as an input and then capitalizes the first character of every word. 

def capitalizeWord(str): 
result = “” 
words = str.split(“”) 
for word in words: 
result = result + word[0:1].upper() + word[1:len(x)] + “” 
return result 

Now, we will register the function as a PySpark UDF using the udf() method from org.apache.spark.sql.functions.udf package. This package must be imported. This function will return the object of class org.apache.spark.sql.expressions.UserDefinedFunction. The code for converting a function to UDF is: 

capitalizeWordUDF = udf(lambda z: capitalizeWord(z), StringType())

Next step is to use UDF with DataFrame. We can apply UDF on a Python DataFrame as it will act as the built-in function of DataFrame. Consider we have a DataFrame stored in variable df, which has the columns as ID_COLUMN and NAME_COLUMN. Now, to capitalize every first character of the word, we will code as: 

df.select(col(“ID_COLUMN”), convertUDF(col(“NAME_COLUMN”)) 
.alias(“NAME_COLUMN”) ) 
.show(truncate = False) 

UDFs must be designed so that the algorithms are efficient and take up less time and space. If care is not taken, the performance of DataFrame operations will be affected.

We use the builder pattern to create a SparkSession. In PySpark.sql library, there is the SparkSession class which has the getOrCreate() method. This method will create a new SparkSession if there is none or else it will return the existing SparkSession object. The code for create SparkSession looks like: 

import PySpark from PySpark.sql 
import SparkSession 
spark = SparkSession.builder.master(“local[1]”) 
.appName(‘KnowledgeHuntSparkSession’)  
.getOrCreate() 

Where,  

  • master() - This is used to set the mode in which the application should run - cluster mode (use the name master) or standalone mode. For Standalone mode, we use local[x] for the function, where x represents the number of partitions to create in the RDD, DataFrame, and DataSet. The value of x is ideally the number of available CPU cores. 
  • appName() – This is used to set the app name. 
  • getOrCreate() – This is used to return a SparkSession object. This will create a new object if it does not exist. If there is an object there, it simply returns it. 

If we want to create a new SparkSession object each time, we can use the newSession method. This method looks like: 

import PySpark from PySpark.sql  
import SparkSession 
spark_session = SparkSession.newSession 

In PySpark, there are various methods used to create RDD. Let’s discuss them one by one. 

Using sparkContext.parallelize() - SparkContext's parallelize() method can be used to create RDDs. This method retrieves an existing collection from the controller and parallelizes it. This is the basic approach to creating an RDD and is used when we have data already present in memory. This also requires all data to be present in the driver before the RDD is created. The code to create an RDD using the parallelize method for the python list is:  

list = [1,2,3,4,5,6,7,8,9,10,11] 
rdd = spark.sparkContext.parallelize(list) 

Using sparkContext.textFile() - We can read .txt file and convert it into RDD. The syntax looks like: 

rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")

Using sparkContext.wholeTextFiles() - This method returns PairRDD. PairRDD is an RDD that contains key-value pairs. In this PairRDD, file path is the key and file content is the value. This method reads entire file into an RDD as a single record. Besides, this method can also read files in other formats like CSV, JSON, parquet, etc. and create the RDDs. 

rdd_whole_text = spark.sparkContext.wholeTextFiles("/path/to/textFile.txt")

Empty RDD with no partition using sparkContext.emptyRDD - The RDD without any data is known as empty RDD. We can make such RDDs which have no partitions by using the emptyRDD() method. The code piece for that will look like:  

empty_rdd = spark.sparkContext.emptyRDD 
empty_rdd_string = spark.sparkContext.emptyRDD[String]

Empty RDD with partitions using sparkContext.parallelize - When we require partition but not the data, then we create empty RDD by using the parallelize method. For example, below code create the empty RDD with 10 partitions:  

empty_partitioned_rdd = spark.sparkContext.parallelize([],10)

PySpark SQL is the most famous PySpark module used to process structured columnar data. Once the DataFrame is created, we can work with the data using SQL syntax. Spark SQL is used to pass native raw SQL queries to Spark using select, where group by, join, union, etc. To use PySpark SQL, the first step is to create a temporary table on a DataFrame using the createOrReplaceTempView() method. Once created, the table is accessible within a SparkSession using the sql() function. When the SparkSession is terminated, the temporary table will get dropped. 

For example, consider that we have a DataFrame assigned to the variable df which contains Name, Age and Gender of Students as the columns. Now, we will create a temporary table of the DataFrame that gets access to the SparkSession by using the sql() function. The SQL queries can be run within the function.   

df.createOrReplaceTempView("STUDENTS")  
df_new = spark.sql("SELECT * from STUDENTS")  
df_new.printSchema() 
The schema will be shown as: 
>> df.printSchema()  
root  
|-- Name: string (nullable = true)  
|-- Age: integer (nullable = true)  
|-- Gender: string (nullable = true) 

We can use join() method that is present in PySpark SQL. The syntax of the method looks like: 

join(self, other, on = None, how = None) 

Where, 

  • other – It is the right side of the join. 
  • on – It is the column name string used for joining.  
  • how – It is the type of join. Default type is inner. The values of the type can be inner, left, right, cross, full, outer, left_outer, right_outer, left_anti and left_semi.  

where() and filter() methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method.  

For example, consider we have two dataframes named Employee and Department. Both have columns named as emp_id, emp_name, empdept_id and dept_id, dept_name respectively. We can internally join the Employee DataFrame with the Department DataFrame to get the department information along with the employee information. The code will look like: 

emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False) 

PySpark Streaming is a scalable, fault-tolerant, high-throughput process streaming system that supports both streaming and batch loading to support real-time data from data sources such as TCP Socket, S3, Kafka, Twitter, file system folders, etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS, etc.

To stream from a TCP socket, we can use the readStream.format("socket") method of the Spark session object to read data from the TCP socket and provide the host and port of the streaming source as options. The code will look like: 

from PySpark import SparkContext  
from PySpark.streaming  
import StreamingContext from PySpark.sql  
import SQLContext from PySpark.sql.functions  
import desc  
sc = SparkContext()  
ssc = StreamingContext(sc, 10)  
sqlContext = SQLContext(sc)  
socket_stream = ssc.socketTextStream("127.0.0.1", 5555)  
lines = socket_stream.window(20) df.printSchema()  

Spark retrieves the data from the socket and represents it in the value column of the DataFrame object. After processing the data, the DataFrame can be streamed to the console or other destinations on demand such as Kafka, dashboards, databases, etc.  

Spark automatically stores intermediate data from various shuffle processes. However, it is recommended to use RDD's persist() function. There are many levels of persistence for storing RDDs in memory, disk, or both, with varying levels of replication. The following persistence levels are available in Spark:  

  1. MEMORY ONLY: This is the default persistence level and is used to store RDDs on the JVM as deserialized Java objects. In case the RDDs are too large to fit in memory, the partitions are not cached and must be recomputed as needed. 
  2. MEMORY AND DISK: On the JVM, RDDs are stored as deserialized Java objects. In case of insufficient memory, partitions that do not fit in memory will be left on disk and data will be read from the drive as needed. 
  3. MEMORY ONLY SER: RDD is stored as one byte per partition of serialized Java objects. 
  4. DISK ONLY: RDD partitions are stored on disk only. 
  5. OFF HEAP: This level is like MEMORY ONLY SER, except that the data is stored in off-heap memory. 

The persist() function has the following syntax for using persistence levels:  

df.persist(StorageLevel.)  

The streaming application must be available 24/7 and tolerant of errors outside the application code (e.g., system crashes, JVM crashes, etc.). The checkpointing process makes streaming applications more fault tolerant. We can store data and metadata in the checkpoint directory.

Checkpoint can be of two types – metadata check and data check.

A metadata checkpoint allows you to store the information that defines a streaming computation in a fault-tolerant storage system such as HDFS. This helps to recover data after a streaming application controller node failure.

Data checkpointing means saving the created RDDs to a safe place. This type of checkpoint requires several state calculations that combine data from different batches.

We can determine the total number of unique words by following certain steps in PySpark. The steps to follow can be listed as: 

  • Open the text file in RDD mode.  

sc.textFile(“hdfs://Hadoop/user/test_file.txt); 
  • Then write a function that will convert each line into a single word. 

def toWords(line): 
return line.split(); 
  • Now, run the toWords function on every member of the RDD in Spark. 

words = line.flatMap(toWords); 
  • Next, generate a (key, value) pair for every word. 

def toTuple(word): 
return (word, 1); 
wordTuple = words.map(toTuple); 
  • Run the reduceByKey() command. 

def sum(x, y): 
return x+y: 
counts = wordsTuple.reduceByKey(sum) 
  • Then print it out. 

counts.collect() 
  • Property Operators – These operators create a new graph with a user-defined map function modifying the characteristics of a vertex or edge. For these operators, the graph structure is unchanged. This is an important property of these operators because it allows the generated graph to preserve the structural indices of the original graph. 
  • Structural Operators - GraphX currently supports only a few widely used structural operators. The opposite operator creates a new graph with the directions of the edges reversed. The subgraph operator returns a graph with only vertices and edges that meet the vertex predicate. The mask operator creates a subgraph by returning a graph with all vertices and edges found in the input graph. The groupEdges operator merges parallel edges. 
  • Join operators – Join operators allow you to join data from external collections (RDDs) to existing graphs. For example, we may want to combine new custom attributes with an existing graph or drag vertex properties from one graph to another. 
  • Avoid dictionaries: If we use Python data types such as dictionaries, our code may not be able to run in a distributed fashion. We should consider adding an extra column to the dataframe that can be used as a filter instead of using keys to index items in the dictionary. This suggestion also applies to Python types that are not PySpark distributable, such as lists. 
  • Limit the use of Pandas: Using toPandas causes all data to be loaded into memory on the driver node, preventing operations from running in a distributed fashion. If the data has been previously aggregated and we want to use regular Python plotting tools, this method is fine, but should not be used for larger data frames. 
  • Minimize eager operations: If we want our pipeline to be as scalable as possible, it's best to avoid heavy operations that pull entire data frames into memory. Reading in CSV, for example, is an eager activity, so before using the dataframe I set it to S3 as Parquet before using it in the next steps.  

The basis of Spark Streaming is dividing the content of the data stream into batches of X seconds, known as DStreams. These DStreams allow developers to cache data, which can be especially useful if data from a DStream is used multiple times. The cache() function or the persist() method can be used to cache data with the correct persistence settings. For input streams receiving data over networks such as Kafka, Flume, and others, the default persistence level is configured to achieve data replication across two nodes to achieve fault tolerance.  

  • Cache method -  

val cacheDf = dframe.cache()  
  • Persist method -  

val persistDf = dframe.persist(StorageLevel.MEMORY_ONLY) 

There are some benefits of Caching. The key benefits are time saving and cost efficiency. Since Spark computations are expensive, caching helps in data reuse, which leads to reuse of computations and reduces the cost of operations. By reusing calculations, we can save a lot of time. Worker nodes can execute/run more tasks by reducing the computation execution time. Hence more tasks can be achieved. 

Spark RDD is extended with a robust API called GraphX that supports graphs and graph-based computations. The Resilient Distributed Property Graph is an enhanced property of Spark RDD, which is a directed multigraph with many parallel edges. User-defined characteristics are associated with each edge and vertex. Multiple connections between the same set of vertices are represented by parallel edges. GraphX offers a collection of operators that enable graph computations such as subgraph, mapReduceTriplets, joinVertices, and so on. It also offers many chart builders and algorithms to facilitate chart analysis.

According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs, which allows us to assemble different parts of jobs that can use any language. An RDD transformation can be created using the pipe() function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.

import findspark 
findspark.init() 
from PySpark. sql import Sparksession, types 
spark = Sparksession.builder.master("local").appName( "Modes of Dataframereader')\ 
.getorcreate() 
sc = spark.sparkContext 
from PySpark.sql.types import * 
schm structype([ 
structField("col_1", stringType(), True), 
structField("col_2", stringType(), True), 
structrield("col", stringtype(), True), 
]) 
df = spark.read.option("mode", "DROPMALFORMED").csv('input1.csv', heade r= True, schema = schm) 
df. show() 
import findspark 
findspark.init() 
from PySpark.sql import Sparksession, types 
spark = Sparksession.builder.master("local").appliame("scenario based")\ 
-getorcreate() 
sc = spark.sparkContext 
dfaspark.read.text("input.csv") 
df.show(truncate = 0) 
header = df.first()[0] 
schema = header.split(-') 
df_imput = df.filter(df['value'] l= header).rdd.map(lambda x: x[0]. split('-|')).toDF (schema) 
df_input.show(truncate = 0) 
import findspark 
findspark.init() 
from PySpark.sql import SparkSession, types 
spark = SparkSession.builder.master("local").appName('scenario based')\ 
.getorCreate() 
sc = spark.sparkContext 
in_df = spark.read.option("delimiter","|").csv("input4.csv", header-True) 
in_df.show() 
from PySpark.sql.functions import posexplode_outer, split 
in_df.withColumn("Qualification", explode_outer(split("Education",","))).show() 
in_df.select("*", posexplode_outer(split("Education",","))).withColumnRenamed ("col", "Qualification").withColumnRenamed ("pos", "Index").drop(“Education”).show() 
import findspark 
findspark.init() 
from PySpark.sql import SparkSession, types 
spark = SparkSession.builder.master("local").appName('Modes of Dataframereader')\ 
.getorCreate() 
sc = spark.sparkContext 
df1 = spark.read.option("delimiter","|").csv('input.csv') 
df2 = spark.read.option("delimiter","|").csv("input2.csv",header=True) 
from PySpark.sql.functions import lit 
df_add = df1.withColumn("Gender",lit("null")) 
df_add. union(df2).show() 

For the Union- 

from PySpark.sql.types import * 
schema = StructType( 
    [ 
        StructField("Name", StringType(), True), 
        StructField("Age", StringType(), True), 
        StructField("Gender", StringType(),True), 
    ] 
) 
df3 = spark.read.option("delimiter","|").csv("input.csv", header = True, schema = schema) 
df4 = spark.read.option("delimiter","|").csv("input2.csv", header = True, schema = schema) 
df3.union(df4).show() 

PySpark is great for big text projects like analysing tons of reviews or processing huge documents. We use things like RDDs or DataFrames to break down the text and analyze it—think word counts or searching for specific phrases. It's like having a super-powered text editor that can instantly handle and analyse entire libraries. Using functions like flatMap() for tokenization and reduceByKey() for counting words ensures efficient processing over clusters.

Window functions in PySpark help us make sense of data that's ordered or grouped in some way. For example, if we’re looking at sales data, we could use a window function to calculate a running total of sales or average sales over a specific period directly within our dataset. It's a bit like having a spreadsheet that automatically updates those figures as we add new data. We can use the over() function to specify a window partitioned by one column, ordered by another, and then apply aggregation functions like sum() or avg() over this window.

Optimising PySpark applications involves several strategies like choosing the right data abstraction (RDD, DataFrame, or DataSet), tuning resource allocation (e.g., memory and cores), and optimising data partitioning. Additionally, minimising data shuffling and using broadcast variables and accumulators wisely can significantly improve performance.

GraphFrames are an enhanced version of DataFrames specifically designed for graph processing in PySpark. They allow us to perform advanced graph operations like finding the shortest paths, calculating PageRank, or running breadth-first searches. Since GraphFrames are integrated with DataFrames, we can seamlessly create, query, and manage graphs while also working with regular tabular data. Imagine trying to figure out the shortest path in a network of roads or ranking websites by importance. GraphFrames build on DataFrames to let us handle these tasks efficiently in PySpark, combining graph algorithms with the ease of SQL-like operations.

Dynamic resource allocation allows PySpark to automatically adjust the number of executors based on workload. This is particularly beneficial in cloud environments where computing resources are elastic. By scaling up resources when demand spikes and scaling down when less compute power is needed, it optimises costs and ensures efficient processing. Dynamic resource allocation is like having an elastic band that stretches resources when needed and contracts when things are quiet. In cloud environments, where we pay for what we use, PySpark can automatically scale up to handle big jobs and scale down when the job's done, saving us money and improving processing efficiency.

MLlib in PySpark is like a treasure chest of machine learning tools. It’s built right into PySpark, so it lets us run algorithms like classification, regression, clustering, and collaborative filtering, as well as feature transformation. MLlib is PySpark's machine learning library that makes practical machine learning scalable and easy. Using MLlib in PySpark allows data scientists to leverage Spark’s distributed computing power to run algorithms faster across clusters. It's perfect for when we need to uncover insights from large datasets quickly.

While PySpark is powerful, it has limitations such as higher memory consumption for certain operations and less efficiency for complex iterative algorithms that do not map well to its execution model. Additionally, its Python API can run slower than the Scala API due to the overhead of Py4J that translates Python API calls to JVM operations. PySpark isn’t perfect—it can be a bit of a memory hog and sometimes slower, especially if we’re using Python instead of Scala. It's also not the best fit for very complex iterative algorithms where we keep looping over the same data. But for many big data tasks, it's still a solid choice.

Keeping our data clean and consistent in PySpark involves setting up checks right from the start. I would use built-in tools to filter out duplicates, correct errors, and make sure everything looks right. Utilising DataFrame API’s built-in functions like dropDuplicates()filter(), and where() help maintain data consistency. Additionally, writing unit tests and using data validation frameworks can further ensure the quality and consistency of the data processed. Think of it as proofreading our data before it goes out into the world.

Speculative execution is like having a backup plan for slow tasks in PySpark. Speculative execution in PySpark involves launching duplicate tasks for those that are running slower than expected. If certain tasks are straggling due to issues like slow nodes, PySpark will launch duplicate tasks on other nodes. Whichever task finishes first is used, and this can significantly improve performance by reducing delays caused by slow nodes.

StatFunctions in PySpark are part of the DataFrame API and provide statistical methods to quickly summarise data. For example, wecan use df.stat.corr('column1', 'column2') to calculate the correlation between two columns. It’s handy for exploratory data analysis, like when we’re trying to find relationships between variables or just get a quick overview of our data’s distribution and trends. It’s like having a fast, built-in data scientist to help us understand large datasets more intuitively.