Tuesday, May 22, 2018

Spark dataframe json schema misinferring - String typed column instead of struct

All you wanted is to load some complex json files into a dataframe, 
and use sql with [lateral view explode] function to parse the json.

Sounds like the basics of SparkSql.

A problem can arise when one of the inner fields of the json, 
has undesired non-json values in some of the records. 

For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct. 

As a result, our schema would look like:

root
 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: string (nullable = true)

Instead of 

root
 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)


When trying to explode a "string" type, we will get a miss type error:

org.apache.spark.sql.AnalysisException: Can't extract value from requestBody#10


How can we remove the non-json values and still, get the correct schema in the dataframe?

Removing the non-jsons values, using string filtering with SparkSql on requestBody column, 
will clean the data, but won't change the type of the column, and we will still not be able to use json functions on the column.

In order to clean the data and then, getting the right schema, 
we should load the dataset into a RDD, filtering out bad rows, and creating a dataframe out of the clean RDD:

Good solution:  RDD -> Cleansing -> Dataframe ( using spark.read.json(cleanRDD)



A bad solution would be to load the data as a dataframe. The requestBody column will be set as a String. Now we can filter out bad records, and store the dataframe back to disk.

At that point, the value of the string typed requestBody will be encapsulated with quotes, and any future effort to load the "fixed" dataset into a dataframe, would set this column's type to be string.
With that, we will have to reload the "fixed" data into a RDD, and cleaning out the encapsulated string, using replace command:

val fixedRDD=badRDD.map(_.replace("\\\"","\"")).map(_.replace("\"{","{")).map(_.replace("}\"","}"))

From that point we can use spark.read.json to get a valid dataframe, and querying it with explode function!

Good luck
and don't forget that sometimes, RDD is the right way to clean the data, before getting it ready and clean for higher level APIs, such as dataframes.