Monday, February 15, 2016

Sentiment Analysis with Spark streaming (Twitter Stream) and databricks/spark-coreNLP

Hi
I want to share a piece of code that I a have written, not long ago, and that might be good base for a nice Sentiment Analysis with spark streaming.

The example is going to analyse twits stream, despite the fact that the CoreNlp was trained with movies reviews, what makes that sentiment analysis less accurate.

You can train the algorithm by yourself but that's for another post :)

So - spark 1.5.2, scala 2.10, CoreNLP 3.6.0, protobuf-java 2.6.1, java 1.8

Fortunately, coreNlp is available as dependency since Jan 16,

Databricks spark core nlp wrapper - working with coreNLP 3.6:

The guys from data bricks are using SparkSql in order to wrap coreNlp. Interesting.
Take a look at the git hub page of the project, in order to understand a bit more.

For twitter:
you must log in to  twitter public streams
https://dev.twitter.com/streaming/public

you need to create your own application and get the id and access tokens.

Now back to spark

sbt.build:
       


libraryDependencies ++= Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.5.2" % "provided",,
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2" % "provided",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3")



Building you spark with protobuf 2.6.1 (comes with 2.4 by default) - that was really disappointing. I had to build it by myself, waste time and replace the server's jars.
       
mvn  -Phadoop-2.6 -Dhadoop.version=2.6.0 -Dprotobuf.version=2.6.1  -DskipTests clean package


I copied Databrick's code to my local project. There are 2 files:
StanfordCoreNLPWrapper.scala
CoreNLP.scala

And in addition we will create our spark streaming object:

imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Column
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.twitter._
import org.apache.spark.{rdd, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.auth.Authorization

and the code:
object TwitterStream {   
 def main( args:Array[String] ):Unit = {
      val consumerKey =""
      val consumerSecret =""
      val accessToken =""
      val accessTokenSecret =""


     object auth{
       val config = new twitter4j.conf.ConfigurationBuilder()
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build
    }


val filters:String=args(0)  //filter for twits
val  filters_array:Array[String]=filters.split(",")val filter:Seq[String]= filters_array

val sparkHome = "/root/spark"
val checkpointDir = "/home/spark/checkpoint/"
val conf = new SparkConf().setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(10))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
val tweets = TwitterUtils.createStream(ssc, atwitter, filter, StorageLevel.DISK_ONLY_2)
val twText = tweets.map(x=>x.getText).map(x=>x.replaceAll("/[^A-Za-z0-9 ]/", "")) 
//I am removing non Alphabetic characters cause the sentiment analysis is not so good with emotions
twText.foreachRDD(rdd=> {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  val fields:Array[String] = args(1).split(",")   //"text.token" for instance, according to the json schema of coreNLP
  val annots:Array[String]=args(2).split(",")  //"tokenize","cleanxml","ssplit" for instance

  var select_fields:List[Column]= List()
  fields.foreach(x=>
             {
               val c:Column=new Column("parsed."+x)
               select_fields=c::select_fields
               }
             )

  val coreNLP = new CoreNLP()
    .setInputCol("text")
    .setAnnotators(annots)
    .setFlattenNestedFields(fields)
    .setOutputCol("parsed")

  val input = rdd.map(k=>(1,k)).toDF("id", "text")
  if(input.count()>0) {

    val parsed = coreNLP.transform(input)
      .select(select_fields:_*)

//printing results!
    parsed.foreach(println)
               }
      }
} )
      ssc.start()
    ssc.awaitTermination()
    }
}
That should work. This is very fun and quite short. 
Run the jar with 3 input parameters: 
twitter filters strings, coreNlp json fields, Annotations strings
I hope that it would help you building twitter stream,
and in addition to do some cool stuff with it. Good luck!

Thursday, December 3, 2015

Creating a standalone HiveMetastore (Not in Hadoop cluster)

When benchmarking Presto database on top of S3 files, I found out that I have to install a Hive metastore instance.

Image result for lonely bee

(A standalone bee)

I didn't need HiveServer, Mapreduce, or Hadoop cluster. So how do you do that?

Here are the steps:


  1. Install Hive Metastore Repository - an instance of one of the dbs that hive metastore works with (MySql, PostgresSql, MsSql, Oracle .. check documentation)
  2. Install Java
  3.  Download Vanilla Hadoop http://hadoop.apache.org/releases.html and unpack on the hive metastore instance  (let's say that you unpacked to /apps/hadoop-2.6.2)
  4. Set environment variables :
    1.  export HADOOP_PREFIX=/apps/hadoop-2.6.2
    2.   export HADOOP_USER_CLASSPATH_FIRST=true
  5. Download hive http://www.apache.org/dyn/closer.cgi/hive/ and upack on you instace
  6. Create a schema (user) for hive user and build the hive schema in the the hive metastore repository db using hive scripts (a sample script for mysql): 
    1.    /apps/apache-hive-1.2.1-bin/scripts/metastore/upgrade/mysql/hive-schema-1.2.0.mysql.sql
  7. configure hive-site.xml with the right parameters for:
    1. ConnectionUrl   (jdbc:mysql://localhost:3666/hive  for example)
    2. ConnectionDriverName 
    3. ConnectionUserName  (the created database user)
    4. ConnectionPassword  (the created database user password)
    5. hive.metastore.warehouse.dir - set it to a local path (file:///home/presto/ for example)
  8. Copy the required jar for jdbc connection to the metastore repository in the hive class path. (Ojdbc6 for oracle, mysql-jdbc-connector for mysql and so on)
  9. Start hive metastore -   /apps/apache-hive-1.2.1-bin/bin/hive --service metastore
  10. For accessing S3:
    1. copy these jars to the classpath:
      1. aws-java-sdk-1.6.6.jar        (http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.6.6)
      2. hadoop-aws-2.6.0.jar    (http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.0/hadoop-aws-2.6.0.jar)
    2.  you can specify these parameters on the hadoop core-site.xml
      1. fs.s3.awsAccessKeyId
      2. fs.s3.awsSecretAccessKey
      3. <property>
           <name>fs.s3n.impl</name>
           <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
        </property>
    3. for secured access to s3, use S3A connection in your URL, 
      1. add fs.s3a.connection.ssl.enabled to haddop_home/etc/hadoop/core-site.xml
      2. you also need to set these parameters for s3 access in the hadoop core-site.xml file:
        1. fs.s3a.secret.key
        2. fs.s3a.access.key
      3. Unfortunately, There is no support currently for temporary S3 credentials



Finally, when running presto, we will use the thrift address and port of the hive metastore service.

If you are running EMR from time to time, you are able to use that external metastore repository according to AWS documentation

That's it. No need for additional Hadoop libraries or settings.
Good luck!

Tuesday, December 1, 2015

Secured ELK (including filebeat)

It is not so difficult to start an ELK topology with an application server that forwards logs to a logstash server and a logstash server that sends logs in the right schema to Elastisearch.

Then, it's also quite straight forward to start an Elasticserach cluster with Kibana for some visualizations.

The problem started for us when we wanted to secure the whole pipeline - from the app server to the client through Logstash, Elasticsearch and Kibana.

We also got to know filebeat tand we almost immediately felt in love.

I will explain here, step by step, the whole installation and deploying process that we have gone through. All the configuration files are attached. Enjoy.

General Architecture:


















  1. Components:
    1. Filebeat  - An evolution of the old forwarder. Light and an easy to use tool for sending data from log files to Logstash.
    2. Logstash - A ready to use tool for sending logs data to Elasticsearch. It supports many other outputs, inputs and manipulations of its input log records. I will show the integration with filebeat as input and Elasticsearch and s3 as output.
    3. Elaticsearch - Our back-end for storing and indexing the logs
    4. Kibana - The visualization tool on top of elasticsearch
  2. Filebeat agent installation (talking with HTTPS to logstash)
    1. As for the project time, the newest version of filebeat (1.0.0 -rc1) had a bug when sending data to logstash. It's automatically creating the "@timestamp" field, which also get created by logstash, and makes it fail. We worked with 1.0.0 -beta4 version (the first one) of filebeat.
    2. configuration file
      1. Take a look at the Logstash as output scope.
      2.  We want to make filebeat trust logstash server. If you signed the certificate that logstash is using with some CA, you can give filebeat  a certificate that is signed by that CA. Configuration - "certificate_authorities"
      3. If you didn't use a sign certificate for the logstash server, you can use the certificate and the public key of logstash server (I'll explain later how to create them) as demonstrated in the configuration file
      4. Keep in mind that we don't use java keystores for that filebeat - logstash connection.
    3. Starting command:   filebeat_location/filebeat -v  -c filebeat_location/filebeat.yml
  3. Logstash (Https communication to Elasticsearch)
    1. Download and unpack
    2. generate certificate:
      1. openssl req -new -text -out server.req
      2. ###When asking for common name, enter the server's ip   : --Common Name (eg, your name or your server's hostname) []:10.184.2.232
      3. openssl rsa -in privkey.pem -out server.key
      4.  rm privkey.pem
      5. sign the req file with the CA certificate
    3. Logstash configuration
      1. The input is configured to get data from filebeat
      2. In the Filer and groke scope:
        1.  we are creating the json documents out of the "message" field that we get from filebeat.
        2. duplicating our "msg" field. We call it  msg_tokenized - that's important for Elasticsearch later on. From one hand, we want to search the logs so we save it as an analyzed field ("msg") but we also want to visualize the logs so we also save the "msg" as an unanalysed field (the whole messages). I will explain later how to set an Elasticsearch template for that. 
        3. creating a new field with the file_name of the source log file
        4. removing the "logtime" field
        5. removing the "message" field
      3. Output scope:
        1.  S3 and Elasticsearch outputs
        2. For ssl between Logstash and Elasticsearch, you can use the CA name in the cacert configuration under elasticsearch output.
        3. If we specify a new name for our index in the logstash elasticsearch output, the index would automatically get created.  We decided to create a new index for each log file and then we created an abstraction layer for all application logs with Kibana with queries on top of couple of indices.
        4. The s3 bucket path is not dynamic currently
    4. Elasticseach (open for  HTTPS only)
      1. Download Elasticsearch, and Shield (shieldlicense)
      2. install the 2 addons for shield:
        1. bin/plugin install file:///path/to/file/license-2.0.0.zip
        2. bin/plugin install file:///path/to/file/shield-2.0.0.zip
      3. Create Certificate for elasticsearch
        1. openssl req -new -text -out server.req
        2. openssl rsa -in privkey.pem -out server.key
        3. rm privkey.pem
        4. sign the req file (organisation ca sign)  and save the signed certificate as signed.cer
        5. Create a java keystore  with the sign ertificate and the private key (2 steps)
          1. openssl pkcs12 -export -name myservercert  -in signed.cer -inkey server.key -out keystore.p12
          2. keytool -importkeystore -destkeystore node01.jks -srckeystore keystore.p12 -srcstoretype pkcs12 -alias myservercert
        6. If you are not using a signing CA you can use that command in roder to create the keystore:      keytool -genkey -alias node01_s -keystore node01.jks -keyalg RSA -keysize 2048 -validity 712 -ext san=ip:10.184.2.238
      4. Configuration File
        1. Take a look at the security shield options. We are using the local java keystore that holds our signed certificate and the private key
      5. Start Elasticsearch: /apps/elasticsearch-2.0.0/bin/elasticsearch
      6. Add user:  elasticsearch/bin/esusers useradd alog -p alog123 -r admin
      7. Add Template for elasticsearch - We want all the indexes that are created by logstash to contain both "msg" field for search and "msg" field for visualization. The template would help us get one analyse field and one that is not.
        1. curl -XPUT   https://alog:alog123@elastic-server:9200/_template/template_1 -d '
          {
              "template" : "alog-*",
              "mappings" : {
                  "_default_" : {
                                          "properties":  {
                                           "msg":{"type":"string", "index" : "not_analyzed"}}
                  }
              }
          }'
        2.  This template will catch for all indices that start with "alog" in their name.
    5. Kibana (HTTPS communication to Elasticsearch,  users logon via HTTPS  )
        1. Download and unpack
        2. Start command: bin/kibana
        3. Configuration
          1. In order to enable https connection of users we create another pair of certificate and key with openssl tool and set the certificate and key in the configuration
          2. We set the  elasticsearch.username and password for Shield authentication
          3. We didn't supply Kibana the Elasticsearch certificate and key becuase we used a signed certificate

And that's it :)
I hope that all of the above would help you go through some of the obstacles that we encountered, whether with the SSL setup, with Groke command or with anything else.

Good Luck !

Sunday, October 18, 2015

Amazon Quick Insight

Amazon QuickSight - a new BI platform from amazon (and some related stuff that jumped to my mind while reading about it)


It was last year that one of the hottest trend for BI was Elasticsearch + Kibana.
A great Database with a great visualization tool on top of it. Pretty close to what Splunk does.

You could hear everywhere things like "you just throw inside all of your data and can query and visualize it in any way you wish".
Managers went after that as they were hypnotized. It was revolutionary. 

But it didn't last long. People started to get that it is cool and nice to get your data sliced and diced so fast and in such beautiful graphs, but in many cases it was only nice to have.

Amazon announced QuickSight last month - https://aws.amazon.com/quicksight/

For me it is a super cheap, easy and elegant way to implement that model of Elasticsearch + Kibana, in case that you are already on AWS.

I don't see that as a revolution, but as another classic case of platform that gets migrated to the cloud, which is quite a lot and enabler for businesses that can't develop these kind of platforms by themselves.

I also have problem with that name. Insights are result of data model, relations, and semantics that you give your data. You can throw the best ingredients to the best cooking pot and boil it, but it won't get tasty by itself. Some ingredients require special treatment, some are only go with others, some are not for boiling at all, but for baking. 

In general, there is no one solution for BI, and that's why platforms as Hadoop and S3 are so important for business. They are actually a supermarket of data, as my friend Evgeni says. Data analysts can then become chefs and cook their own desired and tasty meal, and Amazon quick insight just another recipe.

Still, I believe that customers that are considering moving to AWS, would get a lot from quick sight. It is one of the worthwhile amazon services to get familiar with because it is more than just a db. It is a while solutions that customers might really really like. 

And correct me if I am wrong :)

Wednesday, July 1, 2015

Playing with Apache Nifi

I decided to play a little with apache NiFi.

What is NiFi?

I had some problems with a Flume agent and someone wrote that i should check out NiFi.

I can't tell that it replaces Flume. Neither replacing any other tool. 
According to the official website, it is an easy to use, powerful, and reliable system to process and distribute data.

Well, too general for me. You can read more here.

For me, it is kind of a very configurable streaming engine, with some basic common built in features, together with nice graphical interface that is very helpful for building and monitoring the data process.


From a first look these guys have thought of almost everything - Events Merger before writing to HDFS, success/failure routes, It is easy to build new custom java processors, data provenance interface, an ability to store event on disk for recoverability and have many more configurations and features.

That's how it looks like:

Updated Merged Flow




In my case, I only had to replace 1 flume agent for a specific data source. Flume is probably the best and simplest tool to ingest data into the HDFS, but it might have some difficulties in some cases. One of them is when ingesting large events (more than 100M).

So, I needed to get NiFi getting data from an IBM MQ. It only supports ActiveMq right now so I had to build a new IBMMQ processor.

With a good instructions of developing custom processors (here) I was managed to add my own GetDataFromIbmMq processor (which is much simpler to use than the Flume to JMS source implementation that requires a binding file).  (I hope to upload the source code as soon as possible).

The new data stream is actually working very well. Large events  are coming out of the IbmMq processor, get merged, get transformed to sequence file and get written to hdfs. In addition it is very easy now to send the data anywhere else, or playing with the topology in any way we wish (adding more data sources, more etl processes and more data stores to save the data in). It is also fantastic that we are able to see how many events went through any processor, how many succeeded and how many failed.


The trend for us right now is storing first on hdfs, and it is kind of opposit to NiFi that focuses on stream processing. But still, even for simple use case of getting data, compression and storing, it is very easy to use and enable new capabilities of data monitoring and provenance.


Must read posts regarding NiFi:
https://blogs.apache.org/nifi/entry/basic_dataflow_design 

https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark


Saturday, June 6, 2015

Oozie Hive2 step from Hue's Oozie's workflow editor


Here is a warm recommendation for moving to hive2 step when running Hive from Oozie and a way to that even when using Hue workflows editor.



The developer wanted to schedule a complex hive query on our cdh 5.3.3 cluster (join over literal views over json serde table ) with oozie Hive step. The query worked when running from regular hive client. The step failed on Oozie due to log4j permissions exception, before it started the hive query mapreduce. Not so indicative.


After 2 days of debugging (Other queries have worked) we decided to try the Hive 2 step (It happened before that the basic hive step didn't work while Hive2 step did work).
I remember that when upgrading to cdh 5.2, Cloudera wrote that it is recommended to migrate to hive2 step (the link), but didn't write anything about specific functionality that won't work with regular hive step.

We wrote a testing workflow xml with hive2 step, ran it with Oozie CLI , and it worked! That was great and made sense because hive2 is acting just as a regular hive client that connects the hive server. That's the natural thing to do, and I don't understand exactly how the basic hive step is working.

Unfortunately, The Hue's Oozie workflows editor doesn't support Hive2 step. That's why manny people probably aren't familiar with this step.

That's too bad cause we didn't want to force the developer writing and maintaining the Oozie workflow xml without having a convenience GUI or API  (who wants to edit xml files?)   (There is an api that someone from my organization had built but it supports only FS step, pig step and hive step  pyoozie ).

The last resort was using a generic step from Hue. We copied the hive2 step block from the XML to the generic step text box on the workflow editor on hue, and it worked! Victory :)

So remember to prefer Hive2 step with Generic step rather than the classic Oozie Hive step that is full of bugs and doesn't work right (not via hive-server).  In addition, you can try the pyoozie, that make it easier to create Oozie workflows from code.









Thursday, June 4, 2015

Just for fun - my new Surprising alarm clock app

I can't think of better place than my own blog to officially announce my new android application - a Surprising Alarm Clock.



It won't take humanity to a better place, but it is fun and i love it :)

The idea is to take a regular alarm clock and add a surprise feature. By surprise i mean choosing x surprise minutes so that the alarm would wake up somewhere between x minutes before the alarm and the alarm itself.

It is good for people who don't have an exact time to wake up at, and find it amusing to find out what is the actual time that the alarm picked.

That's the link to the Google play store



I believe that the feature would defiantly become a built in feature in any alarm clock, just like the Snooze button. In that case, i should start writing the SDK for alarm clock surprise :)