Audience: Data Owners and Data Users
Content Summary: Users can access subscribed data sources within their Spark jobs by using SparkSQL with the
ImmutaSession
class (Spark 2.4). Immuta enforces SparkSQL controls on data platforms that support batch processing workloads. Through this process, all tables are virtual and empty until a query is materialized.When a query is materialized, standard Spark libraries access data from metastore-backed data sources (like Hive and Impala) to retrieve the data from the underlying files stored in HDFS. Other data source types access data using the Query Engine, which proxies the query to the native database technology and automatically enforces policies for each data source.
Security of data sources is enforced both server-side and client-side. Server-side security is provided by an external partitioning service and client-side security is provided by a Java SecurityManager to moderate access to sensitive information.
Audience: Data Users
Content Summary: This page outlines how to use the Immuta SparkSession with spark-submit, spark-shell, and pyspark.
Immuta SparkSession Background: For Spark 2, the Immuta SparkSession must be used in order to access Immuta data sources. Once the Immuta Spark Installation has been completed on your Spark cluster, then you are able to use the special Immuta Spark interfaces that are detailed below. For data platforms that support batch processing workloads, the Immuta SparkSession allows users to query data sources the same way that they query Hive tables with Spark SQL.
When querying metastore-backed data sources, such as Hive and Impala, the Immuta Session accesses the data directly in HDFS. Other data source types will pass through the Query Engine. In order to take advantage of the performance gains provided by directly acting on the files in HDFS in your Spark jobs, you must create Immuta data sources for metastore-backed data sources with tables that are persisted in HDFS.
For guidance on querying data sources across multiple clusters and/or remote databases, see Leveraging Data on Other Clusters and Databases.
Launch the special immuta-spark-submit
interface, and submit jobs just like you would with spark-submit
:
First, launch the special immuta-spark-shell
interface:
Then, Use the immuta
variable just like you would spark
:
Next, use the immuta
format to specify partition information:
The immuta
format also supports query pushdown:
Finally, specify the fetch size:
First, launch the special immuta-pyspark
interface:
Then, use the immuta
variable just like you would spark
:
Finally, use the immuta
format to specify partition information:
The immuta
format also supports query pushdown:
Audience: Data Owners and Data Users
Content Summary: This page details the components of Immuta's Spark ecosystem and policy enforcement.
In Immuta's Spark plugins, policies are enforced at query time much like the Immuta Query Engine.
Outside of Databricks, Immuta's Spark ecosystem is composed of
Immuta SparkSession
Vulcan Service
Immuta SecurityManager
Immuta NameNode Plugin (optional, HDFS)
All of these components work in conjunction to apply and enforce Immuta policies on data sources queried through Spark.
In Databricks, Immuta's Spark policy enforcement is driven by Spark plugins that operate on a normal SparkSession (i.e., no ImmutaSparkSession
class or object).
The Immuta SparkSession is the client-side plugin in the Immuta Spark ecosystem. This plugin is an extension of the open-source SparkSession, but Immuta's SparkSession and the open-source SparkSession have two differences:
Immuta's external and session catalogs
Immuta's logical replanning
The replanning in ImmutaSparkSession
occurs in the QueryExecution
class. Immuta has an internal version of that class that replaces the different stages of the plan (logical
, analyzed
, optimized
, sparkPlan
, and executedPlan
) with policy-enforced versions, and the QueryExecution
object and resulting SparkPlan
(physical plan) trigger audit calls. Additionally, Immuta's implementation of QueryExecution
provides a layer of security within the JVM itself to make sure that any sensitive information needed by physical plans is used or stored so that it can be protected by the SecurityManager.
Several other Spark internals are implemented in Immuta to organize code in a way that the SecurityManager can prevent access to fields or methods that expose sensitive information.
Non-Databricks Deployments
In non-Databricks deployments, users will have to use a different object in their code (an instance of ImmutaSparkSession) than the normal SparkSession object to run Immuta Spark jobs. Creating this object is simple, only requiring a 1-2 line change in most existing scripts.
In Databricks deployments, Immuta's plugins operate in a more transparent manner than outside of Databricks. Immuta leverages SparkSessionExtensions
in Databricks to update the different planning phases in Spark and add Immuta's policies to the target SparkSession objects. This means that in Databricks users do not have to use a different object to interact with Immuta data sources; they simply connect to an Immuta-enabled cluster and do their work as usual.
Immuta updates the Analyzer, Hive Client, and physical planning strategy to ensure that policies are enforced on any user-generated plans and that the user's view of available data sources represents only what they are allowed to see in Immuta.
Since Immuta runs transparently on the cluster and does not require a separate SparkSession object, any administrative users who need to run jobs against raw data on cluster must set the proper Spark configuration parameters to bypass Immuta's plan resolution plugins. See the configuration documentation for the Databricks installation for more information.
ODBC/JDBC Queries
In Databricks, Spark is the execution layer for any ODBC/JDBC connections to the cluster. This means that when Immuta's plugins are installed, ODBC/JDBC queries submitted to the cluster go through Immuta's plugins during execution. This provides a great deal of functionality for users who wish to connect BI tools directly to the cluster and still have their view of Immuta's data. However, when exposing data sources in Immuta from an Immuta-enabled Databricks cluster, the API token provided to Immuta for exposing the Databricks data source must belong to either an administrative user in Databricks or a privileged user specified in the Immuta configuration on the Databricks cluster.
See Immuta's Databricks installation configuration for more details.
To make the Immuta Spark ecosystem as user-friendly as possible, Immuta's Spark implementation resolves relations by reaching out to the Immuta Web Service instead of resolving relations in the Hive Metastore directly. All queryable Immuta data sources are available to Immuta's Spark plugins.
Cluster-native data sources (Hive, Impala, or Databricks) will be queried by accessing files directly from storage that compose the Metastore table, which is the same type of query execution that occurs in open source Spark when accessing a table in the Hive Metastore.
Any non-cluster queryable data source in Immuta will be queried from the user's Spark application via JDBC through the Immuta Query Engine. Users can provide query partition information similar to what is available via the JDBC data source in Spark to distribute their query to the Query Engine.
In JDBC data sources, policies are enforced at the Query Engine layer. In cluster data sources, policies are enforced through the following steps:
Plan modification during analysis to include policies using functions/expressions for masking and filters for row-level policies.
Restrictions to field/method access through the Immuta SecurityManager.
In Databricks
Restrictions to storage configuration access via the Immuta SecurityManager. User code cannot access credentials for S3, ADL gen 2, etc. directly, and those configurations are only loadable by the ImmutaSecureFileSystemWrapper
class.
Restrictions to the use of AWS instance roles via the Immuta SecurityManager.
Outside Databricks
Partition and file access token generation in the Vulcan Service.
Token validation and filesystem access enforcement in the Immuta NameNode plugin (HDFS).
Token validation and remote object store proxying/enforcement in the Vulcan Service (S3/ADL/etc).
When a user attempts to query any Hive or Impala data source through the Immuta SparkSession, the Immuta catalogs first replace the relation in the user's plan with the proper plan that the data source represents. For example, if the user attempts the query (immuta
is an instance of ImmutaSparkSession)
and the customer_purchases
data source is composed of this query
and, in Immuta, these columns were selected to expose in this data source
id
first_name
last_name
age
country
ssn
product_id
department
purchase_date
the resulting Spark logical plan would look like this:
After the data source is resolved, the policies specific to the user will be applied to the logical plan. If the policy has masking or filters (row-level, minimization, time filter, etc.), those filters will be applied to all corresponding underlying tables in the plan. For example, consider the following Immuta policies:
Mask using hashing the column ssn for everyone.
Only show rows where user is a member of group that matches the value in the column department for everyone.
The plan would be modified (assume the current user is in the "Toys" and "Home Goods" groups):
In this example, the masked columns (such as ssn
) are aliased to their original name after masking is applied. This means that transformations, filters, or functions applied to those columns will be applied to the masked columns. Additionally, filters on the plan are applied before any user transformations or filters, so a user's query cannot modify or subvert the policies applied to the plan.
Immuta does not attempt to change or block optimizations to the Spark Plan via the Catalyst Optimizer.
Spark policies are applied at the lowest possible level in the Spark plan for security reasons, which may lead to different results when applying policies to a Spark plan rather than a Query Engine plan. For instance, in the Query Engine a user may be able to compute a column and then generate a masking policy on that computed column. For security reasons, this is not possible in Spark, so the query may be blocked outright.
Immuta has an implementation of the Java SecurityManager construct, which is required when running Spark jobs with the Immuta SparkSession. When a user's Immuta Spark job starts, it communicates with the Immuta Vulcan Service to get an access token, which can be exchanged for partition information during job planning and execution.
The Vulcan Service checks whether the user's job is running with the SecurityManager enabled; if so, it is allowed to retrieve partitions and access tokens during job execution to temporarily access the underlying data for the table. This data is stored in HDFS or a cloud object store (such as S3 or ADL). During job execution, the SecurityManager restricts when file access tokens can be used and which classes can use them. These restrictions prevent users from attempting to access data outside an approved Immuta Spark plan with policies applied.
The SecurityManager also prevents users from making changes to Spark plans that the Immuta SparkSession has generated. This means that once policies have been applied, users cannot attempt to modify the plan and remove policies that are being enforced via the plan modifications.
The Vulcan Service serves administrative functions in the Spark ecosystem and is only deployed outside of Databricks. The Service has these major responsibilities in Immuta's Spark ecosystem:
Compute partition information for Immuta Spark Jobs
Service administrative requests for Immuta Hadoop Native Workspaces
Act as a proxy to remote storage (S3, Google Storage, etc.) for Immuta Spark jobs
Immuta users do not have access to the underlying data files (like Parquet or ORC files) for the Hive Metastore tables that make up Immuta data sources on-cluster. For this reason, the user's Spark application cannot generate partition information directly because it cannot read file metadata from HDFS or remote storage.
Consequently, the user's Spark job must request partition information from the Vulcan Service, which must be configured in such a way that it can access all raw data that may be the target of Immuta data sources. This configuration should include
Running the service as a kerberos principal that is specified in HDFS NameNode configuration as the Immuta Vulcan user. If this configuration is incorrect, the service will fail to start, as the service will not have access to the locations in HDFS that it requires. This access is granted dynamically by the Immuta NameNode plugin.
Running the service with S3/Google Storage credentials that have access to the underlying data in remote storage. This configuration should be written in a way that users cannot access the configuration files, but the Vulcan Service user can. Typically this is done by configuring sensitive information in generator.xml
on the CLASSPATH for Vulcan and only giving the OS user running the Vulcan service access to that file.
The Vulcan Service serves all native workspace management requests on Hadoop Clusters. These requests include
Workspace creation
Workspace deletion
Derived data source creation from a directory
Determining if directory contains supported files (ORC/Parquet)
The Vulcan Service must have access to create Metastore databases to create Immuta native workspace databases and have access in storage (HDFS is handled via the NameNode plugin) to create directories in the configured workspace locations.
The Vulcan Service acts as a proxy to remote storage when Immuta Spark jobs read data from Metastore-based data sources. As mentioned above, the Vulcan Service must have access to credentials for reading data from remote storage to fulfill requests from Immuta Spark jobs to read that data. The Vulcan Service acts as a proxy with very minimal overhead when reading from remote storage.
The user must present Vulcan with a temporary access token for any target files being read. These temporary tokens are generated by Vulcan during partition generation and protected by the SecurityManager so that users cannot access them directly. The token presented to Vulcan grants access to the raw data via Vulcan's storage proxy endpoints. Vulcan opens a stream to the target object in storage and passes that stream's content back to the client until they are finished reading.
Note: The client will read all bytes needed from Vulcan, but Vulcan may read more data from storage than the client needed into its buffers. This may produce warning messages in the Vulcan logs but those are expected, as Vulcan cannot predict the number of bytes needed by the client.
The way Immuta is deployed allows a cluster to service both Immuta and non-Immuta workloads. Although it is recommended that those workloads are segregated, in many cases that is not feasible. However, because of the way Immuta jobs are executed (outside of Databricks), it is clear when a user is attempting to use Immuta and when they are not because of the immuta-
prefixed scripts that are analogous to the out-of-the-box Spark scripts for starting different spark toolsets. (For example, immuta-pyspark
instead of pyspark
and immuta-spark-submit
instead of spark-submit
.)
These scripts are required because Immuta packages a full deployment of Spark's binaries to override the target Spark classes needed by Immuta's plugins to operate securely. The immuta-
prefixed scripts set up environment variables needed by Immuta to execute properly and set other required configuration items that are not the default global values for Spark.
However, even when a non-Immuta user is executing a non-Immuta Spark job, it is possible that the Immuta NameNode plugin is still in the execution path for that job. Please see our configuration documentation for the Immuta NameNode plugin to minimize the overhead or impact on non-Immuta users in a Hadoop cluster (such as setting up ignored paths in HDFS or dynamically determining non-Immuta users or paths).
Note: This does not apply to Databricks. Once a Databricks cluster is Immuta-enabled/configured, Immuta is in the execution path for all jobs, regardless of whether the executing user is an Immuta user.
Audience: Data Users
Content Summary: Immuta's Spark integration can help you leverage data in tables across different clusters and databases in your organization, without having to make permanent copies of the data. This page illustrates the process of running efficient cross-technology joins in Spark.
The code examples on this page are written in Scala using the
immuta
session variable in Spark 2.4. If you are using Spark 1.6, you can repeat these steps with the ImmutaContext variable,ic
.
An Immuta data source for each database table that you wish to join. For guidance on creating these data sources, please refer to .
A working Immuta HDFS/Spark plugin installation on one of your clusters. This is also the cluster that your spark jobs will run on. For guidance on installing the Immuta plugin, please refer to the .
When joining data across clusters, the most efficient approach is to focus queries on narrower windows of data to eliminate overhead. Although Immuta is not permanently rewriting the data, it still must transport data across a network from a different cluster. For this reason, users are encouraged to avoid overly broad queries.
Suppose you wish to run the query below, where sales
refers to an Immuta data source on Cluster A and customer
refers to an Immuta data source denoted by Database B. Also assume that the Immuta Spark plugin has been successfully installed on Cluster A.
To eliminate overhead, you join data and calculate sales totals for customers within their first month of registration. The following query calculates first-month sales for customers who registered in April 2018:
To maximize the efficiency of the cross-cluster join query, the first step is to load a partitioned portion of the data into a Spark DataFrame. This will reduce the overhead of the join query, and allow Immuta to calculate an ideal query plan.
First, load the desired sales
data from the local Cluster A into a DataFrame named salesDF
by passing the desired query to immuta.sql()
:
Then, load customer
data from remote Database B into a DataFrame named customerDF
. The syntax to set up the remote DataFrame is a little bit different since the user needs to pass in the partitioning configuration. Note that the user defines partitions on the region_id
column, which is an integer
between 1000
and 2000
.
Note: When choosing a partition column, it is important to find a column with a generally even distribution across a known range of values. If you are expecting a large volume of data to be returned from the remote cluster, you can increase the number of partitions to break up the transfers into smaller payloads.
If you do not partition your query and the remote data is larger than a single executor can handle (which is very typical for most workloads), the full local-cluster portion of the query will run. Then, one-by-one each Spark executor will attempt to execute the remote query and fail due to memory limitations. Thus, the time to failure of a non-partitioned query is extremely long. For more information, please contact your Immuta Support Professional.
Now that you have defined the filtered and partitioned DataFrames, register them as temporary views that will be used in the join query:
Immuta recognizes these temporary views as queryable tables for the current session. Below is an example of viewing the queryable Immuta tables in the Spark CLI:
Finally, leverage the newly-created temporary views to run the cross-cluster join query:
The following is a possible output in the Spark CLI: