How to use the cache effectively on Apache Spark

Did you ever used the DataFrame method cache() to cache your Spark DataFrame? In this post, I want to describe how you can use this method to get better performance.
Apache Spark
Performance
Author
Affiliation

Pedro Duarte Faria

Blip

Published

January 13, 2024

Introduction

I work a lot with Apache Spark on Databricks, and very recently, I encountered some cases of jobs failling because of cached DataFrames ocupying all the memory available, and, as consequence, raising OutOfMemory runtime errors.

In essence, the job was executing a Python notebook that contained some pyspark code. Many Spark DataFrames were being constantly cached by using the DataFrame method cache(). And this pattern was causing the memory to be crowed with more and more caches, until it became full, and caused the job to crash.

In this article, I want to describe how you should use cache() effectively on Apache Spark, and also, explain how this OutOfMemory error happenned.

What is this cache() method?

In Apache Spark we work with Spark DataFrames. They are the core (or the essence) of any Spark application. We model, transform, load and export these objects to get the result we want.

However, in some cases, generating a specific Spark DataFrame can take a long time. Maybe this DataFrame is defined by a heavy query, that involves many and many layers of calculations, or maybe, a huge amount of data needs to be read to calculate/generate this DataFrame.

For these specific cases, we can cache this specific Spark DataFrame. By caching it, we avoid the need to calculate/generate from scratch this DataFrame, over and over again. We calculate it once, and then, we reuse this same data in posterior cases.

We do this, by calling the cache() DataFrame method, to mark that specific DataFrame as a “cached DataFrame”. As an example, in the code below, I’m creating a Spark DataFrame called df:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from datetime import date
from pyspark.sql import Row

data = [
  Row(id = 1, value = 28.3, date = date(2021,1,1)),
  Row(id = 2, value = 15.8, date = date(2021,1,1)),
  Row(id = 3, value = 20.1, date = date(2021,1,2)),
  Row(id = 4, value = 12.6, date = date(2021,1,3))
]

df = spark.createDataFrame(data)

If I want to cache this DataFrame, all I need to do is to call the cache() method with this df object, like this:

df.cache()

Now, the df DataFrame is marked as a “DataFrame to be cached” by Spark, and if we use this df DataFrame over the next lines of our notebook, instead of Spark recalculating the entire DataFrame each time, it will reuse the data of this DataFrame that was cached. This can make our Spark application much faster, because Spark will not spend more time recalculating the DataFrame.

But is important to note, that in Apache Spark, cache operations are lazy operations. I quickly described this lazy aspect of Spark at Section 3.5 of my pyspark book.

A key aspect of Spark is its laziness. In other words, for most operations, Spark will only check if your code is correct and if it makes sense. Spark will not actually run or execute the operations you are describing in your code, unless you explicit ask for it with a trigger operation, which is called an “action” (this kind of operation is described in Section 5.2). Faria (2024)

Therefore, the cache operation of the DataFrame will only happen if you call an action over the next lines of your notebook. I listed what operations are considered as actions in Spark at Section 5.2 of my pyspark book. But essentially, the Spark DataFrame methods below are all examples of actions in Spark:

  • show()
  • collect()
  • count()
  • write.csv()
  • read.csv()

So, if you call any of the Spark DataFrame methods above, after you called cache() over the same Spark DataFrame, then, Spark will effectively cache your DataFrame.

How to use cache effectively

There are two commom situations where cache can be very effective, which are:

  • Constantly use the same DataFrame over the notebook.

  • Frequently access a subset of a DataFrame.

Every time you call an action on a Spark DataFrame in your notebook, Spark needs to read and load the DataFrame’s data from storage (this storage can be many things, like a data lake in the cloud, or a local static file, etc.). So if you repeateadly use the same Spark DataFrame, then, you are repeateadly reading from storage the same data over and over again.

Having this in mind, when you constantly use the same Spark DataFrame over and over again across your notebook, it might be a good idea to cache this specific DataFrame. For example, if you use a DataFrame called students in 15 locations in your notebook, then, by default, Spark will recalculate this students DataFrame, from scratch, 15 times. But if you cache this DataFrame, then, Spark will calculate the DataFrame on the first time, and reuse the cached data in the remaining 14 times.

So …

caching is optimal when you need to perform multiple operations on the same dataset to avoid reading from storage repeatedly. Patidar (2023).

The same applies to when you are frequently using the same subset of a DataFrame. For example, you might have defined in your notebook a new object under_10 which contains all rows from students DataFrame that describes students that have an age below 10.

from pyspark.sql.functions import col
under_10 = students.filter(col('age') < 10)

If you use this subset of students DataFrame across multiple locations of your notebook, then, it might be also a good idea to cache this under_10 DataFrame, like this:

from pyspark.sql.functions import col
under_10 = (
  students
    .filter(col('age') < 10)
    .cache()
)

How to NOT use cache effectively

If your notebook have a very sequential logic, then, caching is usually wasteful or a bad idea. Take for example, a notebook that perform the following steps:

  1. take DataFrame A, filter it, summarise it, and then, save it.
  2. take DataFrame B, filter it, add new columns, group by and summarise it, and then, send it to the server.
  3. take DataFrame C, filter the essential information you want, save this data into a new CSV file, and then, send this CSV file to an Amazon S3 bucket.

If you pay attention to these steps above, you will notice that they are independent from each other. These steps can be performed in any order, because the result from each step does not depend on the results from the other steps.

In a notebook like this, caching any of the three cited DataFrames (A, B or C) is usually unnecessary (or a bad idea), because each of these DataFrames is used only once across the notebook, so there is no really need to cache it. By caching it, you will be wasting not only time, but also, memory space.

Having this in mind, notebooks that have a more complex and interconnected logic are much more suitable candidates for caching. For example, if you have a notebook that produces two DataFrames (B and C) as output, and they both are produced from a JOIN operation with the same DataFrame A, then, it might be worth to cache DataFrame A, so that Spark calculates DataFrame A from scratch only once, instead of two.

Now, another situation where caching might be a bad idea is when you do not have much memory available in your Spark cluster. As an example, let’s consider that you only had available a cluster with 2 nodes and 8 GB of RAM memory in your Spark environment.

If your notebook is working with a Spark DataFrame whose physical size is 7 GB worth of data, then, it might be a very bad idea to cache this DataFrame, because if you do cache it, then, 7 GB (almost 90%) of your memory will be occupied with the cached data, and this leaves only 1 GB avaialable to perform all the transformations and remaining operations of your notebook.

So caching can be more of a hindrance than a help (especially if you cache multiple DataFrames) when these caches occupy a too big chunk of memory. 1 GB might be (or might be not) enough to perform the remaining tasks in your notebook. But you should not take this risk. In general, when you have a low quantity of memory available, and you cache multiple DataFrames, two things can happen:

  1. most likely, your job (or your Spark application) will crash with an OutOfMemory error.
  2. the calculations and transformations become much, much, MUCH slower, because the memory does not have enough space available to perform these calculations in parallel. Your Spark application will succeed, and will produce the output you want… only many, many, MANY times slower.

What happened with the cases I’ve seen

I recently encountered some cases of jobs failing during runtime because of OutOfMemory errors, that were generated by multiple caching operations that polluted all memory available in the cluster.

I want to use this practical example to demonstrate how caching was badly used in this example. So that you can learn from it.

The notebook I encountered, was essentially responsible for update 3 different tables in our SQL databases, and to do that, this notebook was defining 5 different Spark DataFrames, and all of them were being cached with the cache() DataFrame method.

If I execute this notebook in a more robust Spark cluster, with more worker nodes, with more RAM memory available, then, the notebook just executed fine.

The complete execution of the notebook took around 55 minutes, which is a long time… But no runtime errors were raised. In essence, no OutOfMemory errors were raised because we were lucky. Because this more robust cluster had enough memory space to hold 5 DataFrames in cache. Figure 1 presents this process visually:

Figure 1: DataFrames being cached in a more robust Spark cluster

However, this same notebook was being executed every day by a scheduled job in Databricks (i.e. a Databricks Workflow). But every time this notebook was executed through the scheduled job, it failed with OutOfMemory errors. The scheduled job was being executed by a much smaller cluster, that had only two worder nodes and 8 GB of RAM memory available.

The OutOfMemory errors were being raised right after we cached the third Spark DataFrame (let’s call this DataFrame of df3) defined in the notebook. So, in summary, what was happening is: Spark was caching each DataFrame defined in the notebook, but on the third DataFrame, the Spark process run out of memory. In other words, we did not had any more memory to do anything!

Figure 2 summarizes this process visually:

Figure 2: Multiple caches surpassing the limit of memory available in a less robust Spark cluster

Conclusion

In essence, using the Spark DataFrame cache() method can improve the performance of your Spark application. But you should be careful when using this resource, because it can be more of a hindrance than a help, depending on how you use it.

As a good rule of thumb, you usually want to use caches in Spark when:

  • Constantly use the same DataFrame over the notebook.
  • Frequently access a subset of a DataFrame.

In this article I also presented a situation where caches were responsible for crashing a Spark application, as a real example on how caches can be harmful.

References

Faria, Pedro Duarte. 2024. Introduction to Pyspark. 1st ed. Belo Horizonte. https://pedropark99.github.io/Introd-pyspark/.
Patidar, Charchit. 2023. “How Cache Works in Apache Spark.” https://medium.com/@charchitpatidar/how-cache-works-in-apache-spark-aea6eeb3fd03.