Writing Spark DataFrames to Elasticsearch/Opensearch databases

Elasticsearch and Opensearch are two very popular No-SQL databases. In this post, I want to address how can you write data from a Spark DataFrame into an Elasticsearch/Opensearch database.
Author
Affiliation

Pedro Duarte Faria

DSM-Firmenich

Published

March 16, 2025

Introduction

Recently, I had to develop a simple connector (i.e. a Python function) to allow a user to upload/write data from a Spark DataFrame into both an Elasticsearch and an Opensearch database. If you are not familiar with Elasticsearch or Opensearch, they are both No-SQL databases. More specificaly, they are databases that store a collection of JSON documents, instead of storing a collection of tables as you would expect from a traditional SQL database like PostgreSQL.

In more detail, the company had some new problems and needs that needed to be addressed. One of them was to send/write some of the data in our data lake into an Elasticsearch/Opensearch database. This is where I came in, to develop a simple solution that would allow the company to write such data into an Elasticsearch/Opensearch databases.

Since our data lake is currently being processed by Apache Spark and Databricks, I needed to develop a solution that would make use of the high scalability of Apache Spark to write this data. In other words, because the company work in a “big data” scenario, I had to develop a solution that would be appropriate with this “big data” scenario.

This task turn out to be a little more complicated than I expected. Mainly because the existing documentation about the connection between Spark and Elasticsearch/Opensearch is very, I mean, veeery poor. Therefore, it was a little hard to “make the thing work”. Since I don’t want you to face the same challenges I did, I’ve decided to write this post to describe how you can use Apache Spark to write data into these kinds of databases.

Spark native data sources

If you want to write data from Apache Spark fast and with high scalability, you want to use a Spark native data source to write your data. If you use Spark, you likely know what a “data source” is. But I want to discuss a few things here.

First of all, Spark data sources are described in details at this specific page in the Spark documentation. Spark supports many different data sources out of the box. For example, it supports static files like CSV, JSON and Parquet files.

But, you can also install Maven packages in your environment to use other data sources that were developed by the community. In other words, Spark is “extensible” in this regard. Because these Maven packages work almost like a “plugin”, by adding new data sources to the Spark engine.

For example, by installing the spark-sql-kafka Maven package into your environment, a new Spark data source becomes available to you, which is the Kafka data source. With this new data source, Spark supports streaming with Apache Kafka queues.

Therefore, Spark supports multiple data sources out of the box, but you can add more data sources to the list by installing Maven packages that include new data sources to the Spark engine.

Selecting a data source

For example, if you are writing a Spark application through pyspark, you can choose the specific data source that you want to use in Spark by using the format() method from a DataFrameWriter class. In the example below, I’m using the JSON data source to write data from Spark into a JSON file.

df = spark.range(1)
df.write.format('json')

Elasticsearch/Opensearch native Spark data sources

Both Elasticsearch and Opensearch have a free/open-source Spark native data source that we can use. These data sources are both available as packages in the Maven repository. If you want to write data to Elasticsearch, then, you are looking for the elasticsearch-spark package. But, if you want to write data to Opensearch instead, then, you are looking for the opensearch-spark package.

You can see more info about these packages in the links below:

Now, you need to use the specific version of these packages that match your specific environment. For example, my environment is this:

  • Databricks Runtime 15.4 LTS.
  • Apache Spark 3.5.0.
  • Scala 2.12.18.
  • Python 3.11.0.

The version of Spark and Scala are the most important infos in this case. Since the version of Spark is 3.5.0, we want to use a version of these packages that are compatible with the 3.* versions of Spark, which are versions opensearch-spark-30 and elasticsearch-spark-30.

And we also need to use a version of these packages that are compatible with the 2.12 version of Scala specifically. This is why, I’ve installed the opensearch-spark-30_2.12 and elasticsearch-spark-30_2.12 versions of these packages in my specific environment.

Your specific environment might have different versions, and, therefore, you might need to use a different version of these packages for your specific case. Just be aware of that.

In Databricks, you can install these Maven packages in your cluster to make these native Spark data sources available in your environment. If you don’t know how to do this, checkout the Databricks documentation for this.

Using the Elasticsearch/Opensearch data source

After you install these Maven packages, you should be able to select the Elasticsearch/Opensearch data source by using the string "org.opensearch.spark.sql" or "org.elasticsearch.spark.sql" in the format() method.

Now, in order to use the data source, you have to set some options with the option() method of the DataFrameWriter class. These are the mandatory options that you have to set if you use the Opensearch data source:

  • opensearch.nodes: the host URL where your Opensearch server is hosted.
  • opensearch.port: the port to use in the connection with the Opensearch server.
  • opensearch.resource: the name of the target index in the Opensearch database to write the data to.
  • opensearch.net.http.auth.user: the username to login into the Opensearch server.
  • opensearch.net.http.auth.pass: the password to login into the Opensearch server.
  • opensearch.write.operation: the type of operation (e.g. index, create, upsert, etc.) that you want to use when writing the data to the Opensearch server.

In the example below, we are selecting the Opensearch data source, and setting these mandatory options. Of course, the values of these options are not real, they are fictitious.

df = spark.range(1)
index = "my_index"
host = "https://my-hostname.com"
port = 1234
user = "my_username"
password = "my_secret_pass"

df.write.format("org.opensearch.spark.sql")\
    .option("opensearch.nodes", host)\
    .option("opensearch.port", port)\
    .option("opensearch.resource", index)\
    .option("opensearch.net.http.auth.user", user)\
    .option("opensearch.net.http.auth.pass", password)\
    .option("opensearch.write.operation", "index")\
    .mode("append")\
    .save()

Now, if you are using the Elasticsearch data source instead, then, these are the mandatory options that you have to set:

  • es.nodes: the host URL where your Elasticsearch server is hosted.
  • es.port: the port to use in the connection with the Elasticsearch server.
  • es.resource: the name of the target index in the Elasticsearch database to write the data to.
  • es.write.operation: the type of operation (e.g. index, create, upsert, etc.) that you want to use when writing the data to the Elasticsearch server.
  • es.net.http.header.Authorization: if you are using an API key to authenticate in the Elasticsearch server, you should use this option to set the authorization header that will be used in the HTTP request that is made to the Elasticsearch server.
  • es.net.http.auth.user: if you are using basic username + password to authenticate in the Elasticsearch server, you should use this option to set the username to login into the Elasticsearch server.
  • es.net.http.auth.pass: if you are using basic username + password to authenticate in the Elasticsearch server, you should use this option to set the password to login into the Elasticsearch server.

In the example below, I’m setting these mandatory options for the Elasticsearch data source:

df = spark.range(1)
index = "my_index"
host = "https://my-hostname.com"
port = 1234
api_key = "my_secret_api_key"

df.write.format("org.opensearch.spark.sql")\
    .option("es.nodes", host)\
    .option("es.port", port)\
    .option("es.resource", index)\
    .option("es.net.http.header.Authorization", f"ApiKey {api_key}")\
    .option("es.write.operation", "index")\
    .mode("append")\
    .save()

Spark write mode

Usually, you don’t care about which Spark write mode is being used when using the Elasticsearch/Opensearch data source. In other words, the Spark write mode is usually irrelevant for most case scenarios.

But, depending on what you want to do, you might have to change the Spark write mode to meet your specific goals. As it will become more clear further in this article, you want to use the Spark write mode "append" in most situations, i.e. it should be your “default”.

But, before we continue, let me explain the differences between the Spark write mode, and the type of write operation that you set through the es.write.operation and opensearch.write.operation options.

With the es.write.operation and opensearch.write.operation options, you specify which type of operation you want to use to insert the new data into the Opensearch database. That is, if the new data should be inserted into the Opensearch index using an upsert operation, or maybe, a create operation, or an index operation, etc.

On the other hand, by setting the Spark write mode, you are basically adding a new effect into the process. Usually, you want to use the Spark write mode "append" in most situations, because this write mode does not introduces any new effect into the mix. In other words, the write mode "append" does nothing, it “means nothing” to the Elasticsearch/Opensearch Spark data source.

But, if you set the Spark write mode to anything else, then, you start to introduce new operations in the mix. In the list below, you can see which effects each Spark write mode has on the process:

  • overwrite: a delete operation is performed in the Elasticsearch/Opensearch index before the Spark data is written into the Elasticsearch/Opensearch index.
  • error or errorifexists: if the Elasticsearch/Opensearch index is empty (i.e. document count is zero, or, the index doesn’t even exist), nothing happens, the Spark data is written into the Elasticsearch/Opensearch index as expected. Otherwise, an exception is raised.
  • ignore: if the Elasticsearch/Opensearch index is empty (i.e. document count is zero, or, the index doesn’t even exist), the Spark data is written into the Elasticsearch/Opensearch index as expected. Otherwise, the Spark data is completely ignored, that is, it is not written into the Elasticsearch/Opensearch index.

Extra options that you might want to use

Mapping the ID column

Elasticsearch and Opensearch are No-SQL databases that store a collection of JSON documents. Each JSON document that you add/insert into these databases is always associated with a particular ID. In other words, each document in the database has an ID associated with it.

This ID is crucial to identify each document in the database. In most cases, this ID is created automatically, at runtime, by the database itself. In more details, when you insert new documents into the database, the Elasticsearch/Opensearch server calculates automatically these ID’s for you. This tipically happens when you use an index write operation to insert these new documents into the database.

This index write operation in an Elasticsearch/Opensearch database works similarly to an INSERT INTO SQL statement in a traditional SQL database. It basically means that you are inserting new data into the database. And because this data is new, the database can generate a completely new and unique ID for this new data that is being inserted, without the help of anyone else.

However, this scenario changes completely when you are updating pre-existing data/documents in the database, or, when you are deleting some pre-existing data/documents. Because in this instance, you, the user, needs to provide the ID’s of the documents that you want to delete/update. In other words, the database needs to know which specific documents you want to update/delete, and you is responsible for specifying the ID’s of these documents.

For example, if you are using an upsert operation (which is basically a “update if exists, and insert otherwise” operation) to write your Spark data into the Elasticsearch/Opensearch database, then, your Spark DataFrame must have a column in it that contains the ID’s of each document, and you need to tell the Elasticsearch/Opensearch Spark data source which column in the DataFrame contains such ID’s through the opensearch.mapping.id and es.mapping.id options.

For example, if the ID’s of each document are in a column named id, then, I should set this option to the string "id". In the example below, we are using the Opensearch data source, but the same thing applies to the Elasticsearch data source, just changes the option to es.mapping.id:

df.write.format("org.opensearch.spark.sql")\
    .option("es.write.operation", "upsert")\
    .option("es.mapping.id", "id")\
    .option("es.nodes", host)\
    .option("es.port", port)\
    .option("es.resource", index)\
    .option("es.net.http.header.Authorization", f"ApiKey {api_key}")\
    .mode("append")\
    .save()

Avoiding unnecessary refresh index operations

Every time that you add new data into an Elasticsearch/Opensearch database, the database needs to perform a “refresh index operation” at some point. A “refresh index operation” makes recent operations performed on one or more indices on the database available for search.

In other words, when you add a new document to an index, or, when you change a pre-existing document in the index, these changes that you made in the index will usually become available to be “searched/accessed” only after a “refresh index operation” is performed in the index.

Therefore, the Elasticsearch/Opensearch database needs to perform this “refresh index operation” at some point, so that you can access/see the changes that you made in the index.

But here is the catch! Everytime you use the Elasticsearch/Opensearch Spark data source to write some data from Spark, the data source always perform a “refresh index operation” at the end of the write process, by making a HTTP request to the Refresh API of the Elasticsearch/Opensearch database at the end of the process. For most case scenarios, you should always avoid this “refresh index operation” at the end!

In other words, you should not let the Spark data source to dictate when the “refresh index operation” should be performed. Let the Elasticsearch/Opensearch database be responsible for that! When the time is appropriate, the Elasticsearch/Opensearch database itself will decide and automatically perform the “refresh index operation” to include your changes.

You can do that by setting the options es.batch.write.refresh and opensearch.batch.write.refresh to False.

df.write.format("org.opensearch.spark.sql")\
    .option("es.batch.write.refresh", False)\
    .option("es.nodes", host)\
    .option("es.port", port)\
    .option("es.resource", index)\
    .option("es.net.http.header.Authorization", f"ApiKey {api_key}")\
    .option("es.write.operation", "index")\
    .mode("append")\
    .save()

Define the chunk size

When you use the Elasticsearch/Opensearch Spark data sources, they split your Spark DataFrame in chunks, and send these chunks/batches to the Elasticsearch/Opensearch server, piece by piece. You can use the es.batch.size.entries and opensearch.batch.size.entries options to specify the size (number of entries) of these chunks that are created by the Spark data source.

For example, if your DataFrame contains 10000 rows, and you set this chunk size to 1000, then, the Spark data source will split your DataFrame into 10 chunks, and send each chunk sequentially to the Elasticsearch/Opensearch server.

By controlling the size of the chunk that is sent to the server, you can control how much data (or, how much “stuff”) the server needs to process in each request. This is useful if your Elasticsearch/Opensearch server have a rate limit, or, a if it does not have much powerful resources to process huge amounts of data in a single request.

df.write.format("org.opensearch.spark.sql")\
    .option("es.batch.size.entries", 1000)\
    .option("es.nodes", host)\
    .option("es.port", port)\
    .option("es.resource", index)\
    .option("es.net.http.header.Authorization", f"ApiKey {api_key}")\
    .option("es.write.operation", "index")\
    .mode("append")\
    .save()

Potential problems that you might face

Finding your Elasticsearch/Opensearch server

This is likely the first problem that you will face when trying to use the Elasticsearch/Opensearch Spark data source. This problem happens when your Elasticsearch/Opensearch server is not found by the Spark data source. In other words, this is a network problem. For example, you ask the Spark data source to write your data into the server located at the URL “https://my-host.com”, but, the Spark data source was unable to find/locate the server at this URL.

This specific problem can happen for multiple reasons. Maybe…

  • the URL that you provided at the es.nodes or opensearch.nodes option is wrong in some way.
  • the port number that you provided at the es.port or opensearch.port option is wrong in some way.
  • the Elasticsearch/Opensearch server might not be accessible through the internet, i.e. maybe this server is hosted on a private/closed network.
  • the Elasticsearch/Opensearch server has a firewall rule that is blocking your network access.
  • your Spark process might not have access to the internet for some reason.

Forbidden database operations

While using these Spark native data sources you might face some problems. The most likely error that you might encounter while using these data sources is a “403/Forbidden” error message, which basically means that, with the inputs that you provided, the Spark data source is asking the Elasticsearch/Opensearch server to perform an operation that you don’t have enough authorization to perform. However, this is kind of generic, i.e. this “forbidden” can mean a lot of things. That is, there are many different operations that you may or may not have enough authorization to perform in the database.

For example, maybe, the data that you have provided does not follow the schema of the JSON documents that are already present in the database index that you are using. If that is your case, then, the database will likely need to redefine/recreate the index in the database from scratch, so that it can alter the schema of that index, and, you might not have enough authorization to recreate such index, therefore, causing a “403/Forbidden” error.

This is just one example, but there are many other examples of operations that you might be silently causing/triggering through these Spark data sources, and that you do not have enough authorization to perform. Unfortunately, the error messages provided by these Spark data sources are really poor in details in some cases. So you will probably need to do a lot of testing, until you find the perfect combination of inputs and data that work for your case.