top

Search

Apache Spark Tutorial

IntroductionEven though Spark is a general purpose computing engine, it also helps to handle Streaming data. Spark streaming can be used for near-real time processing.Structured Streaming supports all types of untyped SQL like operations such as select, where, groupBy and typed RDD like operations like map, filter, flatMap, etc. Most of the common operations on DataFrames/Datasets can be applied to Structure streaming. There are only a few operations which are not supported which we will see later. Let us see some examples:case class DeviceData(device: String, deviceType: String,  signal: Double, time: DateTime)val df: DataFrame =...// streaming DataFrame with IOT device  data with schema { device: string, deviceType: string, signal:  double, time: string }  val ds: Dataset[DeviceData] = df.as[DeviceData]// streaming Dataset with IOT device data // Select the devices which have signal more than 10  df.select("device").where("signal > 10")  // using untyped APIs    ds.filter(_.signal >10).map(_.device)    // using typed  APIs // Running count of the number of updates for each device type  df.groupBy("deviceType").count()        //  using untyped API // Running average signal for each device type  import org.apache.spark.sql.expressions.scalalang.typed  ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))  // using  typed APIWe can also register the streaming Dataframe as a temporary view or table the same way we did for DataFrames. df.createOrReplaceTempView("updates")  spark.sql("select count(*) from updates")  // returns another streaming DF We can also check if the Dataframe is streaming or not using below: df.isStreamingWindow Operation on Event Time If we want to aggregate data over a sliding event-time window it is easy using Structured Streaming. It is also very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window that the event-time of a row falls into. Event Time Aggregations and WaterMarking From a logical point of view, the key idea in event time is to treat application-specified timestamps as an arbitrary field in the data, allowing records to arrive out-of-order. We can then use standard operators and incremental processing to update results grouped by event time. In practice, however, it is useful for the processing system to have some loose bounds on how late data can arrive, for two reasons:  Allowing arbitrarily late data might require storing arbitrarily large states. For example, if we count data by a 1-minute event time window, the system needs to remember a count for every 1-minute window since the application beganbecause a late record might still arrive for any particular minute. This can quickly lead to large amounts of state, especially if combined with another grouping key. The same issue happens with joins. Some sinks do not support data retraction, making it useful to be able to write the results for a given event time after a timeout. For example, custom downstream applications want to start working with a “final" result and might not support retractions. Append-mode sinks also do not support retractions.  Structured Streaming lets developers set a watermark for event time columns using the withWatermark operator. This operator gives the system a delay threshold tC for a given timestamp column C. At any point in time, the watermark for C is max(C) −tC, that is, tC seconds before the maximum event time seen so far in C. Note that this choice of watermark is naturally robust to backlogged data: if the system cannot keep up with the input rate for a period of time, the watermark will not move forward arbitrarily during that time, and all events that arrived within at most T seconds of being produced will still be processed. When present, watermarks affect when stateful operators can forget old state (e.g., if grouping by a window derived from a watermarked column), and when Structured Streaming will output data with an event time key to append-mode sinks. Different input streams can have different watermarks.With Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Spark 2.3 added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point in time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Many usecases require more advanced stateful operations than aggregations. For example, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState.  Both operations allow the application of user-defined code on grouped Datasets to update user-defined state. Continuous Processing ModeA new continuous processing added in Apache Spark 2.3 executes Structured Streaming jobs using long-lived operators as in traditional streaming systems such as Telegraph and Borealis. This mode enables lower latency at a cost of less operational flexibility (e.g., limited support for rescaling the job at runtime).  The key enabler for this execution mode was choosing a declarative API for Structured Streaming that is not tied to the execution strategy. For example, the original Spark Streaming API had some operators based on processing time that leaked the concept of micro-batches into the programming model, making it hard to move programs to another type of engine. In contrast, Structured Streaming’s API and semantics are independent of the execution engine: continuous execution is similar to having a much larger number of triggers. Note that unlike systems based purely on unsynchronized message passing, such as Storm, we do retain the concept of triggers and epochs in this mode so the output from multiple nodes can be coordinated and committed together to the sink. ConclusionWe saw how Spark Streaming works and this makes it a very suitable choice for processing batches as well as streaming data processing. It is also a very good choice for implementing Lambda architecture.
logo

Apache Spark Tutorial

Streaming Operations on DataFrames and Datasets

Introduction

Even though Spark is a general purpose computing engine, it also helps to handle Streaming data. Spark streaming can be used for near-real time processing.

Structured Streaming supports all types of untyped SQL like operations such as select, where, groupBy and typed RDD like operations like map, filter, flatMap, etc. 

Most of the common operations on DataFrames/Datasets can be applied to Structure streaming. There are only a few operations which are not supported which we will see later. 

Let us see some examples:

case class DeviceData(device: String, deviceType: String,
 signal: Double, time: DateTime)
val df: DataFrame =...// streaming DataFrame with IOT device
 data with schema { device: string, deviceType: string, signal:
 double, time: string } 
val ds: Dataset[DeviceData] = df.as[DeviceData]// streaming
 Dataset with IOT device data 
// Select the devices which have signal more than 10 
df.select("device").where("signal > 10")  // using untyped APIs    
ds.filter(_.signal >10).map(_.device)    // using typed
 APIs 
// Running count of the number of updates for each device type 
df.groupBy("deviceType").count()        // 
using untyped API 
// Running average signal for each device type 
import org.apache.spark.sql.expressions.scalalang.typed 
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))  // using
 typed API

We can also register the streaming Dataframe as a temporary view or table the same way we did for DataFrames. 

df.createOrReplaceTempView("updates") 
spark.sql("select count(*) from updates")  // returns another

 streaming DF 

We can also check if the Dataframe is streaming or not using below: 

df.isStreaming

Window Operation on Event Time 

If we want to aggregate data over a sliding event-time window it is easy using Structured Streaming. It is also very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window that the event-time of a row falls into. 

Event Time Aggregations and WaterMarking 

From a logical point of view, the key idea in event time is to treat application-specified timestamps as an arbitrary field in the data, allowing records to arrive out-of-order. We can then use standard operators and incremental processing to update results grouped by event time. In practice, however, it is useful for the processing system to have some loose bounds on how late data can arrive, for two reasons:  

  • Allowing arbitrarily late data might require storing arbitrarily large states. For example, if we count data by a 1-minute event time window, the system needs to remember a count for every 1-minute window since the application beganbecause a late record might still arrive for any particular minute. This can quickly lead to large amounts of state, especially if combined with another grouping key. The same issue happens with joins. 
  • Some sinks do not support data retraction, making it useful to be able to write the results for a given event time after a timeout. For example, custom downstream applications want to start working with a “final" result and might not support retractions. Append-mode sinks also do not support retractions.  

Structured Streaming lets developers set a watermark for event time columns using the withWatermark operator. This operator gives the system a delay threshold tC for a given timestamp column C. At any point in time, the watermark for C is max(C) −tC, that is, tC seconds before the maximum event time seen so far in C. Note that this choice of watermark is naturally robust to backlogged data: if the system cannot keep up with the input rate for a period of time, the watermark will not move forward arbitrarily during that time, and all events that arrived within at most T seconds of being produced will still be processed. When present, watermarks affect when stateful operators can forget old state (e.g., if grouping by a window derived from a watermarked column), and when Structured Streaming will output data with an event time key to append-mode sinks. Different input streams can have different watermarks.

Late data handling in windowed grouped aggregation

With Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. 

Spark 2.3 added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point in time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. 

Many usecases require more advanced stateful operations than aggregations. For example, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState.  

Both operations allow the application of user-defined code on grouped Datasets to update user-defined state. 

Continuous Processing Mode

A new continuous processing added in Apache Spark 2.3 executes Structured Streaming jobs using long-lived operators as in traditional streaming systems such as Telegraph and Borealis. This mode enables lower latency at a cost of less operational flexibility (e.g., limited support for rescaling the job at runtime).  

The key enabler for this execution mode was choosing a declarative API for Structured Streaming that is not tied to the execution strategy. For example, the original Spark Streaming API had some operators based on processing time that leaked the concept of micro-batches into the programming model, making it hard to move programs to another type of engine. 

In contrast, Structured Streaming’s API and semantics are independent of the execution engine: continuous execution is similar to having a much larger number of triggers. Note that unlike systems based purely on unsynchronized message passing, such as Storm, we do retain the concept of triggers and epochs in this mode so the output from multiple nodes can be coordinated and committed together to the sink. 

Conclusion

We saw how Spark Streaming works and this makes it a very suitable choice for processing batches as well as streaming data processing. It is also a very good choice for implementing Lambda architecture.

Leave a Reply

Your email address will not be published. Required fields are marked *