Skip to content

Commit 896ab1e

Browse files
committed
Transformations exercises.
1 parent 098313a commit 896ab1e

File tree

6 files changed

+10267
-0
lines changed

6 files changed

+10267
-0
lines changed

sds-rdd-apis/ColumnsNotebook.ipynb

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.

sds-rdd-apis/LogFileDemo.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from pyspark.sql import *
2+
from pyspark.sql.functions import regexp_extract, substring_index
3+
4+
if __name__ == "__main__":
5+
spark = SparkSession \
6+
.builder \
7+
.master("local[3]") \
8+
.appName("LogFileDemo") \
9+
.getOrCreate()
10+
11+
file_df = spark.read.text("data/apache_logs.txt")
12+
file_df.printSchema()
13+
14+
15+
log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'
16+
17+
logs_df = file_df.select(regexp_extract('value', log_reg, 1).alias('ip'),
18+
regexp_extract('value', log_reg, 4).alias('date'),
19+
regexp_extract('value', log_reg, 6).alias('request'),
20+
regexp_extract('value', log_reg, 10).alias('referrer'))
21+
22+
logs_df.printSchema()
23+
24+
logs_df \
25+
.where("trim(referrer) != '-' ") \
26+
.withColumn("referrer", substring_index("referrer", "/", 3)) \
27+
.groupBy("referrer") \
28+
.count() \
29+
.show(100, truncate=False)
30+

sds-rdd-apis/MyPythonNotebook.ipynb

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"source": [
6+
"from pyspark.sql import *\n",
7+
"from pyspark.sql.functions import *\n",
8+
"from pyspark.sql.types import *\n",
9+
"\n",
10+
"def to_date_df(df, fmt, fld):\n",
11+
" return df.withColumn(fld, to_date(col(fld), fmt))\n"
12+
],
13+
"metadata": {
14+
"application/vnd.databricks.v1+cell": {
15+
"title": "",
16+
"showTitle": false,
17+
"inputWidgets": {},
18+
"nuid": "80b42010-ef17-4d97-9ff1-16f7b96464d3"
19+
}
20+
},
21+
"outputs": [
22+
{
23+
"output_type": "display_data",
24+
"metadata": {
25+
"application/vnd.databricks.v1+output": {
26+
"datasetInfos": [],
27+
"data": "<div class=\"ansiout\"></div>",
28+
"removedWidgets": [],
29+
"addedWidgets": {},
30+
"type": "html",
31+
"arguments": {}
32+
}
33+
},
34+
"data": {
35+
"text/html": [
36+
"<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"
37+
]
38+
}
39+
}
40+
],
41+
"execution_count": 0
42+
},
43+
{
44+
"cell_type": "code",
45+
"source": [
46+
"my_schema = StructType([\n StructField(\"ID\", StringType()),\n StructField(\"EventDate\", StringType())])\n\nmy_rows = [Row(\"123\", \"04/05/2020\"), Row(\"124\", \"4/5/2020\"), Row(\"125\", \"04/5/2020\"), Row(\"126\", \"4/05/2020\")]\nmy_rdd = spark.sparkContext.parallelize(my_rows, 2)\nmy_df = spark.createDataFrame(my_rdd, my_schema)"
47+
],
48+
"metadata": {
49+
"application/vnd.databricks.v1+cell": {
50+
"title": "",
51+
"showTitle": false,
52+
"inputWidgets": {},
53+
"nuid": "4ae5aa80-cd11-4e1a-905e-cf1406a6e1c8"
54+
}
55+
},
56+
"outputs": [
57+
{
58+
"output_type": "display_data",
59+
"metadata": {
60+
"application/vnd.databricks.v1+output": {
61+
"datasetInfos": [
62+
{
63+
"name": "my_df",
64+
"typeStr": "pyspark.sql.dataframe.DataFrame",
65+
"schema": {
66+
"fields": [
67+
{
68+
"metadata": {},
69+
"name": "ID",
70+
"nullable": true,
71+
"type": "string"
72+
},
73+
{
74+
"metadata": {},
75+
"name": "EventDate",
76+
"nullable": true,
77+
"type": "string"
78+
}
79+
],
80+
"type": "struct"
81+
},
82+
"tableIdentifier": null
83+
}
84+
],
85+
"data": "<div class=\"ansiout\"></div>",
86+
"removedWidgets": [],
87+
"addedWidgets": {},
88+
"type": "html",
89+
"arguments": {}
90+
}
91+
},
92+
"data": {
93+
"text/html": [
94+
"<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"
95+
]
96+
}
97+
}
98+
],
99+
"execution_count": 0
100+
},
101+
{
102+
"cell_type": "code",
103+
"source": [
104+
"my_df.printSchema()\nmy_df.show()\nnew_df = to_date_df(my_df, \"M/d/y\", \"EventDate\")\nnew_df.printSchema()\nnew_df.show() "
105+
],
106+
"metadata": {
107+
"application/vnd.databricks.v1+cell": {
108+
"title": "",
109+
"showTitle": false,
110+
"inputWidgets": {},
111+
"nuid": "457f165a-d68e-476a-8b32-31ed691770b2"
112+
}
113+
},
114+
"outputs": [
115+
{
116+
"output_type": "display_data",
117+
"metadata": {
118+
"application/vnd.databricks.v1+output": {
119+
"datasetInfos": [
120+
{
121+
"name": "new_df",
122+
"typeStr": "pyspark.sql.dataframe.DataFrame",
123+
"schema": {
124+
"fields": [
125+
{
126+
"metadata": {},
127+
"name": "ID",
128+
"nullable": true,
129+
"type": "string"
130+
},
131+
{
132+
"metadata": {},
133+
"name": "EventDate",
134+
"nullable": true,
135+
"type": "date"
136+
}
137+
],
138+
"type": "struct"
139+
},
140+
"tableIdentifier": null
141+
}
142+
],
143+
"data": "<div class=\"ansiout\">root\n |-- ID: string (nullable = true)\n |-- EventDate: string (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|04/05/2020|\n|124| 4/5/2020|\n|125| 04/5/2020|\n|126| 4/05/2020|\n+---+----------+\n\nroot\n |-- ID: string (nullable = true)\n |-- EventDate: date (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|2020-04-05|\n|124|2020-04-05|\n|125|2020-04-05|\n|126|2020-04-05|\n+---+----------+\n\n</div>",
144+
"removedWidgets": [],
145+
"addedWidgets": {},
146+
"type": "html",
147+
"arguments": {}
148+
}
149+
},
150+
"data": {
151+
"text/html": [
152+
"<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\">root\n-- ID: string (nullable = true)\n-- EventDate: string (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|04/05/2020|\n124| 4/5/2020|\n125| 04/5/2020|\n126| 4/05/2020|\n+---+----------+\n\nroot\n-- ID: string (nullable = true)\n-- EventDate: date (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|2020-04-05|\n124|2020-04-05|\n125|2020-04-05|\n126|2020-04-05|\n+---+----------+\n\n</div>"
153+
]
154+
}
155+
}
156+
],
157+
"execution_count": 0
158+
}
159+
],
160+
"metadata": {
161+
"application/vnd.databricks.v1+notebook": {
162+
"notebookName": "MyPythonNotebook",
163+
"dashboards": [],
164+
"language": "python",
165+
"widgets": {},
166+
"notebookOrigID": 343023503281504
167+
}
168+
},
169+
"nbformat": 4,
170+
"nbformat_minor": 0
171+
}

sds-rdd-apis/RowDemo.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from pyspark.sql import *
2+
from pyspark.sql.functions import *
3+
from pyspark.sql.types import *
4+
5+
from lib.logger import Log4j
6+
7+
8+
def to_date_df(df, fmt, fld):
9+
return df.withColumn(fld, to_date(fld, fmt))
10+
11+
12+
if __name__ == "__main__":
13+
spark = SparkSession \
14+
.builder \
15+
.master("local[3]") \
16+
.appName("RowDemo") \
17+
.getOrCreate()
18+
19+
logger = Log4j(spark)
20+
21+
my_schema = StructType([
22+
StructField("ID", StringType()),
23+
StructField("EventDate", StringType())])
24+
25+
my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
26+
my_rdd = spark.sparkContext.parallelize(my_rows, 2)
27+
my_df = spark.createDataFrame(my_rdd, my_schema)
28+
29+
my_df.printSchema()
30+
my_df.show()
31+
new_df = to_date_df(my_df, "M/d/y", "EventDate")
32+
new_df.printSchema()
33+
new_df.show()

sds-rdd-apis/RowDemo_Test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from datetime import date
2+
from unittest import TestCase
3+
4+
from pyspark.sql import *
5+
from pyspark.sql.types import *
6+
7+
from RowDemo import to_date_df
8+
9+
10+
class RowDemoTestCase(TestCase):
11+
12+
@classmethod
13+
def setUpClass(cls) -> None:
14+
cls.spark = SparkSession.builder \
15+
.master("local[3]") \
16+
.appName("RowDemoTest") \
17+
.getOrCreate()
18+
19+
my_schema = StructType([
20+
StructField("ID", StringType()),
21+
StructField("EventDate", StringType())])
22+
23+
my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
24+
my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
25+
cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)
26+
27+
def test_data_type(self):
28+
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
29+
for row in rows:
30+
self.assertIsInstance(row["EventDate"], date)
31+
32+
def test_date_value(self):
33+
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
34+
for row in rows:
35+
self.assertEqual(row["EventDate"], date(2020, 4, 5))

0 commit comments

Comments
 (0)