Friday, February 26, 2016

SprayJson Scala case classes for Google Maps API response


I have played a little with Spray Json - A lightweight, clean and simple JSON implementation in Scala

Here is an example of how to easily marshalling a google get reponse for some adderss, using Spray Json and case classes that were built for the responding json.

I hope that anyone who are trying to do so, would save a little time by using the ready to use case classes (Maybe it's a good idea to create a central repository of case classes for common used jsons).

That's the 1st result for searching "melbourne australia" in google maps (originally an array of  2 results).:

   "results" : [  
      "address_components" : [  
         "long_name" : "Melbourne",  
         "short_name" : "Melbourne",  
         "types" : [ "colloquial_area", "locality", "political" ]  
         "long_name" : "Victoria",  
         "short_name" : "VIC",  
         "types" : [ "administrative_area_level_1", "political" ]  
         "long_name" : "Australia",  
         "short_name" : "AU",  
         "types" : [ "country", "political" ]  
      "formatted_address" : "Melbourne VIC, Australia",  
      "geometry" : {  
       "bounds" : {  
         "northeast" : {  
          "lat" : -37.4598457,  
          "lng" : 145.76474  
         "southwest" : {  
          "lat" : -38.2607199,  
          "lng" : 144.3944921  
       "location" : {  
         "lat" : -37.814107,  
         "lng" : 144.96328  
       "location_type" : "APPROXIMATE",  
       "viewport" : {  
         "northeast" : {  
          "lat" : -37.4598457,  
          "lng" : 145.76474  
         "southwest" : {  
          "lat" : -38.2607199,  
          "lng" : 144.3944921  
      "partial_match" : true,  
      "place_id" : "ChIJ90260rVG1moRkM2MIXVWBAQ",  
      "types" : [ "colloquial_area", "locality", "political" ]  
   "status" : "OK"  

And that's the Code, including the case classes that are used for parsing the json:

  * Created by gilad on 24/02/16.  
 import spray.json._  
 import DefaultJsonProtocol._  
 object FetchCoordinates extends App{  
  def get(url: String,  
      connectTimeout:Int =5000,  
      readTimeout:Int =5000,  
      requestMethod: String = "GET") = {  
   import{URL, HttpURLConnection}  
   val connection = (new URL(url)).openConnection.asInstanceOf[HttpURLConnection]  
   val inputStream = connection.getInputStream  
   val content = io.Source.fromInputStream(inputStream).mkString  
   if (inputStream != null) inputStream.close  
  try {  
   val address = "melbourneaustralia"  
   val content = get(""+address+"&sensor=true")  
   val contentJs=content.parseJson  
   val googleRes = contentJs.convertTo[GoogleMapResponse]  
   val form_address= googleRes.results(0).formatted_address  
  } catch {  
   case ioe: => // handle this  
   case ste: => // handle this  

Case classes:

 import spray.json._  
 import DefaultJsonProtocol._  
 case class JsonCoords(lat:Long,lng:Long)  
 object JsonCoords { implicit val f = jsonFormat2(JsonCoords.apply)}  
 case class JsonBounds(northeast:JsonCoords,southwest:JsonCoords)  
 object JsonBounds { implicit val f = jsonFormat2(JsonBounds.apply)}  
 case class JsonGeometry(bounds:JsonBounds,location_type:String, viewport: JsonBounds)  
 object JsonGeometry { implicit val f = jsonFormat3(JsonGeometry.apply)}  
 case class JsonAddress(long_name:String,short_name:String,types:Seq[String])  
 object JsonAddress { implicit val f = jsonFormat3(JsonAddress.apply)}  
 case class JsonGoogleResult(address_components:Seq[JsonAddress],formatted_address:String,  
 object JsonGoogleResult { implicit val f = jsonFormat6(JsonGoogleResult.apply)}  
 case class GoogleMapResponse(results:Seq[JsonGoogleResult],status:String)  
 object GoogleMapResponse { implicit val f = jsonFormat2(GoogleMapResponse.apply)}  

Tuesday, February 23, 2016

HTTPS From Java to Logstash

The goal: securely sending data from a server that has access to a rabbitMq based system to a remote server (and then to s3)
How would you do that?
Due to a special protocol when accessing the rabbitMq system, we decided to do that with a custom consuming java code in the Rabbit side and with Logstash listening on 443 for HTTPS posts.

We really wanted to do that with Logstash to Logstash but couldn't due to special requirements.

Why https? We tried TCP over SSL but when a new connection is being established, and then the listener Logstash is going down, we had to work harder in order to re-establish the connection from the java side. And in addition, no response code is being sent back when working over tcp with no http.

We though of using Logstash Lumberjack on the listener side, but then you have to use Logstash with Lumberjack output.

Step #1: Creating certificates (if you don't have any) and import them into the keystore.

On the Logstash box:

  1.  keytool -genkey -keyalg RSA -alias mycert -keystore keystore.jks -storepass 123pass -ext SAN=ip:,ip:  -validity 360 -keysize 2048 
  2. The 2 ips are the ips that we want the java box to trust when it is connecting Logstash. In that case we had to pass via another routing box in the middle.
  3. Export the created certificate for the java consuming client use.
  4. keytool -exportcert -file -alias mysert -keystore keystore.jks 
On the Java side:
  1. Create a truststore with the exported certificate:
    1. keytool -importcert -file client.cer  -alias mycert -keystore truststore.jks
  2. You can  import another certificates if needed

Step #2: Java app

I will focus on the https part with basic authentication.
In order to send data through https you can use the next code that is based on apache http client (it was harder than I expected it would be to find the right way to do that with SSL):


 import org.apache.commons.codec.binary.Base64;  
 import org.apache.http.client.ClientProtocolException;  
 import org.apache.http.client.config.RequestConfig;  
 import org.apache.http.client.methods.CloseableHttpResponse;  
 import org.apache.http.client.methods.HttpPost;  
 import org.apache.http.conn.HttpHostConnectException;  
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;  
 import org.apache.http.conn.ssl.TrustSelfSignedStrategy;  
 import org.apache.http.entity.ByteArrayEntity;  
 import org.apache.http.impl.client.CloseableHttpClient;  
 import org.apache.http.impl.client.HttpClients;  
 import org.apache.http.ssl.SSLContexts;  

Some setups before sending (We setup the apache https client. It also has a basic authentication headers with username and password).

We point the java keystore location that we have created on the java side, in part #1.

   httpclient = HttpClients.createDefault();  

     SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(new File(truststore_location), truststore_password.toCharArray(),  
         new TrustSelfSignedStrategy())  
     // Allow TLSv1 protocol only  
     SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcontext,null, null, SSLConnectionSocketFactory.getDefaultHostnameVerifier());  
     httpclient = HttpClients.custom().setSSLSocketFactory(sslsf).build();  
     final RequestConfig params = RequestConfig.custom().setConnectTimeout(5000).setSocketTimeout(5000).build();  
     httppost = new HttpPost("https://"+logstashUrl);  
     String credentials = this.httpuser+":"+httppassword;  
     byte[] encoding=Base64.encodeBase64(credentials.getBytes());  
     String authStringEnc = new String(encoding);  
     httppost.setHeader("Authorization", "Basic " + authStringEnc);  

Sending chunks of data:

  ByteArrayEntity postDataEntity = new ByteArrayEntity(data.getBytes());  
 //Actual post to remote host  
 CloseableHttpResponse httpResponse = httpclient.execute(httppost);  
 BufferedReader reader = new BufferedReader(new InputStreamReader(httpResponse.getEntity().getContent()));  
 String inputLine;  
 StringBuffer stringResponse = new StringBuffer();  
 while ((inputLine = reader.readLine()) != null) {  

Part #3: Logstash with HTTPS INPUT
That's the easy and well documented part.
A sample of configuration for that kind of logstash:

 input {  
  http {  
   host => "" # default:  
   port => 443   
   keystore_password => "123pass"  
   ssl => true  
   password => "my_basic_auth_pass"  
   user => "my_basic_auth_user"  

Path #5: Logstash output (Epilogstash)

So we needed the data to get to S3 and amazon Kinesis.
Logstash, as the component that gets the data via https gives us lots of flexibility.
It supports S3 out of the box. The problem is that it doesn't support S3 server side encryption. So it wasn't good for us.
There is a github project for Kinesis output. We preferred not using that.

The temporary solution is to wrtie to files with logstash,
and then reading the files with python script and sending to s3 using the aws cli.
Kinesis has a nice agent that is able to consume a directory and to send new lines to kinesis.

That's it. Good luck!

Thursday, February 18, 2016

Too many HDFS blocks!

Don't underestimate the number of blocks on your cluster.
Well we had a feeling that the cluster became heavier and slower since we passed the 700 K blocks per node threshold, but things kept working and we have been waited for the right time to fix it. Until the day the Namenode didn't start up because of JVM pause.
It all started with a human mistake that ended with hdfs restarting. The active namenode and the standby, in an unexpected behavior, were both in standby state and went down after several minutes. Our automatic failover is disabled and the manual failover didn't work out. We tried many combinations of restarting and stopping of the 2 namenodes, but the error log showed that the namenode (the one we wanted to activate) can't tell at what state it is (standby or active). Weird.    
We enabled the automatic failover controller. Then we saw at its logs that it can't figure out the namenode service id ip (the one who should be active). Then we saw that the namenode service id that the failover controller mentioned is different than what we see on the hdfs-site.xml (at dfs.namenode.rpc-address confs) or in the zookeeper's server confs. Weird.
We didn't really know where this id comes from so we tried restarting the zookeeper that is responsible for the hadoop-ha, playing with the zookkeper /hadoop-ha directory,and deploying configurations, but nothing helped. 
We decided to disable high availability. We installed a secondary namenode and started the hdfs. The namenode got started! But then it crushed! We saw this message: Detectedpause in JVM or host machine (eg GC). This is bad.. what should we do? 

We decided to go back to high availability mode. It all went good and at this time, there was an elected active namenode and a standby namenode. I guess the configurations got fixed somehow. The problem was that the jvm pause was still happening. 
Nimrod (, suggested increasing the namenode jvm heap size. We found the configuration on the cloudera manager and saw a warning about a ratio between the number of blocks and the namenode heap size: 1 G per 1 M blocks. We had 6 M blocks, and 4 GB! After increasing the heap to 12 GB, the namenode got started and stayed up. Victory :) 
There are many unsolved questions about what happend. Why couldn’t the namenode determined its state, and why the zookeeper, at the stage we moved to automatic failover, was not able to elect an active namenode. Why did the failover controller tried communicate with wrong namenode service id. We will look into the hdfs core-site.xml that we didn't check during the problems and will read more about the failover process (who elects the active in a manul state, where did the zookeeper took the namenodes ids).
But there is 1 thing we are certain about and it's the potential disaster  of the too many blocks alerts.

A Property graph over Elasticsearch?

I would like to share with you a very interesting architectural argument that is happening on my company.

It all started when Datastax acquired Titan (by thinkaurelius). Titan is a graph database that sits on top of Cassandra, Hbase or berkeley db.

Till then, we used Titan as our property graph that allowed us integrate lots of entities with a schema that kept changing. We were able to index edges and vertices properties in order to get them by filtering on one of their properties, and in addition we could travel from the vertex to its neighbors in a O(1) time.
 We didn't really need to ask graph oriented questions such as finding central vertices.

We liked titan for being scalable and even though we used Hbase, we were happy with it.

So, what have been changed ? 

We had lots of plans to use Titan in many upcoming projects, and the Datastax announcement brought us to realize that Titan will stay behind and won't get developed from now on. 
As an enterprise, that wished titan to get to its stable version and offer support, we ere very disappointed.

In the meanwhile, some guys started playing with Elasticsearch as a property graph. they even implemented a Thinkerpop API for Elasticsearch - and after some benchmarks we found out that the performances of the Elasticsearch were significantly better than the Titan performances on many use cases.

Oh, and i didn't mention that we also need some text search capabilities, that are part of Elasticsearch, but when using Titan,  it is only possible with an Elasticsearch cluster that gets its data from Titan, without having the possibility to configure the Elasticsearch index in a custom and more optimized ways.

A very interesting argument started on a the mail.

Some said that we should keep using Titan cause it is working, and use Elasticsearch only as the Titan's text search store. Datastax announced that it would be simple to move from titan to their property graph and when reaching this bridge, we would just move to Datastax DSE grpah db. we are not talking about other graph dbs cause we are not familiar with a scalable and stable graph db that is better than Titan (OrientDB for instance).

Other said that we must try Elasticsearch as our property graph, and keep develop Thinkerpop above it (mostly for being able to write in Gremlin). It is worth trying cause we had good benchmarks and it is better to use one back-end than two (Titan and Elastic).

On the other hand, there were voices that eliminated Elasticsearch and were very upset with the idea of threat it as a property graph. They said that in its nature it won't be able to support the type of indexing, masses and continues online writes that our property graph requires "Elasticsearch is not a Back End!". And in addition, why should we take care of a new thnikerpop implementation.

For summary I would say that property graph (RDF also, but that's for another talk) is a great data model for data that keep changing and help us make the data available and findable while avoiding hundreds of relational tables. 

I think that while having a working Titan property graph cluster, we should keep using it in other projects mainly because of the  experience with it. I don't think that having 2 back ends (Titan and  elastic) for 1 app is wrong, while using each back properly.

Elasticsearch has gone through a long way since its beginning. Many databases are now offering more than only "document store" or "key-value store" even if that is their nature. You can find Quora questions about [Why should I NOT use ElasticSearch as my primary datastore]. That is a good question that scientifically might be answered as a big NO but down there on the tech teams, it might work.

So, in case The Titan + Elastic won't work for us, trying Elastic is probably the next thing to do.

Monday, February 15, 2016

Sentiment Analysis with Spark streaming (Twitter Stream) and databricks/spark-coreNLP

I want to share a piece of code that I a have written, not long ago, and that might be good base for a nice Sentiment Analysis with spark streaming.

The example is going to analyse twits stream, despite the fact that the CoreNlp was trained with movies reviews, what makes that sentiment analysis less accurate.

You can train the algorithm by yourself but that's for another post :)

So - spark 1.5.2, scala 2.10, CoreNLP 3.6.0, protobuf-java 2.6.1, java 1.8

Fortunately, coreNlp is available as dependency since Jan 16,

Databricks spark core nlp wrapper - working with coreNLP 3.6:

The guys from data bricks are using SparkSql in order to wrap coreNlp. Interesting.
Take a look at the git hub page of the project, in order to understand a bit more.

For twitter:
you must log in to  twitter public streams

you need to create your own application and get the id and access tokens.

Now back to spark

libraryDependencies ++= Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"" % "protobuf-java" % "2.6.1",
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.5.2" % "provided",,
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2" % "provided",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3")

Building you spark with protobuf 2.6.1 (comes with 2.4 by default) - that was really disappointing. I had to build it by myself, waste time and replace the server's jars.
mvn  -Phadoop-2.6 -Dhadoop.version=2.6.0 -Dprotobuf.version=2.6.1  -DskipTests clean package

I copied Databrick's code to my local project. There are 2 files:

And in addition we will create our spark streaming object:


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Column
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.twitter._
import org.apache.spark.{rdd, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.auth.Authorization

and the code:
object TwitterStream {   
 def main( args:Array[String] ):Unit = {
      val consumerKey =""
      val consumerSecret =""
      val accessToken =""
      val accessTokenSecret =""

     object auth{
       val config = new twitter4j.conf.ConfigurationBuilder()

val filters:String=args(0)  //filter for twits
val  filters_array:Array[String]=filters.split(",")val filter:Seq[String]= filters_array

val sparkHome = "/root/spark"
val checkpointDir = "/home/spark/checkpoint/"
val conf = new SparkConf().setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(10))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
val tweets = TwitterUtils.createStream(ssc, atwitter, filter, StorageLevel.DISK_ONLY_2)
val twText =>x.getText).map(x=>x.replaceAll("/[^A-Za-z0-9 ]/", "")) 
//I am removing non Alphabetic characters cause the sentiment analysis is not so good with emotions
twText.foreachRDD(rdd=> {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  val fields:Array[String] = args(1).split(",")   //"text.token" for instance, according to the json schema of coreNLP
  val annots:Array[String]=args(2).split(",")  //"tokenize","cleanxml","ssplit" for instance

  var select_fields:List[Column]= List()
               val c:Column=new Column("parsed."+x)

  val coreNLP = new CoreNLP()

  val input =>(1,k)).toDF("id", "text")
  if(input.count()>0) {

    val parsed = coreNLP.transform(input)

//printing results!
} )
That should work. This is very fun and quite short. 
Run the jar with 3 input parameters: 
twitter filters strings, coreNlp json fields, Annotations strings
I hope that it would help you building twitter stream,
and in addition to do some cool stuff with it. Good luck!