One example is the Parquet multiple output format.
If you have more than 1 parquet schema to write within the same mapreduce, it is impossible to do that with the current combination of Multiple output format and the Parquet output format.
How come that no one has ever needed to do that? We have found only 1 Google result of someone who asks about that and no one answered.
The positive side is that it makes our job relevant and interesting. We, the engineers, can proudly tell stories about how we opened the code and implemented our own classes, contributing our part to the community. It is much more interesting than using ready to use products and integrating stuff.
Now back to the parquet thing. The reason that you can't write different schemes within the same job is that the schema is being set in the Main class of the mapreduce. All the classes that are taking part of the parquet records writing are using the first schema that was set on the initiating of the Output Format.
You have to find a way to override the schema from the Reducer, according to the current record schema.
In order to do that, there are 3 places that you should take care of.
1+2. on the ParquetOutputFormat.java class
It is being called from the reducer in order to get the RecordWrtier. We got the schema out of the path, cause we had configuration with schema to path mapping
We replaced the schema in the blue parts:
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec) throws IOException, InterruptedException { final WriteSupport<T> writeSupport = getWriteSupport(conf); CodecFactory codecFactory = new CodecFactory(conf); long blockSize = getLongBlockSize(conf); if (INFO) LOG.info("Parquet block size to " + blockSize); int pageSize = getPageSize(conf); if (INFO) LOG.info("Parquet page size to " + pageSize); int dictionaryPageSize = getDictionaryPageSize(conf); if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize); boolean enableDictionary = getEnableDictionary(conf); if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off")); boolean validating = getValidation(conf); if (INFO) LOG.info("Validation is " + (validating ? "on" : "off")); WriterVersion writerVersion = getWriterVersion(conf); if (INFO) LOG.info("Writer version is: " + writerVersion); WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter(conf,
init.getSchema()our schema, file); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO); long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION, MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION); if (memoryManager == null) { memoryManager = new MemoryManager(maxLoad, minAllocation); } else if (memoryManager.getMemoryPoolRatio() != maxLoad) { LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " + "be reset by the new value: " + maxLoad); } return new ParquetRecordWriter<T>( w, writeSupport,
init.getSchema()our schema
, init.getExtraMetaData(), blockSize, pageSize, codecFactory.getCompressor(codec, pageSize), dictionaryPageSize, enableDictionary, validating, writerVersion, memoryManager); }
3. on GroupWriter class. that is being called from the groupWriteSupport instance.
public void write(Group group) { recordConsumer.startMessage(); override schema with group. writeGroup(group,
schema
group.getType()
); recordConsumer.endMessage(); }
Thats it.
Thanks to Nimrod Parasol and Kiril Yershov
for the implementation.