Migrating a multi-terabyte OpenSearch cluster
elasticsearch
opensearch
spark

Migrating a multi-terabyte OpenSearch cluster

Miłosz Piechocki
Miłosz Piechocki

At monday.com, we often deal with challenges related to the massive amount of data we store and process. One example is the migration of the Activity Log—a component that allows users to browse and search for activities happening on their boards. Activity Log sits on top of a huge OpenSearch cluster – over 40TiB for our busiest cloud region!

Recently, we decided to migrate the Activity Log cluster to a new one with an improved data partitioning setup. Once we decided on how the new setup should look, it turned out that moving multiple terabytes of data between two OpenSearch clusters is not a trivial operation. Let’s dive into how we approached it and what lessons we learned.

The challenge

The biggest problem with moving that much data between OpenSearch clusters is the amount of time required. Based on our previous experiences with migrating a much smaller 1 TiB cluster, it could easily take multiple weeks when done in a naive way. We didn’t want it to take so much time.

Furthermore, such migration involves reading all the data from the source cluster, which increases the load on the cluster. At the same time, this cluster would also continue to serve the production traffic. We wanted to minimize the impact of the migration on the cluster’s ability to cope with the traffic. Therefore, we wanted to have control over when the migration is running (preferably in off-peak time) and at what speed. 

It’s worth noting that our cluster is append-only, which simplifies the migration process. Because of that, we didn’t have to worry about updates occurring on the source cluster during the migration.

Finally, we didn’t just want to copy the data as-is. Instead, we wanted to perform some basic transformations on the data (e.g., removing some fields that are no longer used), as well as change shard and index routing for each document.

Alternatives considered

The easiest way to migrate OpenSearch data is with the built-in reindex API. We used this approach before on a much smaller scale. It’s quite versatile and would allow us to perform the necessary data transformations (with the script parameter) as well as control the load on the cluster (requests_per_second parameter). However, the migration speed was unsatisfactory – as I’ve already mentioned, migrating a 1TiB cluster took us almost a week. At that pace, migrating 40 TiB would take more than half a year.

Another approach we evaluated was with the elasticdump tool. It’s a very flexible program that not only supports copying data between clusters, but also lets you export the data to a file in one of many supported formats. Unfortunately, this time, the migration speed was also not good enough (over a month for migrating all the data). The migration could be sped up by parallelizing it with multielasticdump. However, it didn’t fit our use case because it operated on an index basis, while our cluster had only a single index.

Given that we use managed OpenSearch in AWS, we also had a look at the Migration Assistant for Amazon OpenSearch Service. It looked very promising with its ability to replay traffic, which would also help us with the traffic-shadowing aspect of the migration (which I’m not covering in this post; we used traffic shadowing for validating that the new OpenSearch can handle production traffic). However, after a brief look, we realized that it doesn’t support performing transformations on the migrated data, which was a key requirement for us.

The solution: Spark and opensearch-hadoop

While discussing the issue with my colleagues, one of them suggested a completely different approach. Their idea was to use Apache Spark for performing the migration. What’s more, they also came up with the idea of doing the migration in two phases: first, dumping the data from the source cluster to files, and next, indexing the data from the files.

Spark is an engine for processing massive amounts of data. The key idea of Spark is parallelization – it can split the work into multiple worker nodes and execute it in parallel. Thanks to the opensearch-hadoop package, Spark can read and write data from and to OpenSearch in parallel. What’s more, it does so in a smart way by adjusting the number of workers to the number of shards in the cluster and having them read from the shards directly. Importantly, it doesn’t result in a much higher load on the cluster compared to regularly reading the data, as a regular cluster-wide query could be fanned out to all shards anyway (unless you use shard routing).

In this approach, we would create two Spark jobs. The export job would read the data from the source cluster and save it as JSON Lines files to S3. It would process the data in batches (in our case, it made sense to split the data by time and process it in a month-long batch). Batches would be executed sequentially, and each batch would be parallelized into multiple workers. The second job would read the data from S3 and index it to the target cluster. 

The indexing job would read the files representing each batch, and for each one, it would index the data it contains. It would also save a tiny marker file next to the data files on S3, indicating the status of indexing. This simple implementation allowed us to implement progress tracking and retries in a simple way.

Decoupling reading from writing has the advantage of minimizing the time window in which we read from the source cluster; simultaneous reads and writes would obviously take longer than isolated reads. This allows us to only run the reading job in the off-peak windows to keep the cluster load in control.

opensearch-hadoop caveats

Opensearch Hadoop turned out not to be documented very well, and we also struggled to find many usage examples on the internet.

One of the key realizations was to use the RDD (Resilient Distributed Dataset) API instead of the more popular DF (DataFrame) API. Initially, we tried to use the DataFrames, as instructed by some tutorials. However, it turned out to be problematic. When using a DF, Spark tries to determine the schema of the data it reads. It does it by sampling the first few documents. In our case, this approach didn’t work well, given that the schema (index mapping) for Activity Logs is very complex and they can take many different forms. Because of that, the job would soon fail with an error saying that one of the documents doesn’t fit the determined schema.

Fortunately, Spark also provides the older RDD interface, which is perfect for use cases where data is pretty much schemaless. In this approach we can just parse each document as JSON on demand and access any field we need. In order for this to work, it’s crucial to set the opensearch.output.json config option to true. The code snippet below shows a very simplified version of our export job.

rdd = spark.sparkContext.newAPIHadoopRDD(
        inputFormatClass="org.opensearch.hadoop.mr.OpenSearchInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.apache.hadoop.io.Text",
        conf={
            "opensearch.nodes": "{{cluster URL}}",
            "opensearch.resource": "{{index name}}",
            "opensearch.query": "{{query}}",
            "opensearch.output.json": "true"  # crucial for raw JSON
            # ... other configuration options
        }
    )

rdd.map(lambda x: x[1]).saveAsTextFile(month_output_path)

Another important aspect of the implementation was controlling the speed of the migration. It took us some time to find relevant opensearch-hadoop settings. We ended up using the opensearch.scroll.size for the export job – changing it had a significant impact on the migration speed as well as the source cluster load. Interestingly, we found this setting in the documentation for Elasticsearch for Apache Hadoop and manually changed the prefix from elasticsearch to opensearch as a wild guess – and it turned out to work!

The results

Overall, the speed of this approach exceeded our expectations. It turned out that Opensearch Hadoop for Spark does a great job at parallelizing the data transfer. Transferring 40TiB of data took under 2 weeks, which is a 20x improvement over the estimated time based on the reindex API.

What we observed is that the cluster setup influenced the migration speed greatly. We also had two much smaller clusters to migrate. However, their migration times didn’t scale down linearly, because these clusters had many fewer nodes and shards. It shows that the parallelization aspect of Spark is particularly important for the migration speed.

It’s worth noting that we didn’t have to provision many resources for the Spark cluster itself. Since the migration operation is mostly IO-bound, it didn’t require any heavy computations. We ended up using an EMR cluster with just four m7g.xlarge instances (which totaled barely $215 for 2 weeks), and it was still overprovisioned.

Closing

Given how successful this approach was, we’re now considering it for other OpenSearch migration and backfilling use cases in the company. There are, of course, drawbacks to this method – it requires some expertise in Spark in the team (which we learned the hard way, by trial and error and making many mistakes). Furthermore, this solution introduces a new piece of infrastructure that we now have to maintain and monitor. Finally, orchestrating Spark jobs is quite cumbersome and requires us to write some code just for this task. However, we believe it was totally worth it, given that we wouldn’t be able to achieve such migration speeds with a different approach.

Miłosz Piechocki
Software Engineering Tech Lead @ monday.com