Thursday, February 18, 2016

Too many HDFS blocks!

Don't underestimate the number of blocks on your cluster.
 
Well we had a feeling that the cluster became heavier and slower since we passed the 700 K blocks per node threshold, but things kept working and we have been waited for the right time to fix it. Until the day the Namenode didn't start up because of JVM pause.
 
It all started with a human mistake that ended with hdfs restarting. The active namenode and the standby, in an unexpected behavior, were both in standby state and went down after several minutes. Our automatic failover is disabled and the manual failover didn't work out. We tried many combinations of restarting and stopping of the 2 namenodes, but the error log showed that the namenode (the one we wanted to activate) can't tell at what state it is (standby or active). Weird.    
We enabled the automatic failover controller. Then we saw at its logs that it can't figure out the namenode service id ip (the one who should be active). Then we saw that the namenode service id that the failover controller mentioned is different than what we see on the hdfs-site.xml (at dfs.namenode.rpc-address confs) or in the zookeeper's server confs. Weird.
 
We didn't really know where this id comes from so we tried restarting the zookeeper that is responsible for the hadoop-ha, playing with the zookkeper /hadoop-ha directory,and deploying configurations, but nothing helped. 
 
We decided to disable high availability. We installed a secondary namenode and started the hdfs. The namenode got started! But then it crushed! We saw this message: Detectedpause in JVM or host machine (eg GC). This is bad.. what should we do? 


We decided to go back to high availability mode. It all went good and at this time, there was an elected active namenode and a standby namenode. I guess the configurations got fixed somehow. The problem was that the jvm pause was still happening. 
 
Nimrod (https://www.linkedin.com/profile...), suggested increasing the namenode jvm heap size. We found the configuration on the cloudera manager and saw a warning about a ratio between the number of blocks and the namenode heap size: 1 G per 1 M blocks. We had 6 M blocks, and 4 GB! After increasing the heap to 12 GB, the namenode got started and stayed up. Victory :) 
 
There are many unsolved questions about what happend. Why couldn’t the namenode determined its state, and why the zookeeper, at the stage we moved to automatic failover, was not able to elect an active namenode. Why did the failover controller tried communicate with wrong namenode service id. We will look into the hdfs core-site.xml that we didn't check during the problems and will read more about the failover process (who elects the active in a manul state, where did the zookeeper took the namenodes ids).
 
But there is 1 thing we are certain about and it's the potential disaster  of the too many blocks alerts.

A Property graph over Elasticsearch?

I would like to share with you a very interesting architectural argument that is happening on my company.

It all started when Datastax acquired Titan (by thinkaurelius). Titan is a graph database that sits on top of Cassandra, Hbase or berkeley db.

Till then, we used Titan as our property graph that allowed us integrate lots of entities with a schema that kept changing. We were able to index edges and vertices properties in order to get them by filtering on one of their properties, and in addition we could travel from the vertex to its neighbors in a O(1) time.
 We didn't really need to ask graph oriented questions such as finding central vertices.

We liked titan for being scalable and even though we used Hbase, we were happy with it.

So, what have been changed ? 

We had lots of plans to use Titan in many upcoming projects, and the Datastax announcement brought us to realize that Titan will stay behind and won't get developed from now on. 
As an enterprise, that wished titan to get to its stable version and offer support, we ere very disappointed.

In the meanwhile, some guys started playing with Elasticsearch as a property graph. they even implemented a Thinkerpop API for Elasticsearch - and after some benchmarks we found out that the performances of the Elasticsearch were significantly better than the Titan performances on many use cases.

Oh, and i didn't mention that we also need some text search capabilities, that are part of Elasticsearch, but when using Titan,  it is only possible with an Elasticsearch cluster that gets its data from Titan, without having the possibility to configure the Elasticsearch index in a custom and more optimized ways.

A very interesting argument started on a the mail.

Some said that we should keep using Titan cause it is working, and use Elasticsearch only as the Titan's text search store. Datastax announced that it would be simple to move from titan to their property graph and when reaching this bridge, we would just move to Datastax DSE grpah db. we are not talking about other graph dbs cause we are not familiar with a scalable and stable graph db that is better than Titan (OrientDB for instance).

Other said that we must try Elasticsearch as our property graph, and keep develop Thinkerpop above it (mostly for being able to write in Gremlin). It is worth trying cause we had good benchmarks and it is better to use one back-end than two (Titan and Elastic).

On the other hand, there were voices that eliminated Elasticsearch and were very upset with the idea of threat it as a property graph. They said that in its nature it won't be able to support the type of indexing, masses and continues online writes that our property graph requires "Elasticsearch is not a Back End!". And in addition, why should we take care of a new thnikerpop implementation.

For summary I would say that property graph (RDF also, but that's for another talk) is a great data model for data that keep changing and help us make the data available and findable while avoiding hundreds of relational tables. 

I think that while having a working Titan property graph cluster, we should keep using it in other projects mainly because of the  experience with it. I don't think that having 2 back ends (Titan and  elastic) for 1 app is wrong, while using each back properly.

Elasticsearch has gone through a long way since its beginning. Many databases are now offering more than only "document store" or "key-value store" even if that is their nature. You can find Quora questions about [Why should I NOT use ElasticSearch as my primary datastore]. That is a good question that scientifically might be answered as a big NO but down there on the tech teams, it might work.

So, in case The Titan + Elastic won't work for us, trying Elastic is probably the next thing to do.

Monday, February 15, 2016

Sentiment Analysis with Spark streaming (Twitter Stream) and databricks/spark-coreNLP

Hi
I want to share a piece of code that I a have written, not long ago, and that might be good base for a nice Sentiment Analysis with spark streaming.

The example is going to analyse twits stream, despite the fact that the CoreNlp was trained with movies reviews, what makes that sentiment analysis less accurate.

You can train the algorithm by yourself but that's for another post :)

So - spark 1.5.2, scala 2.10, CoreNLP 3.6.0, protobuf-java 2.6.1, java 1.8

Fortunately, coreNlp is available as dependency since Jan 16,

Databricks spark core nlp wrapper - working with coreNLP 3.6:

The guys from data bricks are using SparkSql in order to wrap coreNlp. Interesting.
Take a look at the git hub page of the project, in order to understand a bit more.

For twitter:
you must log in to  twitter public streams
https://dev.twitter.com/streaming/public

you need to create your own application and get the id and access tokens.

Now back to spark

sbt.build:
       


libraryDependencies ++= Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.5.2" % "provided",,
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2" % "provided",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3")



Building you spark with protobuf 2.6.1 (comes with 2.4 by default) - that was really disappointing. I had to build it by myself, waste time and replace the server's jars.
       
mvn  -Phadoop-2.6 -Dhadoop.version=2.6.0 -Dprotobuf.version=2.6.1  -DskipTests clean package


I copied Databrick's code to my local project. There are 2 files:
StanfordCoreNLPWrapper.scala
CoreNLP.scala

And in addition we will create our spark streaming object:

imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Column
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.twitter._
import org.apache.spark.{rdd, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.auth.Authorization

and the code:
object TwitterStream {   
 def main( args:Array[String] ):Unit = {
      val consumerKey =""
      val consumerSecret =""
      val accessToken =""
      val accessTokenSecret =""


     object auth{
       val config = new twitter4j.conf.ConfigurationBuilder()
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build
    }


val filters:String=args(0)  //filter for twits
val  filters_array:Array[String]=filters.split(",")val filter:Seq[String]= filters_array

val sparkHome = "/root/spark"
val checkpointDir = "/home/spark/checkpoint/"
val conf = new SparkConf().setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(10))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
val tweets = TwitterUtils.createStream(ssc, atwitter, filter, StorageLevel.DISK_ONLY_2)
val twText = tweets.map(x=>x.getText).map(x=>x.replaceAll("/[^A-Za-z0-9 ]/", "")) 
//I am removing non Alphabetic characters cause the sentiment analysis is not so good with emotions
twText.foreachRDD(rdd=> {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  val fields:Array[String] = args(1).split(",")   //"text.token" for instance, according to the json schema of coreNLP
  val annots:Array[String]=args(2).split(",")  //"tokenize","cleanxml","ssplit" for instance

  var select_fields:List[Column]= List()
  fields.foreach(x=>
             {
               val c:Column=new Column("parsed."+x)
               select_fields=c::select_fields
               }
             )

  val coreNLP = new CoreNLP()
    .setInputCol("text")
    .setAnnotators(annots)
    .setFlattenNestedFields(fields)
    .setOutputCol("parsed")

  val input = rdd.map(k=>(1,k)).toDF("id", "text")
  if(input.count()>0) {

    val parsed = coreNLP.transform(input)
      .select(select_fields:_*)

//printing results!
    parsed.foreach(println)
               }
      }
} )
      ssc.start()
    ssc.awaitTermination()
    }
}
That should work. This is very fun and quite short. 
Run the jar with 3 input parameters: 
twitter filters strings, coreNlp json fields, Annotations strings
I hope that it would help you building twitter stream,
and in addition to do some cool stuff with it. Good luck!

Thursday, December 3, 2015

Creating a standalone HiveMetastore (Not in Hadoop cluster)

When benchmarking Presto database on top of S3 files, I found out that I have to install a Hive metastore instance.

Image result for lonely bee

(A standalone bee)

I didn't need HiveServer, Mapreduce, or Hadoop cluster. So how do you do that?

Here are the steps:


  1. Install Hive Metastore Repository - an instance of one of the dbs that hive metastore works with (MySql, PostgresSql, MsSql, Oracle .. check documentation)
  2. Install Java
  3.  Download Vanilla Hadoop http://hadoop.apache.org/releases.html and unpack on the hive metastore instance  (let's say that you unpacked to /apps/hadoop-2.6.2)
  4. Set environment variables :
    1.  export HADOOP_PREFIX=/apps/hadoop-2.6.2
    2.   export HADOOP_USER_CLASSPATH_FIRST=true
  5. Download hive http://www.apache.org/dyn/closer.cgi/hive/ and upack on you instace
  6. Create a schema (user) for hive user and build the hive schema in the the hive metastore repository db using hive scripts (a sample script for mysql): 
    1.    /apps/apache-hive-1.2.1-bin/scripts/metastore/upgrade/mysql/hive-schema-1.2.0.mysql.sql
  7. configure hive-site.xml with the right parameters for:
    1. ConnectionUrl   (jdbc:mysql://localhost:3666/hive  for example)
    2. ConnectionDriverName 
    3. ConnectionUserName  (the created database user)
    4. ConnectionPassword  (the created database user password)
    5. hive.metastore.warehouse.dir - set it to a local path (file:///home/presto/ for example)
  8. Copy the required jar for jdbc connection to the metastore repository in the hive class path. (Ojdbc6 for oracle, mysql-jdbc-connector for mysql and so on)
  9. Start hive metastore -   /apps/apache-hive-1.2.1-bin/bin/hive --service metastore
  10. For accessing S3:
    1. copy these jars to the classpath:
      1. aws-java-sdk-1.6.6.jar        (http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.6.6)
      2. hadoop-aws-2.6.0.jar    (http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.0/hadoop-aws-2.6.0.jar)
    2.  you can specify these parameters on the hadoop core-site.xml
      1. fs.s3.awsAccessKeyId
      2. fs.s3.awsSecretAccessKey
      3. <property>
           <name>fs.s3n.impl</name>
           <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
        </property>
    3. for secured access to s3, use S3A connection in your URL, 
      1. add fs.s3a.connection.ssl.enabled to haddop_home/etc/hadoop/core-site.xml
      2. you also need to set these parameters for s3 access in the hadoop core-site.xml file:
        1. fs.s3a.secret.key
        2. fs.s3a.access.key
      3. Unfortunately, There is no support currently for temporary S3 credentials



Finally, when running presto, we will use the thrift address and port of the hive metastore service.

If you are running EMR from time to time, you are able to use that external metastore repository according to AWS documentation

That's it. No need for additional Hadoop libraries or settings.
Good luck!

Tuesday, December 1, 2015

Secured ELK (including filebeat)

It is not so difficult to start an ELK topology with an application server that forwards logs to a logstash server and a logstash server that sends logs in the right schema to Elastisearch.

Then, it's also quite straight forward to start an Elasticserach cluster with Kibana for some visualizations.

The problem started for us when we wanted to secure the whole pipeline - from the app server to the client through Logstash, Elasticsearch and Kibana.

We also got to know filebeat tand we almost immediately felt in love.

I will explain here, step by step, the whole installation and deploying process that we have gone through. All the configuration files are attached. Enjoy.

General Architecture:


















  1. Components:
    1. Filebeat  - An evolution of the old forwarder. Light and an easy to use tool for sending data from log files to Logstash.
    2. Logstash - A ready to use tool for sending logs data to Elasticsearch. It supports many other outputs, inputs and manipulations of its input log records. I will show the integration with filebeat as input and Elasticsearch and s3 as output.
    3. Elaticsearch - Our back-end for storing and indexing the logs
    4. Kibana - The visualization tool on top of elasticsearch
  2. Filebeat agent installation (talking with HTTPS to logstash)
    1. As for the project time, the newest version of filebeat (1.0.0 -rc1) had a bug when sending data to logstash. It's automatically creating the "@timestamp" field, which also get created by logstash, and makes it fail. We worked with 1.0.0 -beta4 version (the first one) of filebeat.
    2. configuration file
      1. Take a look at the Logstash as output scope.
      2.  We want to make filebeat trust logstash server. If you signed the certificate that logstash is using with some CA, you can give filebeat  a certificate that is signed by that CA. Configuration - "certificate_authorities"
      3. If you didn't use a sign certificate for the logstash server, you can use the certificate and the public key of logstash server (I'll explain later how to create them) as demonstrated in the configuration file
      4. Keep in mind that we don't use java keystores for that filebeat - logstash connection.
    3. Starting command:   filebeat_location/filebeat -v  -c filebeat_location/filebeat.yml
  3. Logstash (Https communication to Elasticsearch)
    1. Download and unpack
    2. generate certificate:
      1. openssl req -new -text -out server.req
      2. ###When asking for common name, enter the server's ip   : --Common Name (eg, your name or your server's hostname) []:10.184.2.232
      3. openssl rsa -in privkey.pem -out server.key
      4.  rm privkey.pem
      5. sign the req file with the CA certificate
    3. Logstash configuration
      1. The input is configured to get data from filebeat
      2. In the Filer and groke scope:
        1.  we are creating the json documents out of the "message" field that we get from filebeat.
        2. duplicating our "msg" field. We call it  msg_tokenized - that's important for Elasticsearch later on. From one hand, we want to search the logs so we save it as an analyzed field ("msg") but we also want to visualize the logs so we also save the "msg" as an unanalysed field (the whole messages). I will explain later how to set an Elasticsearch template for that. 
        3. creating a new field with the file_name of the source log file
        4. removing the "logtime" field
        5. removing the "message" field
      3. Output scope:
        1.  S3 and Elasticsearch outputs
        2. For ssl between Logstash and Elasticsearch, you can use the CA name in the cacert configuration under elasticsearch output.
        3. If we specify a new name for our index in the logstash elasticsearch output, the index would automatically get created.  We decided to create a new index for each log file and then we created an abstraction layer for all application logs with Kibana with queries on top of couple of indices.
        4. The s3 bucket path is not dynamic currently
    4. Elasticseach (open for  HTTPS only)
      1. Download Elasticsearch, and Shield (shieldlicense)
      2. install the 2 addons for shield:
        1. bin/plugin install file:///path/to/file/license-2.0.0.zip
        2. bin/plugin install file:///path/to/file/shield-2.0.0.zip
      3. Create Certificate for elasticsearch
        1. openssl req -new -text -out server.req
        2. openssl rsa -in privkey.pem -out server.key
        3. rm privkey.pem
        4. sign the req file (organisation ca sign)  and save the signed certificate as signed.cer
        5. Create a java keystore  with the sign ertificate and the private key (2 steps)
          1. openssl pkcs12 -export -name myservercert  -in signed.cer -inkey server.key -out keystore.p12
          2. keytool -importkeystore -destkeystore node01.jks -srckeystore keystore.p12 -srcstoretype pkcs12 -alias myservercert
        6. If you are not using a signing CA you can use that command in roder to create the keystore:      keytool -genkey -alias node01_s -keystore node01.jks -keyalg RSA -keysize 2048 -validity 712 -ext san=ip:10.184.2.238
      4. Configuration File
        1. Take a look at the security shield options. We are using the local java keystore that holds our signed certificate and the private key
      5. Start Elasticsearch: /apps/elasticsearch-2.0.0/bin/elasticsearch
      6. Add user:  elasticsearch/bin/esusers useradd alog -p alog123 -r admin
      7. Add Template for elasticsearch - We want all the indexes that are created by logstash to contain both "msg" field for search and "msg" field for visualization. The template would help us get one analyse field and one that is not.
        1. curl -XPUT   https://alog:alog123@elastic-server:9200/_template/template_1 -d '
          {
              "template" : "alog-*",
              "mappings" : {
                  "_default_" : {
                                          "properties":  {
                                           "msg":{"type":"string", "index" : "not_analyzed"}}
                  }
              }
          }'
        2.  This template will catch for all indices that start with "alog" in their name.
    5. Kibana (HTTPS communication to Elasticsearch,  users logon via HTTPS  )
        1. Download and unpack
        2. Start command: bin/kibana
        3. Configuration
          1. In order to enable https connection of users we create another pair of certificate and key with openssl tool and set the certificate and key in the configuration
          2. We set the  elasticsearch.username and password for Shield authentication
          3. We didn't supply Kibana the Elasticsearch certificate and key becuase we used a signed certificate

And that's it :)
I hope that all of the above would help you go through some of the obstacles that we encountered, whether with the SSL setup, with Groke command or with anything else.

Good Luck !

Sunday, October 18, 2015

Amazon Quick Insight

Amazon QuickSight - a new BI platform from amazon (and some related stuff that jumped to my mind while reading about it)


It was last year that one of the hottest trend for BI was Elasticsearch + Kibana.
A great Database with a great visualization tool on top of it. Pretty close to what Splunk does.

You could hear everywhere things like "you just throw inside all of your data and can query and visualize it in any way you wish".
Managers went after that as they were hypnotized. It was revolutionary. 

But it didn't last long. People started to get that it is cool and nice to get your data sliced and diced so fast and in such beautiful graphs, but in many cases it was only nice to have.

Amazon announced QuickSight last month - https://aws.amazon.com/quicksight/

For me it is a super cheap, easy and elegant way to implement that model of Elasticsearch + Kibana, in case that you are already on AWS.

I don't see that as a revolution, but as another classic case of platform that gets migrated to the cloud, which is quite a lot and enabler for businesses that can't develop these kind of platforms by themselves.

I also have problem with that name. Insights are result of data model, relations, and semantics that you give your data. You can throw the best ingredients to the best cooking pot and boil it, but it won't get tasty by itself. Some ingredients require special treatment, some are only go with others, some are not for boiling at all, but for baking. 

In general, there is no one solution for BI, and that's why platforms as Hadoop and S3 are so important for business. They are actually a supermarket of data, as my friend Evgeni says. Data analysts can then become chefs and cook their own desired and tasty meal, and Amazon quick insight just another recipe.

Still, I believe that customers that are considering moving to AWS, would get a lot from quick sight. It is one of the worthwhile amazon services to get familiar with because it is more than just a db. It is a while solutions that customers might really really like. 

And correct me if I am wrong :)

Wednesday, July 1, 2015

Playing with Apache Nifi

I decided to play a little with apache NiFi.

What is NiFi?

I had some problems with a Flume agent and someone wrote that i should check out NiFi.

I can't tell that it replaces Flume. Neither replacing any other tool. 
According to the official website, it is an easy to use, powerful, and reliable system to process and distribute data.

Well, too general for me. You can read more here.

For me, it is kind of a very configurable streaming engine, with some basic common built in features, together with nice graphical interface that is very helpful for building and monitoring the data process.


From a first look these guys have thought of almost everything - Events Merger before writing to HDFS, success/failure routes, It is easy to build new custom java processors, data provenance interface, an ability to store event on disk for recoverability and have many more configurations and features.

That's how it looks like:

Updated Merged Flow




In my case, I only had to replace 1 flume agent for a specific data source. Flume is probably the best and simplest tool to ingest data into the HDFS, but it might have some difficulties in some cases. One of them is when ingesting large events (more than 100M).

So, I needed to get NiFi getting data from an IBM MQ. It only supports ActiveMq right now so I had to build a new IBMMQ processor.

With a good instructions of developing custom processors (here) I was managed to add my own GetDataFromIbmMq processor (which is much simpler to use than the Flume to JMS source implementation that requires a binding file).  (I hope to upload the source code as soon as possible).

The new data stream is actually working very well. Large events  are coming out of the IbmMq processor, get merged, get transformed to sequence file and get written to hdfs. In addition it is very easy now to send the data anywhere else, or playing with the topology in any way we wish (adding more data sources, more etl processes and more data stores to save the data in). It is also fantastic that we are able to see how many events went through any processor, how many succeeded and how many failed.


The trend for us right now is storing first on hdfs, and it is kind of opposit to NiFi that focuses on stream processing. But still, even for simple use case of getting data, compression and storing, it is very easy to use and enable new capabilities of data monitoring and provenance.


Must read posts regarding NiFi:
https://blogs.apache.org/nifi/entry/basic_dataflow_design 

https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark