Running Apache Spark on bare Mesos

Apache Spark is a cluster computing framework. What does it mean by that? It basically mean an Application running on Spark e.g. a Map Reduce job can be broken down to many small tasks which can be scheduled and executed on many machines. The concept of scheduling and running tasks distributed is mapped nicely to the design of Mesos. Here you have tasks that are scheduled by some scheduler (e.g Spark driver) and executed by some worker(Spark executor).

In order to be able to run those Drivers and Executors, a Spark program need to talk to some cluster manager to ask for resources. Spark supports many different cluster manager such as its own Standalone Cluster, Yarn, and finally Mesos. In fact, Spark started out as a side project to validate the if idea of 2 level-scheduling of Mesos. This means the Spark application (or to be exact: the Spark driver) itself is a framework running on top of Mesos that has its custom scheduling implementation.

The flow of a Spark job on Mesos are:

  1. User run spark-submit --master mesos://zk://...../mesos.
  2. A driver is launched on a Mesos Slave. By default the driver is run on the same slave from which we run spark-submit. You can also make it run as a Mesos task on any slave with --deploy-mode cluster. For long running job, it is a good idea to run spark-submit as a Marathon job.
  3. Mesos Master detect our new Spark driver framework and send Resource Offer to it
  4. Upon accepting the resource offer, the Spark Driver requests Mesos Master to launch one or more executors on the slaves based on what has been offered. The command to run is spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend along with the driver location as well as cpu and memory constraints. The Executor will be launched inside a Mesos container.
  5. The Executor backend register itself with the Driver and start exchanging RPC requests such as launch/stop/kill tasks. It continues running until it is told to shutdown from the Driver or disconnected with the Driver.

There are some notes if you want to run Spark on Mesos

  • Spark need to be able to find Mesos library (by default Spark will look for /usr/local/lib/libmesos.so). The easiest way is to install Spark on every Mesos slave.
  • Spark binaries (spark-shell, spark-submit, spark-class) read spark-defaults.conf so you could put common configurations there
  • Long running jobs like Spark Streaming can be run by Marathon. This effectively achieves the same failover as submitting Spark job with --deploy-mode cluster --supervise. Marathon can detect failure of the Spark Driver and automatically rerun spark-submit on other server.

Example of a Spark submit command

spark-submit --master mesos://zk://zookeeper-001:2181,zookeeper-002:2181,zookeeper-003:2181/mesos 
  --class <MAIN_CLASS> 
  --conf spark.cores.max=4 
  --conf spark.driver.memory=512M 
  --conf spark.executor.memory=512M 
  --conf spark.files=<PATH_TO_application.conf>
  --conf spark.driver.extraJavaOptions="-Dconfig.file=<PATH_TO_application.conf>" 
  --conf spark.executor.extraJavaOptions="-Dconfig.file=<PATH_TO_application.conf>" 
  <PATH_TO_JAR_FILE> [command line arguments]

Note that both <PATH_TO_JAR_FILE> and <PATH_TO_application.conf> need to be readable by Mesos slave. You could just install all dependencies on all Mesos slaves or use Artifact Store if running on Marathon. Next we will discuss running Spark on Mesos Docker which address the app distribution problem.

Running Spark in Docker container

First you need to have an image that contains both Java, Spark, Mesos and optionally your uber jar file. It's a good idea to package your Spark image that has everything required for running Spark on Mesos and a separate image containing your uber jar on top. Since every Mesos slave need to pull the images before it can run our job, it's quite important that we make sure these images as composed as possible. E.g. Alpine Linux allows you to build minimum and secured Docker image just enough for running your applications. Another cool feature of Docker is COW file system. This allows the static layer of your images (Java/Spark/Mesos) to be shared between multiple images keeping disk space and image pulling time minimum.

Building Spark + Mesos Docker image

TBD

Submitting Spark job on Docker

Once we have a Spark Docker image. Launching a Spark job on Docker is not much different from launching a normal Spark job.

docker run --rm --net host <YOUR_SPARK_IMAGE> <SPARK_SUBMIT_CMD>

<SPARK_SUBMIT_CMD> is the spark-submit command we have seen above. We use--net hostto simplify networking and allow executor and driver to be able to communicate with each other out of the box. There is one extra configuration we need pass to spark-submit to be able to launch an executor from a Docker image:

--conf spark.mesos.executor.docker.image=<YOUR_SPARK_IMAGE>

To make sure we always use the up-to-date image, use --conf spark.mesos.executor.docker.forcePullImage=true. You can also fix this into spark-defaults.conf when packaging our Spark image.

Pulling from Private Docker Registry

We have seen how to run Docker container from Mesos previously: Using a private Docker Registry. The same technique can be applied for submitting a Spark job using the below setting:

--conf spark.mesos.uris=file:///etc/docker.tar.gz

Last but not least, you can also run your job using Marathon using the following app JSON:

{
  "id": "/test-docker-spark",
  "cmd": "spark-submit --master mesos://zk://zookeeper-001:2181,zookeeper-002:2181,zookeeper-003:2181/mesos 
            --class <MAIN_CLASS>
            --conf spark.mesos.uris=file:///etc/docker.tar.gz 
            --conf spark.mesos.executor.docker.volumes=<HOST_PATH>:<CONTAINER_PATH>:<ro|rw>
            --conf spark.mesos.executor.docker.image=<SPARK_IMAGE>
            --conf spark.cores.max=2 
            --conf spark.driver.memory=512M 
            --conf spark.executor.memory=512M 
            <PATH_TO_YOUR_JAR> [command line arguments]",
  "cpus": 2,
  "mem": 512,
  "disk": 0,
  "instances": 1,
  "constraints": [],
  "container": {
    "type": "DOCKER",
    "volumes": [
      {
        "containerPath": "<CONTAINER_PATH>",
        "hostPath": "<HOST_PATH>",
        "mode": "RO"
      }
    ],
    "docker": {
      "image": "SPARK_IMAGE",
      "network": "HOST",
      "portMappings": null,
      "privileged": false,
      "parameters": [],
      "forcePullImage": true
    }
  },
  "portDefinitions": [
    {
      "port": 10014,
      "protocol": "tcp",
      "labels": {}
    }
  ],
  "uris": [
    "file:///etc/docker.tar.gz"
  ],
  "fetch": [
    {
      "uri": "file:///etc/docker.tar.gz",
      "extract": true,
      "executable": false,
      "cache": false
    }
  ]
}

results matching ""

    No results matching ""