From 9734dc4db865ec4e657e601194bf147c00de9acc Mon Sep 17 00:00:00 2001 From: Arthur Taylor Date: Sun, 28 Aug 2022 14:46:25 +0200 Subject: [PATCH] Updated config parsing to read config from environment --- flathunt.py | 30 +++++----- flathunter/config.py | 111 +++++++++++++++++++++++++++++++++---- flathunter/hunter.py | 4 +- flathunter/logging.py | 8 +++ main.py | 16 ++---- test/test_config.py | 2 +- test/test_web_interface.py | 23 ++++---- 7 files changed, 144 insertions(+), 50 deletions(-) diff --git a/flathunt.py b/flathunt.py index f0e8bb7b..dccbea63 100644 --- a/flathunt.py +++ b/flathunt.py @@ -8,12 +8,11 @@ import os import logging import time -from pprint import pformat -from flathunter.logging import logger, wdm_logger +from flathunter.logging import logger, wdm_logger, configure_logging from flathunter.idmaintainer import IdMaintainer from flathunter.hunter import Hunter -from flathunter.config import Config +from flathunter.config import Config, Env from flathunter.heartbeat import Heartbeat __author__ = "Jan Harrie" @@ -31,10 +30,10 @@ def launch_flat_hunt(config, heartbeat=None): hunter.hunt_flats() counter = 0 - while config.get('loop', {}).get('active', False): + while config.loop_is_active(): counter += 1 counter = heartbeat.send_heartbeat(counter) - time.sleep(config.get('loop', {}).get('sleeping_time', 60 * 10)) + time.sleep(config.loop_period_seconds()) hunter.hunt_flats() @@ -45,7 +44,10 @@ def main(): " and sends results to Telegram User"), epilog="Designed by Nody" ) - default_config_path = f"{os.path.dirname(os.path.abspath(__file__))}/config.yaml" + if Env.FLATHUNTER_TARGET_URLS is not None: + default_config_path = None + else: + default_config_path = f"{os.path.dirname(os.path.abspath(__file__))}/config.yaml" parser.add_argument('--config', '-c', type=argparse.FileType('r', encoding='UTF-8'), default=default_config_path, @@ -62,15 +64,13 @@ def main(): # load config config_handle = args.config - config = Config(config_handle.name) + if config_handle is not None: + config = Config(config_handle.name) + else: + config = Config() - # adjust log level, if required - if config.get('verbose'): - logger.setLevel(logging.DEBUG) - # Allow logging of "webdriver-manager" module on verbose mode - wdm_logger.setLevel(logging.INFO) - - logger.debug("Settings from config: %s", pformat(config)) + # setup logging + configure_logging(config) # initialize search plugins for config config.init_searchers() @@ -89,7 +89,7 @@ def main(): return if not config.get('telegram', {}).get('receiver_ids'): logger.warning("No Telegram receivers configured - nobody will get notifications.") - if not config.get('urls'): + if len(config.target_urls()) == 0: logger.error("No URLs configured. Starting like this would be pointless...") return diff --git a/flathunter/config.py b/flathunter/config.py index ba7b0ef0..43ecfbc9 100644 --- a/flathunter/config.py +++ b/flathunter/config.py @@ -2,6 +2,8 @@ import os import yaml +from dotenv import load_dotenv + from flathunter.logging import logger from flathunter.captcha.imagetyperz_solver import ImageTyperzSolver from flathunter.captcha.twocaptcha_solver import TwoCaptchaSolver @@ -15,18 +17,48 @@ from flathunter.crawl_idealista import CrawlIdealista from flathunter.filter import Filter +load_dotenv() + +class Env: + + def readenv(key): + if key in os.environ: + return os.environ[key] + return None + + # Captcha setup + FLATHUNTER_2CAPTCHA_KEY = readenv("FLATHUNTER_2CAPTCHA_KEY") + FLATHUNTER_IMAGETYPERZ_TOKEN = readenv("FLATHUNTER_IMAGETYPERZ_TOKEN") + + # Generic Config + FLATHUNTER_TARGET_URLS = readenv("FLATHUNTER_TARGET_URLS") + FLATHUNTER_DATABASE_LOCATION = readenv("FLATHUNTER_DATABASE_LOCATION") + FLATHUNTER_VERBOSE_LOG = readenv("FLATHUNTER_VERBOSE_LOG") + FLATHUNTER_LOOP_PERIOD_SECONDS = readenv("FLATHUNTER_LOOP_PERIOD_SECONDS") + + # Website setup + FLATHUNTER_WEBSITE_SESSION_KEY = readenv("FLATHUNTER_WEBSITE_SESSION_KEY") + FLATHUNTER_WEBSITE_DOMAIN = readenv("FLATHUNTER_WEBSITE_DOMAIN") + class Config: """Class to represent flathunter configuration""" def __init__(self, filename=None, string=None): + self.useEnvironment = True if string is not None: self.config = yaml.safe_load(string) + self.useEnvironment = False else: - if filename is None: - filename = os.path.dirname(os.path.abspath(__file__)) + "/../config.yaml" - logger.info("Using config %s", filename) - with open(filename, encoding="utf-8") as file: - self.config = yaml.safe_load(file) + if filename is None and Env.FLATHUNTER_TARGET_URLS is None: + raise Exception("Config file loaction must be specified, or FLATHUNTER_TARGET_URLS must be set") + if filename is not None: + logger.info("Using config path %s", filename) + if not os.path.exists(filename): + raise Exception("No config file found at location %s") + with open(filename, encoding="utf-8") as file: + self.config = yaml.safe_load(file) + else: + self.config = {} self.__searchers__ = [] self.check_deprecated() @@ -71,10 +103,21 @@ def get(self, key, value=None): """Emulate dictionary""" return self.config.get(key, value) + def _read_yaml_path(self, path, default_value=None): + config = self.config + parts = path.split('.') + while len(parts) > 1: + config = config.get(parts[0], {}) + parts = parts[1:] + return config.get(parts[0], default_value) + def database_location(self): """Return the location of the database folder""" - if "database_location" in self.config: - return self.config["database_location"] + config_database_location = self._read_yaml_path('database_location') + if config_database_location is not None: + return config_database_location + if self.useEnvironment and Env.FLATHUNTER_DATABASE_LOCATION is not None: + return Env.FLATHUNTER_DATABASE_LOCATION return os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + "/..") def set_searchers(self, searchers): @@ -91,19 +134,63 @@ def get_filter(self): builder.read_config(self.config) return builder.build() + def target_urls(self): + if self.useEnvironment and Env.FLATHUNTER_TARGET_URLS is not None: + return Env.FLATHUNTER_TARGET_URLS.split(';') + return self._read_yaml_path('urls', []) + + def verbose_logging(self): + if self.useEnvironment and Env.FLATHUNTER_VERBOSE_LOG is not None: + return True + return self._read_yaml_path('verbose') is not None + + def loop_is_active(self): + if self.useEnvironment and Env.FLATHUNTER_LOOP_PERIOD_SECONDS is not None: + return True + return self._read_yaml_path('loop.active', False) + + def loop_period_seconds(self): + if self.useEnvironment and Env.FLATHUNTER_LOOP_PERIOD_SECONDS is not None: + return int(Env.FLATHUNTER_LOOP_PERIOD_SECONDS) + return self._read_yaml_path('loop.sleeping_time', 60 * 10) + + def has_website_config(self): + if self.useEnvironment and Env.FLATHUNTER_WEBSITE_SESSION_KEY is not None: + return True + return 'website' in self.config + + def website_session_key(self): + if self.useEnvironment and Env.FLATHUNTER_WEBSITE_SESSION_KEY is not None: + return Env.FLATHUNTER_WEBSITE_SESSION_KEY + return self._read_yaml_path('website.session_key', None) + + def website_domain(self): + if self.useEnvironment and Env.FLATHUNTER_WEBSITE_DOMAIN is not None: + return Env.FLATHUNTER_WEBSITE_DOMAIN + return self._read_yaml_path('website.domain', None) + + def website_bot_name(self): + if self.useEnvironment and Env.FLATHUNTER_WEBSITE_BOT_NAME is not None: + return Env.FLATHUNTER_WEBSITE_BOT_NAME + return self._read_yaml_path('website.bot_name', None) + def captcha_enabled(self): """Check if captcha is configured""" return "captcha" in self.config def get_captcha_solver(self) -> CaptchaSolver: """Get configured captcha solver""" - captcha_config = self.config.get("captcha", {}) - - imagetyperz_token = captcha_config.get("imagetyperz", {}).get("token", "") - twocaptcha_api_key = captcha_config.get("2captcha", {}).get("api_key", "") - + if self.useEnvironment and Env.FLATHUNTER_IMAGETYPERZ_TOKEN is not None: + imagetyperz_token = Env.FLATHUNTER_IMAGETYPERZ_TOKEN + else: + imagetyperz_token = self._read_yaml_path("captcha.imagetyperz.token", "") if imagetyperz_token: return ImageTyperzSolver(imagetyperz_token) + + if self.useEnvironment and Env.FLATHUNTER_2CAPTCHA_KEY is not None: + twocaptcha_api_key = Env.FLATHUNTER_2CAPTCHA_KEY + else: + twocaptcha_api_key = self._read_yaml_path("captcha.2captcha.api_key", "") if twocaptcha_api_key: return TwoCaptchaSolver(twocaptcha_api_key) diff --git a/flathunter/hunter.py b/flathunter/hunter.py index 542c7198..db433185 100644 --- a/flathunter/hunter.py +++ b/flathunter/hunter.py @@ -30,9 +30,9 @@ def try_crawl(searcher, url, max_pages): logger.info("Error while scraping url %s:\n%s", url, traceback.format_exc()) return [] - return chain(*[try_crawl(searcher,url, max_pages) + return chain(*[try_crawl(searcher, url, max_pages) for searcher in self.config.searchers() - for url in self.config.get('urls', [])]) + for url in self.config.target_urls()]) def hunt_flats(self, max_pages=None): """Crawl, process and filter exposes""" diff --git a/flathunter/logging.py b/flathunter/logging.py index 44c3c7c4..46f188b1 100644 --- a/flathunter/logging.py +++ b/flathunter/logging.py @@ -1,6 +1,7 @@ """Provides logger""" import logging import os +from pprint import pformat class LoggerHandler(logging.StreamHandler): """Formats logs and alters WebDriverManager's logs properties""" @@ -50,3 +51,10 @@ def setup_wdm_logger(wdm_new_logger_handler): # Setup "requests" module's logger logging.getLogger("requests").setLevel(logging.WARNING) + +def configure_logging(config): + if config.verbose_logging(): + logger.setLevel(logging.DEBUG) + # Allow logging of "webdriver-manager" module on verbose mode + wdm_logger.setLevel(logging.INFO) + logger.debug("Settings from config: %s", pformat(config)) \ No newline at end of file diff --git a/main.py b/main.py index 67abe07c..8ecb9a52 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ from flathunter.googlecloud_idmaintainer import GoogleCloudIdMaintainer from flathunter.web_hunter import WebHunter from flathunter.config import Config -from flathunter.logging import logger, wdm_logger +from flathunter.logging import logger, wdm_logger, configure_logging from flathunter.web import app @@ -21,11 +21,7 @@ # Use Google Cloud DB if we run on the cloud id_watch = GoogleCloudIdMaintainer() -# adjust log level, if required -if config.get('verbose'): - logger.setLevel(logging.DEBUG) - # Allow logging of "webdriver-manager" module on verbose mode - wdm_logger.setLevel(logging.INFO) +configure_logging(config) # initialize search plugins for config config.init_searchers() @@ -33,10 +29,10 @@ hunter = WebHunter(config, id_watch) app.config["HUNTER"] = hunter -if 'website' in config: - app.secret_key = config['website']['session_key'] - app.config["DOMAIN"] = config['website']['domain'] - app.config["BOT_NAME"] = config['website']['bot_name'] +if config.has_website_config(): + app.secret_key = config.website_session_key() + app.config["DOMAIN"] = config.website_domain() + app.config["BOT_NAME"] = config.website_bot_name() else: app.secret_key = b'Not a secret' notifiers = config.get("notifiers", []) diff --git a/test/test_config.py b/test/test_config.py index be8585b0..078bc9e1 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -50,7 +50,7 @@ def test_loads_config(self): config_file.flush() config_file.close() created = True - config = Config() + config = Config("config.yaml") self.assertTrue(len(config.get('urls')) > 0, "Expected URLs in config file") if created: os.remove("config.yaml") diff --git a/test/test_web_interface.py b/test/test_web_interface.py index 4d6511fc..7a69096b 100644 --- a/test/test_web_interface.py +++ b/test/test_web_interface.py @@ -14,6 +14,9 @@ from dummy_crawler import DummyCrawler DUMMY_CONFIG = """ +notifiers: + - telegram + telegram: bot_token: 1234xxx.12345 @@ -107,7 +110,7 @@ def test_hunt_via_post_with_filters(hunt_client, **kwargs): def test_render_index_after_login(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session rv = hunt_client.get('/') assert rv.status_code == 200 @@ -118,7 +121,7 @@ def test_do_not_send_messages_if_notifications_disabled(hunt_client, **kwargs): app.config['HUNTER'].set_filters_for_user(1234, {}) rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session rv = hunt_client.post('/toggle_notifications') assert rv.status_code == 201 @@ -133,7 +136,7 @@ def test_toggle_notification_status(hunt_client): app.config['HUNTER'].set_filters_for_user(1234, {}) rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session rv = hunt_client.post('/toggle_notifications') assert rv.status_code == 201 @@ -145,7 +148,7 @@ def test_toggle_notification_status(hunt_client): def test_update_filters(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session rv = hunt_client.post('/filter', data = { 'b': '3' }) assert app.config['HUNTER'].get_filters_for_user(1234) == { 'b': 3.0 } @@ -158,7 +161,7 @@ def test_update_filters_not_logged_in(hunt_client): def test_index_logged_in_with_filters(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session hunt_client.post('/filter', data = { 'max_size': '35' }) rv = hunt_client.get('/') @@ -167,7 +170,7 @@ def test_index_logged_in_with_filters(hunt_client): def test_login_with_telegram(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session assert session['user']['first_name'] == 'Jason' assert json.dumps(session['user']) == '{"id": "1234", "first_name": "Jason", "last_name": "Bourne", "username": "mattdamon", "photo_url": "https://i.example.com/profile.jpg", "auth_date": "123455678"}' @@ -175,25 +178,25 @@ def test_login_with_telegram(hunt_client): def test_login_with_invalid_url(hunt_client): rv = hunt_client.get('/login_with_telegram?username=mattdamon&id=1234&first_name=Jason&last_name=Bourne&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' not in session def test_login_with_missing_params(hunt_client): rv = hunt_client.get('/login_with_telegram?ad=1234&hash=51d737e1a3ba0821359955a36d3671f2957b5a8f1f32f9a133ce95836c44a9a9') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' not in session def test_login_with_invalid_hash(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=0091a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25900') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' not in session def test_logout(hunt_client): rv = hunt_client.get('/login_with_telegram?id=1234&first_name=Jason&last_name=Bourne&username=mattdamon&photo_url=https%3A%2F%2Fi.example.com%2Fprofile.jpg&auth_date=123455678&hash=c691a55de4e28b341ccd0b793d4ca17f09f6c87b28f8a893621df81475c25952') assert rv.status_code == 302 - assert rv.headers['location'] == 'http://localhost/' + assert rv.headers['location'] == '/' assert 'user' in session rv = hunt_client.get('/logout') assert 'user' not in session