This is a proposal to add support for optimizing and executing relational queries in a manner that integrates tightly with the Core Spark API. The goal is to allow Spark users to both run SQL queries over data that is currently stored in RDDs as well as run SQL queries over external data sources (such as Hive), returning the results as an RDD.
Spark SQL support will be broken into three major components.
An implementation-agnostic framework for manipulating trees of relational operators and expressions. Catalyst was first discussed during a talk at the most recent Spark Summit, and a more detailed design document for Catalyst can be found here.
Catalyst provides three main features: a TreeNode library for transforming trees that are expressed as Scala case classes, a logical plan representation for relational operators, and an expression library. A key design point here was to build an extensible system that could be extended for Streaming SQL and other dataflows that need optimization.
Catalyst has no interfaces that are directly exposed to Spark users.
Catalyst’s only dependency is on the scala-logging interface for SLF4J by Typesafe. This logging library integrates with Spark’s existing logging infrastructure, with the added benefit of making logging statements that are below the current logging level even cheaper through the use of Scala macros.
This component includes a set of standard physical operators (e.g. Project, Filter, Hash Join, Nested Loop Join, etc.), as well as a set of planning strategies that are used to select the specific physical operators that will be used to execute a given logical query plan.
SqlContext, which takes as an argument a standard SparkContext. This interface provides the ability to register RDDs as tables and run over them SQL statements expressed as strings, returning the results as an RDD. There is also support for registering data stored in Parquet as a table. Finally, there is an experimental DSL that allows queries to be expressed using a LINQ-like syntax.
Spark SQL’s only dependency is on the Parquet libraries, which have an active community and a small transitive dependency footprint.
The Hive module adds support to Spark SQL for interacting with data and queries in the Hive ecosystem. This includes:
- A mapping from a HiveQL AST to catalyst logical plans / expression trees.
- An interface that allows queries to reference tables that are extant in a Hive MetaStore.
- A table scan operator that can read data from Hive SerDes.
- Wrappers for existing Hive UDFs, UDAFs, and UDTFs.
- Support for passing DDL commands back to hive for execution.
The Hive module provides a HiveContext which extends SqlContext with the ability to interact with existing Hive deployments using HiveQL.
The Hive module has dependencies on Hive 0.12.0, specifically hive-metastore, hive-exec, and hive-serde. While these dependencies are on an unmodified version of Hive that can be obtained from Maven Central, they do introduce significant transitive dependencies to the project. Due to this large dependency tree, and to avoid possible conflicts with dependencies of existing Spark applications, the Hive module is not included in the Spark assembly. Instead, there is a separate, optional Hive assembly that is used only when present.
Dependencies on Catalyst and Spark SQL are added to the Spark assembly. The compute classpath is also changed to use the optional hive assembly if present.
Each of the submodules of Spark SQL has unit tests. There is also a framework for executing a subset of the query tests that are included in the Hive distribution, which augments the test coverage with an additional 647 multi-part tests. When this framework runs, the query test files are broken up into individual statements and executed using Spark SQL. The result of each query is then compared with a “golden” answer that has been pre-generated using a stock Hive system. While these tests are primarily intended to ensure Hive compatibility, they also act as integration tests for the entire Spark SQL task.
Unlike Shark, Spark SQL does not act as a drop in replacement for Hive or the HiveServer. Instead this new feature is intended to make it easier for Spark developers to run queries over structured data, using either SQL or the query DSL. After this sub-project graduates from Alpha status it will likely become a new optimizer/backend for the Shark project.