Skip to content

Commit

Permalink
Use black to format data set downloader and add requirements.txt
Browse files Browse the repository at this point in the history
  • Loading branch information
Frank Blechschmidt committed Apr 28, 2019
1 parent e125b6a commit 7c6889c
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 42 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# CS 498 CCA: Project Team 40

> Improving Amazon Star Ratings through Text Analysis on Review Content
## Download Data Sets

1. Install Spark on your local machine, e.g. `brew install apache-spark` for MacOS
1. Install Python 3, e.g. `brew install python` for MacOS
1. Install the Python dependencies: `pip install boto3 click click_pathlib pyspark`
1. Install the Python dependencies: `pip install -r requirements.txt`
1. Run the Data Set Downloader: `python data_set_downloader.py`

### Usage and Options
```

```text
Usage: data_set_downloader.py [OPTIONS]
Utility to download Amazon Product Reviews data set.
Expand All @@ -26,24 +29,29 @@ Options:
```

### Default Values

| Option | Value |
|-----------------------|----------------------|
| `aws-data-set-region` | `us-east-1` |
| `aws-data-set-bucket` | `amazon-reviews-pds` |
| `file-destination` | `./datasets` |

### Example Usage

Run with file name filter, custom file destination and debug info activated:
```

```bash
python data_set_downloader.py --file-names tsv/amazon_reviews_us_Watches_v1_00.tsv.gz --file-names tsv/amazon_reviews_us_Home_Entertainment_v1_00.tsv.gz -d .target --debug
```

## Jupyter Notebooks

1. Install Spark on your local machine, e.g. `brew install apache-spark` for MacOS
1. Install Python 3, e.g. `brew install python` for MacOS
1. Install the Python dependencies: `pip install beautifulsoup4 jupyter matplotlib numpy pandas scikit-learn seaborn`
1. Install the Python dependencies: `pip install -r requirements.txt`
1. Start Jupyter Notebook: `python jupyter`

## Statistics
A preliminary example of the process we intend to implement in a distributed computing process is shown in `Amazon Watches Reviews EDA.ipynb`.

A preliminary example of the process we intend to implement in a distributed computing process is shown in `Amazon_Watches_Reviews_EDA.ipynb`.
This Juptyer notebook demonstates how the Amazon Reviews can be divided into several tiers and how a training data set and model can be used to predict the tier that the product should belong using certain aggregate features.
124 changes: 87 additions & 37 deletions data_set_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,72 +14,116 @@
from pyspark import SparkContext
from pyspark.sql import SparkSession
except ImportError:
sys.exit("Please install dependencies: pip install boto3 click click_pathlib pyspark")
sys.exit(
"Please install dependencies: pip install boto3 click click_pathlib pyspark"
)


DEFAULT_PRODUCT_REVIEWS_DATASET_BUCKET = "amazon-reviews-pds"
DEFAULT_PRODUCT_REVIEWS_DATASET_REGION = "us-east-1"
DEFAULT_FILE_DESTINATION = Path('.') / '.datasets'
DEFAULT_FILE_DESTINATION = Path(".") / ".datasets"
FILE_PATTERN = re.compile(r"tsv/amazon_reviews_us\w*\.tsv\.gz")


def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])


def download_files(file, bucket, folder):
s3_resource = boto3.resource('s3')
path = folder / file['Key'].lstrip('tsv/')
s3_resource.Object(bucket, file['Key']).download_file(str(path))
s3_resource = boto3.resource("s3")
path = folder / file["Key"].lstrip("tsv/")
s3_resource.Object(bucket, file["Key"]).download_file(str(path))
return path


def ungzip_file(source):
destination = source.with_suffix('')
with gzip.open(source, 'rb') as f_in, open(destination, 'wb') as f_out:
destination = source.with_suffix("")
with gzip.open(source, "rb") as f_in, open(destination, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
return destination


@click.command()
@click.option('--aws-access-key-id', '-a', help='AWS Access Key ID')
@click.option('--aws-secret-access-key', '-s', help='AWS Secret Access Key')
@click.option('--aws-data-set-region', '-r', help='AWS Data Set Region', default=DEFAULT_PRODUCT_REVIEWS_DATASET_REGION)
@click.option('--aws-data-set-bucket', '-b', help='AWS Data Set S3 Bucket', default=DEFAULT_PRODUCT_REVIEWS_DATASET_BUCKET)
@click.option('--file-names', '-f', multiple=True, help='Data Set File Filter')
@click.option('--file-destination', '-d', help='Data Set File Destination', default=DEFAULT_FILE_DESTINATION, type=click_pathlib.Path(exists=True))
@click.option('--debug', is_flag=True, help='Print additional information')
def download(aws_access_key_id, aws_secret_access_key, aws_data_set_region, aws_data_set_bucket, file_names, file_destination, debug):
@click.option("--aws-access-key-id", "-a", help="AWS Access Key ID")
@click.option("--aws-secret-access-key", "-s", help="AWS Secret Access Key")
@click.option(
"--aws-data-set-region",
"-r",
help="AWS Data Set Region",
default=DEFAULT_PRODUCT_REVIEWS_DATASET_REGION,
)
@click.option(
"--aws-data-set-bucket",
"-b",
help="AWS Data Set S3 Bucket",
default=DEFAULT_PRODUCT_REVIEWS_DATASET_BUCKET,
)
@click.option("--file-names", "-f", multiple=True, help="Data Set File Filter")
@click.option(
"--file-destination",
"-d",
help="Data Set File Destination",
default=DEFAULT_FILE_DESTINATION,
type=click_pathlib.Path(exists=True),
)
@click.option("--debug", is_flag=True, help="Print additional information")
def download(
aws_access_key_id,
aws_secret_access_key,
aws_data_set_region,
aws_data_set_bucket,
file_names,
file_destination,
debug,
):
"""Utility to download Amazon Product Reviews data set."""
s3 = boto3.client('s3',
s3 = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_data_set_region)
region_name=aws_data_set_region,
)

files = s3.list_objects(Bucket=aws_data_set_bucket, Prefix='tsv')['Contents']
files = s3.list_objects(Bucket=aws_data_set_bucket, Prefix="tsv")["Contents"]
if debug:
# list the contents of DEFAULT_PRODUCT_REVIEWS_DATASET_BUCKET
print(f"Content of S3 bucket {aws_data_set_bucket}:")
pprint([
(file['Key'], convert_size(s3.head_object(Bucket=aws_data_set_bucket, Key=file['Key'])['ContentLength']))
for file in files
])
pprint(
[
(
file["Key"],
convert_size(
s3.head_object(Bucket=aws_data_set_bucket, Key=file["Key"])[
"ContentLength"
]
),
)
for file in files
]
)

# ignore the multilingual files
files = filter(lambda file: FILE_PATTERN.match(file['Key']), files)
files = filter(lambda file: FILE_PATTERN.match(file["Key"]), files)

# apply provided filename filter
files = [file for file in files if file['Key'] in file_names]
if file_names:
files = [file for file in files if file["Key"] in file_names]

if debug:
print(f"\nFiltered file list for S3 bucket {aws_data_set_bucket}:")
pprint([file['Key'] for file in files])
pprint([file["Key"] for file in files])

context = SparkContext()
distributed_dataset = context.parallelize(files)
archives = distributed_dataset.map(lambda f: download_files(f, aws_data_set_bucket, file_destination)).collect()
archives = distributed_dataset.map(
lambda f: download_files(f, aws_data_set_bucket, file_destination)
).collect()
if debug:
print(f"\nDownloaded archives:")
pprint(archives)
Expand All @@ -91,10 +135,16 @@ def download(aws_access_key_id, aws_secret_access_key, aws_data_set_region, aws_
pprint(downloaded_files)

if debug:
spark = SparkSession.builder.appName('AmazonImprovedStarRatings').getOrCreate()
path = file_destination / '*.tsv'
df = spark.read.format("csv").option("header", "true").option("delimiter", "\t").load(str(path))
spark = SparkSession.builder.appName("AmazonImprovedStarRatings").getOrCreate()
path = file_destination / "*.tsv"
df = (
spark.read.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.load(str(path))
)
print(df.show(5))

if __name__ == '__main__':

if __name__ == "__main__":
download()
66 changes: 66 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
appnope==0.1.0
attrs==19.1.0
backcall==0.1.0
beautifulsoup4==4.7.1
bleach==3.1.0
boto3==1.9.137
botocore==1.12.137
Click==7.0
click-pathlib==2019.4.26.2
cycler==0.10.0
decorator==4.4.0
defusedxml==0.6.0
docutils==0.14
entrypoints==0.3
ipykernel==5.1.0
ipython==7.5.0
ipython-genutils==0.2.0
ipywidgets==7.4.2
jedi==0.13.3
Jinja2==2.10.1
jmespath==0.9.4
jsonschema==3.0.1
jupyter==1.0.0
jupyter-client==5.2.4
jupyter-console==6.0.0
jupyter-core==4.4.0
kiwisolver==1.1.0
MarkupSafe==1.1.1
matplotlib==3.0.3
mistune==0.8.4
nbconvert==5.5.0
nbformat==4.4.0
notebook==5.7.8
numpy==1.14.5
pandas==0.24.2
pandocfilters==1.4.2
parso==0.4.0
pexpect==4.7.0
pickleshare==0.7.5
prometheus-client==0.6.0
prompt-toolkit==2.0.9
ptyprocess==0.6.0
py4j==0.10.7
Pygments==2.3.1
pyparsing==2.4.0
pyrsistent==0.15.1
pyspark==2.4.2
python-dateutil==2.8.0
pytz==2019.1
pyzmq==18.0.1
qtconsole==4.4.3
s3transfer==0.2.0
scikit-learn==0.20.3
scipy==1.2.1
seaborn==0.9.0
Send2Trash==1.5.0
six==1.12.0
soupsieve==1.9.1
terminado==0.8.2
testpath==0.4.2
tornado==6.0.2
traitlets==4.3.2
urllib3==1.24.2
wcwidth==0.1.7
webencodings==0.5.1
widgetsnbextension==3.4.2

0 comments on commit 7c6889c

Please sign in to comment.