Tuesday, May 26, 2015

Parquet Multiple Schema Output Format

From time to time (too many times) i am getting surprised by an open sourced development that lacks some basic features. I am getting even more surprised by the number of Google returned results (small one :) ) that correspond to that missing feature.

One example is the Parquet multiple output format.

Apache Parquet logo

If you have more than 1 parquet schema to write within the same mapreduce, it is impossible to do that with the current combination of Multiple output format and the Parquet output format.

How come that no one has ever needed to do that? We have found only 1 Google result of someone who asks about that and no one answered.

The positive side is that it makes our job relevant and interesting. We, the engineers, can proudly tell stories about how we opened the code and implemented our own classes, contributing our part to the community. It is much more interesting than using ready to use products and integrating stuff.

Now back to the parquet thing. The reason that you can't write different schemes within the same job is that the schema is being set in the Main class of the mapreduce. All the classes that are taking part of the parquet records writing are using the first schema that was set on the initiating of the Output Format.

You have to find a way to override the schema from the Reducer, according to the current record schema.



In order to do that, there are 3 places that you should take care of.

1+2. on the ParquetOutputFormat.java class

It is being called from the reducer in order to get the RecordWrtier. We got the schema out of the path, cause we had configuration with schema to path mapping

We replaced the schema in the blue parts:


 public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)  
     throws IOException, InterruptedException {  
   final WriteSupport<T> writeSupport = getWriteSupport(conf);  
   CodecFactory codecFactory = new CodecFactory(conf);  
   long blockSize = getLongBlockSize(conf);  
   if (INFO) LOG.info("Parquet block size to " + blockSize);  
   int pageSize = getPageSize(conf);  
   if (INFO) LOG.info("Parquet page size to " + pageSize);  
   int dictionaryPageSize = getDictionaryPageSize(conf);  
   if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);  
   boolean enableDictionary = getEnableDictionary(conf);  
   if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));  
   boolean validating = getValidation(conf);  
   if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));  
   WriterVersion writerVersion = getWriterVersion(conf);  
   if (INFO) LOG.info("Writer version is: " + writerVersion);  
   WriteContext init = writeSupport.init(conf);  
   ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema() our schema, file);  
   w.start();  
   float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,  
     MemoryManager.DEFAULT_MEMORY_POOL_RATIO);  
   long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,  
     MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);  
   if (memoryManager == null) {  
    memoryManager = new MemoryManager(maxLoad, minAllocation);  
   } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {  
    LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +  
      "be reset by the new value: " + maxLoad);  
   }  
   return new ParquetRecordWriter<T>(  
     w,  
     writeSupport,  
     init.getSchema()  our schema,  
     init.getExtraMetaData(),  
     blockSize, pageSize,  
     codecFactory.getCompressor(codec, pageSize),  
     dictionaryPageSize,  
     enableDictionary,  
     validating,  
     writerVersion,  
     memoryManager);  
  }  


3. on GroupWriter class.  that is being called from the groupWriteSupport instance.

  public void write(Group group) {  
   recordConsumer.startMessage();  override schema with group.
   writeGroup(group, schema  group.getType());  
   recordConsumer.endMessage();  
  } 


Thats it.

Thanks to Nimrod Parasol and Kiril Yershov 
for  the implementation.  

Friday, May 22, 2015

Migrating to Yarn On a Mapreduce and Impala Cluster

Hey

I would like to share with you some insights from the yarn 2.5 migration we have gone through.
We are using cdh 5.2.1 with Impala and Mapreduce as the main workloads on the cluster.
Don't take it as a technical guide but as a technical experience with some interesting points we share.

  • Let The Dogs Out:  The cluster is a multi-tenant cluster that is used for Mapreduce and  Cloudera Impala. Before migrating to yarn everything worked great: we  were allocating static resources to Impala (X g RAM on a daemon, no  limit for cpu) and y and z mappers and reducers for each Mapreduce  tasktracker while setting max java heap size for mappers and reducers.We  had enough memory for both Impala and Mapreduce jobs and no problem  with the cpu eaither: We set 14 mappers and 14 reducers per 32 cores  node. The map and reduce slots are always full so it didn't leave much  cpu for the Impala process, but The good thing is that the Impala didn't  really care and it always had enough cpu. The charts showed that the  Impala uses 1/10 cpu in compare to the Mapreduce jobs.  We said that  moving to yarn is like letting the dogs out - and we don't know whether   they will  bite each other or play with each other.

  •  The Dogs Bite Each Other:  After migrating there were X available VCORES for all applications  (Impala and Mapreduce) on the cluster. The mapreduce jobs behaved as  expected and asked for 1 VCORE per mapper/reducer. The problem was that  the Impala asked for lots of VCORES - much more than we expected and  much higher than the Minimum configuration of the fair scheduled we set  for users. Simple query on a parquet 100K rows table with stats, asked  for X/2 VCORES (half of the cluster capacity). We have around 2 queries  per second (requires huge amount of vcores). It all resulted in a 50 %  Impala failures because of 5 minutes time-out of waiting for resources.  In the other hand, important Mapreduce jobs didn't get lots of vcores  and spent lot of time waiting for resources. We saw that users that are  running important mapreduce jobs gets 1/10 vcores than a simple Impala  user that decided to run some queries. That is an undesirable situation  that YARN brought.

  •  An Impala CPU is not a Mapreduce CPU:   Why doe's Impala ask for so many vcores? Cloudera is talking about lack  of table statistics. In addition, according to Nimrod Parasol Impala opens 1 thread per disk. We have 360 disks on cluster so every query was asking for 360 vcores from yarn, and it is not acceptable in a total of 720 vcores cluster. You can take this as an immature implementation of LLAMA - the yarn and impala mediator. We see  that when all the cpu slots is used by mappers, the load average is  extremely high. But when they are used by impala queries, the load  average is quite low. The conclusion is that impala is using the cpu in a  lighter way than the mapreduce. So why should we allocate the same  vcores to both mapreduce and impala, and let the mapreduce jobs wait for  a cpu that is being used by Impala, but could have serve each other at  the same time.

  •  Solutions?  Lets say that we have 1000 vcores on cluster. We were willing to set  1,000 vcores for mapreduce, because setting it higher would result in a  high load average on servers (The mappers are usually cpu intensive). We  were also willing to set 10,000 vcores for the impala because we know  that when it asks for 100 vcores, it is probably physically using much  less cpus. That allocation would create a situation where we give  virtual cpus to impala, but it's ok cause we see that before yarn we  gave all the cpus to Mapreduce, and the Impala did great. The problem is  that yarn and llama won't allow us setting hierarchic queues - upper  limits for mapreduce and impala, and more allocations for applicative  users inside each pool (In order to use hierarchic queues, each user have to set it manually. eg.  set "mapreduceQueue.userA") .

  • Eventually, we gave up on managing Impala resource with llama and yarn, and stayed with Mapreduce 2 (YARN) by itself.


2 More  things you should be aware of:

  • High  Load Average on cluster: another thing that we saw is that after moving  to mapredcue 2, the servers load average got higher by almost 100% (250  with yarn, 150 without yarn). We think that the reason is that more  mappers are  running simultaneously (because we only set number of  containers per tasktracker, but not number of mappers and reducers ), in  compare to when we set y mappers and z reducers per servers, and  mappers are usually heavier than reducers.

  • Hive  on oozie issue: After migrating to yarn, hive steps in oozie workflows  failed with "hive-site.xml (permission denied)". The message has lots of  references, but nothing that is related to yarn. Eventually the  solution was to create a new share lib that contains yarn jars,  cancelling the oozie share lib mapping parameter (that points to a  custom share lib directory ), and using the default shar elib path: The  latest /user/oozie/share/lib/libyyymmdd directory.
Bottom  line:  I don't find Impala mature enough to work with mapreduce under  YARN. A better scheduler and queues options (hierarchic pools), or a  better LLAMA implementation (So impala would ask for its real vcores  need) is required.

Good luck everybody.

Moving from quora

I finally moved from Quora  (http://distcp.quora.com/) to a more appropriate blog system.
I have noticed that Quora blog system is very graphically minimalistic, and far from being user friendly.
The reason that i picked it was that every time i searched Google for a technical or architectural issues i found Quora's results as the most professional answers, which were written by some of the leaders of these subjects in the world.
My hopes were that some of these people would find my blog inside Quora and would even follow my blog.

That didn't happen. I realized that it is hard to find my blog on Google, not even when searching my name together with the subjects i am writing about. Furthermore, i understood that people got into my blog only via my linked-in and tweeter shares which can be done from any blog system, so why not picking an easy to use and user friendly blog system, that in addition has more chances to get indexed by Google?

So here i come Blogger.

(A print screen of my previous blog, just to feel how it is like to upload an image to your blog :) )

.