Skip to content

You are viewing documentation for Immuta version 2022.1.

For the latest version, view our documentation for Immuta SaaS or the latest self-hosted version.

Databricks JDBC and Notebook Cell Query Audit Logs

Audience: System Administrators, users with the AUDIT permission, and Data Owners

Content Summary: In addition to the executed Spark plan, the tables, and the tables' underlying paths for every audited Spark job, Immuta captures the code or query that triggers the Spark plan. This page outlines this process and provides examples of the captured query and resulting audit record.

For a tutorial on getting these audit logs, see the View Audit Logs page.

Best Practices: Store Audit Records

By default Immuta audit records expire after 60 days, so store audit records outside of Immuta in order to retain the audits long term.

Audit Messages

Each audit message from the Immuta platform will be a one-line JSON object containing the properties listed below. These audit records are stored with the recordType: spark.

Property Description Example
ID integer b0000000-1234-abcd-11111111111111
DateTime integer or string The timestamp for when the record was created. This may be an ISO-8601 timestamp string or an epoch timestamp. 2504188066580 or 2017-08-31T14:01:15.607Z
Month integer 1455
ProfileID integer The profile ID of the user who made the query. 1
UserID string The user ID of the user who made the query. jane.doe@immuta.com
DataSourceID integer The ID of the data source that was queried. 12
DataSourceName string The name of the data source that was queried. Public Customer Data
ProjectID integer The ID of the project the data source is in. 18
ProjectName string The name of the project the data source is in. Project 1
PurposeID integer The ID of the project's purpose(s). 22
RecordType string The type of record captured. Databricks query audit records will always be spark.
Success boolean If true, the query was successful. true or false
Component string The Immuta component that generated the record. nativeSql
AccessType string Indicates whether access was granted to an individual blob or if this was a query potentially encompassing many blobs. query
Query string The query that was run in the integration. See the example below

Example queryText

Below is an example of the queryText, which contains the full notebook cell (since the query was the result of a notebook). If the query had been from a JDBC connection, the queryText would contain the full SQL query.

testTable = 'default.crime_data_delta'
testDb = 'test'

df = spark.table(testTable)
df.limit(1).collect()

filteredDf = df.filter('victim_age > 20')

filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))
spark.table('{}.audit_cell'.format(testDb)).limit(1).collect()

spark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))

This notebook cell had multiple audit records associated with it, but the example audit record in the tab to the right corresponds to the filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb)) line.

Example Audit Record

{
"id": "b0d49f2a-4a34-4d50-b36e-fd9b619eed32",
"dateTime": "1617997828777",
"month": 1455,
"profileId": 1,
"userId": "kris@immuta.com",
"dataSourceId": 41,
"dataSourceName": "Crime Data Delta",
"projectId": 17,
"projectName": "test",
"purposeIds": [
22
],
"count": 1,
"recordType": "spark",
"success": true,
"component": "dataSource",
"accessType": "query",
"query": "'CreateTable `test`.`audit_cell`, ErrorIfExists\n+- Filter (victim_age#8907 > 20)\n   +- ImmutaResolvedTableAlias default.crime_data_delta\n      +- SubqueryAlias spark_catalog.immuta.default_crime_data_delta\n         +- TrustedPlan\n            +- Project [dr_number#8882, area_id#8883, area_name#8884, reporting_district#8885, crime_code#8886, crime_code_description#8887, mo_codes#8888, victim_sex#8889, victim_descent#8890, premise_code#8891, premise_description#8892, weapon_used_code#8893, weapon_description#8894, status_code#8895, status_description#8896, crime_code_1#8897, crime_code_2#8898, crime_code_3#8899, crime_code_4#8900, address#8901, cross_street#8902, location#8903, date_reported#8904, date_occurred#8905, ... 2 more fields]\n               +- SubqueryAlias spark_catalog.default.crime_data_delta\n                  +- Relation[dr_number#8882,area_id#8883,area_name#8884,reporting_district#8885,crime_code#8886,crime_code_description#8887,mo_codes#8888,victim_sex#8889,victim_descent#8890,premise_code#8891,premise_description#8892,weapon_used_code#8893,weapon_description#8894,status_code#8895,status_description#8896,crime_code_1#8897,crime_code_2#8898,crime_code_3#8899,crime_code_4#8900,address#8901,cross_street#8902,location#8903,date_reported#8904,date_occurred#8905,... 2 more fields] parquet\n",
"extra": {
"maskedColumns": {},
"metastoreTables": [
"default.crime_data_delta"
],
"pathUris": [
"dbfs:/user/hive/warehouse/crime_data_delta"
],
"queryText": "testTable = 'default.crime_data_delta'\ntestDb = 'test'\n\ndf = spark.table(testTable)\ndf.limit(1).collect()\n\n\n\n\n\n\n\n\n\n# doing spark things...\n\n\n\n\n\n\n\n\nfilteredDf = df.filter('victim_age > 20')\n\nfilteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))\nspark.table('{}.audit_cell'.format(testDb)).limit(1).collect()\n\nspark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))",
"queryLanguage": "python",
"purposes": [
"Re-identification Prohibited.Expert Determination.SDM"
]
},
"dataSourceTableName": "default_crime_data_delta",
"createdAt": "2021-04-09T19:50:28.787Z",
"updatedAt": "2021-04-09T19:50:28.787Z"
}

This example audit record contains two new fields under extra:

  • queryText: The queryText will contain either the full notebook cell (when the query is the result of a notebook) or the full SQL query (when it is a query from a JDBC connection).

  • queryLanguage: The queryLanguage corresponds to the programming language used: SQL, Python, Scala, or R. Audited JDBC queries will indicate that it came from JDBC here.