Tuesday, May 26, 2015

Parquet Multiple Schema Output Format

From time to time (too many times) i am getting surprised by an open sourced development that lacks some basic features. I am getting even more surprised by the number of Google returned results (small one :) ) that correspond to that missing feature.

One example is the Parquet multiple output format.

Apache Parquet logo

If you have more than 1 parquet schema to write within the same mapreduce, it is impossible to do that with the current combination of Multiple output format and the Parquet output format.

How come that no one has ever needed to do that? We have found only 1 Google result of someone who asks about that and no one answered.

The positive side is that it makes our job relevant and interesting. We, the engineers, can proudly tell stories about how we opened the code and implemented our own classes, contributing our part to the community. It is much more interesting than using ready to use products and integrating stuff.

Now back to the parquet thing. The reason that you can't write different schemes within the same job is that the schema is being set in the Main class of the mapreduce. All the classes that are taking part of the parquet records writing are using the first schema that was set on the initiating of the Output Format.

You have to find a way to override the schema from the Reducer, according to the current record schema.

In order to do that, there are 3 places that you should take care of.

1+2. on the ParquetOutputFormat.java class

It is being called from the reducer in order to get the RecordWrtier. We got the schema out of the path, cause we had configuration with schema to path mapping

We replaced the schema in the blue parts:

 public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)  
     throws IOException, InterruptedException {  
   final WriteSupport<T> writeSupport = getWriteSupport(conf);  
   CodecFactory codecFactory = new CodecFactory(conf);  
   long blockSize = getLongBlockSize(conf);  
   if (INFO) LOG.info("Parquet block size to " + blockSize);  
   int pageSize = getPageSize(conf);  
   if (INFO) LOG.info("Parquet page size to " + pageSize);  
   int dictionaryPageSize = getDictionaryPageSize(conf);  
   if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);  
   boolean enableDictionary = getEnableDictionary(conf);  
   if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));  
   boolean validating = getValidation(conf);  
   if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));  
   WriterVersion writerVersion = getWriterVersion(conf);  
   if (INFO) LOG.info("Writer version is: " + writerVersion);  
   WriteContext init = writeSupport.init(conf);  
   ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema() our schema, file);  
   float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,  
   long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,  
   if (memoryManager == null) {  
    memoryManager = new MemoryManager(maxLoad, minAllocation);  
   } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {  
    LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +  
      "be reset by the new value: " + maxLoad);  
   return new ParquetRecordWriter<T>(  
     init.getSchema()  our schema,  
     blockSize, pageSize,  
     codecFactory.getCompressor(codec, pageSize),  

3. on GroupWriter class.  that is being called from the groupWriteSupport instance.

  public void write(Group group) {  
   recordConsumer.startMessage();  override schema with group.
   writeGroup(group, schema  group.getType());  

Thats it.

Thanks to Nimrod Parasol and Kiril Yershov 
for  the implementation.  

No comments:

Post a Comment