Accelerate JSON in Spark SQL

GoDaddy has heavily invested in the Hadoop ecosystem to manage much of our data needs. We have multi-petabyte clusters across the US, a modern data lake, and JSON events flowing everywhere.

Our production data warehousing jobs are written using PigHive and a handful of legacy Map/Reduce jobs. These are high level languages that have traditionally compiled down to a set of Hadoop map/reduce jobs for execution. Battle tested for years, they’re stable and reliable, but can be slow for development and execution.

Alternatively, Spark was developed at the UC Berkeley AMPlab with a slightly different goal in mind: fast, iterative distributed computation over data that was stored primarily in memory. The API was easy to use and popularized first-class functions on a Hadoop cluster. While it is fairly similar to high-level languages like Pig or Hive, there is no new syntax to learn. Spark programs are written in popular languages including Scala, Python and R.

Spark SQL builds on Spark with a powerful optimization system and relational APIs (ala SQL), perfect for querying almost anything that resembles a tabular data source. Including our JSON event streams, if you squint just enough.

More capabilities

Spark SQL added support for JSON quite a while back with the release of Spark 1.1. While it wasn’t game changing, it did simplify things for those of us committed to JSON.

One of my first experiments using Spark SQL was a little tool that measured storage costs of compressed JSON vs. typed Parquet files. It was just a handful of lines of Scala, an excellent demonstration of the power of Spark SQL. The runtime was much slower than I had anticipated however, and profiling confirmed that there was room for improvement. But it was good enough at the time, and I left good enough alone.

Replacing the foundation: Spark 1.4

Fast forward another year or so and we have a massive amount of JSON events streaming through an equally massive Kafka bus, and an ever increasing number of Spark and PySpark applications are running in production. While many jobs are well served by datasets that “cooked” from the raw JSON periodically (among other sources), others need to access the real-time event stream or to information ignored by the scheduled batch conversions.

Around the release of Spark 1.3, I started working on a small research project to find optimization opportunities with our infrastructure. One of our large computing grids runs a mix of long running jobs that have very different resource utilization patterns. Some are I/O heavy, others are CPU heavy, and they all have variable but predictable changes in behavior throughout the day and week. These jobs are randomly assigned to machines across the grid, causing some resources to be locally oversubscribed even though the grid has excess capacity.

We’ve been collecting operational metrics about these jobs for quite a while. CPU time, network utilization, disk IOPS and bandwidth, memory utilization, among others. Very useful. Exactly what we’d like to aggregate and feed into a multidimensional bin packer to search for better static job placement.

But there was a rub: disk IOPS were missing from the cooked dataset. Since this was a research project, it didn’t seem appropriate to hack up the production job. Besides that, it processed a lot of fields that I didn’t need and at a resolution much greater than what my prototype optimizer would be able to handle. So I turned to Spark SQL for my JSON handling needs, primed with a few hundred terabytes worth of small event payloads. I found that even though it had received a handful of improvements to it’s JSON handling since Spark 1.1, one thing near to my heart (and quota) remained unchanged: performance.

Now seemed as good a time as any to finish what had been tempting many months ago.

Under the hood

The JSON parsing in Spark SQL had been based on Jackson’s very popular ObjectMapper. It’s easy to use and is fast for what it does. However, it assumes you have both a JSON document and a Java class that provides the schema. As Spark SQL schemas don’t have a corresponding class, it requested a generic representation: java.util.Map[String, AnyRef]. Alternatively it could have requested a more natural class: JsonNode. But neither is natively supported by Spark SQL, and a few transformations are needed before they can be used. Besides being a rather roundabout way to do the parsing, it had some other effects we’ll talk about later.

Dropping down a few layers of abstraction below the ObjectMapper is Jackson’s streaming API. It generates a stream of JsonToken values that represent (unsurprisingly) the tokenized JSON document. It is less convenient to work than an object model, but it enables a very direct conversion to to Spark’s internal schema and rows, and generates a lot less garbage along the way.

The main casualty of the stream representation is, well, pretty much the remainder of the code. Converting trees (an object model) to trees (like a schema) no longer works when the input is a stream. So, we’ll rely on the classic way of converting streams to a tree: a recursive descent parser.

Recursive descent parsers aren’t new or novel, but are popular for a reason…. they’re easy to implement, understand, and debug. However, they’re best for languages that require limited (or no) ‘lookahead’. Lucky for us, Spark SQL’s datatypes fall into this class. So what are they exactly? Wikipedia has a good overview, but the basic idea is that each production in the language is implemented by a function, and these functions can call each other (they are mutually recursive) to handle different types. We will end up writing a couple of these.

JSON schema inference

The first step along the way is to perform schema inference. For a single JSON document, we’d like to figure out what the types are of each field. And since we’re talking about Spark, we can safely assume there are going to be a whole lot of documents… we will need some way of inferring a single schema for an entire RDD[String] full of JSON.

Conceptually this process is identical to before. Schema inference will still take an RDD[String] and return a StructType. Similarly, parsing needs an RDD[String], a StructType, and to produce a DataFrame. However, the new implementation is nearly a complete rewrite that takes complete advantage of the JSON token stream.

Making it a little more concrete, lets define inference as a map-reduce job:

  • The map function performs runs inference on a single document: String => StructType
  • A reduce function combines any two schemas into one: (StructType, StructType) => StructType
    • This function needs to be commutative in order to be safely used in reduce or aggregate operation
    • Which forms a commutative monoid with an empty structure as the identity: ∅ = StructType(Seq())

Single document inference

Single document inference is very straightforward. We merely need to map Jackson’s token types to Spark SQL’s data types. The token value is never accessed, the type alone is usually sufficient for schema inference. This saves additional overhead.

But there is one big problem: JSON arrays are heterogeneous, but Spark SQL arrays are not. While is valid JSON doesn’t have a clear mapping back to a Spark type: {array: [1, "test"]}. So how do we fix this? Read on.

Merging types

When we have an array containing a bunch of types there isn’t always a great way of resolving the differences. In some cases it’s easy. Given an Integer and a BigInteger and it’s obvious that the smaller type can be converted to a larger type without losing information. But a String with a Long? One thing we know for sure is that all JSON nodes can be serialized. And their serialized form, at least in Spark SQL, is a String. This is our top type.

And we can extend this to all other types. Whenever we find a mismatch that cannot be resolved via a lossless widening we will convert both sides to a String. With a few exceptions for complex types.

Multiple document inference

By fixing the array case we’ve really done most of the work for inferring a schema for an entire RDD worth of JSON.

Extending this to multiple documents ended up being boring (Spark is awesome, right?). We could fold (sequentially) the inference and merging functions over the RDD, but we’ll take advantage of the commutative laws and divide-and-conquer with a tree aggregation to improve performance on very large datasets.

Parsing

Now that we have a schema we can use to drive the conversion, let’s tackle the parser.

The new parser is also recursive descent parser and is schema directed. That is to say that that only fields and types defined in the schema are parsed and everything else is discarded. This paves the road for projection pushdown as well as being good for performance.

The hard work was almost entirely in the inference stage, the parser itself will be much simpler. While recursively parsing the token stream, the fields in the stream just need to be paired up with the fields defined in the schema (if there is one) and written into the appropriate column in a row.

However, there is one little snag. String is our top type, so any field, including objects and arrays in the token stream, must be convertible to a String. Jackson natively supports converting a token stream back to a String so this is nearly free. Easy right? Parsing is complete.

Benchmarks

Now we can convert any set of JSON strings into a Spark StructType and its matching DataFrame without an object model in sight. How did the rewrite fare in the end?

On production jobs I regularly saw a 10x to 50x decrease in wall clock time for parsing tasks. The actual performance delta will depend on the schema and queries being executed, but it should be faster across the board.

Prior to merging upstream I ran some simple benchmarks using the publicly available last.fm Million Song dataset:

Command v1.3.1 v1.4.0
import sqlContext.implicits._
val df = sqlContext.jsonFile("/tmp/lastfm.json") 70.0s 14.6s
df.count() 28.8s 6.2s
df.rdd.count() 35.3s 21.5s
df.where($"artist" === "Robert Hood").collect() 28.3s 16.9s

Resolution

Spark 1.4 has been out for nearly a year and this change lives on in the most recent releases as well. Whenever you use SQLContext.read.json (or the deprecated SQLContext.jsonRDD) this is quietly processing your data behind the scenes. No config or code changes required, JSON handling is just faster.

The Spark job that might have taken a week before now completes in a few hours. And I ended up with the IOPS metrics I needed rolled up in 15 minute quantiles, perfect for the bin packing optimizer. That’s what I call a win.

Hope you’re also enjoying the improved performance! And if you’re interested in working on these and other fun problems with us, check out our jobs page.