-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
19 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,81 +7,39 @@ | |
# MAGIC This tutorial shows how to use Databricks autoloader to ingest streaming data into delta tables, contains following topics: | ||
# MAGIC 1. Connect to ADLS. | ||
# MAGIC 2. Use autoloader to ingest and transform data on the fly, using pyspark / sql. | ||
# MAGIC | ||
# MAGIC https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader | ||
|
||
# COMMAND ---------- | ||
|
||
# MAGIC %md ## Methods to connection to ADLS containers, non-production setup | ||
|
||
# COMMAND ---------- | ||
|
||
# DBTITLE 1,Method 1: directly using access key (most straightforward way) | ||
sakey = '' | ||
adlsname = 'hwangstorage1' | ||
adlscontainer = 'autoloader' | ||
adlsloc = 'abfss://' + adlscontainer + '@' + adlsname + '.dfs.core.windows.net/test' | ||
spark.conf.set('fs.azure.account.key.' + adlsname + '.dfs.core.windows.net', sakey) | ||
|
||
# COMMAND ---------- | ||
|
||
# DBTITLE 1,Method 2: Using secrets and key vault | ||
from pyspark.sql.types import StructType,StructField, StringType, IntegerType | ||
|
||
schema = StructType([ \ | ||
StructField("firstname",StringType(),True), \ | ||
StructField("middlename",StringType(),True), \ | ||
StructField("lastname",StringType(),True), \ | ||
StructField("id", StringType(), True), \ | ||
StructField("gender", StringType(), True), \ | ||
StructField("salary", IntegerType(), True) \ | ||
]) | ||
|
||
data2 = [("James","","Smith","36636","M",3000), | ||
("Michael","Rose","","40288","M",4000), | ||
("Robert","","Williams","42114","M",4000), | ||
("Maria","Anne","Jones","39192","F",4000), | ||
("Jen","Mary","Brown","","F",-1) | ||
] | ||
|
||
df_created = spark.createDataFrame(data=data2,schema=schema) | ||
# MAGIC %md ## 1. Connect to ADLS using key (non prod setup) | ||
|
||
# COMMAND ---------- | ||
|
||
# this requires setup on secrets scope and key vault instance, replace hwangstorage1 with your storage account name | ||
spark.conf.set( | ||
"fs.azure.account.key.hwangstorage1.dfs.core.windows.net", | ||
dbutils.secrets.get(scope="hwang-kv2", | ||
key="hwang-adls-access-key")) | ||
|
||
# COMMAND ---------- | ||
|
||
df_created.write.partitionBy("gender").mode("overwrite").parquet("abfss://[email protected]/test3") | ||
|
||
# COMMAND ---------- | ||
|
||
df_created.show() | ||
|
||
# COMMAND ---------- | ||
|
||
df_read = spark.read.parquet("abfss://[email protected]/test3") | ||
|
||
# COMMAND ---------- | ||
|
||
df_read.show() | ||
|
||
# COMMAND ---------- | ||
|
||
# MAGIC %md ## Autoloader 20 minute tutorial | ||
# MAGIC %md ## 2. Autoloader 20 minute tutorial | ||
# MAGIC | ||
# MAGIC Tips: | ||
# MAGIC 1. New file with different name will trigger loading action | ||
# MAGIC 2. Overwritten file in blob with the same name will not trigger loading action | ||
# MAGIC 3. In any container we can't have 2 files with same name -> will trigger overwritten if same name | ||
# MAGIC 4. You need to specify checkpoint path for autoloader | ||
# MAGIC 5. You need to specify schema when loading input json sources (dynamic schema handle for another session) | ||
|
||
# COMMAND ---------- | ||
|
||
# DBTITLE 1,Method 1: Directly transform streaming df and write to delta | ||
# DBTITLE 1,Method 1: Directly transform streaming df and write to delta, for simple transformations | ||
from pyspark.sql.types import * | ||
from delta.tables import * | ||
import pyspark.sql.functions as F | ||
|
||
# new file with different name will trigger | ||
# overwritten file in blob with the same name will not trigger | ||
# in container we can't have 2 files with same name -> will trigger overwritten if same name | ||
|
||
source_container_path = "abfss://[email protected]/" | ||
sink_container_path = "abfss://[email protected]/testsink" | ||
sink_checkpoint_path = "abfss://[email protected]/checkpoint" | ||
|
@@ -96,19 +54,19 @@ | |
# directly transform streaming df | ||
df_transform = df_stream_in.withColumn("new_col", F.lit("1")) | ||
|
||
# then write to delta | ||
# then write to delta table | ||
df_transform.writeStream.format("delta") \ | ||
.outputMode("append") \ | ||
.option("checkpointLocation", sink_checkpoint_path) \ | ||
.start(sink_container_path) | ||
|
||
# COMMAND ---------- | ||
|
||
dff = spark.read.format("delta").load("abfss://[email protected]/testsink") | ||
loaded_df = spark.read.format("delta").load(sink_container_path) | ||
|
||
# COMMAND ---------- | ||
|
||
dff.show() | ||
loaded_df.show() | ||
|
||
# COMMAND ---------- | ||
|
||
|
@@ -158,11 +116,9 @@ def add_new_col(df_in, batchId): | |
|
||
# COMMAND ---------- | ||
|
||
dff = spark.read.format("delta").load("abfss://[email protected]/testsink2") | ||
|
||
# COMMAND ---------- | ||
|
||
dff.show() | ||
# check results, multiple modification by custom function | ||
sink_df_1 = spark.read.format("delta").load(sink_container_path) | ||
sink_df_2 = spark.read.format("delta").load(sink_container_path_2) | ||
|
||
# COMMAND ---------- | ||
|
||
|