Databricks JDBC and Notebook Cell Query Audit Logs
Audience: System Administrators, users with the
AUDIT
permission, and Data OwnersContent Summary: In addition to the executed Spark plan, the tables, and the tables' underlying paths for every audited Spark job, Immuta captures the code or query that triggers the Spark plan. This page outlines this process and provides examples of the captured query and resulting audit record.
For a tutorial on getting these audit logs, see the View Audit Logs page.
Best Practices: Store Audit Records
By default Immuta audit records expire after 60 days, so store audit records outside of Immuta in order to retain the audits long term.
Audit Messages
Each audit message from the Immuta platform will be a one-line JSON object containing the properties listed below.
These audit records are stored with the recordType
: spark
.
Property | Description | Example |
---|---|---|
ID | integer |
b0000000-1234-abcd-11111111111111 |
DateTime | integer or string The timestamp for when the record was created. This may be an ISO-8601 timestamp string or an epoch timestamp. |
2504188066580 or 2017-08-31T14:01:15.607Z |
Month | integer |
1455 |
ProfileID | integer The profile ID of the user who made the query. |
1 |
UserID | string The user ID of the user who made the query. |
jane.doe@immuta.com |
DataSourceID | integer The ID of the data source that was queried. |
12 |
DataSourceName | string The name of the data source that was queried. |
Public Customer Data |
ProjectID | integer The ID of the project the data source is in. |
18 |
ProjectName | string The name of the project the data source is in. |
Project 1 |
PurposeID | integer The ID of the project's purpose(s). |
22 |
RecordType | string The type of record captured. |
Databricks query audit records will always be spark . |
Success | boolean If true , the query was successful. |
true or false |
Component | string The Immuta component that generated the record. |
nativeSql |
AccessType | string Indicates whether access was granted to an individual blob or if this was a query potentially encompassing many blobs. |
query |
Query | string The query that was run in the integration. |
See the example below |
Example queryText
Below is an example of the queryText
, which contains the full notebook cell (since the query was the result of
a notebook). If the query had been from a JDBC connection, the queryText
would contain the full SQL query.
testTable = 'default.crime_data_delta'
testDb = 'test'
df = spark.table(testTable)
df.limit(1).collect()
filteredDf = df.filter('victim_age > 20')
filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))
spark.table('{}.audit_cell'.format(testDb)).limit(1).collect()
spark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))
This notebook cell had multiple audit records associated with it, but the example audit record in the tab to the
right corresponds to the filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))
line.
Example Audit Record
{
"id": "b0d49f2a-4a34-4d50-b36e-fd9b619eed32",
"dateTime": "1617997828777",
"month": 1455,
"profileId": 1,
"userId": "kris@immuta.com",
"dataSourceId": 41,
"dataSourceName": "Crime Data Delta",
"projectId": 17,
"projectName": "test",
"purposeIds": [
22
],
"count": 1,
"recordType": "spark",
"success": true,
"component": "dataSource",
"accessType": "query",
"query": "'CreateTable `test`.`audit_cell`, ErrorIfExists\n+- Filter (victim_age#8907 > 20)\n +- ImmutaResolvedTableAlias default.crime_data_delta\n +- SubqueryAlias spark_catalog.immuta.default_crime_data_delta\n +- TrustedPlan\n +- Project [dr_number#8882, area_id#8883, area_name#8884, reporting_district#8885, crime_code#8886, crime_code_description#8887, mo_codes#8888, victim_sex#8889, victim_descent#8890, premise_code#8891, premise_description#8892, weapon_used_code#8893, weapon_description#8894, status_code#8895, status_description#8896, crime_code_1#8897, crime_code_2#8898, crime_code_3#8899, crime_code_4#8900, address#8901, cross_street#8902, location#8903, date_reported#8904, date_occurred#8905, ... 2 more fields]\n +- SubqueryAlias spark_catalog.default.crime_data_delta\n +- Relation[dr_number#8882,area_id#8883,area_name#8884,reporting_district#8885,crime_code#8886,crime_code_description#8887,mo_codes#8888,victim_sex#8889,victim_descent#8890,premise_code#8891,premise_description#8892,weapon_used_code#8893,weapon_description#8894,status_code#8895,status_description#8896,crime_code_1#8897,crime_code_2#8898,crime_code_3#8899,crime_code_4#8900,address#8901,cross_street#8902,location#8903,date_reported#8904,date_occurred#8905,... 2 more fields] parquet\n",
"extra": {
"maskedColumns": {},
"metastoreTables": [
"default.crime_data_delta"
],
"pathUris": [
"dbfs:/user/hive/warehouse/crime_data_delta"
],
"queryText": "testTable = 'default.crime_data_delta'\ntestDb = 'test'\n\ndf = spark.table(testTable)\ndf.limit(1).collect()\n\n\n\n\n\n\n\n\n\n# doing spark things...\n\n\n\n\n\n\n\n\nfilteredDf = df.filter('victim_age > 20')\n\nfilteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))\nspark.table('{}.audit_cell'.format(testDb)).limit(1).collect()\n\nspark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))",
"queryLanguage": "python",
"purposes": [
"Re-identification Prohibited.Expert Determination.SDM"
]
},
"dataSourceTableName": "default_crime_data_delta",
"createdAt": "2021-04-09T19:50:28.787Z",
"updatedAt": "2021-04-09T19:50:28.787Z"
}
This example audit record contains two new fields under extra
:
-
queryText
: ThequeryText
will contain either the full notebook cell (when the query is the result of a notebook) or the full SQL query (when it is a query from a JDBC connection). -
queryLanguage
: ThequeryLanguage
corresponds to the programming language used: SQL, Python, Scala, or R. Audited JDBC queries will indicate that it came from JDBC here.