Skip to content

Commit

Permalink
[core] Thrift TMultiplexedProtocol support
Browse files Browse the repository at this point in the history
[core] Add TMultiplexedProtocol support in thrift utils
  • Loading branch information
generalpiston authored and romainr committed Aug 6, 2014
1 parent 8afc15b commit 028db3a
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 2 deletions.
58 changes: 58 additions & 0 deletions desktop/core/ext-py/thrift-0.9.1/src/TMultiplexedProcessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from thrift.Thrift import TProcessor, TMessageType, TException
from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol

class TMultiplexedProcessor(TProcessor):
def __init__(self):
self.services = {}

def registerProcessor(self, serviceName, processor):
self.services[serviceName] = processor

def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin();
if type != TMessageType.CALL & type != TMessageType.ONEWAY:
raise TException("TMultiplex protocol only supports CALL & ONEWAY")

index = name.find(TMultiplexedProtocol.SEPARATOR)
if index < 0:
raise TException("Service name not found in message name: " + name + ". Did you forget to use TMultiplexProtocol in your client?")

serviceName = name[0:index]
call = name[index+len(TMultiplexedProtocol.SEPARATOR):]
if not serviceName in self.services:
raise TException("Service name not found: " + serviceName + ". Did you forget to call registerProcessor()?")

standardMessage = (
call,
type,
seqid
)
return self.services[serviceName].process(StoredMessageProtocol(iprot, standardMessage), oprot)


class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator):
def __init__(self, protocol, messageBegin):
TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
self.messageBegin = messageBegin

def readMessageBegin(self):
return self.messageBegin
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from thrift.Thrift import TMessageType
from thrift.protocol import TProtocolDecorator

SEPARATOR = ":"

class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator):
def __init__(self, protocol, serviceName):
TProtocolDecorator.TProtocolDecorator.__init__(self, protocol)
self.serviceName = serviceName

def writeMessageBegin(self, name, type, seqid):
if (type == TMessageType.CALL or
type == TMessageType.ONEWAY):
self.protocol.writeMessageBegin(
self.serviceName + SEPARATOR + name,
type,
seqid
)
else:
self.protocol.writeMessageBegin(name, type, seqid)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from thrift.protocol.TProtocol import TProtocolBase
from types import *

class TProtocolDecorator():
def __init__(self, protocol):
TProtocolBase(protocol)
self.protocol = protocol

def __getattr__(self, name):
if hasattr(self.protocol, name):
member = getattr(self.protocol, name)
if type(member) in [MethodType, UnboundMethodType, FunctionType, LambdaType, BuiltinFunctionType, BuiltinMethodType]:
return lambda *args, **kwargs: self._wrap(member, args, kwargs)
else:
return member
raise AttributeError(name)

def _wrap(self, func, args, kwargs):
if type(func) == MethodType:
result = func(*args, **kwargs)
else:
result = func(self.protocol, *args, **kwargs)
return result
10 changes: 8 additions & 2 deletions desktop/core/src/desktop/lib/thrift_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport, TMemoryBuffer,\
TTransportException
from thrift.protocol.TBinaryProtocol import TBinaryProtocol
from thrift.protocol.TMultiplexedProtocol import TMultiplexedProtocol
from desktop.lib.python_util import create_synchronous_io_multiplexer
from desktop.lib.thrift_sasl import TSaslClientTransport
from desktop.lib.exceptions import StructuredException, StructuredThriftTransportException
Expand Down Expand Up @@ -80,7 +81,8 @@ def __init__(self, klass, host, port, service_name,
certfile=None,
validate=False,
timeout_seconds=45,
transport='buffered'):
transport='buffered',
multiple=False):
"""
@param klass The thrift client class
@param host Host to connect to
Expand All @@ -100,6 +102,7 @@ def __init__(self, klass, host, port, service_name,
@param validate Validate the certificate received from server
@param timeout_seconds Timeout for thrift calls
@param transport string representation of thrift transport to use
@param multiple Whether Use MultiplexedProtocol
"""
self.klass = klass
self.host = host
Expand All @@ -117,10 +120,11 @@ def __init__(self, klass, host, port, service_name,
self.validate = validate
self.timeout_seconds = timeout_seconds
self.transport = transport
self.multiple = multiple

def __str__(self):
return ', '.join(map(str, [self.klass, self.host, self.port, self.service_name, self.use_sasl, self.kerberos_principal, self.timeout_seconds,
self.mechanism, self.username, self.use_ssl, self.ca_certs, self.keyfile, self.certfile, self.validate, self.transport]))
self.mechanism, self.username, self.use_ssl, self.ca_certs, self.keyfile, self.certfile, self.validate, self.transport, self.multiple]))

class ConnectionPooler(object):
"""
Expand Down Expand Up @@ -260,6 +264,8 @@ def sasl_factory():
transport = TBufferedTransport(sock)

protocol = TBinaryProtocol(transport)
if conf.multiple:
protocol = TMultiplexedProtocol(protocol, conf.service_name)
service = conf.klass(protocol)
return service, protocol, transport

Expand Down

0 comments on commit 028db3a

Please sign in to comment.