top

Search

Apache Spark Tutorial

IntroductionLet us look at Spark’s graph processing library. Apache Spark GraphX is an efficient graph processing framework embedded within the Spark distributed dataflow system. GraphX presents a familiar, expressive graph API. GraphX API enables the composition of graphs with unstructured and tabular data and permits the same physical data to be viewed both as a graph and as collections without data movement or duplication. For example, using GraphX it is easy to join a social graph with user comments, apply graph algorithms, and expose the results as either collections or graphs to other procedures (e.g., visualization or rollup). Consequently, GraphX enables users to adopt the computational pattern (graph or collection) that is best suited for the current task without sacrificing performance or flexibility.Let us look at Spark’s graph processing library. Apache Spark GraphX is an efficient graph processing framework embedded within the Spark distributed dataflow system. GraphX presents a familiar, expressive graph API. GraphX API enables the composition of graphs with unstructured and tabular data and permits the same physical data to be viewed both as a graph and as collections without data movement or duplication. For example, using GraphX it is easy to join a social graph with user comments, apply graph algorithms, and expose the results as either collections or graphs to other procedures (e.g., visualization or rollup). Consequently, GraphX enables users to adopt the computational pattern (graph or collection) that is best suited for the current task without sacrificing performance or flexibility.   GraphX is built as a library on top of Spark by encoding graphs as collections and then expressing the GraphX API on top of standard dataflow operators. GraphX requires no modifications to Spark, revealing a general method to embed graph computation within distributed dataflow frameworks and distill graph computation to a specific join–map–group-by dataflow pattern. By reducing graph computation to a specific pattern we identify the critical path for system optimization. To achieve performance parity with specialized graph processing systems, GraphX introduces a range of optimizations both in how graphs are encoded as collections as well as the execution of the common dataflow operators. Flexible vertex-cut partitioning is used to encode graphs as horizontally partitioned collections and match the state of the art in distributed graph partitioning.  GraphX recasts system optimizations developed in the context of graph processing systems as join optimizations (e.g., CSR indexing, join elimination, and join-site specification) and materialized view maintenance (e.g., vertex mirroring and delta updates) and applies these techniques to the Spark dataflow operators. By leveraging logical partitioning and lineage, GraphX achieves low-cost fault tolerance. Finally, by exploiting immutability GraphX reuses indices across graph and collection views and over multiple iterations, reducing memory overhead and improving system performance. GraphX has a library of algorithms like PageRank, Connected Components, etc. But GraphX is the library in Spark which has the least language support and Scala is the only language.  GraphFrames is a graph processing library developed by Databricks, University of California, Berkeley, and the Massachusetts Institute of Technology. It is an external Spark package built on top of Spark DataFrames. As it is developed on top of DataFrames, all the operations available on DataFrames are also possible on GraphFrames. And it also comes with support for programming languages such as Java, Python, Scala, and R with a uniform API. As GraphFrames is built on top of DataFrames, users by default get the added benefits like support for numerous data sources, the persistence of data, and powerful graph queries in Spark SQL.  A GraphFrame is logically represented as two DataFrames: an edge DataFrame and a vertex DataFrame. That is to say, edges and vertices are represented in separate DataFrames, and each of them can contain attributes that are part of the supported types. Take a social network graph for an example. The vertices can contain attributes including name (string), age (integer), and geographic location (a struct consisting of two floating-point values for longitude and latitude), while the edges can contain an attribute about the time a user friended another (timestamp). The GraphFrame model supports user-defined attributes with each vertex and edges and thus is equivalent to the property graph model used in many graph systems including GraphX and GraphLab. GraphFrame is more general than Pregel/Giraph since GraphFrame supports user-defined attributes on edges.  Similar to DataFrames, a GraphFrame object is internally represented as a logical plan, and as a result, the declaration of a GraphFrame object does not necessarily imply the materialization of its data.  class GraphFrame { // Different views on the graph def vertices: DataFrame def edges: DataFrame def triplets: DataFrame // Pattern matching def pattern(pattern: String): DataFrame // Relational-like operators def filter(predicate: Column): GraphFrame def select(cols: Column*): GraphFrame def joinV(v: DataFrame, predicate: Column): GraphFrame def joinE(e: DataFrame, predicate: Column): GraphFrame // View creation def createView(pattern: String): DataFrame // Partition function def partitionBy(Column*) GraphFrame } Listing 2: GraphFrame API in ScalaGraphFrame API in Scala.  A GraphFrame can be constructed using two DataFrames: a vertex DataFrame and an edge DataFrame. A DataFrame is merely a logical view (plan) and can support a wide range of sources that implement a data source API. Some examples of a DataFrame input include:a table registered in Spark SQL’s system cataloga table in an external relational database through JDBCJSON, Parquet, Avro, CSV files on diska table in memory in columnar formata set of documents in ElasticSearch or Solrresults from relational transformations on the aboveThe following code demonstrates constructing a graph using a user table in a live transactional database and the edges table from some JSON based log files in Amazon S3:users = read.jdbc (" mysql ://...") likes = read.json ("s3 ://...") graph = GraphFrame (users , likes)Again, since DataFrames and GraphFrames are logical abstractions, the above code does not imply that users, likes, or graphs are materialized.GraphFrames vs. GraphXLet us look at some of the differences between the Apache Spark’s GraphX and GraphFrame libraries.GraphFramesGraphXCore APIsScala, Java, PythonScala onlyProgramming AbstractionDataFramesRDDsUse CasesAlgorithms, Queries, Motif FindingAlgorithmsVertexIdsAny type (in Catalyst)LongVertex/edge attributesAny number of DataFrame columnsAny type (VD,ED)Return TypesGraphFrames/DataFramesGraph [VD,ED] or...ConclusionIn this module we looked at the features of GraphX and also understood how it is different from GraphFrames libraries.
logo

Apache Spark Tutorial

Graph Processing with GraphFrames

Introduction

Let us look at Spark’s graph processing library. Apache Spark GraphX is an efficient graph processing framework embedded within the Spark distributed dataflow system. GraphX presents a familiar, expressive graph API. GraphX API enables the composition of graphs with unstructured and tabular data and permits the same physical data to be viewed both as a graph and as collections without data movement or duplication. For example, using GraphX it is easy to join a social graph with user comments, apply graph algorithms, and expose the results as either collections or graphs to other procedures (e.g., visualization or rollup). Consequently, GraphX enables users to adopt the computational pattern (graph or collection) that is best suited for the current task without sacrificing performance or flexibility.

Let us look at Spark’s graph processing library. Apache Spark GraphX is an efficient graph processing framework embedded within the Spark distributed dataflow system. GraphX presents a familiar, expressive graph API. GraphX API enables the composition of graphs with unstructured and tabular data and permits the same physical data to be viewed both as a graph and as collections without data movement or duplication. For example, using GraphX it is easy to join a social graph with user comments, apply graph algorithms, and expose the results as either collections or graphs to other procedures (e.g., visualization or rollup). Consequently, GraphX enables users to adopt the computational pattern (graph or collection) that is best suited for the current task without sacrificing performance or flexibility.   

GraphX is built as a library on top of Spark by encoding graphs as collections and then expressing the GraphX API on top of standard dataflow operators. GraphX requires no modifications to Spark, revealing a general method to embed graph computation within distributed dataflow frameworks and distill graph computation to a specific join–map–group-by dataflow pattern. By reducing graph computation to a specific pattern we identify the critical path for system optimization. To achieve performance parity with specialized graph processing systems, GraphX introduces a range of optimizations both in how graphs are encoded as collections as well as the execution of the common dataflow operators. Flexible vertex-cut partitioning is used to encode graphs as horizontally partitioned collections and match the state of the art in distributed graph partitioning.  

GraphX recasts system optimizations developed in the context of graph processing systems as join optimizations (e.g., CSR indexing, join elimination, and join-site specification) and materialized view maintenance (e.g., vertex mirroring and delta updates) and applies these techniques to the Spark dataflow operators. By leveraging logical partitioning and lineage, GraphX achieves low-cost fault tolerance. Finally, by exploiting immutability GraphX reuses indices across graph and collection views and over multiple iterations, reducing memory overhead and improving system performance. GraphX has a library of algorithms like PageRank, Connected Components, etc. But GraphX is the library in Spark which has the least language support and Scala is the only language.  

GraphFrames is a graph processing library developed by Databricks, University of California, Berkeley, and the Massachusetts Institute of Technology. It is an external Spark package built on top of Spark DataFrames. As it is developed on top of DataFrames, all the operations available on DataFrames are also possible on GraphFrames. And it also comes with support for programming languages such as Java, Python, Scala, and R with a uniform API. As GraphFrames is built on top of DataFrames, users by default get the added benefits like support for numerous data sources, the persistence of data, and powerful graph queries in Spark SQL.  

A GraphFrame is logically represented as two DataFrames: an edge DataFrame and a vertex DataFrame. That is to say, edges and vertices are represented in separate DataFrames, and each of them can contain attributes that are part of the supported types. Take a social network graph for an example. The vertices can contain attributes including name (string), age (integer), and geographic location (a struct consisting of two floating-point values for longitude and latitude), while the edges can contain an attribute about the time a user friended another (timestamp). The GraphFrame model supports user-defined attributes with each vertex and edges and thus is equivalent to the property graph model used in many graph systems including GraphX and GraphLab. GraphFrame is more general than Pregel/Giraph since GraphFrame supports user-defined attributes on edges.  

Similar to DataFrames, a GraphFrame object is internally represented as a logical plan, and as a result, the declaration of a GraphFrame object does not necessarily imply the materialization of its data.  

class GraphFrame { // Different views on the graph def vertices: DataFrame def edges: DataFrame def triplets: DataFrame // Pattern matching def pattern(pattern: String): DataFrame // Relational-like operators def filter(predicate: Column): GraphFrame def select(cols: Column*): GraphFrame def joinV(v: DataFrame, predicate: Column): GraphFrame def joinE(e: DataFrame, predicate: Column): GraphFrame // View creation def createView(pattern: String): DataFrame // Partition function def partitionBy(Column*) GraphFrame } Listing 2: GraphFrame API in Scala

GraphFrame API in Scala.  

A GraphFrame can be constructed using two DataFrames: a vertex DataFrame and an edge DataFrame. A DataFrame is merely a logical view (plan) and can support a wide range of sources that implement a data source API. Some examples of a DataFrame input include:

  • a table registered in Spark SQL’s system catalog
  • a table in an external relational database through JDBC
  • JSON, Parquet, Avro, CSV files on disk
  • a table in memory in columnar format
  • a set of documents in ElasticSearch or Solr
  • results from relational transformations on the above

The following code demonstrates constructing a graph using a user table in a live transactional database and the edges table from some JSON based log files in Amazon S3:

users = read.jdbc (" mysql ://...")
likes = read.json ("s3 ://...")
graph = GraphFrame (users , likes)

Again, since DataFrames and GraphFrames are logical abstractions, the above code does not imply that users, likes, or graphs are materialized.

GraphFrames vs. GraphX

Let us look at some of the differences between the Apache Spark’s GraphX and GraphFrame libraries.


GraphFrames
GraphX
Core APIs
Scala, Java, Python
Scala only
Programming Abstraction
DataFrames
RDDs
Use Cases
Algorithms, Queries, Motif Finding
Algorithms
VertexIds
Any type (in Catalyst)
Long
Vertex/edge attributes
Any number of DataFrame columns
Any type (VD,ED)
Return Types
GraphFrames/DataFrames
Graph [VD,ED] or...

Conclusion

In this module we looked at the features of GraphX and also understood how it is different from GraphFrames libraries.

Leave a Reply

Your email address will not be published. Required fields are marked *