Tuesday, October 2, 2018

Spark job concurrency and Understanding number of cores assigned to Jobs

Executors of Spark jobs are assigned certain number of cores. What do those number of cores actually mean? Do they actually tell that when an executor is running with 2 cores then the executor must be running in such a way that it uses only 2 cores of the machine, even when the machine has 16 CPU cores?
Actually things are somewhat different. Let me explain.
When we submit a job to Spark cluster, the driver communicates with master asking for resources to run. The number of cores is one such resource. Spark Master then makes a survey of the slaves or workers regarding the resource availability. It finds out the slave nodes with enough cores left and ask some of those worker(s) to run the job. The workers will create separate Java processes (executor processes) using our application jar and communicate with the driver to execute various stages of the job. Assume, there are one executor for a job which is using 2 cores, then that doesn't mean the executor java process will only use 2 cores of the machine it is running on. But it may use all the available cores of the machine. So, the "number of cores" is actually a parameter to assign the the slaves who are in a state to run the executors of a job.

Let us do some  exercise here. Let us start Spark master and slave where slave says that it has 2 cores to offer to execute jobs.


Now let us check what the Spark master UI is showing:


As we can see from the screenshot, there are 2 cores in the cluster and 0 of them are used right now. Those 2 are the ones advertised by lone worker running in the cluster.

Now let us run a small Spark streaming job. We will use Spark streaming with Kafka as the event source. I will not cover here how to setup Kafka and Zookeeper. We will setup a Kafka cluster and create a topic named "streaming". For this experiment we may create the topic with just one partition and no replication. The code for the simple streaming job is accessible here

We submit a Spark streaming job with application name "appa" and Kafka consumer group name "groupa" using the below command:




As you may see, the job is requesting for 2 cores (--executors-cores 2). After the job is started, let us visit the master URL again.


As we can see from the screenshot, there is one application running which has used 2 cores (i.e. all the available cores in the cluster). 

Now let us examine what are the actual CPU cores being used by the executor of the Spark job. (The machine has 4 CPUs).
First look at the process IDs of various java process.




Process 13667 is the Spark slave Java process, process 14936 is the executor Java process,  Process 13581 is the Spark master Java process and process 14860 is the driver Java process of our streaming job. We are interested to know how many cores executor (process 14936) is using at this moment. We execute "top" command (top -d 2 -p 14936 -H ) to get this information.


The last column in the above screenshot shows the last CPU core used by the thread. As we can see, the executor process is using all the 4 cores available. And, hence we can see for sure that "--executor-cores" parameter is just to identify which are the worker nodes that have enough resources to run the executors for a job. Once an executor process starts, the OS may schedule the threads of the executor process on all the CPU cores available on the machine.

As our Spark cluster now have "zero cores left" for executing new Job, so submitting new Job will only queue them and they won't make any progress until the first job is killed.