Skip to content

Auditing JDBC and Notebook Cell Queries in Databricks

Audience: System Administrators and Data Owners

Content Summary: In addition to the executed Spark plan, the tables, and the tables' underlying paths for every audited Spark job, Immuta captures the code or query that triggers the Spark plan. This page outlines this process and provides examples of the captured query and resulting audit record.

Audit Limitations

Immuta will audit queries that come from interactive notebooks, notebook jobs, and JDBC connections, but will not audit Scala or R submit jobs. Futhermore, Immuta only audits Spark jobs that are associated with Immuta tables. Consequently, Immuta will not audit a query in a notebook cell that does not trigger a Spark job, unless immuta.spark.audit.all.queries is set to true; for more details about this configuration and auditing all queries in Databricks, see Limited Enforcement in Databricks.

Introduction

Capturing the code or query that triggers the Spark plan makes audit records more useful in assessing what users are doing.

To audit the code or query that triggers the Spark plan, Immuta hooks into Databricks where notebook cells and JDBC queries execute and saves the cell or query text. Then, Immuta pulls this information into the audits of the resulting Spark jobs. Examples of a saved cell/query and the resulting audit record are provided below.

Audit Record and queryText Example

Example queryText

Below is an example of the queryText, which contains the full notebook cell (since the query was the result of a notebook). If the query had been from a JDBC connection, the queryText would contain the full SQL query.

testTable = 'default.crime_data_delta'
testDb = 'test'

df = spark.table(testTable)
df.limit(1).collect()

filteredDf = df.filter('victim_age > 20')

filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))
spark.table('{}.audit_cell'.format(testDb)).limit(1).collect()

spark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))

This notebook cell had multiple audit records associated with it, but the example audit record in the tab to the right corresponds to the filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb)) line.

Example Audit Record

{
"id": "b0d49f2a-4a34-4d50-b36e-fd9b619eed32",
"dateTime": "1617997828777",
"month": 1455,
"profileId": 1,
"userId": "kris@immuta.com",
"dataSourceId": 41,
"dataSourceName": "Crime Data Delta",
"projectId": 17,
"projectName": "test",
"purposeIds": [
    22
],
"count": 1,
"recordType": "spark",
"success": true,
"component": "dataSource",
"accessType": "query",
"query": "'CreateTable `test`.`audit_cell`, ErrorIfExists\n+- Filter (victim_age#8907 > 20)\n   +- ImmutaResolvedTableAlias default.crime_data_delta\n      +- SubqueryAlias spark_catalog.immuta.default_crime_data_delta\n         +- TrustedPlan\n            +- Project [dr_number#8882, area_id#8883, area_name#8884, reporting_district#8885, crime_code#8886, crime_code_description#8887, mo_codes#8888, victim_sex#8889, victim_descent#8890, premise_code#8891, premise_description#8892, weapon_used_code#8893, weapon_description#8894, status_code#8895, status_description#8896, crime_code_1#8897, crime_code_2#8898, crime_code_3#8899, crime_code_4#8900, address#8901, cross_street#8902, location#8903, date_reported#8904, date_occurred#8905, ... 2 more fields]\n               +- SubqueryAlias spark_catalog.default.crime_data_delta\n                  +- Relation[dr_number#8882,area_id#8883,area_name#8884,reporting_district#8885,crime_code#8886,crime_code_description#8887,mo_codes#8888,victim_sex#8889,victim_descent#8890,premise_code#8891,premise_description#8892,weapon_used_code#8893,weapon_description#8894,status_code#8895,status_description#8896,crime_code_1#8897,crime_code_2#8898,crime_code_3#8899,crime_code_4#8900,address#8901,cross_street#8902,location#8903,date_reported#8904,date_occurred#8905,... 2 more fields] parquet\n",
"extra": {
    "maskedColumns": {},
    "metastoreTables": [
    "default.crime_data_delta"
    ],
    "pathUris": [
    "dbfs:/user/hive/warehouse/crime_data_delta"
    ],
    "queryText": "testTable = 'default.crime_data_delta'\ntestDb = 'test'\n\ndf = spark.table(testTable)\ndf.limit(1).collect()\n\n\n\n\n\n\n\n\n\n# doing spark things...\n\n\n\n\n\n\n\n\nfilteredDf = df.filter('victim_age > 20')\n\nfilteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))\nspark.table('{}.audit_cell'.format(testDb)).limit(1).collect()\n\nspark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))",
    "queryLanguage": "python",
    "purposes": [
    "Re-identification Prohibited.Expert Determination.SDM"
    ]
},
"dataSourceTableName": "default_crime_data_delta",
"createdAt": "2021-04-09T19:50:28.787Z",
"updatedAt": "2021-04-09T19:50:28.787Z"
}

This example audit record contains two new fields under extra:

  • queryText: The queryText will contain either the full notebook cell (when the query is the result of a notebook) or the full SQL query (when it is a query from a JDBC connection).

  • queryLanguage: The queryLanguage corresponds to the programming language used: SQL, Python, Scala, or R. Audited JDBC queries will indicate that it came from JDBC here.