forked from databricks/tmm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathXML Ingest Demo Notebook.py
356 lines (252 loc) · 11.4 KB
/
XML Ingest Demo Notebook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# Databricks notebook source
# MAGIC %md
# MAGIC #Ingest XML - Endangered Species Demo
# MAGIC see [README](https://github.com/databricks/tmm/blob/main/XML-ingest-demo/README.md) for more details. Run this notebook on classic single user cluster.
# COMMAND ----------
# Adding image from misc/x.png to cell
from IPython.display import Image
Image(filename='misc/xml.png')
# COMMAND ----------
import os
# files used for this demo
# an XML list of endangered animals with XSD
endangered_f = f'file:{os.getcwd()}/endangered_species.xml'
endangered_xsd_f = f'file:{os.getcwd()}/endangered_species.xsd'
# extended XML file, with a new <POPULATION> element introduced
endangered_new_f = f'file:{os.getcwd()}/endangered_new.xml'
# apes, not matching the XSD
endangered_apes_f = f'file:{os.getcwd()}/apes.xml'
schema_loc = '/tmp/schema_loc'
bad_rec_loc = 'file:/tmp/badrec_loc'
dbutils.fs.rm(schema_loc, True)
dbutils.fs.rm(bad_rec_loc, True)
# print(f'current working dir:{os.getcwd()}')
# COMMAND ----------
# MAGIC %md
# MAGIC This cell sets up the files and directories needed for the XML processing demo.
# MAGIC - `endangered_f` and `endangered_xsd_f` [reference XML]($./endangered_species.xml) and [XSD schema]($./endangered_species.xsd) files for endangered species data.
# MAGIC - `endangered_new_f` introduces a new XML element to test schema evolution.
# MAGIC - `endangered_apes_f` is an XML file with a different structure that doesn't match the endangered species XSD schema.
# MAGIC - `schema_loc` is a directory where Auto Loader will store inferred schemas to enable schema evolution.
# COMMAND ----------
# DBTITLE 1,Pandas example
# pandas can load XML files too, but it's limitted to single node and not Spark
import pandas as pd
df = pd.read_xml("endangered_species.xml")
df
# COMMAND ----------
# MAGIC %md
# MAGIC For comparison, this cell shows how to read the endangered species XML into a pandas DataFrame.
# MAGIC Pandas provides a simple `read_xml` function to parse XML into a DataFrame, but Pandas doesn't scale and is limitted by memory.
# COMMAND ----------
# DBTITLE 1,Read XML list of endangered species with Spark
# `rowTag` option is required when reading files in XML format
df = (spark.read
.format("xml")
.option("rowTag", "species")
.load(endangered_f)
)
# COMMAND ----------
# MAGIC %md
# MAGIC To read XML data into a **Spark DataFrame**, you need to specify the `"xml"` format.
# MAGIC The `"rowTag"` option is required and indicates which XML element represents a row in the DataFrame.
# MAGIC Here, each `"species"` element will become a row.
# MAGIC The `load()` method reads the XML file specified by `endangered_f`.
# COMMAND ----------
# DBTITLE 1,XML attributes are flattened to _attribute
display(df.select("name","_id","info"))
# COMMAND ----------
# MAGIC %md
# MAGIC When Spark reads XML, attributes are flattened into columns with a leading underscore.
# MAGIC For example, the `"id"` attribute becomes the `"_id"` column.
# MAGIC Selecting `"name"`, `"_id"`, and `"info"` columns shows the flattened structure.
# COMMAND ----------
# DBTITLE 1,Filter to only return name elements: set rowTag to "name"
df_names = (spark.read
.format("xml")
.option("rowTag", "name")
.load(endangered_f)
)
display(df_names)
# COMMAND ----------
# MAGIC %md
# MAGIC You can use the `"rowTag"` option to control which elements are parsed as rows.
# MAGIC Setting `"rowTag"` to `"name"` will create a DataFrame with only the `"name"` elements as rows.
# MAGIC This is useful for filtering or extracting specific parts of the XML structure.
# COMMAND ----------
# DBTITLE 1,Load the movie XML with different structure and null values
movie_df = (spark.read
.format("xml")
.option("rowTag", "species")
.load(endangered_apes_f)
)
display(movie_df)
# COMMAND ----------
# MAGIC %md
# MAGIC This cell demonstrates reading an XML file (`endangered_apes_f`) with a different structure than the endangered species data.
# MAGIC The `"rowTag"` is set to `"Ape"` since those are the elements representing rows in this file.
# MAGIC The resulting DataFrame shows how Spark handles missing or null values in the XML.
# COMMAND ----------
# MAGIC %md
# MAGIC # XML Ingest with row validation (XML not matching)
# COMMAND ----------
# DBTITLE 1,Rejected movie XML with species XSD schema enforcement
from pyspark.sql.functions import col
movie_df = spark.read.format("xml") \
.option("rowTag", "species") \
.option("rescuedDataColumn", "_my_rescued_data") \
.option("rowValidationXSDPath", endangered_xsd_f) \
.load(endangered_apes_f)
if "_corrupt_record" in movie_df.columns:
movie_df.cache()
movie_df.select("_corrupt_record").show()
# COMMAND ----------
# MAGIC %md
# MAGIC Spark allows validating XML data against an XSD schema using the `"rowValidationXSDPath"` option.
# MAGIC This cell tries to load the apes XML file while enforcing the endangered species XSD schema.
# MAGIC Since the apes data does not conform to the endangered species schema, Spark will reject it.
# MAGIC tbd: check for corrupt records
# COMMAND ----------
# MAGIC
# MAGIC %md
# MAGIC # XML Ingest with row validation (happy path)
# COMMAND ----------
# DBTITLE 1,Original species XML with XSD validation
df = spark.read.format("xml") \
.option("rowTag", "species") \
.option("rowValidationendangered_xsd_f", endangered_xsd_f) \
.load(endangered_f)
display(df)
# COMMAND ----------
# MAGIC %md
# MAGIC To validate XML data against an [XSD schema]($./endangered_species.xsd), use the `"rowValidationXSDPath"` option.
# MAGIC Specify the path to the XSD file (`endangered_xsd_f`).
# MAGIC Spark will parse the XML and validate each row against the schema.
# MAGIC Rows that don't conform to the schema will be rejected.
# MAGIC The XSD does not impact the resulting DataFrame schema, it's only used for validation.
# COMMAND ----------
# MAGIC %md
# MAGIC # XML Ingest with Auto Loader
# COMMAND ----------
# DBTITLE 1,Auto Loader in Python with validation
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "xml") \
.option("rowTag", "species") \
.option("rowValidationendangered_xsd_f", endangered_xsd_f) \
.option("cloudFiles.schemaLocation", schema_loc) \
.load(endangered_f)
#display(df)
# COMMAND ----------
# MAGIC %md
# MAGIC Auto Loader allows streaming XML data and supports schema inference and evolution.
# MAGIC - Use `"cloudFiles.format"` to specify `"xml"` for XML data.
# MAGIC - `"rowTag"` indicates the XML elements to parse as rows.
# MAGIC - `"rowValidationXSDPath"` provides the XSD for validation.
# MAGIC - `"cloudFiles.schemaLocation"` specifies where to store the inferred schema for evolution.
# MAGIC
# MAGIC Uncomment `display(df)` to see the streaming DataFrame.
# COMMAND ----------
# DBTITLE 1,Auto Loader: old XSD, XML with NEW Element - will FAIL
#
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "xml") \
.option("rowTag", "species") \
.option("rowValidationendangered_xsd_f", endangered_xsd_f) \
.option("cloudFiles.schemaLocation", schema_loc) \
.load(endangered_new_f)
# COMMAND ----------
# MAGIC %md
# MAGIC This cell demonstrates a schema evolution failure.
# MAGIC - Auto Loader is set up with the original endangered species XSD.
# MAGIC - However, the XML file (`endangered_new_f`) introduces a new `<POPULATION>` element.
# MAGIC - Since the new element is not defined in the XSD, validation will fail.
# COMMAND ----------
display(df)
# COMMAND ----------
# MAGIC %md
# MAGIC Displaying the streaming DataFrame will show the failure due to the schema mismatch.
# MAGIC Auto Loader cannot handle the new XML element because it's not defined in the original XSD schema.
# COMMAND ----------
# DBTITLE 1,SchemaEvolution for new cols ON
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "xml") \
.option("rowTag", "species") \
.option("rowValidationendangered_xsd_f", endangered_xsd_f) \
.option("cloudFiles.schemaLocation", schema_loc) \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.load(endangered_new_f)
# COMMAND ----------
# MAGIC %md
# MAGIC The default of `"cloudFiles.schemaEvolutionMode"` is `"addNewColumns"`. This allows Auto Loader to update the schema when new columns are encountered. For other settings see the [documentation for schemaEvolutionMode](https://docs.databricks.com/en/ingestion/auto-loader/schema.html#evolution). The inferred schema stored in `"cloudFiles.schemaLocation"` will be updated to include the new column.
# MAGIC Now Auto Loader can process the XML with the new `<POPULATION>` element.
# COMMAND ----------
display(df)
# COMMAND ----------
# MAGIC %md
# MAGIC Displaying the streaming DataFrame now succeeds.
# MAGIC The DataFrame includes the new column representing the `<POPULATION>` element.
# MAGIC Auto Loader has evolved the schema to handle the change in the XML structure.
# COMMAND ----------
df.printSchema()
# COMMAND ----------
# MAGIC %md
# MAGIC Printing the DataFrame schema reveals the new column added through schema evolution.
# MAGIC Auto Loader inferred the new column and updated the schema stored in `"cloudFiles.schemaLocation"`.
# COMMAND ----------
# MAGIC %md
# MAGIC # Using SQL with XML
# COMMAND ----------
# DBTITLE 1,Ingest XML
# MAGIC %sql
# MAGIC
# MAGIC DROP TABLE IF EXISTS e_species;
# MAGIC
# MAGIC CREATE TABLE e_species USING XML
# MAGIC OPTIONS (path "file:/PATH/endangered_species.xml", rowTag "species");
# MAGIC
# MAGIC SELECT * FROM e_species;
# COMMAND ----------
# MAGIC %md
# MAGIC Spark SQL also supports creating tables directly from XML files.
# MAGIC - Use the `"CREATE TABLE"` statement with the `"USING XML"` clause.
# MAGIC - Specify the path to the XML file and the `"rowTag"` option to indicate the element for rows.
# MAGIC - The resulting table can be queried using standard SQL syntax.
# COMMAND ----------
# MAGIC %md
# MAGIC ## SQL XML functions()
# COMMAND ----------
# DBTITLE 1,SQL schema_of_xml() function
# MAGIC %sql
# MAGIC SELECT schema_of_xml('
# MAGIC <Ape>
# MAGIC <Name>Koba</Name>
# MAGIC <Species>Bonobo</Species>
# MAGIC <Leader>false</Leader>
# MAGIC <Intelligence>High</Intelligence>
# MAGIC <Personality>
# MAGIC <Trait>Aggressive</Trait>
# MAGIC <Trait>Loyal</Trait>
# MAGIC <Trait>Vengeful</Trait>
# MAGIC </Personality>
# MAGIC </Ape>
# MAGIC ');
# COMMAND ----------
# MAGIC %md
# MAGIC The `"schema_of_xml"` function in Spark SQL infers the schema of an XML string.
# MAGIC - Pass an XML string to the function, and it returns the corresponding schema.
# MAGIC - This is useful for exploring and understanding the structure of XML data.
# MAGIC - The inferred schema shows the hierarchy and data types of the XML elements and attributes.
# COMMAND ----------
# DBTITLE 1,SQL to_xml() function
# MAGIC %sql
# MAGIC SELECT
# MAGIC to_xml(named_struct("species","Orangutan"))
# COMMAND ----------
# MAGIC %md
# MAGIC The `"to_xml"` function in Spark SQL converts a struct to an XML string.
# MAGIC - Use `"named_struct"` to create a struct with key-value pairs representing XML elements.
# MAGIC - Pass the struct to `"to_xml"`, and it returns the corresponding XML string.
# MAGIC - This is handy when you need to generate XML output from structured data in DataFrames or tables.