diff --git a/rosapi/scripts/rosapi_node b/rosapi/scripts/rosapi_node index 013b0729..48bd2eb4 100755 --- a/rosapi/scripts/rosapi_node +++ b/rosapi/scripts/rosapi_node @@ -88,7 +88,8 @@ class Rosapi(Node): full_name = self.get_namespace() + self.get_name() else: full_name = self.get_namespace() + "/" + self.get_name() - params.init(full_name) + # params.init(full_name) + params._node = self self.create_service(Topics, "/rosapi/topics", self.get_topics) self.create_service(Interfaces, "/rosapi/interfaces", self.get_interfaces) self.create_service(TopicsForType, "/rosapi/topics_for_type", self.get_topics_for_type) @@ -284,8 +285,8 @@ class Rosapi(Node): params.delete_param(request.node_name, request.name, self.globs.params) return response - def get_param_names(self, request, response): - response.names = params.get_param_names(self.globs.params) + async def get_param_names(self, request, response): + response.names = await params.get_param_names(self.globs.params) return response def get_time(self, request, response): diff --git a/rosapi/src/rosapi/params.py b/rosapi/src/rosapi/params.py index 36af6913..44860b64 100644 --- a/rosapi/src/rosapi/params.py +++ b/rosapi/src/rosapi/params.py @@ -35,6 +35,7 @@ from json import dumps, loads import rclpy +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rcl_interfaces.msg import Parameter, ParameterType, ParameterValue from rcl_interfaces.srv import ListParameters from rclpy.parameter import get_parameter_value @@ -48,7 +49,6 @@ # Ensure thread safety for setting / getting parameters. param_server_lock = threading.RLock() _node = None -_parent_node_name = "" _parameter_type_mapping = [ "", @@ -64,22 +64,22 @@ ] -def init(parent_node_name): - """ - Initializes params module with a rclpy.node.Node for further use. - This function has to be called before any other for the module to work. - """ - global _node, _parent_node_name - # TODO(@jubeira): remove this node; use rosapi node with MultiThreadedExecutor or - # async / await to prevent the service calls from blocking. - parent_node_basename = parent_node_name.split("/")[-1] - param_node_name = f"{parent_node_basename}_params" - _node = rclpy.create_node( - param_node_name, - cli_args=["--ros-args", "-r", f"__node:={param_node_name}"], - start_parameter_services=False, - ) - _parent_node_name = get_absolute_node_name(parent_node_name) +async def future_with_timeout(node: rclpy.node.Node, future: rclpy.task.Future, timeout_sec: float): + first_done_future = rclpy.task.Future() + + def handler(arg=None): + first_done_future.set_result(None) + + timer = node.create_timer(timeout_sec, handler) + future.add_done_callback(handler) + + await first_done_future + + if not future.done(): + raise TimeoutError(f"Future timed out after {timeout_sec} seconds") + + timer.cancel() + timer.destroy() def set_param(node_name, name, value, params_glob): @@ -194,17 +194,17 @@ def delete_param(node_name, name, params_glob): _set_param(node_name, name, None, ParameterType.PARAMETER_NOT_SET) -def get_param_names(params_glob): +async def get_param_names(params_glob): params = [] nodes = get_nodes() for node in nodes: - params.extend(get_node_param_names(node, params_glob)) + params.extend(await get_node_param_names(node, params_glob)) return params -def get_node_param_names(node_name, params_glob): +async def get_node_param_names(node_name, params_glob): """Gets list of parameter names for a given node""" node_name = get_absolute_node_name(node_name) @@ -214,22 +214,25 @@ def get_node_param_names(node_name, params_glob): return list( filter( lambda x: any(fnmatch.fnmatch(str(x), glob) for glob in params_glob), - _get_param_names(node_name), + await _get_param_names(node_name), ) ) else: # If there is no parameter glob, don't filter. - return _get_param_names(node_name) + return await _get_param_names(node_name) -def _get_param_names(node_name): +async def _get_param_names(node_name): # This method is called in a service callback; calling a service of the same node # will cause a deadlock. - global _parent_node_name - if node_name == _parent_node_name: + if node_name == _node.get_fully_qualified_name(): return [] - client = _node.create_client(ListParameters, f"{node_name}/list_parameters") + client = _node.create_client( + ListParameters, + f"{node_name}/list_parameters", + callback_group=MutuallyExclusiveCallbackGroup(), + ) ready = client.wait_for_service(timeout_sec=5.0) if not ready: @@ -237,10 +240,8 @@ def _get_param_names(node_name): request = ListParameters.Request() future = client.call_async(request) - if _node.executor: - _node.executor.spin_until_future_complete(future) - else: - rclpy.spin_until_future_complete(_node, future) + + await future_with_timeout(_node, future, 2.0) response = future.result() if response is not None: