Databricks Spark Query Audit Logs

In addition to the executed Spark plan, the tables, and the tables' underlying paths for every audited Spark job, Immuta captures the code or query that triggers the Spark plan. Immuta audits the activity of Immuta users on Immuta data sources.

Requirements

Store audit logs

By default Immuta audit logs expire after 7 days. Export the universal audit model (UAM) logs to S3 or ADLS Gen 2, and store audit logs outside of Immuta in order to retain the audit logs long-term.

Audit schema

Each audit message from the Immuta platform will be a one-line JSON object containing the properties listed below.

Example queryText

Below is an example of the queryText, which contains the full notebook cell (since the query was the result of a notebook). If the query had been from a JDBC connection, the queryText would contain the full SQL query.

testTable = 'default.crime_data_delta'
testDb = 'test'

df = spark.table(testTable)
df.limit(1).collect()

filteredDf = df.filter('victim_age > 20')

filteredDf.write.saveAsTable('{}.audit_cell'.format(testDb))
spark.table('{}.audit_cell'.format(testDb)).limit(1).collect()

spark.sql('DROP TABLE IF EXISTS {}.audit_cell'.format(testDb))

This notebook cell had multiple audit records associated with it.

Example audit record

{
  "action": "QUERY",
  "actor": {
    "type": "USER_ACTOR",
    "name": "Taylor",
    "id": "taylor@immuta.com",
    "identityProvider": "okta",
    "impersonatedBy": null
  },
  "sessionId": "abc123456589",
  "actionStatus": "SUCCESS",
  "actionStatusReason": null,
  "actorIp": "1.2.3.4",
  "eventTimestamp": "2022-10-13T20:03:41.013Z",
  "id": "abc123",
  "customerId": "abc123",
  "targetType": "DATASOURCE",
  "targets": [{
    "id": "4",
    "name": "Movies",
    "technology": "DATABRICKS"
  }],
  "auditPayload": {
    "type": "QueryAuditPayload",
    "queryId": "81fe4385-1329-444a-b6d9-b26bce5c8dc7",
    "query": "Project [director#778904]\n+- Filter ((YEAR#778903L = 1999) OR (YEAR#778903L = 2000))\n   +- Relation[movie_id#778901L,Title#778902,Year#778903L,Director#778904,Budget_million#778905,Gross_worldwide#778906L] parquet\n",
    "startTime": "2022-10-13T20:03:41.013Z",
    "endTime": null,
    "duration": null,
    "accessControls": {
      "entitlements": {
        "groups": [],
        "attributes": []
      },
      "policySet": [{
        "type": "SUBSCRIPTION",
        "global": false,
        "subscriptionPolicyType": "MANUAL",
        "ruleAppliedForUser": true
      }]
    },
    "technologyContext": {
      "type": "DatabricksContext",
      "clusterId": "1006-194110-8j0shd5d",
      "clusterName": "databricks-cluster-name",
      "workspaceId": "123456789",
      "pathUris": [
        "dbfs:/user/hive/warehouse/your_database.db/movies"
      ],
      "metastoreTables": ["your_database.movies"],
      "queryLanguage": "python",
      "queryText": "query_success = []\nnum_queries_run = 0\nimpersonate_probability = .20\nspark.sql(\"set immuta.impersonate.user=\")\n\ndef make_fail_query(query):\n  try:\n    spark.sql(\"set immuta.impersonate.user=taylor@databricks.com\")\n    spark.sql(query).toPandas()\n  except: \n    pass\n  \nfor index, query in enumerate(new_queries.values):\n  if(num_queries_run % 100 == 0):\n    print(f\"Queries Successfully Ran: {num_queries_run}/2000, out of total queries ran: {index+1}\")\n  to_impersonate = random.randrange(100)\n  if to_impersonate < impersonate_probability * 100:\n    make_fail_query(query)\n    spark.sql(\"set immuta.impersonate.user=\")\n    num_queries_run += 1\n  else:\n    try:\n      spark.sql(query).toPandas()\n      query_success.append((query, True))\n      num_queries_run += 1\n      if num_queries_run == 2000:\n        break\n    except Exception as e:\n      query_success.append((query, False))\n      \n    ",
      "immutaPluginVersion": "2022.3.0-spark-3.1.1"
    }
  },
  "receivedTimestamp": "2022-10-13T20:03:41.044Z"
}

Enriched Databricks audit logs

Beyond raw audit events (such as “John Doe queried Table X in Databricks"), the Databricks audit records include the policy information enforced during the query execution, even if a query was denied.

Queries will be denied if at least one of the conditions below is true:

  • User does not meet policy conditions.

  • User is not subscribed to the data source.

  • Data source is not in the user's current project.

  • Data source is in the user's current project, but the user is not subscribed to the data source.

  • Data source is not registered in Immuta.

User entitlements

The user's entitlements represent the state at the time of the query. This includes the following fields:

Policy information

The policySet includes the following fields:

Copyright © 2014-2024 Immuta Inc. All rights reserved.