Skip to content

Using the ImmutaContext (Spark 1.6)

Audience: Data Users

Content Summary: This page outlines how to initialize and use the ImmutaContext with spark-submit, spark-shell, and pyspark. This page also demonstrates how to use other Spark data sources and provides a Spark Submit script.

ImmutaContext Background: For Spark 1.6, the ImmutaContext must be used in order to access Immuta data sources. Once the Immuta Spark Installation has been completed on your Spark cluster, then you can use the ImmutaContext class. For data storage technologies that support batch processing workloads, the ImmutaContext class allows users to query data sources the same way that they query Hive tables with the HiveContext.

When querying metastore-backed data sources, such as Hive and Impala, the ImmutaContext will access the data directly in HDFS. Other data source types will pass through the Query Engine. In order to take advantage of the performance gains provided by directly acting on the files in HDFS in your Spark Jobs, you must create Immuta data sources for metastore-backed data sources with tables that are persisted in HDFS.

For guidance on querying data sources across multiple clusters and/or remote databases, see Leveraging Data on Other Clusters and Databases.

Initialize ImmutaContext

Python:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from ImmutaContext import ImmutaContext

sc = SparkContext()
sql_context = HiveContext(sc)
ic = ImmutaContext(sql_context)

Scala:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val ic = new org.apache.spark.sql.immuta.ImmutaContext(sqlContext)

Using the ImmutaContext

The ImmutaContext may be used the same way that you would use the SQLContext to create Data Frames.

from pyspark import SparkContext
from pyspark.sql import HiveContext
from ImmutaContext import ImmutaContext

sc = SparkContext()
sql_context = HiveContext(sc)
ic = ImmutaContext(sql_context)

# List tables
ic.tables().show()

# Query table using Spark SQL
ic.sql('SELECT * FROM data_source LIMIT 10').show()

With spark-submit

When using spark-submit, the ImmutaContext must be enabled in your Spark configuration. Typically this may be done in your spark-defaults.conf, but it may also be passed in as command line options.

spark-submit \
    --conf "spark.broadcast.factory=org.apache.spark.broadcast.ImmutaSerializableBroadcastFactory" \
    --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.hadoop.fs.hdfs.impl=com.immuta.hadoop.ImmutaSparkTokenFileSystem"
    <job>

NOTE: If you are using Python, you will want to be sure that /opt/cloudera/parcels/IMMUTA/python is added to your PYTHONPATH. This may be set in your spark-env.sh.

With spark-shell

When using spark-shell, the ImmutaContext must be enabled in your Spark configuration. Typically this may be done in your spark-defaults.conf, but it may also be passed in as command line options.

spark-shell \
    --conf "spark.broadcast.factory=org.apache.spark.broadcast.ImmutaSerializableBroadcastFactory" \
    --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.hadoop.fs.hdfs.impl=com.immuta.hadoop.ImmutaSparkTokenFileSystem"

With pyspark

When using pyspark, the ImmutaContext must be enabled in your Spark configuration. Typically this may be done in your spark-defaults.conf, but it may also be passed in as command line options.

pyspark \
    --conf "spark.broadcast.factory=org.apache.spark.broadcast.ImmutaSerializableBroadcastFactory" \
    --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.hadoop.fs.hdfs.impl=com.immuta.hadoop.ImmutaSparkTokenFileSystem"

NOTE: You will want to be sure that /opt/cloudera/parcels/IMMUTA/python is added to your PYTHONPATH. This may be set in your spark-env.sh.

Passing Extra Java Options

Although spark-submit should work as usual, if you intend to pass in spark.driver.extraJavaOptions you will override the Immuta options. In order for the ImmutaContext to function properly, you must include the following Immuta configuration as well:

-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json

For example, if you were passing in -Duser.timezone you should pass that in along with the required Immuta options:

spark-submit --conf spark.driver.extraJavaOptions="-Duser.timezone=PST
    -Djava.security.manager=com.immuta.security.ImmutaSecurityManager
    -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    <job>

Note that in some instances, the value for immuta.security.manager.classes.config will be different. For instance, when using a high-availability NameNode you may need to prefix the file path with hdfs://nameservice1/. Check with your Immuta Administrator to determine what value you should use.

Using Other Spark Data Sources

Users are able to connect to other Spark data sources by specifying the Spark data source type and configuration options while creating a DataFrame.

The example below uses the hbase-spark connector to create a DataFrame using data directly from an HBase server.

from pyspark import SparkContext
from pyspark.sql import HiveContext
from ImmutaContext import ImmutaContext

sc = SparkContext()
sql_context = HiveContext(sc)
ic = ImmutaContext(sql_context)

hbdf = (ic
    .read
    .format('org.apache.hadoop.hbase.spark')
    .option('hbase.table', 'demo_hbase_table')
    .option('hbase.columns.mapping','id STRING :key, value STRING cf1:val')
    .option('hbase.use.hbase.context', False)
    .option('hbase.config.resources', '/etc/hbase/conf/hbase-site.xml')
    .load())
hbdf.show()

Spark-submit Script

#!/bin/bash

SPARK_CLASSPATH=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar \
PYTHONPATH=/opt/cloudera/parcels/IMMUTA/python/context \
spark-submit \
    --conf "spark.broadcast.factory=org.apache.spark.broadcast.ImmutaSerializableBroadcastFactory" \
    --conf "spark.executor.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraClassPath=/opt/cloudera/parcels/IMMUTA/lib/immuta-hadoop-filesystem.jar:/opt/cloudera/parcels/IMMUTA/lib/immuta-spark-context.jar" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.manager=com.immuta.security.ImmutaSecurityManager -Dimmuta.security.manager.classes.config=/user/immuta/allowedCallingClasses.json" \
    --conf "spark.hadoop.fs.hdfs.impl=com.immuta.hadoop.ImmutaSparkTokenFileSystem" \
    $*