Skip to content

Commit

Permalink
improved function, added cli and config support
Browse files Browse the repository at this point in the history
Signed-off-by: nathannathant <[email protected]>
  • Loading branch information
nathom committed Mar 5, 2021
1 parent eb19e73 commit 32015dc
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 111 deletions.
15 changes: 8 additions & 7 deletions qobuz_dl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def reset_config(config_file):
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(config_file, "w") as configfile:
config.write(configfile)
logging.info(
Expand Down Expand Up @@ -105,16 +106,19 @@ def main():
if (
"folder_format" not in config["DEFAULT"]
or "track_format" not in config["DEFAULT"]
or "smart_discography" not in config["DEFAULT"]
):
logging.info(
f"{YELLOW}Config file does not include format string," " updating..."
f"{YELLOW}Config file does not include some settings, updating..."
)
config["DEFAULT"]["folder_format"] = "{artist} - {album} ({year}) "
"[{bit_depth}B-{sampling_rate}kHz]"
config["DEFAULT"]["track_format"] = "{tracknumber}. {tracktitle}"
config["DEFAULT"]["smart_discography"] = "false"
with open(CONFIG_FILE, "w") as cf:
config.write(cf)

smart_discography = config.getboolean("DEFAULT", "smart_discography")
folder_format = config["DEFAULT"]["folder_format"]
track_format = config["DEFAULT"]["track_format"]

Expand Down Expand Up @@ -151,12 +155,9 @@ def main():
cover_og_quality=arguments.og_cover or og_cover,
no_cover=arguments.no_cover or no_cover,
downloads_db=None if no_database or arguments.no_db else QOBUZ_DB,
folder_format=arguments.folder_format
if arguments.folder_format is not None
else folder_format,
track_format=arguments.track_format
if arguments.track_format is not None
else track_format,
folder_format=arguments.folder_format or folder_format,
track_format=arguments.track_format or track_format,
smart_discography=arguments.smart_discography or smart_discography,
)
qobuz.initialize_client(email, password, app_id, secrets)

Expand Down
6 changes: 6 additions & 0 deletions qobuz_dl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ def add_common_arg(custom_parser, default_folder, default_quality):
metavar="PATTERN",
help="pattern for formatting track names. see `folder-format`.",
)
custom_parser.add_argument(
"-sd",
"--smart-discography",
action="store_true",
help="Try to filter out unrelated albums when requesting an artists discography.",
)


def qobuz_dl_args(
Expand Down
191 changes: 87 additions & 104 deletions qobuz_dl/core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# ----- Testing ------
import json

# --------------------
import logging
import os
import re
Expand All @@ -26,7 +22,12 @@
ARTISTS_SELECTOR = "td.chartlist-artist > a"
TITLE_SELECTOR = "td.chartlist-name > a"
EXTENSIONS = (".mp3", ".flac")
QUALITIES = {5: "5 - MP3", 6: "6 - FLAC", 7: "7 - 24B<96kHz", 27: "27 - 24B>96kHz"}
QUALITIES = {
5: "5 - MP3",
6: "6 - 16 bit, 44.1kHz",
7: "7 - 24 bit, <96kHz",
27: "27 - 24 bit, >96kHz",
}

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,7 +92,7 @@ def __init__(

def initialize_client(self, email, pwd, app_id, secrets):
self.client = qopy.Client(email, pwd, app_id, secrets)
logger.info(f"{YELLOW}Set quality: {QUALITIES[int(self.quality)]}\n")
logger.info(f"{YELLOW}Set max quality: {QUALITIES[int(self.quality)]}\n")

def get_tokens(self):
spoofer = spoofbuz.Spoofer()
Expand Down Expand Up @@ -185,10 +186,18 @@ def handle_url(self, url):
os.path.join(self.directory, sanitize_filename(content_name))
)

# items = [item[type_dict["iterable_key"]]["items"] for item in content][0]
items = self.smart_discography_filter(
content, True, True,
)
if self.smart_discography and url_type == "artist":
logger.info(f"{YELLOW}Filtering {content_name}'s discography")
items = self.smart_discography_filter(
content,
save_space=True,
skip_extras=True,
)
else:
items = [item[type_dict["iterable_key"]]["items"] for item in content][
0
]

logger.info(f"{YELLOW}{len(items)} downloads in queue")
for item in items:
self.download_from_id(
Expand Down Expand Up @@ -482,110 +491,84 @@ def make_m3u(self, pl_directory):
pl.write("\n\n".join(track_list))

def smart_discography_filter(
self, contents: list, save_space=False, remove_extras=False
self, contents: list, save_space=False, skip_extras=False
) -> list:
"""When downloading some artists' discography, there can be a lot
of duplicate albums that needlessly use 10's of GB of bandwidth. This
filters the duplicates.
Example (Stevie Wonder):
* ...
* Songs In The Key of Life [24/192]
* Songs In The Key of Life [24/96]
* Songs In The Key of Life [16/44.1]
* ...
This function should choose either [24/96] or [24/192].
It also skips deluxe albums in favor of the originals, picks remasters
in favor of originals, and removes albums by other artists that just
feature the requested artist.
"""When downloading some artists' discography, many random and spam-like
albums can get downloaded. This helps filter those out to just get the good stuff.
This function removes:
* albums by other artists, which may contain a feature from the requested artist
* duplicate albums in different qualities
* (optionally) removes collector's, deluxe, live albums
:param list contents: contents returned by qobuz API
:param bool save_space: choose highest bit depth, lowest sampling rate
:param bool remove_extras: remove albums with extra material (i.e. live, deluxe,...)
:returns: filtered items list
"""

def print_album(a: dict):
print(
f"{album['title']} - {album['version']} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']})"
# for debugging
def print_album(album: dict):
logger.info(
f"{album['title']} - {album.get('version', '~~')} ({album['maximum_bit_depth']}/{album['maximum_sampling_rate']} by {album['artist']['name']}) {album['id']}"
)

def remastered(s: str) -> bool:
"""Case insensitive match to check whether
an album is remastered.
TYPE_REGEXES = {
"remaster": r"(?i)(re)?master(ed)?",
"extra": r"(?i)(anniversary|deluxe|live|collector|demo|expanded)",
}

def is_type(album_t: str, album: dict) -> bool:
version = album.get("version", "")
title = album.get("title", "")
regex = TYPE_REGEXES[album_t]
return re.search(regex, f"{title} {version}") is not None

def essence(album: dict) -> str:
"""Ignore text in parens/brackets, return all lowercase.
Used to group two albums that may be named similarly, but not exactly
the same.
"""
if s is None:
return False
return re.match(r"(?i)(re)?master(ed)?", s) is not None

def extra(album: dict) -> bool:
assert hasattr(album, "__getitem__"), "param must be dict-like"
if 'version' not in album:
return False
return (
re.findall(
r"(?i)(anniversary|deluxe|live|collector|demo)",
f"{album['title']} {album['version']}",
)
!= []
)
r = re.match(r"([^\(]+)(?:\s*[\(\[][^\)][\)\]])*", album)
return r.group(1).strip().lower()

# remove all albums by other artists
artist = contents[0]["name"]
requested_artist = contents[0]["name"]
items = [item["albums"]["items"] for item in contents][0]
artist_f = [] # artist filtered
for item in items:
if item["artist"]["name"] == artist:
artist_f.append(item)

# use dicts to group duplicate titles together
titles_f = dict()
for item in artist_f:
if (t := item["title"]) not in titles_f:
titles_f[t] = []
titles_f[t].append(item)

# pick desired quality out of duplicates
# remasters are given preferred status
quality_f = []
for albums in titles_f.values():
# no duplicates for title
if len(albums) == 1:
quality_f.append(albums[0])
continue

# desired bit depth and sampling rate
bit_depth = max(a["maximum_bit_depth"] for a in albums)
# having sampling rate > 44.1kHz is a waste of space
# https://en.wikipedia.org/wiki/Nyquist–Shannon_sampling_theorem
# https://en.wikipedia.org/wiki/44,100_Hz#Human_hearing_and_signal_processing
cmp_func = min if save_space else max
sampling_rate = cmp_func(
# use dicts to group duplicate albums together by title
title_grouped = dict()
for item in items:
if (t := essence(item["title"])) not in title_grouped:
title_grouped[t] = []
title_grouped[t].append(item)

items = []
for albums in title_grouped.values():
best_bit_depth = max(a["maximum_bit_depth"] for a in albums)
get_best = min if save_space else max
best_sampling_rate = get_best(
a["maximum_sampling_rate"]
for a in albums
if a["maximum_bit_depth"] == bit_depth
if a["maximum_bit_depth"] == best_bit_depth
)
has_remaster = bool([a for a in albums if remastered(a["version"])])

# check if album has desired bit depth and sampling rate
# if there is a remaster in `item`, check if the album is a remaster
for album in albums:
if (
album["maximum_bit_depth"] == bit_depth
and album["maximum_sampling_rate"] == sampling_rate
):
if not has_remaster:
quality_f.append(album)
elif remastered(album["version"]):
quality_f.append(album)

if remove_extras:
final = []
# this filters those huge albums with outtakes, live performances etc.
for album in quality_f:
if not extra(album):
final.append(album)
else:
final = quality_f
remaster_exists = any(is_type("remaster", a) for a in albums)

def is_valid(album):
return (
album["maximum_bit_depth"] == best_bit_depth
and album["maximum_sampling_rate"] == best_sampling_rate
and album["artist"]["name"] == requested_artist
and not ( # states that are not allowed
(remaster_exists and not is_type("remaster", album))
or (skip_extras and is_type("extra", album))
)
)

filtered = tuple(filter(is_valid, albums))
# most of the time, len is 0 or 1.
# if greater, it is a complete duplicate,
# so it doesn't matter which is chosen
if len(filtered) >= 1:
items.append(filtered[0])

return final
# key = lambda a: a["title"]
# final.sort(key=key)
# for album in final:
# print_album(album)
return items

0 comments on commit 32015dc

Please sign in to comment.