Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions mssql_python/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,29 @@ def cursor(self) -> Cursor:
cursor = Cursor(self)
self._cursors.add(cursor) # Track the cursor
return cursor

def execute(self, sql, *args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding type annotations to the execute method signature. This will improve code clarity and enable better static analysis tooling support. For example, you might specify the types for sql, args, and the return value (e.g., def execute(self, sql: str, *args: Any) -> Cursor:).
This helps both readers and tools understand expected usage and can catch certain classes of bugs earlier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For batch executions or scenarios involving multiple related statements, creating a new cursor for each call to execute may not be the most efficient approach.

Consider providing an option or an additional method that allows reusing a single cursor or enables more explicit cursor management for batch operations. This could improve performance and resource utilization for users who need to execute multiple statements in sequence.

This could be to-do \ enhancement for the next release. Create a backlog for it.

"""
Creates a new Cursor object, calls its execute method, and returns the new cursor.

This is a convenience method that is not part of the DB API. Since a new Cursor
is allocated by each call, this should not be used if more than one SQL statement
needs to be executed on the connection.

Args:
sql (str): The SQL query to execute.
*args: Parameters to be passed to the query.

Returns:
Cursor: A new cursor with the executed query.

Raises:
DatabaseError: If there is an error executing the query.
InterfaceError: If the connection is closed.
"""
cursor = self.cursor()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each call to execute creates a new cursor and adds it to self._cursors. If these cursors are not explicitly closed or dereferenced, memory usage may accumulate over time, potentially resulting in a memory leak.

There is a need to close the returned cursors, enforcing cursor closure, or providing a context manager to ensure proper cleanup. This will help prevent resource leaks and ensure efficient memory management.

In cursors class we already close\dereference the cursors upon exit.. Please consider documenting it explicityly for future maintainers and code readability.

cursor.execute(sql, *args)
return cursor

def commit(self) -> None:
"""
Expand Down
161 changes: 161 additions & 0 deletions tests/test_003_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,3 +485,164 @@ def test_connection_pooling_basic(conn_str):

conn1.close()
conn2.close()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible missing edge case: Please add a test for calling execute after the connection has been closed. This should raise an InterfaceError. Covering this scenario will help ensure robust error handling and proper DB-API compliance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible missing edge case: Consider adding a test that calls execute with very large parameter sets to check for performance limits and correct handling of large inputs. This can help identify potential scalability or memory issues.

This could be a to-do\future enhancement for overall cursor calls. Create a future backlog item for next semester release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible missing edge case: There is no explicit test for creating multiple simultaneous cursors using execute. Please add a test to verify resource limits and memory usage when many cursors are opened at once.

This could be a to-do\future enhancement for overall cursor calls. Create a future backlog item for next semester release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible missing edge case: Please consider adding a test that explicitly checks whether cursors returned by execute are properly closed and removed from self._cursors. This will help prevent memory/resource leaks and ensure correct cursor lifecycle management.

def test_connection_execute(db_connection):
"""Test the execute() convenience method for Connection class"""
# Test basic execution
cursor = db_connection.execute("SELECT 1 AS test_value")
result = cursor.fetchone()
assert result is not None, "Execute failed: No result returned"
assert result[0] == 1, "Execute failed: Incorrect result"

# Test with parameters
cursor = db_connection.execute("SELECT ? AS test_value", 42)
result = cursor.fetchone()
assert result is not None, "Execute with parameters failed: No result returned"
assert result[0] == 42, "Execute with parameters failed: Incorrect result"

# Test that cursor is tracked by connection
assert cursor in db_connection._cursors, "Cursor from execute() not tracked by connection"

# Test with data modification and verify it requires commit
if not db_connection.autocommit:
drop_table_if_exists(db_connection.cursor(), "#pytest_test_execute")
cursor1 = db_connection.execute("CREATE TABLE #pytest_test_execute (id INT, value VARCHAR(50))")
cursor2 = db_connection.execute("INSERT INTO #pytest_test_execute VALUES (1, 'test_value')")
cursor3 = db_connection.execute("SELECT * FROM #pytest_test_execute")
result = cursor3.fetchone()
assert result is not None, "Execute with table creation failed"
assert result[0] == 1, "Execute with table creation returned wrong id"
assert result[1] == 'test_value', "Execute with table creation returned wrong value"

# Clean up
db_connection.execute("DROP TABLE #pytest_test_execute")
db_connection.commit()

def test_connection_execute_error_handling(db_connection):
"""Test that execute() properly handles SQL errors"""
with pytest.raises(Exception):
db_connection.execute("SELECT * FROM nonexistent_table")

def test_connection_execute_empty_result(db_connection):
"""Test execute() with a query that returns no rows"""
cursor = db_connection.execute("SELECT * FROM sys.tables WHERE name = 'nonexistent_table_name'")
result = cursor.fetchone()
assert result is None, "Query should return no results"

# Test empty result with fetchall
rows = cursor.fetchall()
assert len(rows) == 0, "fetchall should return empty list for empty result set"

def test_connection_execute_different_parameter_types(db_connection):
"""Test execute() with different parameter data types"""
# Test with different data types
params = [
1234, # Integer
3.14159, # Float
"test string", # String
bytearray(b'binary data'), # Binary data
True, # Boolean
None # NULL
]

for param in params:
cursor = db_connection.execute("SELECT ? AS value", param)
result = cursor.fetchone()
if param is None:
assert result[0] is None, "NULL parameter not handled correctly"
else:
assert result[0] == param, f"Parameter {param} of type {type(param)} not handled correctly"

def test_connection_execute_with_transaction(db_connection):
"""Test execute() in the context of explicit transactions"""
if db_connection.autocommit:
db_connection.autocommit = False

cursor1 = db_connection.cursor()
drop_table_if_exists(cursor1, "#pytest_test_execute_transaction")

try:
# Create table and insert data
db_connection.execute("CREATE TABLE #pytest_test_execute_transaction (id INT, value VARCHAR(50))")
db_connection.execute("INSERT INTO #pytest_test_execute_transaction VALUES (1, 'before rollback')")

# Check data is there
cursor = db_connection.execute("SELECT * FROM #pytest_test_execute_transaction")
result = cursor.fetchone()
assert result is not None, "Data should be visible within transaction"
assert result[1] == 'before rollback', "Incorrect data in transaction"

# Rollback and verify data is gone
db_connection.rollback()

# Need to recreate table since it was rolled back
db_connection.execute("CREATE TABLE #pytest_test_execute_transaction (id INT, value VARCHAR(50))")
db_connection.execute("INSERT INTO #pytest_test_execute_transaction VALUES (2, 'after rollback')")

cursor = db_connection.execute("SELECT * FROM #pytest_test_execute_transaction")
result = cursor.fetchone()
assert result is not None, "Data should be visible after new insert"
assert result[0] == 2, "Should see the new data after rollback"
assert result[1] == 'after rollback', "Incorrect data after rollback"

# Commit and verify data persists
db_connection.commit()
finally:
# Clean up
try:
db_connection.execute("DROP TABLE #pytest_test_execute_transaction")
db_connection.commit()
except Exception:
pass

def test_connection_execute_vs_cursor_execute(db_connection):
"""Compare behavior of connection.execute() vs cursor.execute()"""
# Connection.execute creates a new cursor each time
cursor1 = db_connection.execute("SELECT 1 AS first_query")
# Consume the results from cursor1 before creating cursor2
result1 = cursor1.fetchall()
assert result1[0][0] == 1, "First cursor should have result from first query"

# Now it's safe to create a second cursor
cursor2 = db_connection.execute("SELECT 2 AS second_query")
result2 = cursor2.fetchall()
assert result2[0][0] == 2, "Second cursor should have result from second query"

# These should be different cursor objects
assert cursor1 != cursor2, "Connection.execute should create a new cursor each time"

# Now compare with reusing the same cursor
cursor3 = db_connection.cursor()
cursor3.execute("SELECT 3 AS third_query")
result3 = cursor3.fetchone()
assert result3[0] == 3, "Direct cursor execution failed"

# Reuse the same cursor
cursor3.execute("SELECT 4 AS fourth_query")
result4 = cursor3.fetchone()
assert result4[0] == 4, "Reused cursor should have new results"

# The previous results should no longer be accessible
cursor3.execute("SELECT 3 AS third_query_again")
result5 = cursor3.fetchone()
assert result5[0] == 3, "Cursor reexecution should work"

def test_connection_execute_many_parameters(db_connection):
"""Test execute() with many parameters"""
# First make sure no active results are pending
# by using a fresh cursor and fetching all results
cursor = db_connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchall()

# Create a query with 10 parameters
params = list(range(1, 11))
query = "SELECT " + ", ".join(["?" for _ in params]) + " AS many_params"

# Now execute with many parameters
cursor = db_connection.execute(query, *params)
result = cursor.fetchall() # Use fetchall to consume all results

# Verify all parameters were correctly passed
for i, value in enumerate(params):
assert result[0][i] == value, f"Parameter at position {i} not correctly passed"