Skip to content

Commit

Permalink
Logging function get_log() works for Hive (cloudera#212)
Browse files Browse the repository at this point in the history
* Added fetchType to TFetchResultsReq

* Changed fetchType to i16

* Logging for hive

* Use iterator instead of list

* Better comment in thrift file
  • Loading branch information
FlorianWilhelm authored and wesm committed Sep 29, 2016
1 parent 42aa850 commit 2397ef2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
5 changes: 3 additions & 2 deletions impala/_thrift_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import (
TBufferedTransport, TTransportException)
from thrift.Thrift import TApplicationException
from thrift.protocol.TBinaryProtocol import (
TBinaryProtocolAccelerated as TBinaryProtocol)

Expand All @@ -58,7 +59,7 @@
if six.PY3:
# import thriftpy code
from thriftpy import load
from thriftpy.thrift import TClient
from thriftpy.thrift import TClient, TApplicationException
# TODO: reenable cython
# from thriftpy.protocol import TBinaryProtocol
from thriftpy.protocol.binary import TBinaryProtocol # noqa
Expand Down Expand Up @@ -105,7 +106,7 @@ def get_socket(host, port, use_ssl, ca_cert):
return TSSLSocket(host, port, validate=False)
else:
return TSSLSocket(host, port, validate=True, ca_certs=ca_cert)
else:
else:
from thriftpy.transport.sslsocket import TSSLSocket
if ca_cert is None:
return TSSLSocket(host, port, validate=False)
Expand Down
21 changes: 17 additions & 4 deletions impala/hiveserver2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
TGetTablesReq, TGetFunctionsReq, TGetOperationStatusReq, TOperationState,
TCancelOperationReq, TCloseOperationReq, TGetLogReq, TProtocolVersion,
TGetRuntimeProfileReq, TGetExecSummaryReq, ImpalaHiveServer2Service,
TExecStats, ThriftClient)
TExecStats, ThriftClient, TApplicationException)


log = get_logger_and_init_null(__name__)
Expand Down Expand Up @@ -1012,9 +1012,22 @@ def get_status(self):
resp = self._rpc('GetOperationStatus', req)
return TOperationState._VALUES_TO_NAMES[resp.operationState]

def get_log(self):
req = TGetLogReq(operationHandle=self.handle)
return self._rpc('GetLog', req).log
def get_log(self, max_rows=1024, orientation=TFetchOrientation.FETCH_NEXT):
try:
req = TGetLogReq(operationHandle=self.handle)
log = self._rpc('GetLog', req).log
except TApplicationException as e: # raised if Hive is used
if not e.type == TApplicationException.UNKNOWN_METHOD:
raise
req = TFetchResultsReq(operationHandle=self.handle,
orientation=orientation,
maxRows=max_rows,
fetchType=1)
resp = self._rpc('FetchResults', req)
schema = [('Log', 'STRING', None, None, None, None, None)]
log = self._wrap_results(resp.results, schema, convert_types=True)
log = '\n'.join(l[0] for l in log)
return log

def cancel(self):
req = TCancelOperationReq(operationHandle=self.handle)
Expand Down
4 changes: 4 additions & 0 deletions impala/thrift/TCLIService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,10 @@ struct TFetchResultsReq {
// Max number of rows that should be returned in
// the rowset.
3: required i64 maxRows

// The type of a fetch results request.
// 0 represents Query output. 1 represents Log.
4: optional i16 fetchType
}

struct TFetchResultsResp {
Expand Down

0 comments on commit 2397ef2

Please sign in to comment.