Tuesday, May 22, 2018

Spark dataframe json schema misinferring - String typed column instead of struct

All you wanted is to load some complex json files into a dataframe, 
and use sql with [lateral view explode] function to parse the json.

Sounds like the basics of SparkSql.

A problem can arise when one of the inner fields of the json, 
has undesired non-json values in some of the records. 

For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct. 

As a result, our schema would look like:

 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: string (nullable = true)

Instead of 

 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)

When trying to explode a "string" type, we will get a miss type error:

org.apache.spark.sql.AnalysisException: Can't extract value from requestBody#10

How can we remove the non-json values and still, get the correct schema in the dataframe?

Removing the non-jsons values, using string filtering with SparkSql on requestBody column, 
will clean the data, but won't change the type of the column, and we will still not be able to use json functions on the column.

In order to clean the data and then, getting the right schema, 
we should load the dataset into a RDD, filtering out bad rows, and creating a dataframe out of the clean RDD:

Good solution:  RDD -> Cleansing -> Dataframe ( using spark.read.json(cleanRDD)

A bad solution would be to load the data as a dataframe. The requestBody column will be set as a String. Now we can filter out bad records, and store the dataframe back to disk.

At that point, the value of the string typed requestBody will be encapsulated with quotes, and any future effort to load the "fixed" dataset into a dataframe, would set this column's type to be string.
With that, we will have to reload the "fixed" data into a RDD, and cleaning out the encapsulated string, using replace command:

val fixedRDD=badRDD.map(_.replace("\\\"","\"")).map(_.replace("\"{","{")).map(_.replace("}\"","}"))

From that point we can use spark.read.json to get a valid dataframe, and querying it with explode function!

Good luck
and don't forget that sometimes, RDD is the right way to clean the data, before getting it ready and clean for higher level APIs, such as dataframes.

Saturday, January 27, 2018

SolrCloud - suggestions for leaders rebalancing

This is a useful and simple bash script that helps rebalancing the cluster leaders, in case of having more than one leader of the same collection, hosted on the same node.

Hosting more than 1 leader of the same collection on the same node (2 different shards leaders) is not a good practice, as it is not helping distributing the writes load for the collection.

The script would suggest the next action -> move from node A to B, the leader of shard X, of collection Y. What it actually means is -> add node B to shard X, and remove node A from shard X of collection Y.  It would ask you to re-run the script afterwards, and get the next required actions, based on the new solr cluster status.

If it doesn't find any collection that has more than 1 leader hosted on the same node, it won't suggest anything.

In Solr 7, new rules for cluster auto scaling and management were added, and it is worth checking them as well.

The script contains many useful commands for playing and parsing the "clusterstatus" output.
You might add suggestion for general rebalancing of the cluster, in case of, for instance, too many leaders that are hosted on few nodes, while other nodes are not hosting any leader.

It only requires JQ installed on the running machine.

example for execution: ./solr-suggest.sh "https://solr-node01"

URL="$1"; shift

raw_cluster_status=$(curl -s -k "${URL}/solr/admin/collections?action=CLUSTERSTATUS&wt=json" | \
                jq '.cluster.collections[].shards[].replicas[]')

all_nodes=$(echo "${raw_cluster_status}" | jq '. |  "\(.node_name)" '  | sort | uniq | tr -d "\"" | awk -F ':' '{print $1}')
all_leaders_sorted=$(echo "${raw_cluster_status}" | jq '.  | select(.leader=="true") |  "\(.node_name)" ' | tr -d '"' | sort| uniq -c | sort | tr -s " " | tr " " "-"  | awk -F ':' '{print $1}')
all_leaders_sorted_reveres=$(echo "${raw_cluster_status}" | jq '.  | select(.leader=="true") |  "\(.node_name)" ' | tr -d '"' | sort| uniq -c | sort -r | tr -s " " | tr " " "-" | awk -F ':' '{print $1}')
no_leader_nodes=$(diff <(echo "$all_nodes" | sort ) <(echo "$all_leaders_sorted_reveres" | awk -F '-' '{print $3}' | sort) | tr -d "<" | tr -d " " | sed 's/^...$//g' | grep -v -e '^$' )
leader_host_to_shards=$(echo "${raw_cluster_status}" | jq '. | select(.leader=="true")  |   "\(.core) | \(.node_name)"' | tr -d " " | awk -F '|' '{gsub("\"","",$1); gsub("\"","",$2); split($1,coll,"_replica"); split($2,coll2,":"); print coll[1]"@"coll2[1]}')
all_collections=$(echo "${raw_cluster_status}" | jq '.  |  "\(.core)" ' | sed 's/\"\(.*\)_shard.*/\1/'  | sort | uniq )

rm $nodes_sorted_tmp 2>/dev/null

for leader in  $no_leader_nodes; do echo "-0-$leader"  >> $nodes_sorted_tmp; done
for leader in  $all_leaders_sorted; do echo $leader  >> $nodes_sorted_tmp; done
all_nodes_sorted=$(cat $nodes_sorted_tmp)

rm $nodes_sorted_tmp 2>/dev/null

echo "-----------------------------"
echo "amount of leaders per node:"
echo "${all_nodes_sorted}" | tr -s "-" " "


for col in $all_collections; do
        collection_leaders=$(echo "${leader_host_to_shards}" | grep $col"_shard" )
        bad_nodes=$(echo "$collection_leaders" |  awk -F '@' '{print $2}' | sort | uniq -c |  grep -v "1 " )

        for bad_node in $(echo "$bad_nodes" | awk '{print $2}'); do
                related_shards=$(echo "$collection_leaders" | grep $bad_node |  awk -F '@' '{split($1,srd   ,"shard"); print "shard"srd[2] }' )
                for shard in $related_shards; do
                        echo $shard
                        shard_nodes=$(echo $raw_cluster_status | jq  '. | select(.core | contains("'${col}'_'${shard}'")) | .node_name')
                        for inode in $(echo "$all_nodes_sorted"); do
                                node=$(echo $inode | awk -F '-' '{print $3}')
                                echo "checking candidate $node"
                                echo "$shard_nodes"
                                #check if candidate node is not part of the shard
                                echo "$shard_nodes" | grep $node > /dev/null
                                if [  $? -eq 1 ]; then
                                        #check if candidate id
                                        echo "$collection_leaders" | grep $node > /dev/null
                                        if [  $? -eq 1 ]; then
                                                #the node with the least number of leaders and is not a member of the problematic shard -> replace the current leader with this node
                                                echo "Node $bad_node is hosting more than 1 leader on collecition - $col, shard - $shard "
                                                echo "___________________________________________________________________________________________________________"
                                                echo "Move the replica that is hosted on $bad_node to $node, which has the least number of leaders on it."
                                                echo "___________________________________________________________________________________________________________"
                                                echo "Re-run this script again after switching nodes for the replica"
                echo "Couldn't find a replacement host for $bad_node on $col - $shard. Remove this replica and add it so that the leader would move to a different host"


echo -e "\nGreat. All nodes are hosting at most one leader of the same collection.\n"

Friday, December 29, 2017

Apache Zeppelin as a Web Querying Interface for Cassandra


An important thing that you are missing when starting to use Cassandra is a decent web tool
for executing CQL queries, and browsing the results.

Googling for a "Cassandra web ui" results in RazorSql, and some other not so promising engines, where most of them are not a web UI at all.

As you can tell, by the post's title, Apache Zeppelin turned out to be a great solution for our need.
It has a built in Cassandra Interpreter, which only requires basic connection properties.

Here is a video explaining all about Zeppelin and Cassandra 

We did had one issue with installing Zeppelin on our VPC, and trying to access it via Nginx and AWS load balancer:

  • Apache Zeppelin is using a web socket for a client-server connectivity. 
  • It requires special configuration on Nginx: how to configure Apache Zeppelin on Nginx
  • It enforced us to use Application Load Balancer, instead of Classic Load Balancer, which doesn't support web sockets out of the box. 

As a result, we no longer have to ssh Cassandra nodes, in order to execute CQL queries.

I hope that everyone who is looking for a CQL web ui, would find his answer here (or a better answer somewhere else :) ).

Wednesday, December 20, 2017

Cassandra as a Docker

We dockerize our in house micro services, as well as 3rd party tools we use.
That includes Apache Solr, Zookeeper, Redis and many more.

The recent datastore we implemented is Apache Cassandra, version 3.11,
and I would like to share some of the hacks we had until getting our Cassandra cluster dockerized and running.

The next configurations support a cluster where each node is running on a separate host. For running multiple Cassandra nodes on the same host, such as in development mode, you will need to modify the configurations a bit.

The configurations are located in cassandra.yaml file and will be modified as part of the docker start script (entry point).
  • Set the required addresses:
    • listen_address - The address that Cassandra process will bind to. That should be the docker internal IP.
    • broadcast_address - The address that Cassandra will publish to other nodes for nodes inter connectivity. That should be the host external IP.
    • broadcast_rpc_address - The address that nodes will publish to connected clients. That, as well as the broadcast_address, should be the external ip.

sed -i 's/listen_address: [0-9a-z].*/listen_address: '${INTERNAL_IP}'/'  conf/cassandra.yaml
sed -i 's/broadcast_address: [0-9a-z].*/broadcast_address: '${EXTERNAL_IP}'/'  conf/cassandra.yaml
sed -i 's/broadcast_rpc_address: [0-9a-z].*/broadcast_rpc_address: '${EXTERNAL_IP}'/'  conf/cassandra.yaml

  • Docker running params 
    • Data Volumes - map the storage location to a permanent storage:
      • -v  /data/cassandra-0:/data
    • Each node has to map its inner Cassandra ports for external connections. Cassandra nodes / Clients will look for that ports when accessing the broadcast_address of other nodes.
      • I didn't find a way to change the default 7000 and 9042 ports
      • The ports are:
        • 7000 - Cassandra inter-node cluster communication.
        • 9042 - Cassandra client port.
        • 9160 - Cassandra client port (Thrift).
        • The best would be to play with the ports mapping and see for yourself, which is mandatory and which is not.
      • Bottom line - Add -P  -p 7000:7000 -p 9042:9042 to the docker run command.
  • Seed list
    • This is tricky. Seeds hosts are a list of given host that will be used by the node to discover and join the cluster.
    • From Datastax docs:
      • Do not make all nodes seed nodes
      • Note: It is a best practice to have more than one seed node per datacenter.
      • The seed node designation has no purpose other than bootstrapping the gossip process for new nodes joining the cluster.
      • Please read more about Seeds mechanism: https://docs.datastax.com/en/cassandra/3.0/cassandra/architecture/archGossipAbout.html
    • Docker challenges: 
      • Our Cassandra cluster is in some way, a micro services cluster. Any docker can die and another will start on a different host.
      • How can we avoid cases where a new node is accessing a seed node that is about to disappear,  and then, live alone in a separate cluster, while all other nodes were connecting different seeds?
      • We can't set a final list of seeds ips. 
      • Solution
        • X Cassandra docker will be deployed with a "seed" tag.
        • The "seed" tag is a hack that we implemented on top of our micro services management system. For example, Lables can be used with Kuberenetes, to set 3 Cassandra pods to start on a "seed" labeled node. All other Cassandra pods will start without a "seed" label. 
        • The Seed list is dynamic. As a docker starts up, and before starting the Cassandra node process on it, it will look for X other nodes that were tagged with a "seed" label. I assume you have a service discovery tool, that will support that step.
        • The retrieved seed labeled ips, will be set as the  "seeds" param for the starting node.
        • We will keep looking for at least X seed nodes in our service discovery endpoint, and will not start Cassandra process until getting enough seeds.
          sed -i 's/seeds: \"[0-9.].*\"/seeds: \"'${seeds_list}'\"/'  conf/cassandra.yaml  
        • Now, we ensure that all nodes will use the same ips as a seeds list. There is a problem with loosing all seeds labeled nodes at the same time, and such a case would require a manual solution. 
        • Other solutions might include selecting seed nodes according to custom rules that will ensure a consistent seed nodes selection by all nodes. For example - picking the 3 hosts that have the highest ip, when sorting all Cassandra service hosts ips (pods in Kuberenetes). 
  • Setting password and username 
    • We are willing to avoid any manual intervention. The first node would have to run a CQL script in order to change the default credentials. 
    • Take that in account and execute that script after launching Cassandra process, and once the CQL terminal is ready.
  • Adding new nodes - The docker start script should also take care of new nodes that are joining an existing cluster.
    • They should remove any dead nodes (otherwise the ew node will fail joining) 
    • I am sure that you will find some other scenarios when executing disruptive tests.
  • Concerns and Todos - 
    • Depends on you docker version, make sure you are satisfied with performances. It is known that IO can get slower with docker
    • Configure a cross datacenters cluster, where each datacenter has a different service discovery endpoint.
    • Run a cluster on the same node
    • Check Docker network to see if it is helpful for that use case. 
    • Publish our docker files.

To conduct, it is possible to run a Dockerized Cassandra cluster. Pay attention to the required params in the cassandra.yaml file, and add any custom behaviour, especially for the seeds selection, to the docker start script. 

In the next article I will present the way our Cassandra java client is accessing the cluster. 

Good luck!

Apache Solr Cloud Sanity - 3 simple sanity tests

Having a clear picture of your services status is a fundamental requirement.
Otherwise you are blind to the system. 

I am going to present a general concept of sanity checks and some more specific Solr database sanity checks. 

There are many tools and methods to monitor a running service, whether it is a micro service, a 3rd party database or any other piece of code.

We use Datadog, and Grafana to have the system fully covered and to dive into relevant metrics when needed.

In addition to these too well known tools,  we implement, for each in-house service or a 3rd party tool, a custom sanity test, that in a failure we know we have a problem.
The sanity tests result are being collected by a centralised monitoring service and they are all being displayed in a monitoring dashboard.  
The dashboard has many features, as have been probably thinking of by yourself, such as deep dive into sanity test logs, get history of sanity tests and so on.

Let's take apache Solr as an example. 
In addition to our Solr docker instance, we deploy a sanity script, for each solr instance. 

We defined 3 sanity tests - They are all based on Solr cluster status output:
  1. Replica status - If a replica is down or recovering, we mark the hosting solr instance of that replica as problematic. 
  2. Balanced leaders within Collection - If a solr server is hosting more than 1 leader of shards from the same collection, it is also a problem (it might overload that host).
  3. Not too many leaders per host - If a Solr server is hosting more than X leaders, we are marking the host as problematic. 

The test is running every M minutes (15 for Solr test).

To conclude, 
a custom sanity test is a simple and an efficient way to be aware of a service status. Our monitoring procedure starts from the centralised sanity dashboard, and only then goes into Datadog or Grafana. 

Thursday, May 5, 2016

Spark ClickStream with Talend

Ofer Habushi, a friend of mine, that is working as a senior consultant for Talend (an open source Data Integration Software company),
was asked by one of his customers, a major e-commerce company,  to present a solution for aggregating huge files of clickstream data on Hadoop (Ofer’s youtube video).

With a given huge website’s clickstream file (>100 GB or even Teras), Aggregate and create session records for all users (userId) and for all of their clicks which were made within 30 minutes of their previous click on the website

If the user was not active in the last 30 minutes -> create a new sessionID.
Else -> use the same sessionID.

Sample Input:

26/01/2016 04:48
26/01/2016 04:48
26/01/2016 04:57
26/01/2016 04:57
26/01/2016 04:40
26/01/2016 04:40
26/01/2016 06:57
26/01/2016 06:57

sample Output:

26/01/2016 04:48
26/01/2016 04:57
26/01/2016 04:40
26/01/2016 04:40
26/01/2016 06:57
26/01/2016 06:57

The company, that was already using an early version of Talend, had already implemented a solution for this problem using a mixture of three different languages in one process. It was a mixture of Talend, Java , Hive with a Hive UDF in Python.

With the coming of Talend v6.X, the company was interested in testing the capabilities of the new Talend Spark job feature.
The goal was defined as following:
  1. Build the same logic to achieve the same results.
  2. Reduce the multi language complexity (no udfs)
  3. Make the job graphical and readable as possible (reduce the code significantly).
  4. Improve or keep the current performance.

In order to solve that challenge, Ofer decided to use Talend which allows to design Spark jobs. Talend provides a graphical interface that generates, in this case, code based on Spark’s Java API.

Next, we’ll discuss what's needed in Talend and Spark in order to solve the clickstream use case. A use case that leads us to tackle spark shuffle, partitioning, and spark secondary sort.

When Ofer introduced me the story, and asked me to review the Talend and Spark solution, I couldn’t tell if the underlying spark talend job would be similar to the solution that I would come with, writing plain spark.

So I started designing the solution with spark, while asking Ofer, after every step, if Talend supports the method that I am using in the step.
  1. Partition the data into userId’s partitions, because after having all the clicks of a user at the same partition we can create the users’ sessions in parallel and with no additional shuffles.
    1. The Spark way - partitionBy(new HashPartitioner(100)) on a key-value rdd
    2. The Talend way - Ofer found that Talend has a “tPartition” component, that is translated to a Spark “partitionBy”. Step 1 is checked!
  1. Sort each partition by userId and by event time (secondary sort). This is essential for understanding where a session starts and ends - we are going through the clicks of each user, until we find a click that comes at least 30 minutes after the previous click.

An interesting question is how can we keep the data partitioned and sorted? Think about it - that’s not a simple challenge. When we sort the entire data set, we shuffle in order to get a sorted rdds and create new partitions which are different than the partitions we got from step 1. And what if we will do the opposite? First sort by creation time and then partition? We will encounter the same problem. The re-partitioning will cause shuffle and we will loose the sort. So how can we do that?

Partition → Sort - losing the original partitioning
Sort → Partition - losing the original sort

    1. The spark way to partition and sort would be to use repartitionAndSortWithinPartitions. This is not too simple.
The secondary sort is based on a composite key of the 2 columns and of a custom partitioner that will only partition by the first column. In this way, we get all the values of each real key sitting within the same partition. Now we get spark to sort all the partition elements by the combined key. With the next method that does both partitioning and sorting, together with a custom partitioner, we don't need step 1 anymore -repartitionAndSortWithinPartitions(partitioner)

The last thing that we need to do is to supply a custom comparator for the combined key because we can’t expect spark to know by itself how to sort the combination of number and date (the tuple is kind of a new type). repartitionAndSortWithinPartitions is defined by: def repartitionAndSortWithinPartitions(partitioner: Partitioner)
So how do we set the comparator? For that we will create a new class that would stand as our combined key, together with an implementation of a custom comparator . You can find here a great example for all of that.
    1. Talend Way - I thought that there is no chance that Talend would support all of this headache. Ofer made some research and found it all. As we said, Talend has a tPartition component. Fortunately, it supports sorting together with the partitioning with the exact same method that I have used in Spark (repartitionAndSortWithinPartitions).


In the above picture you can see Ofer chose a combined partition key and set a custom partitioner. I find Talend easier than just writing spark in that case, because we don’t need to create a dedicated class with a comparator for that secondary sotr. Talend understands what’s the type of the combined key.

3. Creating the sessions - Foreach partition, in parallel, Foreach userId, run over all clicks and create a session field.
In spark it’s a simple fold call with an inner logic. It’s quite the same with talend, that offers a tJavaRow for custom java code that will run after the partitioning+sorting. It would be nice to have a tScalaRow component though (perhaps planned for Talend next release v6.2)

To summarize , Spark is a great platform for this case, having the ability to do distributed aggregation on large amounts of click stream data, create sessions that requires sorting.
Surprisingly (or not?) Talend-Spark integration supports all that’s required and in addition of course, let you enjoy the benefits of a mature and rich ETL tool.

Here is a link to Ofer’s youtube video, telling exactly how to implement the click stream analysis with spark and Talend.

Wednesday, March 16, 2016

Playing with dbpedia (Sematic Web wikipedia)

Web 3.0, Semantic web, Dbpedia and IOT are all buzz words that are dealing with the computer's ability to understand the data it has. "Understanding" is the key meaning of the semantic web - a concept and a set of technologies and standards that are "almost there" for more than 20 years, but we are not there yet.

You can find additional info about semantic web very easily.

I know that lots of efforts are invested in it, and google are quite semantic. The schema.org project is an effort to create one uniform ontology for the web. But, still, we are not there yet.

What I do like to do is playing with the semantic representation of wikipedia - dbpedia. You can query dbpedia, using Sparql (the semantic web sql).

That is the link for the sparql endpoint - http://dbpedia.org/sparql

And an example of a cool query:

 SELECT distinct ?p ?ed ?u ?t {  
   ?u a dbo:University.  
   ?p ?ed ?u.  
   ?p dbo:knownFor ?t.  
   ?t dbo:programmingLanguage ?j  
 } limit 5000  
We are looking  for people that have any connection to things that are universities (?p ?ed ?u), and that they are "known for" something that has any programmingw language. The results including universities and people that have some kind of contribution to technologies that we are using.

See how easy it is to generalize, or specify the relations that we are looking for.
Of course, that tuining and understanding the ontology and also sparql, might bring much better results.

Run by yourself to see the results.

The results in a graph (I have used ploty for that)
If a university had 2 different "knownAs" it will count for 2.
And The winner is Massachusetts_Institute_of_Technology! 
(Berkeley is #2)