Skip to content

Leveraging Data on Other Clusters and Databases

Audience: Data Users

Content Summary: Immuta's Spark integration can help you leverage data in tables across different clusters and databases in your organization, without having to make permanent copies of the data. This page illustrates the process of running efficient cross-technology joins in Spark.

The code examples on this page are written in Scala using the immuta session variable in Spark 2.4. If you are using Spark 1.6, you can repeat these steps with the ImmutaContext variable, ic.

Prerequisites

  • An Immuta data source for each database table that you wish to join. For guidance on creating these data sources, please refer to this tutorial.
  • A working Immuta HDFS/Spark plugin installation on one of your clusters. This is also the cluster that your spark jobs will run on. For guidance on installing the Immuta plugin, please refer to the Hadoop Installation Guide.

Cross-cluster Joins

When joining data across clusters, the most efficient approach is to focus queries on narrower windows of data to eliminate overhead. Although Immuta is not permanently rewriting the data, it still must transport data across a network from a different cluster. For this reason, users are encouraged to avoid overly broad queries.

Suppose you wish to run the query below, where sales refers to an Immuta data source on Cluster A and customer refers to an Immuta data source denoted by Database B. Also assume that the Immuta Spark plugin has been successfully installed on Cluster A.

To eliminate overhead, you join data and calculate sales totals for customers within their first month of registration. The following query calculates first-month sales for customers who registered in April 2018:

SELECT
   s.customer_id, c.id, c.registration_date,
   sum(s.sale_price) total_sales
FROM
    sales s, customer c
WHERE
    s.customer_id = c.id
    and s.sale_date < 20180501
    and s.sale_date > 20180331
    and c.registration_date < 20180501
    and c.registration_date > 20180331
GROUP BY
    s.customer_id, c.id, c.registration_date
ORDER BY
    c.id

Step 1: Load Tables into Spark DataFrames

To maximize the efficiency of the cross-cluster join query, the first step is to load a partitioned portion of the data into a Spark DataFrame. This will reduce the overhead of the join query, and allow Immuta to calculate an ideal query plan.

First, load the desired sales data from the local Cluster A into a DataFrame named salesDF by passing the desired query to immuta.sql():

val salesQuery = """SELECT
customer_id, sale_price, sale_date, region_id
FROM sales
WHERE sale_date < 20180501 and sale_date > 20180331"""

val salesDF = immuta.sql(salesQuery)

Then, load customer data from remote Database B into a DataFrame named customerDF. The syntax to set up the remote DataFrame is a little bit different since the user needs to pass in the partitioning configuration. Note that the user defines partitions on the region_id column, which is an integer between 1000 and 2000.

Note: When choosing a partition column, it is important to find a column with a generally even distribution across a known range of values. If you are expecting a large volume of data to be returned from the remote cluster, you can increase the number of partitions to break up the transfers into smaller payloads.

val customerQuery = """(SELECT
id, region_id, registration_date
FROM customer WHERE registration_date < 20180501 and registration_date > 20180331)
as customer_tmp"""

val customerReader = immuta.read.format("immuta")
.option("dbtable", customerQuery)
.option("partitionColumn", "region_id")
.option("lowerBound", "1000")
.option("upperBound", "2000")
.option("numPartitions", "3")
val customerDF = customerReader.load()

If you do not partition your query and the remote data is larger than a single executor can handle (which is very typical for most workloads), the full local-cluster portion of the query will run. Then, one-by-one each Spark executor will attempt to execute the remote query and fail due to memory limitations. Thus, the time to failure of a non-partitioned query is extremely long. For more information, please contact your Immuta Support Professional.

Step 2: Register Temporary Views of Filtered Data

Now that you have defined the filtered and partitioned DataFrames, register them as temporary views that will be used in the join query:

salesDF.createOrReplaceTempView("sales_tmp")
customerDF.createOrReplaceTempView("customer_tmp")

Immuta recognizes these temporary views as queryable tables for the current session. Below is an example of viewing the queryable Immuta tables in the Spark CLI:

scala> immuta.sql("show tables").show()                                                                                                                                            +--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
|  immuta|       customer|      false|
|  immuta|          sales|      false|
|        |   customer_tmp|       true|
|        |      sales_tmp|       true|
+--------+---------------+-----------+

Step 3: Run the Join Query

Finally, leverage the newly-created temporary views to run the cross-cluster join query:

val joinQuery="""SELECT
s.customer_id, c.registration_date,
sum(s.sale_price) total_sales
FROM sales_tmp s, customer_tmp c
WHERE s.customer_id = c.id
GROUP BY s.customer_id, c.id, c.registration_date
ORDER BY c.id"""

val joinDF = immuta.sql(joinQuery)

The following is a possible output in the Spark CLI:

scala> joinDF.show()
+-----------+-----------------+-----------+
|customer_id|registration_date|total_sales|
+-----------+-----------------+-----------+
|   00000001|         20180427|    1005.40|
|   00000002|         20180411|      80.82|
|   00000003|         20180412|       9.00|
|   00000004|         20180409|     768.09|
|   00000005|         20180421|     534.20|
|   00000006|         20180429|    3218.28|
|   00000007|         20180403|    1076.20|
|   00000008|         20180422|     632.45|
|   00000009|         20180417|      76.50|
|   00000010|         20180428|     598.12|
|   00000011|         20180425|       9.99|
|   00000012|         20180405|      54.90|
|   00000013|         20180410|    2602.97|
|   00000014|         20180416|      16.02|
|   00000015|         20180413|     576.90|
|   00000016|         20180419|      12.39|
|   00000017|         20180401|    2280.92|
|   00000018|         20180418|     209.71|
|   00000019|         20180414|    1140.46|
|   00000020|         20180416|     342.89|
+-----------+-----------------+-----------+
only showing top 20 rows