Monday, February 15, 2016

Sentiment Analysis with Spark streaming (Twitter Stream) and databricks/spark-coreNLP

Hi
I want to share a piece of code that I a have written, not long ago, and that might be good base for a nice Sentiment Analysis with spark streaming.

The example is going to analyse twits stream, despite the fact that the CoreNlp was trained with movies reviews, what makes that sentiment analysis less accurate.

You can train the algorithm by yourself but that's for another post :)

So - spark 1.5.2, scala 2.10, CoreNLP 3.6.0, protobuf-java 2.6.1, java 1.8

Fortunately, coreNlp is available as dependency since Jan 16,

Databricks spark core nlp wrapper - working with coreNLP 3.6:

The guys from data bricks are using SparkSql in order to wrap coreNlp. Interesting.
Take a look at the git hub page of the project, in order to understand a bit more.

For twitter:
you must log in to  twitter public streams
https://dev.twitter.com/streaming/public

you need to create your own application and get the id and access tokens.

Now back to spark

sbt.build:
       


libraryDependencies ++= Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.5.2" % "provided",,
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2" % "provided",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3")



Building you spark with protobuf 2.6.1 (comes with 2.4 by default) - that was really disappointing. I had to build it by myself, waste time and replace the server's jars.
       
mvn  -Phadoop-2.6 -Dhadoop.version=2.6.0 -Dprotobuf.version=2.6.1  -DskipTests clean package


I copied Databrick's code to my local project. There are 2 files:
StanfordCoreNLPWrapper.scala
CoreNLP.scala

And in addition we will create our spark streaming object:

imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Column
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.twitter._
import org.apache.spark.{rdd, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.auth.Authorization

and the code:
object TwitterStream {   
 def main( args:Array[String] ):Unit = {
      val consumerKey =""
      val consumerSecret =""
      val accessToken =""
      val accessTokenSecret =""


     object auth{
       val config = new twitter4j.conf.ConfigurationBuilder()
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build
    }


val filters:String=args(0)  //filter for twits
val  filters_array:Array[String]=filters.split(",")val filter:Seq[String]= filters_array

val sparkHome = "/root/spark"
val checkpointDir = "/home/spark/checkpoint/"
val conf = new SparkConf().setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(10))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
val tweets = TwitterUtils.createStream(ssc, atwitter, filter, StorageLevel.DISK_ONLY_2)
val twText = tweets.map(x=>x.getText).map(x=>x.replaceAll("/[^A-Za-z0-9 ]/", "")) 
//I am removing non Alphabetic characters cause the sentiment analysis is not so good with emotions
twText.foreachRDD(rdd=> {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  val fields:Array[String] = args(1).split(",")   //"text.token" for instance, according to the json schema of coreNLP
  val annots:Array[String]=args(2).split(",")  //"tokenize","cleanxml","ssplit" for instance

  var select_fields:List[Column]= List()
  fields.foreach(x=>
             {
               val c:Column=new Column("parsed."+x)
               select_fields=c::select_fields
               }
             )

  val coreNLP = new CoreNLP()
    .setInputCol("text")
    .setAnnotators(annots)
    .setFlattenNestedFields(fields)
    .setOutputCol("parsed")

  val input = rdd.map(k=>(1,k)).toDF("id", "text")
  if(input.count()>0) {

    val parsed = coreNLP.transform(input)
      .select(select_fields:_*)

//printing results!
    parsed.foreach(println)
               }
      }
} )
      ssc.start()
    ssc.awaitTermination()
    }
}
That should work. This is very fun and quite short. 
Run the jar with 3 input parameters: 
twitter filters strings, coreNlp json fields, Annotations strings
I hope that it would help you building twitter stream,
and in addition to do some cool stuff with it. Good luck!

2 comments:

  1. Helpful but incomplete example could you please help me with the input files for reference ?

    Also I am unable to locate the CoreNLP.scala from the link given. Hope this is same for everyone.

    ReplyDelete
  2. I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual) instructor lead live training in Apache Spark TECHNOLOGY , kindly contact us http://www.maxmunus.com/contact
    MaxMunus Offer World Class Virtual Instructor-led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ pieces of training in India, USA, UK, Australia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
    For Demo Contact us.
    Pratik Shekhar
    MaxMunus
    E-mail: pratik@maxmunus.com
    Ph:(0) +91 9066268701
    http://www.maxmunus.com/

    ReplyDelete