Recently, I explored Azure Databricks for a PoC project with machine learning workload. Since Snowflake and GCP are our base, setting up Databricks with Azure proved to be a bit tricky.

I know, I know, using Databricks on GCP would be way easier in this case. There are several reasons why I couldn’t do that. And, who likes it easy? (Me, actually…)

Anyway, it is what it is. Let’s begin with my learning journey.

Setting Up Configurations

1 - Installation

Here, we are installing the azure cli, databricks cli and azcopy. azcopy is a fasat and scalable solutionn to move data across cloud storages.

brew update && brew install azure-cli

brew install azcopy

brew install databricks

pip install databricks-cli

2 - Authentication

az login

azcopy login

To check relevant info about our account and the cli,

az account list-locations -o table

az account set --subscription <subscription-id>

az --version

To move data across cloud storage, say for example, from Google Cloud Storage to Azure Blob, we will need a GCP service account with sufficient storage priviledge and export creds.json locally.

For more details, check here

export GOOGLE_APPLICATION_CREDENTIALS=/your/file/path/gcp-creds.json

File Cloning with AZCOPY

Following the Azure doc, I then clone data across cloud storage

azcopy copy 'https://storage.cloud.google.com/<bucket-name>' 'https://<storage-account>.blob.core.windows.net/<container-name>' --recursive=true

After data transferred, we can check the blob objects via -

az storage blob list --account-name <storage-account> --container-name <container-name> --output table

To bulk delete files under a folder

#!/bin/bash
RESOURCE_GROUP="grp-name"
ACCOUNT_NAME="acc-name"
PREFIX="file-prefix"
CONTAINER="container-name"
EXCLUDE_FOLDER="exclude-subfolder"

# Optionally as a first step
# List all file inside container container-name/subfolder
az storage account keys list --resource-group $RESOURCE_GROUP --account-name $ACCOUNT_NAME --query '[0].value' -o tsv | \
xargs -I {} az storage blob list --account-name $ACCOUNT_NAME --account-key {} --container-name $CONTAINER --prefix subfolder- --query "[?name][].name" -o tsv | grep -v '/$EXCLUDE_FOLDER/'
#!/bin/bash
RESOURCE_GROUP="grp-name"
ACCOUNT_NAME="acc-name"
PREFIX="file-prefix"
CONTAINER="container-name"
EXCLUDE_FOLDER="exclude-subfolder"

# delete all file with prefix file-prefix inside container container-name/subfolder

az storage account keys list --resource-group $RESOURCE_GROUP --account-name $ACCOUNT_NAME --query '[0].value' -o tsv | \
xargs -I {} az storage blob list --account-name $ACCOUNT_NAME --account-key {} --container-name $CONTAINER --prefix $PREFIX- --query "[?name][].name" -o tsv | grep -v '/$EXCLUDE_FOLDER/' | \
xargs -I {} az storage blob delete --account-name $ACCOUNT_NAME --account-key $(az storage account keys list --resource-group $RESOURCE_GROUP --account-name $ACCOUNT_NAME --query '[0].value' -o tsv) --container-name $CONTAINER --name {}

Working with Azure Databricks

Access blob data

After cloning the data, we need access the data.

1 - Setting up KeyVault

az storage account keys list -g groupName -n groupname

# An example
az storage account keys list -g contosoResourceGroup5 -n contosoblobstorage5
# Create a Key Vault -- for secret setting purpose after
az keyvault create --name contosoblobstorage5 --resource-group contosoResourceGroup5 --location southcentralus

Using contosoKeyVault10 as example provided on the Azure documentation

 # Create the secret
az keyvault secret set --vault-name contosoKeyVault10 --name storageKey --value "value of your key1"

2 - Create Scope on Databricks backed by Azure, go to the link, follow the steps and create scope. Note that this only need tobe created once, there can be multiple secrets under the scope

https://‘adb-instance’#secrets/createScope

To get the below info,

Go to Azure Portal > Home > Subscription > my sub = MS Enterprise > Resources under Settings > find your keyvault under resources > Properties under setting

  • Scope Name = dbutils.secrets.get(scope = “”, …)
  • DNS name = Vault URI
  • Resource ID = Resource ID

3 - Configure databricks cli

We need to obtain an access token first

Go to user setting -> access token -> generate token

databricks configure --token

url = https://‘your-adb-instance’.azuredatabricks.net/

token = The token obtained above

4 - Working with databricks cli

# check scope created
databricks secrets list-scopes
# List secret in a given scope
databricks secrets list --scope <scope-name>

To create secrets under scope, in this case our secret name is storageKey

az keyvault secret set --vault-name contosoKeyVault10 --name storageKey --value "<your-key>"

az keyvault secret show --name "storageKey" --vault-name "contosoKeyVault10" --query "value"
# Extra - if we want to set a secret key with creds on a local folder
az keyvault secret set --vault-name contosoKeyVault10 --name gsaPrivateKey --file "/your/file/path/gcp_key.txt"
# Extra2 - Delete secret under a keyVault
az keyvault secret delete --vault-name contosoKeyVault10 --name gsaPrivateKey
# if fail to run secret set, re-run the below
az storage account keys list -g contosoResourceGroup5 -n contosoblobstorage5

az account set --subscription <subscription-id>
az keyvault secret list --vault-name contosoKeyVault10

Create ADB Secrets Scope backed by Databricks

databricks secrets  create-scope --scope scope-name

# To verifiy has the scope created successfully
databricks secrets list-scopes

# Create secret in the scope
databricks secrets put --scope scope-name --key secrete-name
# Then it will pop up a .txt window, paste the secret value there and type `:wq` to save(write) and quit

# Verify secrets has added successfully
databricks secrets list --scope scope-name 

# Verfiy my ACL access of the scope
databricks secrets get-acl --scope scope-name --principal email@email.com

# Give permission to other
databricks secrets put-acl --scope scope-name --principal email@email.com --permission READ     

5 - Accessing blob data in Azure Databricks

Finally, after all the set up, we can now try access our data from the storage.

Reading the data into a Spark DF - execute the below in a databricks notebook cell

dbutils.fs.mount(
source = "wasbs://<container-name>@<storage-account>.blob.core.windows.net",
mount_point = "/mnt/blobstorage",
extra_configs = {"fs.azure.account.key.<storage-account>.blob.core.windows.net":
dbutils.secrets.get(scope = "<scope-name-in-databricks>", key = "storageKey")}) # storageKey is the key we set on the step above

df = spark.read.json("/mnt/blobstorage/<file-name>")

df.show()

We can also create a table in Azure Databricks from storage data.

To do so, we will first need a SAS Token

# To get SAS Token
az storage container generate-sas \
    --account-name <account-name> \
    --name <container-name> \
    --permissions acdlrw \
    --expiry 2023-09-23T00:00Z \
    --auth-mode login \
    --as-user

Note of permission parameter

--permissions
The permissions the SAS grants. Allowed values: (a)dd (c)reate (d)elete (e)xecute (f)ilter_by_tags (i)set_immutability_policy (l)ist (m)ove (r)ead (t)ag (w)rite (x)delete_previous_version (y)permanent_delete. Do not use if a stored access policy is referenced with --id that specifies this value. Can be combined.

Then we can load the data to the destination table.

CREATE SCHEMA BRONZE_SCHEMA;
CREATE TABLE BRONZE_SCHEMA.TBL_NAME;

Loading from json

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/<file-prefix>-*' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = JSON
COPY_OPTIONS ('mergeSchema' = 'true')

Loading from csv

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/<file-prefix>-*' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
                'header' = 'false')
COPY_OPTIONS ('mergeSchema' = 'true')

Depending on the above format options for headers, col. name might need to be adjusted

ALTER TABLE BRONZE_SCHEMA.TBL_NAME SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5');

ALTER TABLE BRONZE_SCHEMA.TBL_NAME RENAME COLUMN _c0 TO COL_NAME1;
ALTER TABLE BRONZE_SCHEMA.TBL_NAME RENAME COLUMN _c1 TO COL_NAME2;

5 - Configure spark cluster so that it can connect to GCS Bucket

spark.databricks.delta.preview.enabled true
spark.hadoop.google.cloud.auth.service.account.enable true

spark.hadoop.fs.gs.project.id <gcp-project-id>
spark.hadoop.fs.gs.auth.service.account.private.key {{secrets/<adb-scope>/gsaPrivateKey}}
spark.hadoop.fs.gs.auth.service.account.email <svc-acc>@<gcp-project-id>.iam.gserviceaccount.com
spark.hadoop.fs.gs.auth.service.account.private.key.id {{secrets/<adb-scope>/gsaPrivateKeyId}}
spark.hadoop.fs.azure.account.key.{account}.blob.core.windows.net {{secrets/blobStorageScope/storageKey}}

PYSPARK_PYTHON=/databricks/python3/bin/python3

Reading data from cloud storage (GCS bucket/ Azure Blob) - execute the below in a databricks notebook cell

# GCS - csv
df = spark.read.format("csv").option("compression", "gzip").load("gs://gcs-bucket/FILE_NAME_*.csv.gz")

# Config for reading zipped csv from unloading data from Snowflake
df = spark.read.format("csv") \
    .option("compression", "gzip") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .load("gs://gcs-bucket/FILE_NAME_*.csv.gz")

# Azure
df = spark.read.json("/mnt/blobstorage/subfolder/dest-folder-name/*.json")

df.limit(10).display()

Saving Spark DF to GCS - execute the below in a databricks notebook cell

current_datetime = datetime.now().strftime("%Y%m%d%H%M%S")
TEMPORARY_TARGET=f"gs://gcs-bucket/spark-tmp/{current_datetime}"
DESIRED_TARGET=f"gs://gcs-bucket/PRED_DATA-{current_datetime}.csv"

# This will save file in 1 json file
prediction_final_df.coalesce(1).write.option("header", "true").mode("overwrite").csv(TEMPORARY_TARGET)

temporary_csv = os.path.join(TEMPORARY_TARGET, dbutils.fs.ls(TEMPORARY_TARGET)[3][1])

dbutils.fs.cp(temporary_csv, DESIRED_TARGET)

Saving Spark DF to Azure Blob

dbutils.fs.mount(
source = f"wasbs://{container-name}@{account-name}.blob.core.windows.net",
mount_point = "/mnt/blobstorage",
extra_configs = {f"fs.azure.account.key.{account-name}.blob.core.windows.net": dbutils.secrets.get(scope = f'{scopeName}', key = f'{storage-key}')
                 })
# Write the DataFrame as a single JSON file to Azure Blob Storage
df.coalesce(1).write.format("json").mode("overwrite").save("/mnt/blobstorage/subfoler/dest-folder-name")

# Write to a folder
df.write.format("json").mode("overwrite").save("/mnt/blobstorage/subfoler/dest-folder-name")

Save df as a delta table in databricks

df.write.format('delta') \
    .mode('overwrite') \
    .option("mergeSchema", "true") \
    .saveAsTable('BRONZE_LAYER.TBL_NAME')

6 - Inserting Data into Databricks using Python

One of the good thing I like about Databricks' python connector is that it allows to bulk load as much data as I want, and snowflake’s connector has a limit of 16384.

from databricks import sql
from typing import Optional, Iterator
from pydantic import BaseModel, ValidationError

def adbconnect():
    try:
        return sql.connect(
            server_hostname="<adb-subscription>.azuredatabricks.net",
            http_path="/sql/1.0/warehouses/<wh-id>",
            access_token="<access-token>",
        )
    except Exception as e:
        raise DatabricksError("Could not connect to Azure Databricks") from e
        

def query(conn, sql: str) -> list[dict]:
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
    except Exception as e:
        logger.exception(f"Error executing query: {sql}")
        raise DatabricksError("Error executing query") from e
    finally:
        cursor.close()
    return result

def format_value(value):
    """Need to reformat data value to avoid insert failure"""
    if value is None or value == '\\':
        return "NULL"
    elif isinstance(value, str):
        modified_value = value.replace('"', "'")
        return f'"{modified_value}"'
    elif isinstance(value, datetime):
        return f'"{value}"'
    else:
        return str(value)

def insert_data_updates(conn, data: list[DataModel]):
    try:
        data_fields: list[str] = list(DataModel.__fields__.keys())

        values_str = ",\n".join(
            "("
            + ", ".join(format_value(getattr(d, field)) for field in data_fields)
            + ")"
            for d in data
        )

        result = f"""INSERT INTO BRONZE_SCHEMA.TBL_NAME ({", ".join(data_fields)})  
        VALUES\n{values_str};"""
        conn.cursor().execute(result)

        conn.commit()
    except Exception as e:
        conn.rollback()
        raise DatabricksError("Error inserting data, transaction rolled back") from e

Applying the above knowledge and the lessons I’ve learnt

Background: I have to migrate large amount of sensor data to Databricks.

Initially, I plan to migrate the data as the below steps.

  1. Move data from GCS to Azure Container
  2. Execute the COPY INTO command in Azure Databricks to load the data into delta table

So ideally, the steps is running the below in terminal

azcopy login

azcopy copy 'https://storage.cloud.google.com/<bucket-name>' 'https://<storage-account>.blob.core.windows.net/<container-name>' --recursive=true

Then in Azure Databricks Notebook, execute the below

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/folder/subfolder/file-prefix-*' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = JSON
COPY_OPTIONS ('mergeSchema' = 'true')

However, when I executed the above in databricks, it returned the error messages

IllegalArgumentException: 'java.net.URISyntaxException: Relative path in 
absolute URI: sensor-data-2024-01-01 00:00:00.000'

After some googling and the help from ChatGPT, I realised the issue is with naming of those json files, mainly due to the whitespaces.

Then on stackoverflow, I found this post, so I tried something like this

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/folder/subfolder/' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = JSON
PATTERN='file-prefix-*'
COPY_OPTIONS ('mergeSchema' = 'true')

But I am still getting the same error message. I finally realised that as long as I am trying to load my files using regex expression, it will failed because of the naming. That means, if I execute the below, the file will be copied successfully.

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/folder/subfolder/sensor-data-2024-01-01 00:00:00.000' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = JSON
COPY_OPTIONS ('mergeSchema' = 'true')

It is when I tried to bulk load everything using regular expression, the execution will fail.

Some approaches we can take to tackle this is,

1 - rename all of the file using az cli

#!/bin/bash

ACCOUNT_NAME="acc-name"
ACCOUNT_KEY="acc-key"  
CONTAINER_NAME="container-name"
SAS_TOKEN="token"

# Source and destination prefix
SOURCE_PREFIX="container/path/"
DESTINATION_PREFIX="final-path/"

az storage blob list \
  --account-name $ACCOUNT_NAME \
  --account-key $ACCOUNT_KEY \
  --container-name $CONTAINER_NAME \
  --prefix $SOURCE_PREFIX \
  --sas-token $SAS_TOKEN \
  --output tsv \
  --query "[?name][].name" | while IFS=$'\t' read -r blobname; do

    # New blob name with spaces replaced by underscores
    NEW_BLOB_NAME=$(echo "$blobname" | tr ' ' '_')

    # Copy blob to new name
    az storage blob copy start \
      --account-name $ACCOUNT_NAME \
      --account-key $ACCOUNT_KEY \
      --destination-blob "$NEW_BLOB_NAME" \
      --destination-container $CONTAINER_NAME \
      --source-uri "https://${ACCOUNT_NAME}.blob.core.windows.net/${CONTAINER_NAME}/${blobname}?${SAS_TOKEN}" \

    # Monitor the status of the blob copy to wait for the operation to complete
    copy_status="pending"
    while [ "$copy_status" != "success" ]; do
        copy_status=$(az storage blob show \
            --name "$NEW_BLOB_NAME" \
            --container-name $CONTAINER_NAME \
            --account-name $ACCOUNT_NAME \
            --sas-token $SAS_TOKEN \
            --query "properties.copy.status" --output tsv)
        sleep 1
    done

    # Delete the old blob after successful copy
    az storage blob delete \
      --account-name $ACCOUNT_NAME \
      --account-key $ACCOUNT_KEY \
      --container-name $CONTAINER_NAME \
      --name "$blobname" \
      --sas-token $SAS_TOKEN \

    echo "Renamed $blobname to $NEW_BLOB_NAME"
done

2 - Or we can leverage gsutil to rename the file on the GCS side then use azcopy to move the file back to Azure and load into Databricks

# Copy files over if don't want to rename directly on the current bucket
# Using cp
gsutil cp -r gs://bucket-name/folder1/folder_to_copy gs://bucket-name/folder1/new_folder

# Using rsync - will be more expensive
gsutil rsync -r gs://bucket-name/folder1/folder_to_copy gs://bucket-name/folder1/new_folder
# Rename files
gsutil ls gs://<bucket>/<folder_with_the_files_to_rename>/ | \
  while read f; do
    gsutil -m mv "$f" "${f// /-}";
  done;
azcopy copy 'https://storage.cloud.google.com/<bucket-name>' 'https://<storage-account>.blob.core.windows.net/<container-name>' --recursive=true

And load the properly named files into Azure as a last step.

COPY INTO BRONZE_SCHEMA.TBL_NAME
FROM 'wasbs://<container-name>@<storage-account>.blob.core.windows.net/folder/subfolder/file-prefix-*' WITH (
  CREDENTIAL (AZURE_SAS_TOKEN = <the-sas-token-obtain-from-above>)
FILEFORMAT = JSON
COPY_OPTIONS ('mergeSchema' = 'true')

However, this is unnecessarily lengthy, as the data have to travel across multiple cloud and likely different regions, it will be costly. Also if the GCS storage is not hot sotrage, it will be even more costly to copy, rename and move acorss cloud.

So, in this case, a more direct way to load these sensor data into databricks is by the below steps.

1 - Unload data from Snowflake to GCS

Execute the below in Snowflake

-- Clear stage if you prefer to do so
REMOVE @ENV_DATA_TRANSFER;

COPY INTO 'gcs://gcs-buscket-name/SENSOR_DATA_TBL'
FROM "DATABASE"."SCHEMA"."SENSOR_DATA_TBL"
STORAGE_INTEGRATION = <gcp-storage-integration>
overwrite=TRUE;

2 - Read data from GCS to Azure databricks directly

Make sure cluster is configured properly

spark.databricks.delta.preview.enabled true
spark.hadoop.google.cloud.auth.service.account.enable true

spark.hadoop.fs.gs.project.id <gcp-project-id>
spark.hadoop.fs.gs.auth.service.account.private.key {{secrets/<adb-scope>/gsaPrivateKey}}
spark.hadoop.fs.gs.auth.service.account.email <svc-acc>@<gcp-project-id>.iam.gserviceaccount.com
spark.hadoop.fs.gs.auth.service.account.private.key.id {{secrets/<adb-scope>/gsaPrivateKeyId}}
spark.hadoop.fs.azure.account.key.{account}.blob.core.windows.net {{secrets/blobStorageScope/storageKey}}



PYSPARK_PYTHON=/databricks/python3/bin/python3
df = spark.read.format("csv") \
    .option("compression", "gzip") \
    .option("delimiter", "\t") \
    .option("inferSchema", "true") \
    .load("gs://gcs-buscket-name/SENSOR_DATA_TBL_*.csv.gz")


df.limit(10).display()

# Write as delta table
df.write.format('delta') \
    .mode('overwrite') \
    .option("mergeSchema", "true") \
    .saveAsTable('BRONZE_SCHEMA.SENSOR_DATA_TBL')
    
# Write to a folder if want to have the file to be stored in Azure Container
df.write.format("json").mode("overwrite").save("/mnt/blobstorage/folder/subfolder")

And finally, if we need to grat user access to external file system to avoid insufficient priviledges, the below commands are needed to be executed as admin.

GRANT SELECT ON ANY FILE TO `user@email.com`;

GRANT MODIFY ON ANY FILE TO `user@email.com`;


That’s it for now, hope you found this helpful.

Thank you for reading and have a nice day!