niceideas.ch
Technological Thoughts by Jerome Kehrli

ELK-MS - ElasticSearch/LogStash/Kibana - Mesos/Spark : a lightweight and efficient alternative to the Hadoop Stack - part II : assessing behaviour

by Jerome Kehrli


Posted on Wednesday Aug 23, 2017 at 11:30PM in Big Data


This article is the second article in my serie of two articles presenting the ELK-MS Stack and test cluster.

ELK-MS stands for ElasticSearch/LogStash/Kibana - Mesos/Spark. The ELK-MS stack is a simple, lightweight, efficient, low-latency and performing alternative to the Hadoop stack providing state of the art Data Analytics features.

ELK-MS is especially interesting for people that don't want to settle down for anything but the best regarding Big Data Analytics functionalities but yet don't want to deploy a full-blend Hadoop distribution, for instance from Cloudera or HortonWorks.
Again, I am not saying that Cloudera and HortonWorks' Hadoops distributions are not good. Au contraire, they are awesome and really simplifies the overwhelming burden of configuring and maintaining the set of software components they provide.
But there is definitely room for something lighter and simpler in terms of deployment and complexity.


The first article - entitled - ELK-MS - part I : setup the cluster in this serie presents the ELK-MS stack and how to set up a test cluster using the niceideas ELK-MS package.

This second article - ELK-MS - part II : assessing behaviour presents a few concerns, assesses the expected behaviour using the niceideas ELK-MS TEST package and discusses the challenges and constraints on this ELK-MS environment.

The conclusions of this serie of articles are presented in the third and last article - ELK-MS - part III : so why is it cool? which presents, as the name suggests, why this ELK-MS stack is really really cool and works great.

This article assumes a basic understanding of Big Data / NoSQL technologies in general by the reader.

Summary


1. Introduction

The reader might want to refer to the Introduction of the first article in the serie.

Summarizing it, this article is about assessing the behaviour of the ELK-MS Stack using the test cluster introduced in the first article.
Especially two questions need to be answered:

  1. First, how does data-locality optimization work using ES-Hadoop to read data from ElasticSearch to Spark ? On a large cluster, achieving Data Locality is the sinews of war. Before considering the ELK-MS stack as an actual alternative to a more standard Hadoop stack, assessing the sound behaviour and a good respect to data locality of the software stack is not optional.
  2. Second, how does Mesos schedule spark executors and how does it impact data-locality? Mesos needs to be an effective alternative to YARN when it comes to dispatching spark executors while still taking into account data-locality.

These are the two main objectives of the tests for which I am reporting the conclusions hereafter, as well as a few other points.
This article is not about testing Spark or Mesos themselves, it's really about testing how ElasticSearch / Mesos / Spark all work together to support the application architecture from the schema above.

In addition, in contrary to the state of the art on Spark, in my current company, we are not going to be using Java or Scala to implement the spark processing logic, we are going to use python.
The reason for this is simple: our Data Scientists know python, period. They do not know Java, they are not willing to learn Scala. Our Data Scientist know R and python and as such as Head of R&D I have made python our standard language for our Data Analytics algorithms (not that I don't like R, au contraire, but I believe python is at the right intersection between Data Science and Engineering).
Choosing python as processing language has an impact when it comes to programming Spark, the support of python is, as a matter of fact, a little under the support of Scala and Java.

Now all of the above give this article is rationality: programming an ElasticSearch / Mesos / Spark Task with python is something that suffers from really little documentation available.
In the previous article I wanted to present how to set things up as well as share my setup tools and in this article I want to present how to use it, it's behaviour and share some short sample programs in the form of my tests package.

2. Testing Framework

I would summarize the specificities of the usage of Spark in my current company as follows:

  • Data analytics use cases are implemented in pyspark and python scripts and not native Scala or Java APIs
  • The input data and results are stored in ElasticSearch, not in HDFS
  • Spark runs on Mesos and not the more standard YARN on Hadoop.

So I needed a way to test and assess that all of this is working as expected and that the behaviour of the Mesos/Spark stack, both from the perspective of concurrency and respect of data-locality in between ES nodes and Spark nodes, is sound.
This is the objective of the niceideas_ELK-MS-TEST framework.

I am presenting this framework, the approach and the tests it contains herunder.
The test framework is available for download here.

2.1 niceideas ELK-MS TEST

The niceideas ELK-MS TEST package structure, after being properly extracted in a local folder, is as follows:


  • ./vm_execute.sh: this is the script one calls on the host machine to launch a test. The test to be executed should be given as argument.
  • ./tests/*: the test scenario scripts.

Executing a test on the ELK-MS Test cluster, is simply done, for instance, by the following command:

badtrash@badbook:/data/niceideas_ELK-MS-TEST$ ./vm_execute.sh scenarii/5_concurrency_1_swissdata_df.sh

This would execute the test 5_1 and show the spark driver logs on the console.

2.2 Test Scenario Script

Each and every test scenario script has the very same structure :

  1. Create an ad'hoc shell script taking care of downloading a data set and loading in into ElasticSearch
  2. Execute that Data Loading Shell script
  3. Create an ad'hoc python script taking care of implementing the Spark processing
  4. Execute the Data Processing Python script

For instance, a test scenario X_test_Y_variant.sh would have following structure:

#!/bin/bash

# 1. Create Data Ingestion script
# -----------------------------------------------------------------------------
cat > X_test_Y_variant_do.sh <<- "EOF"
#!/bin/bash

# echo commands
set -x

# ...
# Various shell commands to proceed with loading the data in ES
# ...

# turn off command echoing
set +x


EOF

# 2. Exexcute Data Ingestion Script
# -----------------------------------------------------------------------------

bash X_test_Y_variant_do.sh
if [[ $? != 0 ]]; then
    echo "Script execution failed. See previous logs"
    exit -1
fi


# 3. Create pyspark script
# -----------------------------------------------------------------------------
cat > X_test_Y_variant.py <<- "EOF"

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Spark configuration 
conf = SparkConf().setAppName("ESTest_X_Y")

# SparkContext and SQLContext
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# ...
# Various spark processing commands
# ...

EOF

# 4. Exexcute pyspark script
# -----------------------------------------------------------------------------

spark-submit X_test_Y_variant.py
if [[ $? != 0 ]]; then
    echo "Script execution failed. See previous logs"
    exit -1
fi

This key point of these scripts is that they are self contained and idempotent. They make no assumption about the state of the ELK-MS cluster before and they always start by cleaning all the data before reloading the data required for the tests.

2.3 Used Dataset

All the tests scenarii from the niceideas ELK-MS TEST package used either one of the following datasets:

The last dataset, related to swiss AirBnB offers and cities information is required to test the behaviour of joins on Spark.
The other datasets represent different volumes and enable us to test various aspects.

2.4 Test purposes

The tests from the niceideas ELK-MS TEST package are presented in details with all results in section 5. Details of Tests.
Before presenting the conclusions inferred from these tests in the next section, this is a short summary of the purpose of every family of tests:

  1. Nominal tests - assess how the various kinds of APIs of Spark are used to read data from ES: the RDD API, the legacy DataFrame API (SQLContext) and the new DataFrame API (SQLSession).
  2. Data-locality tests - assess how data-locality optimization between ES and Spark works and to what extent.
  3. Aggregation tests - how aggregation on ES data works.
  4. Join tests - how joining two data frames coming from ES works.
  5. Concurrency tests - how does Mesos / Spark behave when running several jobs at a time.

Again, the section 5. Details of Tests presents each and every test in details, along with the screenshots of Cerebro, Spark History Server, the logs of the spark driver, etc.

3. Conclusions from assessment tests

I am reporting in this section already the conclusions that can be taken from the tests executed in the scope of this work.
The conclusions and important information are presented in this early section already, preventing the reader from the requirement to read all the individual tests presented in details in the next section.

3.1 Spark Static Resource Allocation vs. Dynamic Allocation

By default, Spark's scheduler uses a Static Resource Allocation system. This means that, at the job (or driver) initialization time, Spark, with the help of Mesos in this case, will decide what resource from the Mesos cluster can be allocated to the job. This decision is static, meaning that once decided the set of resources allocated to the job will never change in its whole life regardless of what happens on the cluster (other / additional nodes becoming free, etc.)

There are two noteworthy consequences of that:

  1. First, if a job is submitted to the cluster at a moment when the cluster is completely free, the job will be allocated the whole cluster. If another job comes even only just a few seconds after, it will still need to wait for the cluster to be freed by the first job, and that will happen only when the first job completes.
  2. Second, if several jobs are waiting to be executed, when the cluster is freed, Mesos will allocate the cluster resources evenly to each and every job. Now imagine that all these jobs are short-lived jobs and only one of them is a long-lived job. At allocation time (static allocation), that long-lived job got only a small portion of the cluster. Even if very soon the cluster becomes free, that job will still need to complete its execution on his small portion, making most of the cluster unused.

Now of course Spark provides a solution to this, the Dynamic Allocation System.

But unfortunately, when using Dynamic Allocation, Spark simply doesn't take into consideration ES-Hadoop's requirements regarding data locality optimization anymore.
Spark'y Dynamic Allocation messes up data locality completely.. It simply doesn't work at all anymore and there's nothing one can do about it (at least none that I found).

Without going into details, the problem comes from the fact that ES-Hadoop makes spark request as many executors as shards and indicates as preferred location the nodes owning the ES shards.
But Dynamic allocation screws all of this by allocating executors only one after the other (more or less) and only after monitoring evolutions of the job processing needs and the amount of tasks created. In no way does the dynamic allocation system give any consideration for ES-Hadoop requirements.

Using static allocation, a solution can be found to the first consequence discussed above: limiting the resources used by a single Spark Job. But I found no solution to the second problem for now.

3.2 ES-Hadoop and Data-locality enforcement

Data locality

Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:

  • PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
  • NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
  • NO_PREF data is accessed equally quickly from anywhere and has no locality preference
  • RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
  • ANY data is elsewhere on the network and not in the same rack Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.

What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU.
The setting indicating how long it should wait before moving the processing elsewhere is spark.locality.wait=10s.

ES-Hadoop

Data-locality enforcement works amazingly under nominal conditions. ES-Hadoop makes the Spark scheduler understand the topology of the shards on the ES cluster and Spark dispatches the processing accordingly. Mesos doesn't interfere in this regards.

But again, it works only under nominal conditions.
As indicated above, there can be several factors compromising Data-locality:

  1. First, imagine that at resource allocation time the Mesos cluster is heavily loaded. Spark will wait for spark.locality.wait=10s trying to get the processing executed on the node where ES stored the target data shard.
    But if in this period the node doesn't become free, spark will move the processing elsewhere.
  2. The second case it not anymore related to spark, but to ElasticSearch. Imagine that at the very moment the spark executor submits the request to ES (through the ES-Hadoop connector), the co-located ES node is busy doing something else (answering another request, indexing some data, etc.).
    In this case, ES will delegate the answering of the request to another node and local data-locality is broken.

3.3 Spark coarse grained scheduling by Mesos vs. Fine Grained

In Coarse Grained scheduling mode, the default, Mesos considers spark only at the scale of the required spark executor processes. All Mesos knows about spark is the executor processes on the nodes they are running. Mesos knows nothing of Spark's jobs internals such as stages and tasks.
In addition, static allocation (which we're forced to stick to, see above) makes Mesos Job pretty easy: try to allocate as many resources from the cluster to spark executors for pending jobs as are available. This has the consequences listed above in 3.1 Spark Static Resource Allocation vs. Dynamic Allocation .

Historically, Mesos on Spark can benefit from a Fine Grained scheduling mode instead, where Mesos will schedule not only spark executors on nodes in a rough fashion but really each and every individual spark task instead.
In regards to data-locality optimization, this doesn't seem to have any impact.
In regards to performance on the other hand, Fine Grained scheduling mode really messes performances completely.

The thing is that Mesos requires quite some time to negotiate with the resources providers. If that negotiation happens for every individual spark tasks, a huge amount of time is lost and eventually the impact on performance is not acceptable.

For this reason (and others), the Fine Grained scheduling mode is deprecated: https://issues.apache.org/jira/browse/SPARK-11857

3.4 Latency regarding Python instantiation

Executing some tasks in Python takes time in comparison to executing tasks natively in Java or Scala. The problem is that spark tasks in python require to launch the individual task processing in seperate process than the Spark JVM. Only Java and Scala Spark processings run natively in the Spark JVM.

This problem is not necessarily a big deal since the DataFrame or RDD APIs exposed to python pyspark scripts are actually implemented by Scala code underneath, they resolve to native Scala code.
There is one noticeable exception in this regards: UDF (User Defined functions) implemented in python. While this is very possible, it should be avoided at all cost.
One can very well still use pyspark but write UDF in Scala.

3.5 Other ES-Hadoop concerns

Repartitioning

I couldn't find a way to make repartitioning work the way I want, meaning re-distributing the data on the cluster in order to scale out the further workload.
I am not saying there is no way, just that I haven't found one so far.

As such, a sound approach regarding initial sharding in ES should be adopted. One should take into consideration that a priori, initial sharding may well drive the way Spark will be able to scale the processing out on the cluster.
While creating by default one shard per node in the cluster would definitely be overkill, the general idea should tend in this direction.

ES level aggregations

It's simply impossible to forge a query from Spark to ElasticSearch through ES-Hadoop that would make ElasticSearch compute aggregation and returning them instead of the raw data.
Such advanced querying features are not available from spark.

The need is well identified but it remains Work in Progress at the moment: https://github.com/elastic/elasticsearch-hadoop/issues/276.

3.6 Other concerns

Spark History Server

Running Spark in Mesos, there is no long-lived Spark process. Spark executors are created when required by Mesos and the Mesos master and slave processes are the only long lived process on the cluster in this regards.

As such, the Spark Application UI (on ports 4040, 4041, etc.) only live for the time of the Spark processing. When the job is finished, the Spark UI application vanishes.

For this reason, Spark provides an History server. The installation and operation of the History Server is presented in the first article of this serie : ELK-MS - part I : setup the cluster.

Interestingly, that history server supports the same JSON / REST API that the usual Spark Console, with only a very few limitations.
For instance, one can use the REST API to discover about the Application-ID of running jobs in order to kill them (whenever required). For this, simply list the jobs and find out about those that have "endTimeEpoch" : -1, meaning the application is still alive:

curl -XGET http://192.168.10.10:18080/api/v1/applications

Limitations of the ELK-MS stack

As stated in the previous article, ElasticSearch is not a distributed filesystem, it's a document-oriented NoSQL database.

There are situations where a distributed filesystem provides interesting possibilities. Those are not provided by the ELK-MS stack as is. It would be interesting to test Ceph on Mesos for this. See http://tracker.ceph.com/projects/ceph/wiki/Ceph-mesos.

4. Further work

I am still considering some next steps on the topic of the ELK-MS stack testing since there are still a few things I would like to test ot assess:

In a raw fashion:

  • Find out about how to set maximum nodes booked by Mesos for a single spark job in order to avoid fully booking the cluster.
  • ElasticSearch on mesos
    • This seems quite obvious. I expect the overall cluster performance to be way better if Mesos and ES don't compete with each other for hardware resources on nodes.
    • There are workaround of course, such as configuring Mesos to avoid using all the CPUs of a node. But that will never be as efficient as letting Mesos distribute the global workload.
  • Find a way for repartitioning to work the way I intend it: data should get redistributed across the cluster!
  • Give Spark Streaming a try to reduce latency.
  • Try a little more to make Spark dynamic allocation and ES-Hadoop work well together (found no way for now).
    • This means finding a way to have dynamic allocation taking into consideration ES-Hadoop's requirements regarding data locality optimization.
  • Try FAIR Spark scheduler and play with it.
    • I got satisfying results using spark FIFO scheduler in terms of concurrency and haven't seen the need to change to FAIR.
    • It really seems Mesos takes care of everything and I do really not see what the FAIR scheduler can change but I want to be sure.
    • There are some chances that this makes me rewrite this whole article ... in another article.
  • Ceph integration on Mesos for binary files processing.
  • What about HDFS on Mesos ?
    • I would want to give it a try even though I am really rather considering Ceph for the use cases ElasticSearch forbids me to address.
    • The thing is that Ceph integrates much better in the UNIX unified filesystem than HDFS
    • Even though there is an approach to reach same level of integration with HDFS based on Fuse https://wiki.apache.org/hadoop/MountableHDFS. But that is still limited (doesn't support ownership informations for now)

5. Details of Tests

This very big section now presents each and every tests in details, along with the results in the form the the logs of the script (data feeding and spark driver logs), the screenshots of the UI applications (Cerebro, Mesos Console, Spark History Server).

The conclusions from the individual tests have been reported in the global 3. Conclusions from assessment tests section above.

5.1 Nominal Tests

Nominal tests - assess how the various kinds of APIs of Spark are used to read data from ES: the RDD API, the legacy DataFrame API (SQLContext) and the new DataFrame API (SQLSession).

5.1.1 Legacy RDD API on bank dataset

Test details

Relevant portion of spark Script

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Spark configuration 
conf = SparkConf().setAppName("ESTest_1_1")

# SparkContext and SQLContext
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# Simplest possible query
q = "?q=*"

es_read_conf = {
    "es.resource" : "bank",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

es_df = sqlContext.createDataFrame(es_rdd)

# I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count : THIS IS FUNNY : 
# it relaunches the whole Distributed Data Frame Processing
print ("Fetched %s accounts (re-computed)") % es_df.count()

# Print count 
print ("Fetched %s accounts (from collected list)") % len (data_list)

Results

Test 1-1 / Dataset in ES
Test 1-1 / Job Completion in mesos
Test 1-1 / Process Overview on Spark
Test 1-1 / Job 0 / Stage 0

Conclusions

  • Spark can read data from ElasticSearch using the ES-Hadoop connector and the RDD API really out of the box.
    One just needs to configure a few settings to the newAPIHadoopRDD API:
    • inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat"
    • keyClass="org.apache.hadoop.io.NullWritable"
    • valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable"
  • Mesos spreads the workload on the cluster efficiently.
    • This test was run alone on the cluster
    • As such, the 3 nodes of the cluster are available
    • Mesos creates a dedicated spark executor on each node
    • Sparks then successfully distribute the RDD on the 3 executors
  • Data-locality optimization works out of the box.
    • There are 5 shards in ElasticSearch, which, with replicas, are well spread on the cluster
    • Mesos / Spark dispatches the workload efficiently since it creates 5 RDD partitions for the 5 shards, each and every of them respecting data locality (NODE_LOCAL) and as such respecting the requirements given by the ES-Hadoop connector:
      17/08/16 06:32:32 INFO TaskSetManager: Starting task 1.0 in stage 2.0 \
          (TID 2, 192.168.10.10, executor 1, partition 1, NODE_LOCAL, 42302 bytes)
      17/08/16 06:32:32 INFO TaskSetManager: Starting task 0.0 in stage 2.0 \
          (TID 3, 192.168.10.11, executor 2, partition 0, NODE_LOCAL, 42302 bytes)
      17/08/16 06:32:32 INFO TaskSetManager: Starting task 3.0 in stage 2.0 \
          (TID 4, 192.168.10.12, executor 0, partition 3, NODE_LOCAL, 42302 bytes)
      17/08/16 06:32:32 INFO TaskSetManager: Starting task 2.0 in stage 2.0 \
          (TID 5, 192.168.10.10, executor 1, partition 2, NODE_LOCAL, 42302 bytes)
      17/08/16 06:32:32 INFO TaskSetManager: Starting task 4.0 in stage 2.0 \
          (TID 6, 192.168.10.12, executor 0, partition 4, NODE_LOCAL, 42302 bytes)
      

5.1.2 Legacy DataFrame API on bank dataset

Test details

Relevant portion of spark Script

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Spark configuration 
conf = SparkConf().setAppName("ESTest_1_2")

## !!! Caution : this is pre 2.0 API !!! 

# SparkContext and SQLContext
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

es_df = sqlContext.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count : THIS IS FUNNY : 
# it relaunches the whole Distributed Data Frame Processing
print ("Fetched %s women accounts (re-computed)") % es_df.count()

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 1-2 / Dataset in ES
Test 1-2 / Job Completion in mesos
Test 1-2 / Process Overview on Spark
Test 1-2 / Job 0 / Stage 0

Conclusions

  • Spark can read data from ElasticSearch using the ES-Hadoop connector and the Legacy DataFrame API (SQLContext) really out of the box.
    The single configuration required is format("org.elasticsearch.spark.sql") on the SQLContext API
  • Here as well, the test being executed alone on the cluster, every node is available from Mesos' perspective. As such, Mesos creates a Spark Executor on every node of the cluster to distribute the workload.
  • In this case, as seen on result_1_2_job_0_stage_0.png, spark successfully respects data locality as well.

5.1.3 DataFrame API on bank dataset

Test details

Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_1_3")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count : THIS IS FUNNY : 
# it relaunches the whole Distributed Data Frame Processing
print ("Fetched %s women accounts (re-computed)") % es_df.count()

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 1-3 / Dataset in ES
Test 1-3 / Job Completion in mesos
Test 1-3 / Process Overview on Spark
Test 1-3 / Job 0 / Stage 0

Conclusions

  • Spark can read data from ElasticSearch using the ES-Hadoop connector and the New DataFrame API (SQLSession) really out of the box.
    The single configuration required here as well is format("org.elasticsearch.spark.sql") on the SQLSession API
  • Nothing specific to report regarding the other aspects: Mesos distribute the workload as expected, still creates a dedicated Spark Executor for the job on every node of the cluster, Spark respects data locality strictly, etc.

5.1.4 DataFrame API on Apache-logs dataset

Test details

Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_1_4")

# es.read.field.exclude (default empty) : 
#  Fields/properties that are discarded when reading the documents 
#  from Elasticsearch
conf.set ("es.read.field.exclude", "relatedContent")

# es.read.field.as.array.include (default empty) : 
#  Fields/properties that should be considered as arrays/lists
conf.set ("es.read.field.as.array.include", "@tags,headings,links")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# Query configuration only (cannot pass any ES conf here :-( )
es_query_conf= { 
    "pushdown": True
}

es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("apache-logs-*")


# I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count : THIS IS FUNNY : 
# it relaunches the whole Distributed Data Frame Processing
print ("Fetched %s logs (re-computed)") % es_df.count()

# Print count 
print ("Fetched %s logs (from collected list)") % len (data_list)

Results

Test 1-4 / Dataset in ES
Test 1-4 / Job Completion in mesos
Test 1-4 / Process Overview on Spark
Test 1-4 / Job 0 / Stage 0

Conclusions

N.D (nothing to declare). Everything works as expected (see previous tests results from the 1 Nominal Tests family)

5.1.5 DataFrame API on Shakespeare dataset

Test details

Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_1_5")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# Query configuration only (cannot pass any ES conf here :-( )
es_query_conf= { 
    "pushdown": True
}

es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("shakespeare*")

# Collect result to the driver
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count : THIS IS FUNNY : 
# it relaunches the whole Distributed Data Frame Processing
print ("Fetched %s logs (re-computed)") % es_df.count()

# Print count 
print ("Fetched %s logs (from collected list)") % len (data_list)

Results

Test 1-5 / Dataset in ES
Test 1-5 / Job Completion in mesos
Test 1-5 / Process Overview on Spark
Test 1-5 / Job 0 / Stage 0

Conclusions

N.D (nothing to declare). Everything works as expected (see previous tests results from the 1 Nominal Tests family)

5.2 Data-locality tests

Data-locality tests - assess how data-locality optimization between ES and Spark works and to what extent.

5.2.1 Bank dataset with 1 shard

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_2_1")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# (2) Print size of every partition on nodes
def f(iterable):
    # need to convert iterable to list to have len()
    print("Fetched % rows on node") % len(list(iterable)) 
es_df.foreachPartition(f)

# (3) I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 2-1 / Dataset in ES
Test 2-1 / Job Completion in mesos
Test 2-1 / Process Overview on Spark
Test 2-1 / Job 0 / Stage 0

Conclusions

  • In terms of workload distribution, what Mesos does by default (as it seems) here is not optimal. Since the cluster is fully available, Mesos books it all for the Job to come. But eventually 2 of the 3 spark executors won't be used at all.
    That is not a big deal since this test is running alone. But if some more jobs are added to the cluster and requests an executor, they will have to wait for that first job to be finished before they can share the cluster among them.
    This is assessed in 5.5 Concurrency test.
  • Data-locality works as expected. The single shard is located on 192.168.10.12 and both the driver logs and the Spark Console for Job 0 / Stage 0 confirms that the co-located Spark executor has been the only one processing the data.

5.2.2 Bank dataset with 2 shards

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_2_2")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# (2) Print size of every partition on nodes
def f(iterable):
    # need to convert iterable to list to have len()
    print("Fetched % rows on node") % len(list(iterable)) 
es_df.foreachPartition(f)

# (3) I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 2-2 / Dataset in ES
Test 2-2 / Job Completion in mesos
Test 2-2 / Process Overview on Spark
Test 2-2 / Job 0 / Stage 0

Conclusions

  • Same remark as above regarding workload distribution. Mesos books the 3 nodes for the job. But eventually only 2 of the 4 Spark executors will be used to respect data-locality.
  • Data locality works as expected. The two shards are on 192.168.10.10 and 192.168.10.12 and both the driver logs and the Spark Console for Job 0 / Stage 0 confirms that both co-located Spark executor have been used to process the 2 shards.
    The tasks have been executed with NODE_LOCAL locality level.

5.2.3 Bank dataset with 3 shards

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_2_3")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# (2) Print size of every partition on nodes
def f(iterable):
    # need to convert iterable to list to have len()
    print("Fetched % rows on node") % len(list(iterable)) 
es_df.foreachPartition(f)

# (3) I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 2-3 / Dataset in ES
Test 2-3 / Job Completion in mesos
Test 2-3 / Process Overview on Spark
Test 2-3 / Job 0 / Stage 0

Conclusions

  • Interestingly, this time the 3 nodes booked by Mesos for Spark executors will actually be used. But Mesos doesn't know this beforehand.
  • Data locality works as expected. The 3 spark executors consumes data from their co-located shards. This is confirmed by everything behaving as expected as can be seen in the driver logs or in the Spark Application UI.

5.2.4 Bank dataset with 1 shard and replicas

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_2_4")

# FIXME GET RID OF THESE TESTS
# Trying some ES settings
# conf.set ("spark.es.input.max.docs.per.partition", 100)
# That doesn't really help => it split the dataframe on several nodes indeed 
# but it doesn't impact the fetching

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'")

# (2) Print size of every partition on nodes
def f(iterable):
    # need to convert iterable to list to have len()
    print("Fetched % rows on node") % len(list(iterable)) 
es_df.foreachPartition(f)

# (3) I need to collect the result to show them on the console
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

Results

Test 2-4 / Dataset in ES
Test 2-4 / Job Completion in mesos
Test 2-4 / Process Overview on Spark
Test 2-4 / Job 0 / Stage 0

Conclusions

  • In this case, it's really as if each and every ES node of the cluster has a copy of the data. ElasticSearch makes no distinction when it comes to serve requests between primary shards and secondary shards (replicas).
  • Which node will finally execute the processing is really random. Out of several executions I always ended up having a different Spark node executing the whole processing. Data co-locality still kicks in and one single node still does the whole processing every time
  • Important note: All of the above works under normal behaviour. Under a heavily loaded cluster, the results can be significantly different.
    By running different scenarii under different conditions, I have been able to determine 2 different situations in addition to the nominal one (the one on a free cluster):
    • First, It can happen that Mesos tries to distribute a specific spark processing part to the Spark executor co-located to the ES shard.
      But then, when the Spark processing finally queries that local node to get the shard, it can well happen that this ES node is busy answering a different request from a different client application.
      In this case, that local ES node will report itself as busy and will ask another node from the ES cluster to server the request.
      So even though initially Mesos / Spark distributed the workload to the local node to the shard in ES, eventually the request will be served by another distant node from te cluster.
    • Second, it can also happen that all nodes co-located to the ES node owning all shards (primary and replicas) are busy.
      In this case, Mesos / Spark will only wait a few seconds expecting of this node to become free, and if that fails to happen, eventually a different Mesos node will run the processing, indifferently for data locality.
  • The difference here is that the existence of replicas suddenly gives ElasticSearch the choice
    ElasticSearch has the choice to answer and serve the data from another node than the local node if suddenly the local node is busy!
  • In addition, Mesos / Spark will only wait spark.locality.wait=10s to try to make the specific processing part local to the ES node owning a shard (or a replica BTW). If none of these nodes (owning one of the primary shard or replicas) becomes free and available for that amount of time, then Mesos will distribute the workload to another available node from the Mesos cluster.

5.2.5 Testing repartitioning

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_2_5")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(pushdown=True) \
            .load("bank") \
            .where("gender='F'") 

# Print size of every partition on nodes
def f(iterable):
    # need to convert iterable to list to have len()
    print("A - % rows stored on node") % len(list(iterable)) 
es_df.foreachPartition(f)

# Doesn't help
#es_df2 = es_df.coalesce(1) 
## Print size of every partition on nodes
#es_df2.foreachPartition(f)

# (2)
es_df3 = es_df.repartition(4 * 3) 
# Print size of every partition on nodes
es_df3.foreachPartition(f)

# (3) I need to collect the result to show them on the console
data_list = es_df3.collect()
print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s women accounts (from collected list)") % len (data_list)

# Print
print (ss._jsc.sc().getExecutorMemoryStatus().size())

Results

Test 2-5 / Dataset in ES
Test 2-5 / Job Completion in mesos
Test 2-5 / Process Overview on Spark
Test 2-5 / Job 0 / Stage 0

Conclusions

  • I haven't been able to make repartitioning work the way I intended it to work.
    • Eventually all of my tests led to the underlying RDD being repartitioned, but all the partitions remain local to the initially owning node
    • I never managed to find a way to make the Spark cluster redistribute the different partitions to the various Spark executors from the cluster
  • I don't know if that comes from Spark somehow knowing that it doesn't need to do that for the post-processing to be done efficiently.
  • Long story short, I have no real conclusions in this regards, reason why the above schema is crossed by an X.

5.3 Aggregation tests

Aggregation tests - assess how aggregation on ES data works.

5.3.1 ES-side Aggregations

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Spark configuration 
conf = SparkConf().setAppName("ESTest_3_1")

# SparkContext and SQLContext
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# -> query dsl
es_aggregations_query = '''
{ 
    "query" : { "match_all": {} },
    "size" : 0,
    "aggregations" : { 
        "play_name": {
            "terms": {
                "field" : "play_name"
            }
        }
    }
}
'''

es_read_conf = {
    "es.resource" : "shakespeare",
    "es.endpoint" : "_search",
    "es.query" : es_aggregations_query
}

# (1)
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

es_df = sqlContext.createDataFrame(es_rdd)

# I need to collect the result 
data_list = es_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s rows (from collected list)") % len (data_list)

Results

Test 3-1 / Dataset in ES
Test 3-1 / Job Completion in mesos
Test 3-1 / Process Overview on Spark
Test 3-1 / Job 0 / Stage 0

Conclusions

  • There is simply no way at the moment to submit specific requests, such as aggregation requests, from spark to ElasticSearch using the ES-Hadoop connector.
  • The need is well identified but the solution is still work in progress: https://github.com/elastic/elasticsearch-hadoop/issues/276
  • Since it's impossible to make this work as expected, the schematic above is crossed with an X.

5.3.2 Spark-side Aggregations

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark.conf import SparkConf
from pyspark.sql import SQLContext, SparkSession

# Spark configuration 
conf = SparkConf().setAppName("ESTest_3_2")

# Every time there is a shuffle, Spark needs to decide how many partitions will 
# the shuffle RDD have. 
# 2 times the amount of CPUS on the cluster is a good value (default is 200) 
conf.set("spark.sql.shuffle.partitions", "12")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# Query configuration only (cannot pass any ES conf here :-( )
es_query_conf= { 
    "pushdown": True
}

# (1)
es_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("shakespeare*")

# (2) Compute aggregates : I want the count of lines per book
agg_df = es_df.groupBy(es_df.play_name).count()

# (3) Collect result to the driver
data_list = agg_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Fetched %s rows (from collected list)") % len (data_list)

Results

Test 3-2 / Dataset in ES
Test 3-2 / Job Completion in mesos
Test 3-2 / Process Overview on Spark
Test 3-2 / Job 0 / Stage 0

Conclusions

  • There aren't a lof of things to conclude here. We can just mention that everything works as expected and return the user to the Data Flow schematic above.
  • Data locality kicks-in, etc.

5.4 Join test

Test details

Expected Behaviour


Relevant portion of spark Script

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
import pyspark.sql.functions as F

# Spark configuration 
# all these options can be given to the command line to spark-submit
# (they would need to be prefixed by "spark.")
conf = SparkConf().setAppName("ESTest_4_1")

# Every time there is a shuffle, Spark needs to decide how many partitions will 
# the shuffle RDD have. 
# 2 times the amount of CPUS oi nthe cluster is a good value (default is 200) 
conf.set("spark.sql.shuffle.partitions", "12")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()


# Query configuration only (cannot pass any ES conf here :-( )
es_query_conf= { 
    "pushdown": True
}

# (1).1 Read city and population
citypop_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("swiss-citypop") \
            .alias("citypop_df")

# (1).2. Read airbnb offers
airbnb_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("swiss-airbnb") \
            .alias("airbnb_df")

# (2) Join on city
joint_df = airbnb_df \
            .join( \
                  citypop_df, \
                  (F.lower(airbnb_df.city) == F.lower(citypop_df.accent_city)), \
                  "left_outer" \
                 ) \
            .select( \
                    'room_id', 'airbnb_df.country', 'airbnb_df.city', \
                    'room_type', 'bedrooms', 'bathrooms', 'price', 'reviews', \
                    'overall_satisfaction', \
                    'airbnb_df.latitude', 'airbnb_df.longitude', \
                    'citypop_df.latitude', 'citypop_df.longitude', 'population', \
                    'region' \
                   )

# (3) Collect result to the driver
data_list = joint_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Computed %s positions (from collected list)") % len (data_list)

Results

Test 4-1 / Dataset in ES
Test 4-1 / Job Completion in mesos
Test 4-1 / Process Overview on Spark
Test 4-1 / Job 0 / Stage 0

Test 4-1 / Job 0 / Stage 1

Conclusions

  • Here as well there aren't a log of things to conclude. Everything works just as expected.
  • Data locality kicks-in both at the ES data fetching side and on the private Spark Side for the join.

5.5 Concurrency test

Test details

The Spark Script

The concurrency tests simply executes four times in parallel the scenario inspired from 5.3.2 Spark-side Aggregations as follows:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
import pyspark.sql.functions as F

# Spark configuration 
conf = SparkConf()

# Every time there is a shuffle, Spark needs to decide how many partitions will 
# the shuffle RDD have. 
# 2 times the amount of CPUS on the cluster is a good value (default is 200) 
conf.set("spark.sql.shuffle.partitions", "12")

# Spark SQL Session 
ss = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()

# Query configuration only (cannot pass any ES conf here :-( )
es_query_conf= { 
    "pushdown": True
}

# 1. Read city and population
citypop_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("swiss-citypop") \
            .alias("citypop_df")

# 2. Read airbnb offers
airbnb_df = ss.read \
            .format("org.elasticsearch.spark.sql") \
            .options(conf=es_query_conf) \
            .load("swiss-airbnb") \
            .alias("airbnb_df")

# 3. Join on city
joint_df = airbnb_df \
            .join( \
                  citypop_df, \
                  (F.lower(airbnb_df.city) == F.lower(citypop_df.accent_city)), \
                  "left_outer" \
                 ) \
            .select( \
                    'room_id', 'airbnb_df.country', 'airbnb_df.city', \
                    'room_type', 'bedrooms', 'bathrooms', 'price', 'reviews', \
                    'overall_satisfaction', \
                    'airbnb_df.latitude', 'airbnb_df.longitude', \
                    'citypop_df.latitude', 'citypop_df.longitude', 'population', \
                    'region' \
                   )

# Collect result to the driver
data_list = joint_df.collect()

print ("Printing 10 first results")
for x in data_list[0:10]:
    print x

# Print count 
print ("Computed %s positions (from collected list)") % len (data_list)

Results

First the mesos console showing the completion of the 4 jobs:

Test 5-1 / P1 / Job Completion in mesos
Test 5-1 / P2 / Job Completion in mesos
Test 5-1 / P3 / Job Completion in mesos
Test 5-1 / P4 / Job Completion in mesos

Overview of the 4 processes in Spark console, the specific view of each of the process:

Test 5-1 / P1 / Process Overview on Spark
Test 5-1 / P2 / Process Overview on Spark
Test 5-1 / P3 / Process Overview on Spark
Test 5-1 / P4 / Process Overview on Spark

Focusing on Job 1 (P1), each and every relevant views from the Spark Application UI:

Test 5-1 / P1 / Job 0
Test 5-1 / P1 / Job 0 / Stage 0
Test 5-1 / P1 / Job 0 / Stage 1
Test 5-1 / P1 / Job 0 / Stage 2

Focusing on Job 2 (P2), each and every relevant views from the Spark Application UI:

Test 5-1 / P2 / Job 0
Test 5-1 / P2 / Job 0 / Stage 0
Test 5-1 / P2 / Job 0 / Stage 1
Test 5-1 / P2 / Job 0 / Stage 2

Focusing on Job 3 (P3), each and every relevant views from the Spark Application UI:

Test 5-1 / P3 / Job 0
Test 5-1 / P3 / Job 0 / Stage 0
Test 5-1 / P3 / Job 0 / Stage 1
Test 5-1 / P3 / Job 0 / Stage 2

Focusing on Job 4 (P4), each and every relevant views from the Spark Application UI:

Test 5-1 / P4 / Job 0
Test 5-1 / P4 / Job 0 / Stage 0
Test 5-1 / P4 / Job 0 / Stage 1
Test 5-1 / P4 / Job 0 / Stage 2

Conclusions

  • Before everything else let's mention that this test has been executed, first, using the FIFO scheduler (spark.scheduler.mode=FIFO) and second, using the static allocation system
    • I haven't tried the FAIR scheduler yet since the behaviour suits my needs but I will do that soon (and report results in a third article).
      The thing is that I do not see what the FAIR scheduler could do any differently. See below.
    • Using the Static Allocation System, it really seems that Mesos / Spark decides at initialization time what resources from the cluster the Spark job will get and this can never change later, regardless of what happens on the cluster.
      Of course Spark support a Dynamic Allocation System. But unfortunately, the Dynamic Allocation System is fundamentally incompatible with ES-Hadoop's requirements regarding data locality.
      I did try pretty much all the tests discussed in this article using dynamic allocation and data-locality optimization is simply ruined, and there's nothing one can do about it.
      The niceideas ELK-MS package provides a script to enable dynamic allocation and the user can give it a try.
  • Let's describe what happens on the Mesos cluster when these 4 jobs are launched at the same time:
    1. First, it takes a dozen of seconds for the 4 drivers to initialize locally. AAMOF the 4 drivers run on the master node 192.168.10.10 and on a single VM it takes quite some time to initialize and run the 4 drivers.
    2. Then the first process who's driver is initialized properly will ask Mesos for resources to execute the spark executor. At that very moment, Mesos knows nothing about the 3 other processes still to come since their respective drivers are still being initialized. As such, Mesos will allocate the whole cluster to that single first process.
    3. The first process is executed on the whole cluster which it uses alone while the 3 other drivers are finally initialized. Their process need to wait for the first process to complete since it uses the whole cluster alone.
    4. Eventually the first process completes. At that moment Mesos knows that 3 processes are expecting resources on the cluster to be executed. Mesos will evenly distribute the cluster resources to each and every of the 3 processes.
      Since we have 3 nodes, every process will get a node for his own.
    5. Unsurprisingly the 3 other processes complete at almost the same time since they really do the same thing (4 times the same job is started by this test)
  • In terms of concurrency, we can see on the following image that the cluster is used quite effectively, looking at the CPU consumption on the host machine:


    Concurrency test - view of CPU (Note : each and every of the 3 VMs can use up to 2 CPUS of the host which has 4 CPUs in total)
  • Also, all my tests, including this one has been executed using Coarse Grained Scheduling Mode (spark.mesos.coarse=true).
    • One might think that using Fine Grained Mode, things would be more efficient since each and ever task would be distributed on the cluster at will and we wouldn't end up in the static topology described above.
    • But unfortunately, Mesos latency when it comes to negotiating resources really messes up performances. The dynamic dispatching of tasks works well, but the overall processes performances are screwed by the time Mesos requires for negotiation.
      In the ends, Fine Grained Scheduling mode kills performance of the whole cluster down.
    • I have executed this very same test using spark.mesos.coarse=false and the dropdown in terms of cluster usage efficiency is seen by looking at the CPU consumption on the host machine for test 5 - 1 using Fine Grained Mode
  • In regards to data locality, since the 3 last processes get one single node of the cluster each, only one third of the tasks will execute with locality level NODE_LOCAL. Two thirds of them will require to fetch data from the network.

6. References

Spark and mesos

ES Hadoop doc

Pyspark.sql doc

Spark Doc

Other Pyspark specificities



Comments:

Many thanks for sharing such a detailed description! This is really useful. In the past I also stuggled with data locality in ES/Spark (on YARN) deployments.

Posted by Michele on August 25, 2017 at 02:37 PM CEST #


Leave a Comment

HTML Syntax: Allowed