Audience: Data Users
Content Summary: Immuta's Spark integration can help you leverage data in tables across different clusters and databases in your organization, without having to make permanent copies of the data. This page illustrates the process of running efficient cross-technology joins in Spark.
The code examples on this page are written in Scala using the
immuta
session variable in Spark 2.4. If you are using Spark 1.6, you can repeat these steps with the ImmutaContext variable,ic
.
An Immuta data source for each database table that you wish to join. For guidance on creating these data sources, please refer to this tutorial.
A working Immuta HDFS/Spark plugin installation on one of your clusters. This is also the cluster that your spark jobs will run on. For guidance on installing the Immuta plugin, please refer to the Hadoop Installation Guide.
When joining data across clusters, the most efficient approach is to focus queries on narrower windows of data to eliminate overhead. Although Immuta is not permanently rewriting the data, it still must transport data across a network from a different cluster. For this reason, users are encouraged to avoid overly broad queries.
Suppose you wish to run the query below, where sales
refers to an Immuta data source on Cluster A and customer
refers to an Immuta data source denoted by Database B. Also assume that the Immuta Spark plugin has been successfully installed on Cluster A.
To eliminate overhead, you join data and calculate sales totals for customers within their first month of registration. The following query calculates first-month sales for customers who registered in April 2018:
To maximize the efficiency of the cross-cluster join query, the first step is to load a partitioned portion of the data into a Spark DataFrame. This will reduce the overhead of the join query, and allow Immuta to calculate an ideal query plan.
First, load the desired sales
data from the local Cluster A into a DataFrame named salesDF
by passing the desired query to immuta.sql()
:
Then, load customer
data from remote Database B into a DataFrame named customerDF
. The syntax to set up the remote DataFrame is a little bit different since the user needs to pass in the partitioning configuration. Note that the user defines partitions on the region_id
column, which is an integer
between 1000
and 2000
.
Note: When choosing a partition column, it is important to find a column with a generally even distribution across a known range of values. If you are expecting a large volume of data to be returned from the remote cluster, you can increase the number of partitions to break up the transfers into smaller payloads.
If you do not partition your query and the remote data is larger than a single executor can handle (which is very typical for most workloads), the full local-cluster portion of the query will run. Then, one-by-one each Spark executor will attempt to execute the remote query and fail due to memory limitations. Thus, the time to failure of a non-partitioned query is extremely long. For more information, please contact your Immuta Support Professional.
Now that you have defined the filtered and partitioned DataFrames, register them as temporary views that will be used in the join query:
Immuta recognizes these temporary views as queryable tables for the current session. Below is an example of viewing the queryable Immuta tables in the Spark CLI:
Finally, leverage the newly-created temporary views to run the cross-cluster join query:
The following is a possible output in the Spark CLI: