Skip to content

Commit

Permalink
Split execute and fetch (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Jan 10, 2016
1 parent 65ed3c6 commit 8d9ad63
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 24 deletions.
53 changes: 45 additions & 8 deletions phoenixdb/avatica.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,17 +387,18 @@ def prepareAndExecute(self, connectionId, statementId, sql, maxRowCount=-1):
}
if self.version >= AVATICA_1_4_0:
request['statementId'] = statementId
return self._apply(request, 'Service$ExecuteResponse')['results']
if self.version >= AVATICA_1_5_0:
response_type = 'executeResults'
else:
response_type = 'Service$ExecuteResponse'
return self._apply(request, response_type)['results']

def prepare(self, connectionId, statementId, sql, maxRowCount=-1):
def prepare(self, connectionId, sql, maxRowCount=-1):
"""Prepares a statement.
:param connectionId:
ID of the current connection.
:param statementId:
ID of the statement to prepare.
:param sql:
SQL query.
Expand All @@ -413,10 +414,41 @@ def prepare(self, connectionId, statementId, sql, maxRowCount=-1):
'sql': sql,
'maxRowCount': maxRowCount,
}
#if self.version >= AVATICA_1_4_0:
# request['statementId'] = statementId
return self._apply(request)['statement']

def execute(self, connectionId, statementId, parameterValues=None, maxRowCount=-1):
"""Returns a frame of rows.
The frame describes whether there may be another frame. If there is not
another frame, the current iteration is done when we have finished the
rows in the this frame.
:param connectionId:
ID of the current connection.
:param statementId:
ID of the statement to fetch rows from.
:param parameterValues:
A list of parameter values, if statement is to be executed; otherwise ``None``.
:param maxRowCount:
Maximum number of rows to return; negative means no limit.
:returns:
Frame data, or ``None`` if there are no more.
"""
request = {
'request': 'execute',
'statementHandle': {
'connectionId': connectionId,
'id': statementId,
},
'parameterValues': parameterValues,
'maxRowCount': maxRowCount,
}
return self._apply(request, 'executeResults')['results']

def fetch(self, connectionId, statementId, parameterValues=None, offset=0, fetchMaxRowCount=-1):
"""Returns a frame of rows.
Expand Down Expand Up @@ -452,7 +484,12 @@ def fetch(self, connectionId, statementId, parameterValues=None, offset=0, fetch
if self.version < AVATICA_1_3_0:
# XXX won't work for all types, but oh well...
request['parameterValues'] = [v['value'] for v in parameterValues]
else:
elif self.version < AVATICA_1_5_0:
request['parameterValues'] = parameterValues
else:
raise errors.InternalError('fetch with parameterValues not supported by avatica 1.5+')
return self._apply(request)['frame']

def supportsExecute(self):
return self.version >= AVATICA_1_5_0

49 changes: 33 additions & 16 deletions phoenixdb/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,45 +231,62 @@ def _fetch_next_frame(self):
offset=offset, fetchMaxRowCount=self.itersize)
self._set_frame(frame)

def _process_results(self, results):
if results:
result = results[0]
if result['ownStatement']:
self._set_id(result['statementId'])
self._set_signature(result['signature'])
self._set_frame(result['firstFrame'])
self._updatecount = result['updateCount']

def execute(self, operation, parameters=None):
if self._closed:
raise ProgrammingError('the cursor is already closed')
self._updatecount = -1
self._set_frame(None)
if parameters is None:
if self._id is None:
self._set_id(self._connection._client.createStatement(self._connection._id))
results = self._connection._client.prepareAndExecute(self._connection._id, self._id,
operation, maxRowCount=self.itersize)
if results:
result = results[0]
if result['ownStatement']:
self._set_id(result['statementId'])
self._set_signature(result['signature'])
self._set_frame(result['firstFrame'])
self._updatecount = result['updateCount']
self._process_results(results)
else:
statement = self._connection._client.prepare(self._connection._id, self._id,
statement = self._connection._client.prepare(self._connection._id,
operation, maxRowCount=self.itersize)
self._set_id(statement['id'])
self._set_signature(statement['signature'])
frame = self._connection._client.fetch(self._connection._id, self._id,
self._transform_parameters(parameters),
fetchMaxRowCount=self.itersize)
self._set_frame(frame)
if self._connection._client.supportsExecute():
results = self._connection._client.execute(self._connection._id, self._id,
self._transform_parameters(parameters),
maxRowCount=self.itersize)
self._process_results(results)
else:
# XXX old avatica (1.4-), remove later
frame = self._connection._client.fetch(self._connection._id, self._id,
self._transform_parameters(parameters),
fetchMaxRowCount=self.itersize)
self._set_frame(frame)

def executemany(self, operation, seq_of_parameters):
if self._closed:
raise ProgrammingError('the cursor is already closed')
self._updatecount = -1
self._set_frame(None)
statement = self._connection._client.prepare(self._connection._id, self._id,
statement = self._connection._client.prepare(self._connection._id,
operation, maxRowCount=0)
self._set_id(statement['id'])
self._set_signature(statement['signature'])
for parameters in seq_of_parameters:
self._connection._client.fetch(self._connection._id, self._id,
self._transform_parameters(parameters),
fetchMaxRowCount=0)
if self._connection._client.supportsExecute():
self._connection._client.execute(self._connection._id, self._id,
self._transform_parameters(parameters),
maxRowCount=0)
else:
# XXX old avatica (1.4-), remove later
self._connection._client.fetch(self._connection._id, self._id,
self._transform_parameters(parameters),
fetchMaxRowCount=0)

def fetchone(self):
if self._frame is None:
Expand Down

0 comments on commit 8d9ad63

Please sign in to comment.