PySpark is open-source distributed computing software. It helps to create more scalable analytics and pipelines to increase processing speed. It also works as a library for large-scale real-time data processing. In this article, we will cover some frequently asked PySpark interview questions to help you understand the types of questions you might be asked in a Python and PySpark-related interview. Whether you are a beginner or an intermediate or an experienced PySpark professional, this guide will help you increase your confidence and knowledge in PySpark. The questions include important topics from PySpark such as DataFrames, RDD, serializers, cluster managers, SparkContext, SparkSession, etc. With PySpark Interview Questions, you can be confident that you will be well-prepared for your next interview. So, if you are looking to advance your career in PySpark, this guide is the perfect resource for you.
PySpark framework is easy to learn and implement. No programming language or database experience is required to learn PySpark. It is very easy to learn if you know programming languages and frameworks. Before getting familiar with PySpark concepts, you should have some knowledge of Apache Spark and Python. Learning advanced concepts of PySpark is very helpful.
MLlib can be used to implement machine learning in Spark. Spark provides a scalable machine learning dataset called MLlib. It is primarily used to make machine learning scalable and lightweight, with common learning algorithms and use cases such as clustering, decay filtering, and dimensionality reduction. This is how machine learning can be implemented in Spark.
There are some basic differences between PySpark and other programming languages. PySpark has its built-in APIs, but other programming languages require APIs to be integrated externally from third parties. Next difference is, implicit communications can be done in PySpark, but it is not possible in other programming languages. PySpark is map based, so developers can use maps to reduce functions. PySpark allows for multiple nodes, again which is not possible in other programming languages.
No, we cannot use PySpark with small data sets. They are not very useful as they are typical library systems with more complex objects than accessible objects. PySpark is great for large amounts of records, so it should be used for large data sets.
Data Science is based on two programming languages, Python and Machine Learning. PySpark is integrated with Python. There are interfaces and built-in environments which are made using both Python and Machine Learning. This makes PySpark an essential tool for data science. Once the dataset is processed, the prototype model is transformed into a production-ready workflow. Because of such reasons, PySpark is important in data science.
Spark can form distributed datasets from any storage source supported by different file systems. These file systems include:
Also, Spark supports text files, Sequence files, and any other Hadoop Input Format.
This is a frequently asked question in PySpark Interview Questions. PySpark is an Apache Spark tool or interface developed by the Apache Spark community and Python to help Python and Spark work together. The tool works with Apache Spark and uses an API written in Python to support features such as Spark SQL, Spark DataFrame, Spark Streaming, Spark Core, and Spark MLlib. It provides an interactive PySpark shell for analyzing and processing structured and semi-structured data in distributed environments by providing a streamlined API that helps programs read data from various data sources. PySpark functionality is implemented in Python in the Py4j library. The availability of the Py4j library allows the user to easily manipulate his Resilient Distributed Datasets (RDDs) in the Python programming language. Python supports many different libraries that support big data processing and machine learning.
We can install PySpark on PyPi using the below command:
pip install PySpark
There are four main characteristics of PySpark. The detailed description of those features can be given as below:
In PySpark, the full form of RDD is Resilient Distributed Datasets. This RDD is the central data structure of PySpark. RDD is a low-level object that is very efficient at performing distributed tasks.
In PySpark, RDDs are elements that can be run and manipulated on multiple nodes to perform parallel processing within a cluster. These are immutable items. This means that once the RDD is created it can not be changed. Also, RDDs are fault tolerant. In case of failure, they are automatically restored. Multiple operations can be applied to an RDD to accomplish a specific task. You can learn more about this concept at Certified Programming courses.
SparkContext serves as the entry point for all Spark functions. When the Spark application runs, the driver program is started and the main function and SparkContext are started. The driver program then executes the operation within the worker node’s executor. In PySpark, SparkContext is recognized as PySpark SparkContext. Using Py4J library, it starts a JVM and creates a JavaSparkContext. PySpark SparkContext is available as ‘sc’ by default, so it does not create a new SparkContext.
PySpark SparkFiles is used to load files into Apache Spark applications. This is one of the functions under SparkContext that you can call with sc.addFile() to load files into Apache Spark. You can also use SparkFiles to get the path using SparkFile.get(). It can also be used to resolve paths to files added using the sc.addFile() method. The class methods in the SparkFiles directory are getrootdirectory() and get(filename).
A must-know for anyone heading into a PySpark interview, this question is frequently asked in PySpark Interview Questions. In PySpark, serialization is the process used to perform performance tuning in Spark. PySpark supports serializers because it needs to continuously inspect data sent and received to disk or storage over the network. There are two types of serializers that PySpark supports. The serializers are:
PySpark ArrayType is a collection data type that extends PySparks’s DataType class. And this DataType class is the superclass of all types. A PySpark ArrayType only contains elements of the same type. We can also create an instance of ArrayType using the ArrayType() method. This method accepts two arguments. The arguments are valueType and valueContainsNull.
from PySpark.sql.types import StringType, ArrayType
ArrayColumn = ArrayType(StringType().False)
A PySpark DataFrame is a distributed collection of well-organized data. These are just like relational database tables, arranged in named columns. PySpark DataFrames are more optimized than the R or Python programming languages as they can be created from a variety of sources such as Hive tables, structured data files, existing RDDs and external databases.
The biggest advantage of PySpark DataFrame is that data in PySpark DataFrame is distributed to different computers in the cluster and operations performed on it are executed in parallel on all computers. This makes it easy to handle large collections of petabytes of structured or semi-structured data.
What do you mean by cluster manager? What are the different cluster manager types that are supported by PySpark?
In PySpark, the cluster manager is a cluster-mode platform that facilitates Spark execution by allotting all resources worker nodes as needed.
The Spark Cluster Manager ecosystem includes a master node and multiple worker nodes. Master nodes, with the help of the cluster manager, provide resources, such as memory and processor allocation, to worker nodes according to the node's needs.
PySpark supports different cluster managers. Those can be explained as:
In PySpark, SparkSession is the application entry point. For the first version of PySpark, it used SparkContext as an entry point. SparkSession replaces the SparkContext since PySpark version 2.0. From the PySpark version 2.0, SparkSession serves as a starting point to access all PySpark functions related to RDDs, DataFrames, Datasets, etc. This is also the unified API used to access SQLContext, StreamingContext, HiveContext, and all other contexts within it to replace PySpark.
SparkSession internally makes SparkContext and SparkConfig according to details provided in SparkSession. We can also create a SparkSession using the builder patterns.
The main function of Spark Core is to implement several important functions such as storage management, fault tolerance, job monitoring, job setup, and communication with storage systems. It also includes additional libraries built on top of the middle tier used for various streaming, machine learning, and SQL workloads.
The Spark Core is used for the functions like:
There is a common workflow followed by a spark program. The first step is to create the input RDD according to the external data. Data can come from a variety of data sources. Next, after the RDD is created, depending on business logic, RDD transformation operations such as filter() and map() are performed to create a new RDD. In case, if we need to reuse the intermediate RDDs for later purposes, we can keep them. Finally, if there are action operations like first(), count(), etc., Spark fires them up to start parallel computation. This workflow is used by Spark program.
In PySpark, startsWith() and endsWith() methods come in the Column class. These methods are used to search DataFrame rows by checking if the column value starts or ends with a specific value. Both are used to filter data in our application.
PySpark supports custom profilers. Custom profilers are used to build predictive models. A profiler is also used to ensure that the data is valid and can be used at the time of consumption. If we want a custom profiler, we will need to define some methods. These methods are:
Yes, PySpark is faster than Pandas. Because it supports parallel execution of statements in a distributed environment. For example, PySpark can run on different cores and machines, which is not available with Pandas. This is the main reason PySpark is faster than Pandas. Check KnowledgeHut's best certification for Programmers for more information about this topic.
PySpark StorageLevel is used to control RDD storage. It can control how and where the RDD is stored. The PySpark StorageLevel decides whether the RDD is stored in memory, on disk, or both. It also determines if we need to replicate the RDD partitions or serialize the RDD. The code for PySpark StorageLevel looks like:
class PySpark.StorageLevel( useDisk, useMemory, useOfHeap, deserialized, replication = 1)
There are some most used Spark ecosystems like Spark SQL, Spark Streaming, GraphX, MLlib, SparkR, etc.
The main difference between get(filename) and getrootdirectory() is that get(filename) is used to get the correct path to the file that is added using SparkContext.addFile(). On the other hand, getrootdirectory() is used to get the root directory containing the file added using SparkContext.addFile().
The Apache Spark execution engine is a graph execution engine that makes it easy for users to explore large datasets with high presentation. If we want the data to be manipulated at different stages of processing, we need to keep Spark in memory for a radical performance boost.
Hive is used in HQL (Hive Query Language), and Spark SQL is used in Structured Query Language to process and query data. We can easily connect SQL table and HQL table to Spark SQL. Flash SQL is used as a unique segment on the Spark Core engine that supports SQL and Hive Query Language without changing the sentence structure.
There are some advantages as well as disadvantages of PySpark and this is another frequently asked question in the PySpark interview rounds. Let’s discuss them one by one.
Advantages of PySpark:
Disadvantages of PySpark:
A common question in the PySpark Interview Questions, don't miss this one. Let me tell you about the key differences between RDD, DataFrame and DataSet one by one.
SparkCore is a high-level execution engine for the Spark platform that includes all features. It offers in-memory computing capabilities for superior speed, a generalized execution model to support a wide variety of applications, and Java, Scala, and Python APIs to simplify development.
The primary role of SparkCore is to perform all basic Input/Output functions, scheduling, monitoring, etc. It is also responsible for troubleshooting and effective memory management.
The key functions of the SparkCore can be listed as:
PySpark also provides a machine learning API called MLlib which is very much like Apache Spark. MLlib supports machine learning algorithms. These algorithms are:
PySpark Partition is a way to divide a large dataset into smaller datasets based on one or more partition keys. As a result, the execution speed is improved because transformations on partitioned data are performed faster because transformations for each partition are performed in parallel. PySpark supports both in-memory (DataFrame) and disk (filesystem) partitioning. When we create a DataFrame from a file or table, PySpark creates the DataFrame in memory with a certain number of subdivisions based on the specified criteria.
It is also easier to split multiple columns with partitionBy() method by passing the column we want to split as an argument to this method. Syntax of this method is: partitionBy(self, *cols).
PySpark recommends having 4x partitions for the number of cores in the cluster that the application can use.
There are some key advantages of PySpark RDD. Here is the detail explanation of those:
Nowadays, every industry makes use of big data to evaluate where they stand and grow. When we hear the term big data, Apache Spark comes to our mind. There are the industry benefits of using PySpark that supports Spark. These benefits can be listed as:
PySpark SparkConf is used to set the configuration and parameters required to run the application on the cluster or on the local system. We can run SparkConf by running the following class:
class PySpark.SparkConf ( localdefaults = True, _jvm = None, _jconf = None )
We can create DataFrame in PySpark by making use of the createDataFrame() method of the SparkSession. Let me explain with an example.
data = [(‘Raj’, 20), (‘Rahul’, 20), (‘Simran’, 20)] columns = [“Name”, “Age”] df = spark.createDataFrame(data = data, schema = columns)
This will create a dataframe where columns will be Name and Age. The data will be filled in the columns accordingly.
Now, we can get the schema of dataframe by using the method df.printSchema(). This will look like:
>> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true)
Yes, we can create PySpark DataFrame from the external data source. Real-time applications leverage external file systems such as local systems, HDFS, HBase, MySQL tables, and S3 Azure. The following example shows how to read data from a CSV file residing on your local system and create a DataFrame. PySpark supports many file extensions like csv, text, avro, parquet, tsv, etc.
df = spark.read.csv(“path/to/filename.csv”)
Expect to come across this popular question in PySpark Interview Questions. PySpark SQL is an organized data library for Spark. PySpark SQL provides more details about data structures and operations than the PySpark RDD API. It comes with the "DataFrame" programming paradigm.
In PySpark SQL, the first step is to create a temporary table in the DataFrame using the createOrReplaceTempView() function. Tables are available in the SparkSession via the sql() method. These temporary tables can be dropped by closing the SparkSession.
The example of PySpark SQL:
import findspark findspark.init() import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.sql(“select ‘spark’ as hello”) df.show()
The Lineage Graph is a collection of RDD dependencies. There are separate lineage graphs for every Spark application. The lineage graph recompiles RDDs on-demand and restores misplaced data from persisted RDDs. An RDD lineage graph lets us assemble a new RDD or restore data from a lost persisted RDD. It was created by using changes to the RDD and generating a regular execution plan.
Catalyst Optimizer plays a very important role in Apache Spark. It helps to improve structural queries in SQL or expressed through DataFrame or DataSet APIs by reducing the program execution time and cost. The Spark Catalyst Optimizer supports both cost-based and rule-based optimization. Rule-based optimization contains a set of rules that define how a query is executed. Cost-based optimization uses rules to create multiple plans and calculate their costs. Catalyst optimizer also manages various big data challenges like semi-structured data and advanced analytics.
PySpark is a Python API. It was created and distributed by the Apache Spark agency to make working with Spark less complicated for Python programmers. Scala is the programming language utilized by Apache Spark. It can work with different languages like Java, R, and Python.
Because Scala is a compile-time, type-secure language, Apache Spark has many skills that PySpark does not. One of which is incorporating Datasets. Datasets are a series of domain-specific objects that can be used to execute concurrent calculations.
In PySpark, the Parquet file is a column-type format supported by various data processing systems. Using a Parquet file, Spark SQL can perform both read and write operations.
A Parquet file contains column-type format storage that provides some benefits. The benefits are: It is small and takes up less space. It makes it easier for us to access specific columns. It follows type-specific encoding. It offers better summarized data. It contains very limited Input/Output operations.
We can use some steps to connect Spark with Mesos.
First, configure the sparkle driver program to associate with Mesos. The paired Spark volume must be in an open Mesos region. Then install Apache Spark in a similar region as Apache Mesos and design the "spark.mesos.executor.home" property to point to the region where it is installed. This is how we can associate Spark with Apache Mesos.
In PySpark, DStream is an acronym for Discretized Stream. It is a collection of groups or RDDs of information divided into small clusters. It is also called Apache Spark Discretized Stream and is used as a collection of RDDs in the cluster. DStreams are grounded in Spark RDDs. They are used to allow uninterrupted coordination of streaming with some other Apache Spark options such as Spark MLlib and Spark SQL.
Consider the following scenario: you have a large text file. How will you use PySpark to see if a specific keyword exists or not?
The code to see the specific keyword exists or not will be:
lines = sc.textFile(“hdfs://Hadoop/user/test_file.txt”); def isFound(line): if line.find(“my_keyword”) > -1 return 1 return 0 foundBits = lines.map(isFound); sum = foundBits.reduce(sum); if sum > 0: print “Found” else: print “Not Found”;
If the keyword is found it will print as “Found” else if it not found it will print “Not Found”.
There are two types of errors in Python: syntax errors and exceptions.
Syntax errors are often referred to as parsing errors. Bugs are errors in a program that can cause it to crash or terminate unexpectedly. When the parser detects an error, it repeats the problematic line and then displays an arrow pointing to the beginning of the line.
Exceptions occur in a program when the program's normal flow is disrupted by an external event. Even if the program's syntax is accurate, there is a chance that an error will be encountered during execution; however, this error is an exception. ZeroDivisionError, TypeError, and NameError are some examples of exceptions.
Receivers are unique objects in Apache Spark Streaming whose sole purpose is to consume data from various data sources and then push it to Spark. By streaming contexts as long-running tasks on different executors, we can generate receiver objects.
There are two different types of receivers which are as follows:
Whenever PySpark performs a transform operation using filter(), map(), or reduce(), they are executed on a remote node that uses the variables supplied with the tasks. These variables are not reusable and cannot be shared across jobs because they are not returned to the controller. To solve the problem of reusability and sharing, we use shared variables in PySpark. There are two types of shared variables:
They are also known as read-only shared variables. These are used in cases of data lookup requests. These variables are cached and made available on all cluster nodes to use. These variables are not sent with every task. Instead, they are distributed to nodes using efficient algorithms to reduce communication costs. When we run an RDD task operation that uses Broadcast variables, PySpark does this:
Broadcast variables are created in PySpark using the broadcast(variable) method of the SparkContext class. The main reason for the use of broadcast variables is that the variables are not sent to the tasks when the broadcast function is called. They will be sent when the variables are first required by the executors.
These variables are known as updatable shared variables. These variables are added through associative and commutative methods. They are used for performing counter or sum operations. PySpark supports the default creation of numeric type accumulators. It can also be used to add custom accumulator types. The custom types are of two types: Named accumulator and Unnamed accumulator.
One of the most frequently posed PySpark Interview Questions, be ready for it. The DAG is a full form of Direct Acyclic Graph. In Spark, DAGScheduler represents scheduling layer that implements task scheduling in a phase-oriented manner using tasks and phases. The logical execution plan (the dependencies of the transformation action line on the RDD) is transformed into a physical execution plan consisting of phases. It calculates the DAG of phases needed for each task and keeps track of which phases are realized RDDs and finds the minimum schedule for executing the tasks. These stages are then sent to the TaskScheduler to run the stages. This is shown in the image below:
DAGScheduler computes the DAG execution for the task. It specifies preferred locations for running each task. It handles failure due to loss of output files during mixing.
PySpark DAGScheduler follows an event queue architecture. Here, the thread publishes events of type DAGSchedulerEvent, such as a new stage or task. DAGScheduler then reads the phases and executes them sequentially in topological order.
Let’s understand the process through an example. Here, we want to capitalize the first letter of every word in a string. There is no default feature in PySpark that can achieve this. However, we can make this happen by creating a UDF capitalizeWord(str) and using it on the DataFrames.
First, we will create a Python function capitalizeWord(). This function takes a string as an input and then capitalizes the first character of every word.
def capitalizeWord(str): result = “” words = str.split(“”) for word in words: result = result + word[0:1].upper() + word[1:len(x)] + “” return result
Now, we will register the function as a PySpark UDF using the udf() method from org.apache.spark.sql.functions.udf package. This package must be imported. This function will return the object of class org.apache.spark.sql.expressions.UserDefinedFunction. The code for converting a function to UDF is:
capitalizeWordUDF = udf(lambda z: capitalizeWord(z), StringType())
Next step is to use UDF with DataFrame. We can apply UDF on a Python DataFrame as it will act as the built-in function of DataFrame. Consider we have a DataFrame stored in variable df, which has the columns as ID_COLUMN and NAME_COLUMN. Now, to capitalize every first character of the word, we will code as:
df.select(col(“ID_COLUMN”), convertUDF(col(“NAME_COLUMN”)) .alias(“NAME_COLUMN”) ) .show(truncate = False)
UDFs must be designed so that the algorithms are efficient and take up less time and space. If care is not taken, the performance of DataFrame operations will be affected.
We use the builder pattern to create a SparkSession. In PySpark.sql library, there is the SparkSession class which has the getOrCreate() method. This method will create a new SparkSession if there is none or else it will return the existing SparkSession object. The code for create SparkSession looks like:
import PySpark from PySpark.sql import SparkSession spark = SparkSession.builder.master(“local”) .appName(‘KnowledgeHuntSparkSession’) .getOrCreate()
If we want to create a new SparkSession object each time, we can use the newSession method. This method looks like:
import PySpark from PySpark.sql import SparkSession spark_session = SparkSession.newSession
In PySpark, there are various methods used to create RDD. Let’s discuss them one by one.
Using sparkContext.parallelize() - SparkContext's parallelize() method can be used to create RDDs. This method retrieves an existing collection from the controller and parallelizes it. This is the basic approach to creating an RDD and is used when we have data already present in memory. This also requires all data to be present in the driver before the RDD is created. The code to create an RDD using the parallelize method for the python list is:
list = [1,2,3,4,5,6,7,8,9,10,11] rdd = spark.sparkContext.parallelize(list)
Using sparkContext.textFile() - We can read .txt file and convert it into RDD. The syntax looks like:
rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")
Using sparkContext.wholeTextFiles() - This method returns PairRDD. PairRDD is an RDD that contains key-value pairs. In this PairRDD, file path is the key and file content is the value. This method reads entire file into an RDD as a single record. Besides, this method can also read files in other formats like CSV, JSON, parquet, etc. and create the RDDs.
rdd_whole_text = spark.sparkContext.wholeTextFiles("/path/to/textFile.txt")
Empty RDD with no partition using sparkContext.emptyRDD - The RDD without any data is known as empty RDD. We can make such RDDs which have no partitions by using the emptyRDD() method. The code piece for that will look like:
empty_rdd = spark.sparkContext.emptyRDD empty_rdd_string = spark.sparkContext.emptyRDD[String]
Empty RDD with partitions using sparkContext.parallelize - When we require partition but not the data, then we create empty RDD by using the parallelize method. For example, below code create the empty RDD with 10 partitions:
empty_partitioned_rdd = spark.sparkContext.parallelize(,10)
PySpark SQL is the most famous PySpark module used to process structured columnar data. Once the DataFrame is created, we can work with the data using SQL syntax. Spark SQL is used to pass native raw SQL queries to Spark using select, where group by, join, union, etc. To use PySpark SQL, the first step is to create a temporary table on a DataFrame using the createOrReplaceTempView() method. Once created, the table is accessible within a SparkSession using the sql() function. When the SparkSession is terminated, the temporary table will get dropped.
For example, consider that we have a DataFrame assigned to the variable df which contains Name, Age and Gender of Students as the columns. Now, we will create a temporary table of the DataFrame that gets access to the SparkSession by using the sql() function. The SQL queries can be run within the function.
df.createOrReplaceTempView("STUDENTS") df_new = spark.sql("SELECT * from STUDENTS") df_new.printSchema() The schema will be shown as: >> df.printSchema() root |-- Name: string (nullable = true) |-- Age: integer (nullable = true) |-- Gender: string (nullable = true)
We can use join() method that is present in PySpark SQL. The syntax of the method looks like:
join(self, other, on = None, how = None)
where() and filter() methods can be attached to the join expression to filter rows. We can also have multiple joins using the chaining join() method.
For example, consider we have two dataframes named Employee and Department. Both have columns named as emp_id, emp_name, empdept_id and dept_id, dept_name respectively. We can internally join the Employee DataFrame with the Department DataFrame to get the department information along with the employee information. The code will look like:
emp_dept_df = empDF.join(deptDF,empDF.empdept_id==deptDF.dept_id,"inner").show(truncate = False)
PySpark Streaming is a scalable, fault-tolerant, high-throughput process streaming system that supports both streaming and batch loading to support real-time data from data sources such as TCP Socket, S3, Kafka, Twitter, file system folders, etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS, etc.
To stream from a TCP socket, we can use the readStream.format("socket") method of the Spark session object to read data from the TCP socket and provide the host and port of the streaming source as options. The code will look like:
from PySpark import SparkContext from PySpark.streaming import StreamingContext from PySpark.sql import SQLContext from PySpark.sql.functions import desc sc = SparkContext() ssc = StreamingContext(sc, 10) sqlContext = SQLContext(sc) socket_stream = ssc.socketTextStream("127.0.0.1", 5555) lines = socket_stream.window(20) df.printSchema()
Spark retrieves the data from the socket and represents it in the value column of the DataFrame object. After processing the data, the DataFrame can be streamed to the console or other destinations on demand such as Kafka, dashboards, databases, etc.
Spark automatically stores intermediate data from various shuffle processes. However, it is recommended to use RDD's persist() function. There are many levels of persistence for storing RDDs in memory, disk, or both, with varying levels of replication. The following persistence levels are available in Spark:
The persist() function has the following syntax for using persistence levels:
The streaming application must be available 24/7 and tolerant of errors outside the application code (e.g., system crashes, JVM crashes, etc.). The checkpointing process makes streaming applications more fault tolerant. We can store data and metadata in the checkpoint directory.
Checkpoint can be of two types – metadata check and data check.
A metadata checkpoint allows you to store the information that defines a streaming computation in a fault-tolerant storage system such as HDFS. This helps to recover data after a streaming application controller node failure.
Data checkpointing means saving the created RDDs to a safe place. This type of checkpoint requires several state calculations that combine data from different batches.
We can determine the total number of unique words by following certain steps in PySpark. The steps to follow can be listed as:
Open the text file in RDD mode.
Then write a function that will convert each line into a single word.
def toWords(line): return line.split();
Now, run the toWords function on every member of the RDD in Spark.
words = line.flatMap(toWords);
Next, generate a (key, value) pair for every word.
def toTuple(word): return (word, 1); wordTuple = words.map(toTuple);
Run the reduceByKey() command.
def sum(x, y): return x+y: counts = wordsTuple.reduceByKey(sum)
Then print it out.
The basis of Spark Streaming is dividing the content of the data stream into batches of X seconds, known as DStreams. These DStreams allow developers to cache data, which can be especially useful if data from a DStream is used multiple times. The cache() function or the persist() method can be used to cache data with the correct persistence settings. For input streams receiving data over networks such as Kafka, Flume, and others, the default persistence level is configured to achieve data replication across two nodes to achieve fault tolerance.
Cache method -
val cacheDf = dframe.cache()
Persist method -
val persistDf = dframe.persist(StorageLevel.MEMORY_ONLY)
There are some benefits of Caching. The key benefits are time saving and cost efficiency. Since Spark computations are expensive, caching helps in data reuse, which leads to reuse of computations and reduces the cost of operations. By reusing calculations, we can save a lot of time. Worker nodes can execute/run more tasks by reducing the computation execution time. Hence more tasks can be achieved.
Spark RDD is extended with a robust API called GraphX that supports graphs and graph-based computations. The Resilient Distributed Property Graph is an enhanced property of Spark RDD, which is a directed multigraph with many parallel edges. User-defined characteristics are associated with each edge and vertex. Multiple connections between the same set of vertices are represented by parallel edges. GraphX offers a collection of operators that enable graph computations such as subgraph, mapReduceTriplets, joinVertices, and so on. It also offers many chart builders and algorithms to facilitate chart analysis.
According to the UNIX Standard Streams, Apache Spark supports the pipe() function on RDDs, which allows us to assemble different parts of jobs that can use any language. An RDD transformation can be created using the pipe() function and can be used to read each RDD element as a string. These can be changed as needed and the results can be presented as strings.
Examine the given file. The file contains some corrupt data. What will you do with such data and how will you import it into a Spark DataFrame?
Emp_no, Emp_name, Department 101, Murugan, HealthCare Invalid Entry, Description: Bad Record entry 102, Kannan, Finance 103, Mani, IT Connection lost, Description: Poor Connection 104, Pavan, HR Bad Record, Description: Corrupt record
import findspark findspark.init() from PySpark. sql import Sparksession, types spark = Sparksession.builder.master("local").appName( "Modes of Dataframereader')\ .getorcreate() sc = spark.sparkContext from PySpark.sql.types import * schm structype([ structField("col_1", stringType(), True), structField("col_2", stringType(), True), structrield("col", stringtype(), True), ]) df = spark.read.option("mode", "DROPMALFORMED").csv('input1.csv', heade r= True, schema = schm) df. show()
The given file has a delimiter ~|. How will you load it as a Spark DataFrame? Use SparkSession (spark).
Name ~| Age Azarudeen, Shahul~| 25 Michel, Clarke ~| 26 Virat, Kohli ~| 28 Andrew, Simond ~| 37 George, Bush~| 59 Flintoff, David ~| 12
import findspark findspark.init() from PySpark.sql import Sparksession, types spark = Sparksession.builder.master("local").appliame("scenario based")\ -getorcreate() sc = spark.sparkContext dfaspark.read.text("input.csv") df.show(truncate = 0) header = df.first() schema = header.split(-') df_imput = df.filter(df['value'] l= header).rdd.map(lambda x: x. split('-|')).toDF (schema) df_input.show(truncate = 0)
Consider a file that contains Education as a column. This column includes an array of elements as shown below. Convert each element in the array to a record by using Spark DataFrame.
Name | Age | Education Azar | 25 | MBA, BE, HSC Hari | 32 | Kumar | 35 | ME, BE, Diploma
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('scenario based')\ .getorCreate() sc = spark.sparkContext in_df = spark.read.option("delimiter","|").csv("input4.csv", header-True) in_df.show() from PySpark.sql.functions import posexplode_outer, split in_df.withColumn("Qualification", explode_outer(split("Education",","))).show() in_df.select("*", posexplode_outer(split("Education",","))).withColumnRenamed ("col", "Qualification").withColumnRenamed ("pos", "Index").drop(“Education”).show()
How will you merge two files File1 and File2 into a single DataFrame if they have different schemas?
File1: Name | Age Azarudeen, Shahul | 25 Michel, Clarke | 26 Virat, Kohli | 28 Andrew, Simond | 37 File2: Name | Age | Gender Rabindra, Tagore | 32 | Male Madona, Laure | 59 | Female Flintoff, David | 12 | Male Ammie, James | 20 | Female
import findspark findspark.init() from PySpark.sql import SparkSession, types spark = SparkSession.builder.master("local").appName('Modes of Dataframereader')\ .getorCreate() sc = spark.sparkContext df1 = spark.read.option("delimiter","|").csv('input.csv') df2 = spark.read.option("delimiter","|").csv("input2.csv",header=True) from PySpark.sql.functions import lit df_add = df1.withColumn("Gender",lit("null")) df_add. union(df2).show()
For the Union-
from PySpark.sql.types import * schema = StructType( [ StructField("Name", StringType(), True), StructField("Age", StringType(), True), StructField("Gender", StringType(),True), ] ) df3 = spark.read.option("delimiter","|").csv("input.csv", header = True, schema = schema) df4 = spark.read.option("delimiter","|").csv("input2.csv", header = True, schema = schema) df3.union(df4).show()