## Apache Spark Tutorial

IntroductionWe have understood how Spark can be used in the batch processing of Big data. In this section we will explore how Apache Spark fits the processing of Structured data.Distributed stream processing has made a lot of progress in recent years due to the wide use cases across industries. Even then these systems pose a lot of challenges to the users. The main challenges come in the form of complex physical execution concepts, like at-least-once delivery, storage of the state of the streams and how to trigger the stream processing. Another major challenge comes in the form of joining streaming data with batch data or joining with static data, running interactive queries for insights or other use cases. So streaming applications in today’s world cannot work in isolation as integration with these systems is needed. Another challenge is maintaining the transactionality while interacting with multiple systems.Some of the challenges of Stream processing are:Multiple data formats (json, xml, avro, parquet, binary)  Timing of data could be late and out of order  Quality of data could be dirty Programming complexity  Complex Use Cases like joining streaming with interactive queries, machine learning, etc. Wide variety of storage like systems (HDFS, Kafka, NoSQL, RDBMS, S3, Kinesis, ...)  System failures and restarts Continuous Applications is the latest trend. Continuous applications show how enterprises would be able to get almost real-time or near real-time reports with very low latency. We can see the difference between Streaming and Continuous Application below in the diagram.Structured Streaming is a high-level API for stream processing. It was developed in Apache Spark starting in 2016. Structured Streaming has similarities with other open source streaming systems but differs from then in mainly two ways as discussed below:  1. Incremental query model:Structured Streaming will automatically increment the queries on static data which are in the form of Spark SQL or DataFrame APIs. This means that the users will only need to understand and write the Spark’s batch APIs to performa Streaming query also. It is very easy to understand and write the event time concepts in this model. Though incremental query, execution and view maintenance have been studied and researched quite well, Spark’s Structured Streaming is the first to adopt them in a widely-used open source system. This incremental API generally worked well for both novice and advanced users.  2. Support for end-to-end applications: Structured Streaming’s API and built-in connectors can be integrated into larger applications using Spark and other software and also they make it easy to write code that is “correct by default.” The “exactly-once" computation is automatically enabled by using data sources and sinks which follow the transactional model. The incrementalization based API makes it easy for users to run a streaming query as a batch job.It also becomes easy to develop a hybrid applications that join streams with static data computed through Spark’s batch APIs. Users can dynamically execute multiple streaming queries and run interactive queries on consistent snapshotted data. This helps to drill down even on streaming data if required.Data Sources  We can create Streaming DataFrames by using the DataStreamReader interface returned by SparkSession.readStream(). Apache Spark provides different built-in data sources like Apache Kafka, network socket, and file source.File Source This data source reads file from a directory as a stream of data. There are many file formats supported like csv, json, orc, parquet, and text. There is a condition required though that the file must be atomically placed in the directory. Apache Kafka Source  This data source reads data from the Apache Kafka. The compatibility can be found from the latest Kafka Integration Guide, but Kafka version 0.10.0 onwards should be supported. Network Socket Source This data source should be mostly used for testing purposes as it does not guarantee an end to end fault tolerance. This reads UTF8 data from socket connection. Data Sinks  We will only need to start the streaming computation after the final result DataFrame/Dataset is defined. We can use theDataStreamWriter returned through Dataset.writeStream(). We will also need to specify an output sink or an output mode. There are some output sinks built-in Apache Spark Structure Streaming which is available to the users. File Sinks This sink stores the output to a specified directory. Kafka Sinks This sink will store the output to Kafka in one or more than one partitions as per the configuration. Console Sink This sink is mostly used for debugging purposes.It prints the output to the console/stderr for every trigger. Foreach Sink  This sink is used to run arbitrary computation on the records in the output.  Memory Sink This is also used for debugging.The output is stored in memory. A Short Example Structured Streaming uses Spark structured APIs i.e. DataFrames, Datasets, and Spark SQL. Users will have to mainly work with abstractions like tables that are represented in Spark as DataFrames or Datasets. Spark automatically will launch streaming computation when a user creates a table or a DataFrame from a streaming input source. Let us see a simple example, which has a batch job that counts clicks by country of origin for a web application. Suppose that the input data is JSON files and the output should be Parquet. This job can be written with Spark DataFrames in Scala as follows:  // Define a DataFrame to read from static data data = spark.read.format("json").load("/inputFile") // Transform it to compute a result counts = data.groupBy($"country").count() // Write to a static data sink counts.write.format("parquet").save("/countsByOrigin")If we need to change this processing to Structured Streaming, we only need to modify the input and the output sources. There is no need to change the transformations used in the middle. For example, if new JSON files are going to continually be uploaded to the /streamInput directory, we can modify our job to continually update /counts by changing only the first and last lines: // Define a DataFrame to read streaming data data = spark.readStream.format("json").load("/streamInput") // Transform it to compute a result counts = data.groupBy($"country").count() // Write to a streaming data sink counts.writeStream.format("parquet").outputMode("complete") .start("/countsByOrigin")The output mode parameter specifies how the Structured Streaming is expected to update the sink. Here we said complete mode, meaning the complete result will be written for each update since the output file sink does not support incremental updates. There are other sinks though available which can be used for incremental updates depending on the use case. ConclusionIn this module, we looked at how Apache Spark can be used for processing Structured data. So now, we have a clear understanding of how Spark can be used for both Structure and Unstructured data processing.

# Continuous Applications with Structured Streaming

Introduction

We have understood how Spark can be used in the batch processing of Big data. In this section we will explore how Apache Spark fits the processing of Structured data.

Distributed stream processing has made a lot of progress in recent years due to the wide use cases across industries. Even then these systems pose a lot of challenges to the users. The main challenges come in the form of complex physical execution concepts, like at-least-once delivery, storage of the state of the streams and how to trigger the stream processing. Another major challenge comes in the form of joining streaming data with batch data or joining with static data, running interactive queries for insights or other use cases. So streaming applications in today’s world cannot work in isolation as integration with these systems is needed. Another challenge is maintaining the transactionality while interacting with multiple systems.

Some of the challenges of Stream processing are:

• Multiple data formats (json, xml, avro, parquet, binary)
• Timing of data could be late and out of order
• Quality of data could be dirty
• Programming complexity
• Complex Use Cases like joining streaming with interactive queries, machine learning, etc.
• Wide variety of storage like systems (HDFS, Kafka, NoSQL, RDBMS, S3, Kinesis, ...)
• System failures and restarts

Continuous Applications is the latest trend. Continuous applications show how enterprises would be able to get almost real-time or near real-time reports with very low latency. We can see the difference between Streaming and Continuous Application below in the diagram.

Structured Streaming is a high-level API for stream processing. It was developed in Apache Spark starting in 2016. Structured Streaming has similarities with other open source streaming systems but differs from then in mainly two ways as discussed below:

1. Incremental query model:

Structured Streaming will automatically increment the queries on static data which are in the form of Spark SQL or DataFrame APIs. This means that the users will only need to understand and write the Spark’s batch APIs to performa Streaming query also.

It is very easy to understand and write the event time concepts in this model. Though incremental query, execution and view maintenance have been studied and researched quite well, Spark’s Structured Streaming is the first to adopt them in a widely-used open source system. This incremental API generally worked well for both novice and advanced users.

2. Support for end-to-end applications:

Structured Streaming’s API and built-in connectors can be integrated into larger applications using Spark and other software and also they make it easy to write code that is “correct by default.” The “exactly-once" computation is automatically enabled by using data sources and sinks which follow the transactional model. The incrementalization based API makes it easy for users to run a streaming query as a batch job.It also becomes easy to develop a hybrid applications that join streams with static data computed through Spark’s batch APIs. Users can dynamically execute multiple streaming queries and run interactive queries on consistent snapshotted data. This helps to drill down even on streaming data if required.

• Data Sources

We can create Streaming DataFrames by using the DataStreamReader interface returned by SparkSession.readStream(). Apache Spark provides different built-in data sources like Apache Kafka, network socket, and file source.

• File Source

This data source reads file from a directory as a stream of data. There are many file formats supported like csv, json, orc, parquet, and text. There is a condition required though that the file must be atomically placed in the directory.

• Apache Kafka Source

This data source reads data from the Apache Kafka. The compatibility can be found from the latest Kafka Integration Guide, but Kafka version 0.10.0 onwards should be supported.

• Network Socket Source

This data source should be mostly used for testing purposes as it does not guarantee an end to end fault tolerance. This reads UTF8 data from socket connection.

• Data Sinks

We will only need to start the streaming computation after the final result DataFrame/Dataset is defined. We can use theDataStreamWriter returned through Dataset.writeStream(). We will also need to specify an output sink or an output mode.

There are some output sinks built-in Apache Spark Structure Streaming which is available to the users.

• File Sinks

This sink stores the output to a specified directory.

• Kafka Sinks

This sink will store the output to Kafka in one or more than one partitions as per the configuration.

• Console Sink

This sink is mostly used for debugging purposes.It prints the output to the console/stderr for every trigger.

• Foreach Sink

This sink is used to run arbitrary computation on the records in the output.

• Memory Sink

This is also used for debugging.The output is stored in memory.

• A Short Example

Structured Streaming uses Spark structured APIs i.e. DataFrames, Datasets, and Spark SQL. Users will have to mainly work with abstractions like tables that are represented in Spark as DataFrames or Datasets. Spark automatically will launch streaming computation when a user creates a table or a DataFrame from a streaming input source.

Let us see a simple example, which has a batch job that counts clicks by country of origin for a web application. Suppose that the input data is JSON files and the output should be Parquet. This job can be written with Spark DataFrames in Scala as follows:

// Define a DataFrame to read from static data
// Transform it to compute a result
counts = data.groupBy($"country").count() // Write to a static data sink counts.write.format("parquet").save("/countsByOrigin") If we need to change this processing to Structured Streaming, we only need to modify the input and the output sources. There is no need to change the transformations used in the middle. For example, if new JSON files are going to continually be uploaded to the /streamInput directory, we can modify our job to continually update /counts by changing only the first and last lines: // Define a DataFrame to read streaming data data = spark.readStream.format("json").load("/streamInput") // Transform it to compute a result counts = data.groupBy($"country").count()
// Write to a streaming data sink
counts.writeStream.format("parquet").outputMode("complete")
.start("/countsByOrigin")

The output mode parameter specifies how the Structured Streaming is expected to update the sink. Here we said complete mode, meaning the complete result will be written for each update since the output file sink does not support incremental updates. There are other sinks though available which can be used for incremental updates depending on the use case.

Conclusion

In this module, we looked at how Apache Spark can be used for processing Structured data. So now, we have a clear understanding of how Spark can be used for both Structure and Unstructured data processing.