Understanding Spark: Part 3: Architecture

  • In the previous blog, we discussed about the overall architecture of spark. In this blog we will get into more details of some aspects of Spark.
  • Now, we have good understanding of

    • Spark components like driver and executors
    • How resources are provisioned from a cluster
    • How map reduce tasks are grouped into stages and exectured in parallel in executors for optimal performance.
  • So, let's looks into some more aspects in greater detail.

1. When does spark decide on creating stages?

  • As driver receives instructions of map reduce operations either in batch or interactive mode, it actually does not perform any of those tasks. All the operations are lazily evaluated. Spark keeps track of all operations that need to be applied to RDDs. But then, when does spark decide to execute, create stages and schedule them on the executors?
  • For that, all operations in spark are categorized into two different kinds of operations: transformations and actions.
  • Transformations are operations that need not apply immediately, but specifies the sequences of operations that need to be applied in case the results need to be created finally. Actions are operations that need results immediately.
  • For example, filter, group by, map reduce are transformations, but show, save, collect are actions.
  • When an action is triggered, spark evaluates what all previous transformations need to be applied on the existing RDDs to arrive at what the action demands and then groups these transformations to create an optimal number of stages and schedules them on executors.
  • List of transformations are given here.
  • List of actions are given here.

2. When does spark create RDDs and how long does it store them in memory?

  • RDDs are created when an action is triggered. Spark maintains the dependencies between RDDS as a DAG (Directed Acyclic Graph) and evaluates all the RDDs that the resulting RDD is dependent on.
  • For example (as given in the following diagram) RDD4 depends on RDD3, RDD3 depends on RDD2 and RDD2 depends on RDD1. And RDD1 is created from the input source.

  • So, in the spark application all transformations are specified which transforms RDD1 -> RDD2 -> RDD3 -> RDD4. Spark DAG will store these dependencies between RDDs, but they are not created phycially in memory until some action is triggered. Assume that finally an action is triggered on RDD4. Now to create RDD4, it creates all previous RDDs. It starts with reading from input source and creating RDD1, then RDD2, then RDD3 and finally RDD4.
  • These RDDs are kept in memory until all the stages are completed until the action is successful. But once the action is accomplished (for example the action may be to store the final results into a file system like HDFS), all RDDs are evicted from memory. By default, Spark does not store any RDD into memory for long. Spark assumes there is dearth of memory in the cluster.
  • But recreating these RDDs everytime an action is triggered may be expensive, so RDDs can be explicitly made to stay in memory. But it is left to the developers to design their application and decide which RDDs will stay in memory and which will not.
  • For example (as given in the diagram above), let's say the spark application is designed in a manner that another RDD5 is created from RDD3. So, RDD5 depends on RDD3 and the DAG creates a dependency like RDD5 <- RDD3 <- RDD2 <- RDD1. Now if an action is triggered on RDD5, then RDD3, RDD2 and RDD1 need to be evaluated again, as spark did not store any of the previous RDD when an action was triggered before on RDD4. This may be expensive from the applicaiton point of view. RDD3 becomes a critical RDD, on which multiple RDDs depend. So, you may decide to persist RDD3 in memory.
  • RDD3 can be persisited by calling cache() or persist() APIs.
  • RDD3 will be persisted when it is created first when an action is triggered on RDD4. So, then an action is triggered on RDD5, it can be evaluated from already persisted RDD3. So, future action become faster.

3. What caching mechanisms exist in Spark?

  • RDDs can be cached in memory or disk. The following table depicts the caching options available in spark.

4. What is a broadcast variable in spark?

  • All RDD are created and all map reduce operations are executed in executor space. So, as long as the map reduce access and tranform the RDD partitions, the paritions are available in executor space. But what if we define a variable in driver and try to access the variable inside map reduce operations. Becauase the variables defined in driver process space is not available in executors, driver will serialize the variable over the network to the executors.
  • As show in the diagram below, the variable a is serialized from driver to the executor. Again, the scope of the variable a is inside the map or reduce function, the variable is cleaned after every iteration of map or reduce. So, if the map function is processing thousands of records, then a is serialized thousands of times from the driver during complete map operation. It is a very expensive way of sharing additional information to the executors from driver.

  • So, a new broadcast variable is defined in spark to address this scenario. You can define a variable in driver and then broadcast it to all executors, which serialized it once and defines it in the process space of executors. And map reduce can access during its operations. The diagram below depicts this.

  • But the most important point is that it is a read-only variable and can not be modified by the executors. It is only a way of sharing additional information (besides the RDDs) with the executors.

5. What is an accumulator in spark?

  • Accumulators are variables that can be used in spark to collect metrics from executors to driver. Once an accumulator is defined by driver, it is shared with the executors. Executors can modify its value. And finally share it values to driver, which aggregates to calculate metrics at the applicaiton level.
  • For example, let's define an accumulator to collect how many bad records exist. Now, every executor will increment the value based on how many bad records they encounter and share with the driver. When driver aggregates all the values from all executors, it finds total number of bad records in the application.
  • An accumulator is with an initial value

    • val bad_recs = sc.accumulator(0, "Numer of bad records")
  • Tasks running on the cluster can then add to it using the add method or the += operator

    • data.map { x => if x is bad then bad_recs += 1 }
  • Driver program can read the accumulator’s value

    • bad_recs.value