Monday, February 15, 2016

Sentiment Analysis with Spark streaming (Twitter Stream) and databricks/spark-coreNLP

Hi
I want to share a piece of code that I a have written, not long ago, and that might be good base for a nice Sentiment Analysis with spark streaming.

The example is going to analyse twits stream, despite the fact that the CoreNlp was trained with movies reviews, what makes that sentiment analysis less accurate.

You can train the algorithm by yourself but that's for another post :)

So - spark 1.5.2, scala 2.10, CoreNLP 3.6.0, protobuf-java 2.6.1, java 1.8

Fortunately, coreNlp is available as dependency since Jan 16,

Databricks spark core nlp wrapper - working with coreNLP 3.6:

The guys from data bricks are using SparkSql in order to wrap coreNlp. Interesting.
Take a look at the git hub page of the project, in order to understand a bit more.

For twitter:
you must log in to  twitter public streams
https://dev.twitter.com/streaming/public

you need to create your own application and get the id and access tokens.

Now back to spark

sbt.build:
       


libraryDependencies ++= Seq(
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"org.apache.spark" %% "spark-core" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.5.2" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.5.2" % "provided",,
"org.apache.spark" % "spark-mllib_2.10" % "1.5.2" % "provided",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3")



Building you spark with protobuf 2.6.1 (comes with 2.4 by default) - that was really disappointing. I had to build it by myself, waste time and replace the server's jars.
       
mvn  -Phadoop-2.6 -Dhadoop.version=2.6.0 -Dprotobuf.version=2.6.1  -DskipTests clean package


I copied Databrick's code to my local project. There are 2 files:
StanfordCoreNLPWrapper.scala
CoreNLP.scala

And in addition we will create our spark streaming object:

imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Column
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.twitter._
import org.apache.spark.{rdd, SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import twitter4j.TwitterFactory
import twitter4j.auth.Authorization

and the code:
object TwitterStream {   
 def main( args:Array[String] ):Unit = {
      val consumerKey =""
      val consumerSecret =""
      val accessToken =""
      val accessTokenSecret =""


     object auth{
       val config = new twitter4j.conf.ConfigurationBuilder()
        .setOAuthConsumerKey(consumerKey)
        .setOAuthConsumerSecret(consumerSecret)
        .setOAuthAccessToken(accessToken)
        .setOAuthAccessTokenSecret(accessTokenSecret)
        .build
    }


val filters:String=args(0)  //filter for twits
val  filters_array:Array[String]=filters.split(",")val filter:Seq[String]= filters_array

val sparkHome = "/root/spark"
val checkpointDir = "/home/spark/checkpoint/"
val conf = new SparkConf().setAppName("Tutorial")
val ssc = new StreamingContext(conf, Seconds(10))
val twitter_auth = new TwitterFactory(auth.config)
val a = new twitter4j.auth.OAuthAuthorization(auth.config)
val atwitter : Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())
val tweets = TwitterUtils.createStream(ssc, atwitter, filter, StorageLevel.DISK_ONLY_2)
val twText = tweets.map(x=>x.getText).map(x=>x.replaceAll("/[^A-Za-z0-9 ]/", "")) 
//I am removing non Alphabetic characters cause the sentiment analysis is not so good with emotions
twText.foreachRDD(rdd=> {
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  val fields:Array[String] = args(1).split(",")   //"text.token" for instance, according to the json schema of coreNLP
  val annots:Array[String]=args(2).split(",")  //"tokenize","cleanxml","ssplit" for instance

  var select_fields:List[Column]= List()
  fields.foreach(x=>
             {
               val c:Column=new Column("parsed."+x)
               select_fields=c::select_fields
               }
             )

  val coreNLP = new CoreNLP()
    .setInputCol("text")
    .setAnnotators(annots)
    .setFlattenNestedFields(fields)
    .setOutputCol("parsed")

  val input = rdd.map(k=>(1,k)).toDF("id", "text")
  if(input.count()>0) {

    val parsed = coreNLP.transform(input)
      .select(select_fields:_*)

//printing results!
    parsed.foreach(println)
               }
      }
} )
      ssc.start()
    ssc.awaitTermination()
    }
}
That should work. This is very fun and quite short. 
Run the jar with 3 input parameters: 
twitter filters strings, coreNlp json fields, Annotations strings
I hope that it would help you building twitter stream,
and in addition to do some cool stuff with it. Good luck!

No comments:

Post a Comment