Skip to content

Commit c7ba3f5

Browse files
Initial Commit
1 parent 4d4d97c commit c7ba3f5

File tree

5 files changed

+127
-0
lines changed

5 files changed

+127
-0
lines changed

07-RowDemo/RowDemo.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from pyspark.sql import *
2+
from pyspark.sql.functions import *
3+
from pyspark.sql.types import *
4+
5+
from lib.logger import Log4j
6+
7+
8+
def to_date_df(df, fmt, fld):
9+
return df.withColumn(fld, to_date(fld, fmt))
10+
11+
12+
if __name__ == "__main__":
13+
spark = SparkSession \
14+
.builder \
15+
.master("local[3]") \
16+
.appName("RowDemo") \
17+
.getOrCreate()
18+
19+
logger = Log4j(spark)
20+
21+
my_schema = StructType([
22+
StructField("ID", StringType()),
23+
StructField("EventDate", StringType())])
24+
25+
my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
26+
my_rdd = spark.sparkContext.parallelize(my_rows, 2)
27+
my_df = spark.createDataFrame(my_rdd, my_schema)
28+
29+
my_df.printSchema()
30+
my_df.show()
31+
new_df = to_date_df(my_df, "M/d/y", "EventDate")
32+
new_df.printSchema()
33+
new_df.show()

07-RowDemo/RowDemo_Test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from datetime import date
2+
from unittest import TestCase
3+
4+
from pyspark.sql import *
5+
from pyspark.sql.types import *
6+
7+
from RowDemo import to_date_df
8+
9+
10+
class RowDemoTestCase(TestCase):
11+
12+
@classmethod
13+
def setUpClass(cls) -> None:
14+
cls.spark = SparkSession.builder \
15+
.master("local[3]") \
16+
.appName("RowDemoTest") \
17+
.getOrCreate()
18+
19+
my_schema = StructType([
20+
StructField("ID", StringType()),
21+
StructField("EventDate", StringType())])
22+
23+
my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
24+
my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
25+
cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)
26+
27+
def test_data_type(self):
28+
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
29+
for row in rows:
30+
self.assertIsInstance(row["EventDate"], date)
31+
32+
def test_date_value(self):
33+
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
34+
for row in rows:
35+
self.assertEqual(row["EventDate"], date(2020, 4, 5))

07-RowDemo/lib/__init__.py

Whitespace-only changes.

07-RowDemo/lib/logger.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class Log4j:
2+
def __init__(self, spark):
3+
log4j = spark._jvm.org.apache.log4j
4+
5+
root_class = "guru.learningjournal.spark.examples"
6+
conf = spark.sparkContext.getConf()
7+
app_name = conf.get("spark.app.name")
8+
9+
self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
10+
11+
def warn(self, message):
12+
self.logger.warn(message)
13+
14+
def info(self, message):
15+
self.logger.info(message)
16+
17+
def error(self, message):
18+
self.logger.error(message)
19+
20+
def debug(self, message):
21+
self.logger.debug(message)

07-RowDemo/log4j.properties

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#define following in Java System
18+
# -Dlog4j.configuration=file:log4j.properties
19+
# -Dlogfile.name=hello-spark
20+
# -Dspark.yarn.app.container.log.dir=app-logs
21+
log4j.appender.file.ImmediateFlush=true
22+
log4j.appender.file.Append=false
23+
log4j.appender.file.MaxFileSize=500MB
24+
log4j.appender.file.MaxBackupIndex=2
25+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
26+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
27+
28+
# Recommendations from Spark template
29+
log4j.logger.org.apache.spark.repl.Main=WARN
30+
log4j.logger.org.spark_project.jetty=WARN
31+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
32+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
33+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
34+
log4j.logger.org.apache.parquet=ERROR
35+
log4j.logger.parquet=ERROR
36+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
37+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
38+

0 commit comments

Comments
 (0)