From 7f5909238ed1975b1e20202ca2b233034ae268ca Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Thu, 6 Feb 2025 22:13:29 +0500 Subject: [PATCH 01/10] add get_driver() method in QueryBuilder --- simple_query_builder/querybuilder.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py index 7672103..33e71a1 100644 --- a/simple_query_builder/querybuilder.py +++ b/simple_query_builder/querybuilder.py @@ -614,7 +614,7 @@ def update(self, table: Union[str, dict], fields: Union[list, dict]): def join(self, table: Union[str, dict] = "", on: Union[str, tuple, list] = (), join_type: str = "INNER"): join_type = join_type.upper() - if self._db.get_driver() == 'sqlite': + if self.get_driver() == 'sqlite': if join_type == "" or join_type not in self._SQLITE_JOIN_TYPES: self.set_error(f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method. Try one of these {self._SQLITE_JOIN_TYPES}") return self @@ -700,6 +700,9 @@ def intersect_select(self, table: Union[str, list, dict]): def __str__(self): return self.get_sql(False) + def get_driver(self) -> str: + return self._db.get_driver().lower() + def create_view(self, view_name: str, add_exists: bool = True): # this method will be moved to another class if not view_name: From 3b3e0763e03d58fb9137a07ec000cc21839ccc72 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Thu, 6 Feb 2025 22:15:22 +0500 Subject: [PATCH 02/10] add union_all() method --- simple_query_builder/querybuilder.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py index 33e71a1..2236527 100644 --- a/simple_query_builder/querybuilder.py +++ b/simple_query_builder/querybuilder.py @@ -652,6 +652,11 @@ def union(self, union_all: bool = False): self._sql += " UNION ALL " if union_all else " UNION " return self + def union_all(self): + self._concat = True + self._sql += " UNION ALL " + return self + def union_select(self, table: Union[str, list, dict], union_all: bool = False): if not table: self.set_error(f"Empty table in {inspect.stack()[0][3]} method") From a6984b2e535733ed34212a57274f5030aaa5c5df Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Thu, 6 Feb 2025 22:15:42 +0500 Subject: [PATCH 03/10] add union_select_all() method --- simple_query_builder/querybuilder.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py index 2236527..ffbf3e0 100644 --- a/simple_query_builder/querybuilder.py +++ b/simple_query_builder/querybuilder.py @@ -670,6 +670,15 @@ def union_select(self, table: Union[str, list, dict], union_all: bool = False): return self + def union_select_all(self, table: Union[str, list, dict]): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self.union_select(table, True) + + return self + def excepts(self): self._concat = True self._sql += " EXCEPT " From 26acac8bae869ffc836356a63b7c06a117d4f047 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 11:01:23 +0500 Subject: [PATCH 04/10] add select_distinct() method --- simple_query_builder/querybuilder.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py index ffbf3e0..9057300 100644 --- a/simple_query_builder/querybuilder.py +++ b/simple_query_builder/querybuilder.py @@ -384,6 +384,15 @@ def select(self, table: Union[str, list, dict], fields: Union[str, list, dict] = return self + def select_distinct(self, table: Union[str, list, dict], fields: Union[str, list, dict] = "*"): + if table and fields: + self.select(table, fields, True) + else: + self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") + return self + + return self + def where(self, where: Union[str, list], addition: str = ""): if not where: self.set_error(f"Empty where in {inspect.stack()[0][3]} method") From 0c80efb42679571235851fb974a8aa5216b0a4e2 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 11:02:57 +0500 Subject: [PATCH 05/10] improve VIEW tests --- tests/qb_views_unittest.py | 64 ++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/tests/qb_views_unittest.py b/tests/qb_views_unittest.py index 7bcb1ba..cfb6598 100644 --- a/tests/qb_views_unittest.py +++ b/tests/qb_views_unittest.py @@ -9,30 +9,60 @@ class QBViewsTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") - def test_sql_create_view_exists(self): - sql = self.qb.select('users').is_null('email').create_view('users_no_email').get_sql() - self.assertEqual(sql, "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") + def test_create_view_empty_view_name(self): + result = self.qb.create_view('') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty view_name in create_view method") + + def test_create_view_no_select(self): + result = self.qb.delete('comments').where([['user_id', 10]]).create_view('users_no_email') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "No select found in create_view method") + + def test_create_view_exists(self): + result = self.qb.select('users').is_null('email').create_view('users_no_email') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") self.assertEqual(self.qb.get_params(), ()) - def test_sql_create_view_no_exists(self): - sql = self.qb.select('users').is_null('email').create_view('users_no_email', False).get_sql() - self.assertEqual(sql, "CREATE VIEW `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") + def test_create_view_no_exists(self): + result = self.qb.select('users').is_null('email').create_view('users_no_email', False) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "CREATE VIEW `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") self.assertEqual(self.qb.get_params(), ()) - def test_sql_create_view(self): - sql = self.qb.select('users').where([['email', 'is null'], 'or', ['email', '']])\ - .create_view('users_no_email').get_sql() - self.assertEqual(sql, "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL) OR (`email` = '')") + def test_create_view(self): + result = self.qb.select('users').where([['email', 'is null'], 'or', ['email', '']])\ + .create_view('users_no_email') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL) OR (`email` = '')") self.assertEqual(self.qb.get_params(), ('', )) - # self.qb.go() - def test_sql_drop_view_no_exists(self): - sql = self.qb.drop_view('users_no_email', False).get_sql() - self.assertEqual(sql, "DROP VIEW `users_no_email`") + def test_drop_view_no_exists(self): + result = self.qb.drop_view('users_no_email', False) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DROP VIEW `users_no_email`") + self.assertEqual(self.qb.get_params(), ()) - def test_sql_drop_view_exists(self): - sql = self.qb.drop_view('users_no_email').get_sql() - self.assertEqual(sql, "DROP VIEW IF EXISTS `users_no_email`") + def test_drop_view_exists(self): + result = self.qb.drop_view('users_no_email') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DROP VIEW IF EXISTS `users_no_email`") + self.assertEqual(self.qb.get_params(), ()) if __name__ == "__main__": From 71c452dd8458d3f5aeed19fa99df5349399cca5e Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 11:03:57 +0500 Subject: [PATCH 06/10] improve SELECT tests --- tests/qb_select_unittest.py | 546 ++++++++++++++++++++++++------------ 1 file changed, 368 insertions(+), 178 deletions(-) diff --git a/tests/qb_select_unittest.py b/tests/qb_select_unittest.py index 145db22..99d1b32 100644 --- a/tests/qb_select_unittest.py +++ b/tests/qb_select_unittest.py @@ -8,192 +8,382 @@ class QBSelectTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None - def test_sql_select_all(self): - sql = self.qb.select('users').get_sql() - self.assertEqual(sql, 'SELECT * FROM `users`') - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_where_eq(self): - sql = self.qb.select('users').where([['id', '=', 10]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` = 10)') - self.assertEqual(self.qb.get_params(), (10,)) - - def test_sql_select_where_no_eq(self): - sql = self.qb.select('users').where([['id', 10]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` = 10)') - self.assertEqual(self.qb.get_params(), (10, )) - - def test_sql_select_where_and_eq(self): - sql = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', '=', 2]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') - self.assertEqual(self.qb.get_params(), (1, 2)) - - def test_sql_select_where_and_no_eq(self): - sql = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', 2]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') - self.assertEqual(self.qb.get_params(), (1, 2)) - - def test_sql_select_where_or_eq(self): - sql = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', '=', 2]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') - self.assertEqual(self.qb.get_params(), (1, 2)) - - def test_sql_select_where_or_no_eq(self): - sql = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', 2]]).get_sql() - self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') - self.assertEqual(self.qb.get_params(), (1, 2)) - - def test_sql_select_where_like(self): - sql = self.qb.select('users').where([['name', 'LIKE', '%John%']]).get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_like_list(self): - sql = self.qb.select('users').like(['name', '%John%']).get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_like_str(self): - sql = self.qb.select('users').like('name', '%John%').get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_where_not_like(self): - sql = self.qb.select('users').where([['name', 'NOT LIKE', '%John%']]).get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_not_like_list(self): - sql = self.qb.select('users').not_like(['name', '%John%']).get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_not_like_str(self): - sql = self.qb.select('users').not_like('name', '%John%').get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") - self.assertEqual(self.qb.get_params(), ('%John%',)) - - def test_sql_select_where_is_null(self): - sql = self.qb.select('users').where([['phone', 'is null']]).get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`phone` IS NULL)") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_is_null(self): - sql = self.qb.select('users').is_null('phone').get_sql() - self.assertEqual(sql, "SELECT * FROM `users` WHERE (`phone` IS NULL)") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_where_is_not_null(self): - sql = self.qb.select('customers').where([['address', 'is not null']]).get_sql() - self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_not_null(self): - sql = self.qb.select('customers').not_null('address').get_sql() - self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_is_not_null(self): - sql = self.qb.select('customers').is_not_null('address').get_sql() - self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_offset(self): - sql = self.qb.select('posts').where([['user_id', 3]]).offset(14).get_sql() - self.assertEqual(sql, "SELECT * FROM `posts` WHERE (`user_id` = 3) OFFSET 14") - self.assertEqual(self.qb.get_params(), (3,)) - - def test_sql_select_limit(self): - sql = self.qb.select('posts').where([['id', '>', 42]]).limit(7).get_sql() - self.assertEqual(sql, "SELECT * FROM `posts` WHERE (`id` > 42) LIMIT 7") - self.assertEqual(self.qb.get_params(), (42, )) - - def test_sql_select_counter(self): - sql = self.qb.select('users', {'counter': 'COUNT(*)'}).get_sql() - self.assertEqual(sql, "SELECT COUNT(*) AS `counter` FROM `users`") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_distinct_order_by(self): - sql = self.qb.select('customers', ['city'], True).order_by('city').get_sql() - self.assertEqual(sql, "SELECT DISTINCT `city` FROM `customers` ORDER BY `city` ASC") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_distinct_order_by_2col(self): - sql = self.qb.select('customers', ['city', 'country'], True).order_by('country desc').get_sql() - self.assertEqual(sql, "SELECT DISTINCT `city`, `country` FROM `customers` ORDER BY `country` DESC") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_order_by_two_param(self): - sql = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ - .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id', 'desc').get_sql() - self.assertEqual(sql, "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") - self.assertEqual(self.qb.get_params(), (1, 1)) - - def test_sql_select_order_by_one_param(self): - sql = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ - .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id desc').get_sql() - self.assertEqual(sql, "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") - self.assertEqual(self.qb.get_params(), (1, 1)) - - def test_sql_select_group_by(self): - sql = self.qb.select('posts', ['id', 'category', 'title'])\ - .where([['views', '>=', 1000]]).group_by('category').get_sql() - self.assertEqual(sql, "SELECT `id`, `category`, `title` FROM `posts` WHERE (`views` >= 1000) GROUP BY `category`") - self.assertEqual(self.qb.get_params(), (1000, )) - - def test_sql_select_group_by_having_eq(self): - sql = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + def test_select_empty_table(self): + result = self.qb.select('', 'param') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select method") + + def test_select_empty_fields(self): + result = self.qb.select('users', []) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select method") + + def test_select_empty_table_and_fields(self): + result = self.qb.select('', '') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select method") + + def test_get_sql(self): + result = self.qb.select('users').where([['id', 1]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` = 1)') + self.assertEqual(result.get_params(), (1, )) + + def test_get_sql_no_values(self): + result = self.qb.select('users').where([['id', 1]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(False), 'SELECT * FROM `users` WHERE (`id` = ?)') + self.assertEqual(result.get_params(), (1, )) + + def test_select_no_fields(self): + result = self.qb.select('users') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users`') + self.assertEqual(result.get_params(), ()) + + def test_select_alias_all(self): + result = self.qb.select({'u': 'users'}, 'u.*') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT u.* FROM `users` AS `u`') + self.assertEqual(result.get_params(), ()) + + def test_select_where_eq(self): + result = self.qb.select('users').where([['id', '=', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` = 10)') + self.assertEqual(result.get_params(), (10,)) + + def test_select_where_no_eq(self): + result = self.qb.select('users').where([['id', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` = 10)') + self.assertEqual(result.get_params(), (10, )) + + def test_select_where_and_eq(self): + result = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', '=', 2]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') + self.assertEqual(result.get_params(), (1, 2)) + + def test_select_where_and_no_eq(self): + result = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', 2]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') + self.assertEqual(result.get_params(), (1, 2)) + + def test_select_where_or_eq(self): + result = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', '=', 2]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') + self.assertEqual(result.get_params(), (1, 2)) + + def test_select_where_or_no_eq(self): + result = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', 2]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') + self.assertEqual(result.get_params(), (1, 2)) + + def test_select_where_like(self): + result = self.qb.select('users').where([['name', 'LIKE', '%John%']]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_like_list(self): + result = self.qb.select('users').like(['name', '%John%']) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_like_str(self): + result = self.qb.select('users').like('name', '%John%') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_where_not_like(self): + result = self.qb.select('users').where([['name', 'NOT LIKE', '%John%']]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_not_like_list(self): + result = self.qb.select('users').not_like(['name', '%John%']) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_not_like_str(self): + result = self.qb.select('users').not_like('name', '%John%') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(result.get_params(), ('%John%',)) + + def test_select_where_is_null(self): + result = self.qb.select('users').where([['phone', 'is null']]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`phone` IS NULL)") + self.assertEqual(result.get_params(), ()) + + def test_select_is_null(self): + result = self.qb.select('users').is_null('phone') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `users` WHERE (`phone` IS NULL)") + self.assertEqual(result.get_params(), ()) + + def test_select_where_is_not_null(self): + result = self.qb.select('customers').where([['address', 'is not null']]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(result.get_params(), ()) + + def test_select_not_null(self): + result = self.qb.select('customers').not_null('address') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(result.get_params(), ()) + + def test_select_is_not_null(self): + result = self.qb.select('customers').is_not_null('address') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(result.get_params(), ()) + + def test_select_offset(self): + result = self.qb.select('posts').where([['user_id', 3]]).offset(14) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `posts` WHERE (`user_id` = 3) OFFSET 14") + self.assertEqual(result.get_params(), (3,)) + + def test_select_limit(self): + result = self.qb.select('posts').where([['id', '>', 42]]).limit(7) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT * FROM `posts` WHERE (`id` > 42) LIMIT 7") + self.assertEqual(result.get_params(), (42, )) + + def test_select_counter(self): + result = self.qb.select('users', {'counter': 'COUNT(*)'}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT COUNT(*) AS `counter` FROM `users`") + self.assertEqual(result.get_params(), ()) + + def test_select_distinct_method_empty_table(self): + result = self.qb.select_distinct('', 'param') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select_distinct method") + + def test_select_distinct_method_empty_fields(self): + result = self.qb.select_distinct('users', []) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select_distinct method") + + def test_select_distinct_method_empty_table_and_fields(self): + result = self.qb.select_distinct('', '') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in select_distinct method") + + def test_select_distinct_order_by(self): + result = self.qb.select('customers', ['city'], True).order_by('city') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT DISTINCT `city` FROM `customers` ORDER BY `city` ASC") + self.assertEqual(result.get_params(), ()) + + def test_select_distinct_method_order_by(self): + result = self.qb.select_distinct('customers', ['city']).order_by('city') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT DISTINCT `city` FROM `customers` ORDER BY `city` ASC") + self.assertEqual(result.get_params(), ()) + + def test_select_distinct_order_by_2col(self): + result = self.qb.select('customers', ['city', 'country'], True).order_by('country desc') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT DISTINCT `city`, `country` FROM `customers` ORDER BY `country` DESC") + self.assertEqual(result.get_params(), ()) + + def test_select_distinct_method_order_by_2col(self): + result = self.qb.select_distinct('customers', ['city', 'country']).order_by('country desc') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT DISTINCT `city`, `country` FROM `customers` ORDER BY `country` DESC") + self.assertEqual(result.get_params(), ()) + + def test_select_order_by_two_params(self): + result = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id', 'desc') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") + self.assertEqual(result.get_params(), (1, 1)) + + def test_select_order_by_one_param(self): + result = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id desc') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") + self.assertEqual(result.get_params(), (1, 1)) + + def test_select_group_by(self): + result = self.qb.select('posts', ['id', 'category', 'title'])\ + .where([['views', '>=', 1000]]).group_by('category') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT `id`, `category`, `title` FROM `posts` WHERE (`views` >= 1000) GROUP BY `category`") + self.assertEqual(result.get_params(), (1000, )) + + def test_select_group_by_having_eq(self): + result = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ .where([['YEAR(`created_at`)', 2020]]).group_by('month_num')\ - .having([['total', '=', 20000]]).get_sql() - self.assertEqual(sql, "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") - self.assertEqual(self.qb.get_params(), (2020, 20000)) + .having([['total', '=', 20000]]) + self.assertEqual(result.get_sql(), "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") + self.assertEqual(result.get_params(), (2020, 20000)) - def test_sql_select_group_by_having_no_eq_sum(self): - sql = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + def test_select_group_by_having_no_eq_sum(self): + result = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ .where([['YEAR(`created_at`)', 2020]]).group_by('month_num')\ - .having([['total', 20000]]).get_sql() - self.assertEqual(sql, "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") - self.assertEqual(self.qb.get_params(), (2020, 20000)) + .having([['total', 20000]]) + self.assertEqual(result.get_sql(), "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") + self.assertEqual(result.get_params(), (2020, 20000)) - def test_sql_select_group_by_having_max(self): - sql = self.qb.select('employees', ['department', {'Highest salary': 'MAX(`salary`)'}])\ + def test_select_group_by_having_max(self): + result = self.qb.select('employees', ['department', {'Highest salary': 'MAX(`salary`)'}])\ .where([['favorite_website', 'Google.com']]).group_by('department')\ - .having([['MAX(`salary`)', '>=', 30000]]).get_sql() - self.assertEqual(sql, "SELECT `department`, MAX(`salary`) AS `Highest salary` FROM `employees` WHERE (`favorite_website` = 'Google.com') GROUP BY `department` HAVING (MAX(`salary`) >= 30000)") - self.assertEqual(self.qb.get_params(), ('Google.com', 30000)) + .having([['MAX(`salary`)', '>=', 30000]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT `department`, MAX(`salary`) AS `Highest salary` FROM `employees` WHERE (`favorite_website` = 'Google.com') GROUP BY `department` HAVING (MAX(`salary`) >= 30000)") + self.assertEqual(result.get_params(), ('Google.com', 30000)) - def test_sql_select_group_by_having_count(self): - sql = self.qb.select('employees', ['department', {'Number of employees': 'COUNT(*)'}])\ + def test_select_group_by_having_count(self): + result = self.qb.select('employees', ['department', {'Number of employees': 'COUNT(*)'}])\ .where([['state', 'Nevada']]).group_by('department')\ - .having([['COUNT(*)', '>', 20]]).get_sql() - self.assertEqual(sql, "SELECT `department`, COUNT(*) AS `Number of employees` FROM `employees` WHERE (`state` = 'Nevada') GROUP BY `department` HAVING (COUNT(*) > 20)") - self.assertEqual(self.qb.get_params(), ('Nevada', 20)) - - def test_sql_select_summ(self): - sql = self.qb.select("1+5 as 'res'").get_sql() - self.assertEqual(sql, "SELECT 1+5 as 'res'") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_sub(self): - sql = self.qb.select("10 - 3 as 'res'").get_sql() - self.assertEqual(sql, "SELECT 10 - 3 as 'res'") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_substr(self): - sql = self.qb.select("substr('Hello world!', 1, 5) as 'str'").get_sql() - self.assertEqual(sql, "SELECT substr('Hello world!', 1, 5) as 'str'") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_sqlite_version(self): - sql = self.qb.select("sqlite_version() as ver").get_sql() - self.assertEqual(sql, "SELECT sqlite_version() as ver") - self.assertEqual(self.qb.get_params(), ()) - - def test_sql_select_time(self): - sql = self.qb.select("strftime('%Y-%m-%d %H:%M', 'now')").get_sql() - self.assertEqual(sql, "SELECT strftime('%Y-%m-%d %H:%M', 'now')") - self.assertEqual(self.qb.get_params(), ()) + .having([['COUNT(*)', '>', 20]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT `department`, COUNT(*) AS `Number of employees` FROM `employees` WHERE (`state` = 'Nevada') GROUP BY `department` HAVING (COUNT(*) > 20)") + self.assertEqual(result.get_params(), ('Nevada', 20)) + + def test_select_sum(self): + result = self.qb.select("1+5 as 'res'") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT 1+5 as 'res'") + self.assertEqual(result.get_params(), ()) + + def test_select_sub(self): + result = self.qb.select("10 - 3 as 'res'") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT 10 - 3 as 'res'") + self.assertEqual(result.get_params(), ()) + + def test_select_substr(self): + result = self.qb.select("substr('Hello world!', 1, 5) as 'str'") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT substr('Hello world!', 1, 5) as 'str'") + self.assertEqual(result.get_params(), ()) + + def test_select_concat_str(self): + result = self.qb.select("'Hello' || ' world!' as 'str'") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT 'Hello' || ' world!' as 'str'") + self.assertEqual(result.get_params(), ()) + + def test_select_sqlite_version(self): + result = self.qb.select("sqlite_version() as ver") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT sqlite_version() as ver") + self.assertEqual(result.get_params(), ()) + + def test_select_time(self): + result = self.qb.select("strftime('%Y-%m-%d %H:%M', 'now')") + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "SELECT strftime('%Y-%m-%d %H:%M', 'now')") + self.assertEqual(result.get_params(), ()) def tearDown(self): pass From 979a1d73e09c4a1f81710b27bada0f48a857bcb2 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 12:51:32 +0500 Subject: [PATCH 07/10] improve TABLE methods tests --- tests/tb_unittest.py | 45 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/tests/tb_unittest.py b/tests/tb_unittest.py index 6fcd298..6e098eb 100644 --- a/tests/tb_unittest.py +++ b/tests/tb_unittest.py @@ -3,24 +3,49 @@ # test run # python .\tb_unittest.py -v -# python -m unittest .\tb_unittest.py class TBTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") - def test_sql_drop_table_no_exists(self): - sql = self.qb.drop('temporary', False).get_sql() - self.assertEqual(sql, "DROP TABLE `temporary`") + def test_drop_table_empty_name(self): + result = self.qb.drop('') - def test_sql_drop_table_exists(self): - sql = self.qb.drop('temporary').get_sql() - self.assertEqual(sql, "DROP TABLE IF EXISTS `temporary`") + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table in drop method") - def test_sql_truncate_table(self): - sql = self.qb.truncate('users').get_sql() - self.assertEqual(sql, "TRUNCATE TABLE `users`") + def test_drop_table_no_exists(self): + result = self.qb.drop('temporary', False) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DROP TABLE `temporary`") + self.assertEqual(result.get_params(), ()) + + def test_drop_table_exists(self): + result = self.qb.drop('temporary') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DROP TABLE IF EXISTS `temporary`") + self.assertEqual(result.get_params(), ()) + + def test_truncate_table_empty_name(self): + result = self.qb.truncate('') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table in truncate method") + + def test_truncate_table(self): + result = self.qb.truncate('users') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "TRUNCATE TABLE `users`") + self.assertEqual(result.get_params(), ()) if __name__ == "__main__": From 5bc400a46578296e556c7af76ad466bdeaa29406 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 13:20:07 +0500 Subject: [PATCH 08/10] improve DELETE tests --- tests/qb_delete_unittest.py | 79 ++++++++++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/tests/qb_delete_unittest.py b/tests/qb_delete_unittest.py index 8796ab4..9a84031 100644 --- a/tests/qb_delete_unittest.py +++ b/tests/qb_delete_unittest.py @@ -9,25 +9,66 @@ class QBDeleteTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") - def test_sql_delete_eq(self): - sql = self.qb.delete('comments').where([['user_id', '=', 10]]).get_sql() - self.assertEqual(sql, "DELETE FROM `comments` WHERE (`user_id` = 10)") - self.assertEqual(self.qb.get_params(), (10, )) - - def test_sql_delete_no_eq(self): - sql = self.qb.delete('comments').where([['user_id', 10]]).get_sql() - self.assertEqual(sql, "DELETE FROM `comments` WHERE (`user_id` = 10)") - self.assertEqual(self.qb.get_params(), (10, )) - - # def test_sql_delete_limit_eq(self): - # sql = self.qb.delete('users').where([['name', '=', 'John']]).limit().get_sql() - # self.assertEqual(sql, "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") - # self.assertEqual(self.qb.get_params(), ('John',)) - # - # def test_sql_delete_limit_no_eq(self): - # sql = self.qb.delete('users').where([['name', 'John']]).limit().get_sql() - # self.assertEqual(sql, "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") - # self.assertEqual(self.qb.get_params(), ('John',)) + def test_delete_empty_table(self): + result = self.qb.delete('') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table in delete method") + + def test_delete_table_as_str(self): + result = self.qb.delete('comments') + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DELETE FROM `comments`") + self.assertEqual(result.get_params(), ()) + + def test_delete_table_as_dict(self): + result = self.qb.delete({'g': 'groups'}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DELETE FROM `groups` AS `g`") + self.assertEqual(result.get_params(), ()) + + def test_delete_where_eq(self): + result = self.qb.delete('comments').where([['user_id', '=', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DELETE FROM `comments` WHERE (`user_id` = 10)") + self.assertEqual(result.get_params(), (10, )) + + def test_delete_where_no_eq(self): + result = self.qb.delete('comments').where([['user_id', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "DELETE FROM `comments` WHERE (`user_id` = 10)") + self.assertEqual(result.get_params(), (10, )) + + def test_delete_limit_eq(self): + result = self.qb.delete('users').where([['name', '=', 'John']]).limit() + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + if self.qb.get_driver() == 'sqlite': + self.assertEqual(result.get_sql(), "DELETE FROM `users` WHERE (`name` = 'John')") + else: + self.assertEqual(result.get_sql(), "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") + self.assertEqual(result.get_params(), ('John',)) + + def test_delete_limit_no_eq(self): + result = self.qb.delete('users').where([['name', 'John']]).limit() + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + if self.qb.get_driver() == 'sqlite': + self.assertEqual(result.get_sql(), "DELETE FROM `users` WHERE (`name` = 'John')") + else: + self.assertEqual(result.get_sql(), "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") + self.assertEqual(result.get_params(), ('John',)) if __name__ == "__main__": From 220826e9f387c5803b1a68cf376010f2e5f00b72 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 13:31:45 +0500 Subject: [PATCH 09/10] improve INSERT tests --- tests/qb_insert_unittest.py | 68 ++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/tests/qb_insert_unittest.py b/tests/qb_insert_unittest.py index 1c08d53..2850c70 100644 --- a/tests/qb_insert_unittest.py +++ b/tests/qb_insert_unittest.py @@ -9,19 +9,69 @@ class QBInsertTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") - def test_sql_insert(self): - sql = self.qb.insert('groups', {'name': 'Moderator', 'permissions': 'moderator'}).get_sql() - self.assertEqual(sql, "INSERT INTO `groups` (`name`, `permissions`) VALUES ('Moderator','moderator')") - self.assertEqual(self.qb.get_params(), ('Moderator', 'moderator')) + def test_insert_empty_table(self): + result = self.qb.insert('', {'param': 'new_value'}) - def test_sql_insert_multiple(self): - sql = self.qb.insert('groups', [ + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in insert method") + + def test_insert_empty_fields(self): + result = self.qb.insert('params', {}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in insert method") + + def test_insert_empty_table_and_fields(self): + result = self.qb.insert('', {}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in insert method") + + def test_insert_table_as_str(self): + result = self.qb.insert('groups', {'name': 'Moderator', 'permissions': 'moderator'}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "INSERT INTO `groups` (`name`, `permissions`) VALUES ('Moderator','moderator')") + self.assertEqual(result.get_params(), ('Moderator', 'moderator')) + + def test_insert_table_as_dict(self): + result = self.qb.insert({'g': 'groups'}, {'name': 'Moderator', 'permissions': 'moderator'}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "INSERT INTO `groups` AS `g` (`name`, `permissions`) VALUES ('Moderator','moderator')") + self.assertEqual(result.get_params(), ('Moderator', 'moderator')) + + def test_insert_multiple_as_str(self): + result = self.qb.insert('groups', [ + ['name', 'role'], + ['Moderator', 'moderator'], ['Moderator2', 'moderator'], + ['User', 'user'], ['User2', 'user'] + ]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "INSERT INTO `groups` (`name`, `role`) VALUES ('Moderator','moderator'),('Moderator2','moderator'),('User','user'),('User2','user')") + self.assertEqual(result.get_params(), ('Moderator', 'moderator', 'Moderator2', 'moderator', 'User', 'user', 'User2', 'user')) + + def test_insert_multiple_as_dict(self): + result = self.qb.insert({'g': 'groups'}, [ ['name', 'role'], ['Moderator', 'moderator'], ['Moderator2', 'moderator'], ['User', 'user'], ['User2', 'user'] - ]).get_sql() - self.assertEqual(sql, "INSERT INTO `groups` (`name`, `role`) VALUES ('Moderator','moderator'),('Moderator2','moderator'),('User','user'),('User2','user')") - self.assertEqual(self.qb.get_params(), ('Moderator', 'moderator', 'Moderator2', 'moderator', 'User', 'user', 'User2', 'user')) + ]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "INSERT INTO `groups` AS `g` (`name`, `role`) VALUES ('Moderator','moderator'),('Moderator2','moderator'),('User','user'),('User2','user')") + self.assertEqual(result.get_params(), ('Moderator', 'moderator', 'Moderator2', 'moderator', 'User', 'user', 'User2', 'user')) + + def tearDown(self): + pass if __name__ == "__main__": From 9bf4ff6ac206277623a08aa9a6eb2bb0281733c6 Mon Sep 17 00:00:00 2001 From: co0lc0der Date: Fri, 7 Feb 2025 13:54:05 +0500 Subject: [PATCH 10/10] improve UPDATE tests --- tests/qb_delete_unittest.py | 4 ++ tests/qb_insert_unittest.py | 1 + tests/qb_update_unittest.py | 81 +++++++++++++++++++++++++++++++------ tests/qb_views_unittest.py | 4 ++ tests/tb_unittest.py | 4 ++ 5 files changed, 82 insertions(+), 12 deletions(-) diff --git a/tests/qb_delete_unittest.py b/tests/qb_delete_unittest.py index 9a84031..e5457dd 100644 --- a/tests/qb_delete_unittest.py +++ b/tests/qb_delete_unittest.py @@ -8,6 +8,7 @@ class QBDeleteTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None def test_delete_empty_table(self): result = self.qb.delete('') @@ -70,6 +71,9 @@ def test_delete_limit_no_eq(self): self.assertEqual(result.get_sql(), "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") self.assertEqual(result.get_params(), ('John',)) + def tearDown(self): + pass + if __name__ == "__main__": unittest.main() diff --git a/tests/qb_insert_unittest.py b/tests/qb_insert_unittest.py index 2850c70..9bd2a2a 100644 --- a/tests/qb_insert_unittest.py +++ b/tests/qb_insert_unittest.py @@ -8,6 +8,7 @@ class QBInsertTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None def test_insert_empty_table(self): result = self.qb.insert('', {'param': 'new_value'}) diff --git a/tests/qb_update_unittest.py b/tests/qb_update_unittest.py index cb33602..9fa73cd 100644 --- a/tests/qb_update_unittest.py +++ b/tests/qb_update_unittest.py @@ -8,24 +8,81 @@ class QBUpdateTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None - def test_sql_update(self): - sql = self.qb.update('posts', {'status': 'published'})\ - .where([['YEAR(`updated_at`)', '>', 2020]]).get_sql() - self.assertEqual(sql, "UPDATE `posts` SET `status` = 'published' WHERE (YEAR(`updated_at`) > 2020)") + def test_update_empty_table(self): + result = self.qb.update('', {'param': 'new_value'}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in update method") + + def test_update_empty_fields(self): + result = self.qb.update('params', {}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in update method") + + def test_update_empty_table_and_fields(self): + result = self.qb.update('', {}) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), True) + self.assertEqual(result.get_error_message(), "Empty table or fields in update method") + + def test_update_table_as_str(self): + result = self.qb.update('posts', {'status': 'published'}).where([['YEAR(`updated_at`)', '>', 2020]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `posts` SET `status` = 'published' WHERE (YEAR(`updated_at`) > 2020)") + self.assertEqual(self.qb.get_params(), ('published', 2020)) + + def test_update_table_as_dict(self): + result = self.qb.update({'p': 'posts'}, {'status': 'published'}).where([['YEAR(`updated_at`)', '>', 2020]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `posts` AS `p` SET `status` = 'published' WHERE (YEAR(`updated_at`) > 2020)") self.assertEqual(self.qb.get_params(), ('published', 2020)) - def test_sql_update_limit_eq(self): - sql = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ - .where([['id', '=', 7]]).limit().get_sql() - self.assertEqual(sql, "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") + def test_update_where_eq(self): + result = self.qb.update('posts', {'status': 'draft'}).where([['user_id', '=', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `posts` SET `status` = 'draft' WHERE (`user_id` = 10)") + self.assertEqual(self.qb.get_params(), ('draft', 10)) + + def test_update_where_no_eq(self): + result = self.qb.update('posts', {'status': 'draft'}).where([['user_id', 10]]) + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `posts` SET `status` = 'draft' WHERE (`user_id` = 10)") + self.assertEqual(self.qb.get_params(), ('draft', 10)) + + def test_update_where_eq_limit(self): + result = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ + .where([['id', '=', 7]]).limit() + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") self.assertEqual(self.qb.get_params(), ('John Doe', 'new status', 7)) - def test_sql_update_limit_no_eq(self): - sql = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ - .where([['id', 7]]).limit().get_sql() - self.assertEqual(sql, "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") + def test_update_where_no_eq_limit(self): + result = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ + .where([['id', 7]]).limit() + + self.assertEqual(result, self.qb) + self.assertEqual(result.has_error(), False) + self.assertEqual(result.get_sql(), "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") self.assertEqual(self.qb.get_params(), ('John Doe', 'new status', 7)) + + def tearDown(self): + pass if __name__ == "__main__": diff --git a/tests/qb_views_unittest.py b/tests/qb_views_unittest.py index cfb6598..3b0491e 100644 --- a/tests/qb_views_unittest.py +++ b/tests/qb_views_unittest.py @@ -8,6 +8,7 @@ class QBViewsTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None def test_create_view_empty_view_name(self): result = self.qb.create_view('') @@ -64,6 +65,9 @@ def test_drop_view_exists(self): self.assertEqual(result.get_sql(), "DROP VIEW IF EXISTS `users_no_email`") self.assertEqual(self.qb.get_params(), ()) + def tearDown(self): + pass + if __name__ == "__main__": unittest.main() diff --git a/tests/tb_unittest.py b/tests/tb_unittest.py index 6e098eb..06583cc 100644 --- a/tests/tb_unittest.py +++ b/tests/tb_unittest.py @@ -8,6 +8,7 @@ class TBTestCase(unittest.TestCase): def setUp(self): self.qb = QueryBuilder(DataBase(), ":memory:") + self.maxDiff = None def test_drop_table_empty_name(self): result = self.qb.drop('') @@ -47,6 +48,9 @@ def test_truncate_table(self): self.assertEqual(result.get_sql(), "TRUNCATE TABLE `users`") self.assertEqual(result.get_params(), ()) + def tearDown(self): + pass + if __name__ == "__main__": unittest.main()