Skip to content

Commit

Permalink
Merge pull request #2 from Gaurang-1402/stephen/ros-prompt-update
Browse files Browse the repository at this point in the history
Stephen/ros prompt update
  • Loading branch information
smwitkowski authored Aug 20, 2023
2 parents 9379a09 + 5e31adf commit 4630562
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/sjtu_drone/rosgpt/rosgpt/ros_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from langchain.chat_models.openai import ChatOpenAI

LLM = ChatOpenAI(temperature=0)
LLM = ChatOpenAI(temperature=0.3)
103 changes: 34 additions & 69 deletions src/sjtu_drone/rosgpt/rosgpt/ros_agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,76 @@
from langchain.chat_models.openai import ChatOpenAI
from langchain.agents import AgentType, initialize_agent
from .tools import CustomCommandToJSON
from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.tools import StructuredTool
from .tools import CustomCommandToJSON, RetrievePOICoordinates, ComputeDroneMovements
from . import LLM


def load_agent():
PREFIX = f"""You are an interpreter for a drone. You will be given a command in English.
If what you are given is not a command, or is not relevant for managing the drone, you should ignore it and return a message saying so.
First, you must think about what the command means. If there are multiple steps in the command, you must plan out each step for the drone to follow sequentially. You may only command the drone to move, rotate, land, stop, or takeoff. If you command it to move, you must specify the direction, distance, and speed.
PREFIX = f"""You are an interpreter for a drone. You will be given a command in English.
If what you are given is not a command, or is not relevant for managing the drone, you should ignore it return a message saying so.
If there is anything that is unclear or ambiguous, you should ask for clarification.
First, you must think about what the command means, and plan out the exact steps you will take to execute the command.
Even if the command is a single step, you should still plan out the steps you will take to execute the command.
After you have planned out the steps, you must format each step into a JSON object that the drone can understand. You should use the command_to_json tool to help you. Note: you may only pass a single command to the tool at a time.
Once you have finished formatting all the commands, you should return a string formatted json object containing all the JSON formatted commands. The drone will then execute the commands in sequential order. The output should follow this format:
Once you have finished formatting all the commands, you should return an array containing all the JSON
formatted commands. The drone will then execute the commands in sequential order. The output should follow this format:
'[json_command_1, json_command_2, ...]'
where each json_command is a JSON object that follows the schema described below:
Each json_command is a JSON object that must contain a "command" key.
The value associated with the "command" key is an object that must have an action field, which specifies what action the drone should take. This can be one of four strings: "land", "takeoff", "move", or "stop".
If the action field is set to "move", then the object associated with the "command" key must also contain a params field, which is an object that provides details about the move action. This object has three fields:
a. linear_speed: A number between 0 and 1 that specifies the linear speed of the drone in meters per second. The default value is 0.1.
b. distance: A floating-point number that specifies the distance the drone should move in meters. The default value is 0.1.
c. direction: A string that specifies the direction the drone should move in. This can be one of six values: "forward", "backward", "left", "right", "up", or "down".
If the action field is set to anything other than "move", the params field is not required in the object associated with the "command" key.
As a reminder, you have access to the following tools:"""
FORMAT_INSTRUCTIONS = """Use the following format:
Command: the original command from the user
Thought: you should always think about what to do
Plan: the entire plan regarding how you will decompose the command into discrete steps
Thought: the thought process of the next action you will take
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation should be repeated for each step in the command)
Thought: I have broken the command out into discrete steps for the drone to follow formatted in the appropriate way
Final Answer: An array of json objects to send to the drone, formatted as a string"""
... (this Thought/Action/Action Input/Observation should be repeated until the command(s) are ready to be sent to the drone)
Thought: I have broken the command out into discrete steps and formatted each step into a JSON object.
Final Answer: An array of json objects to send to the drone"""
SUFFIX = """Begin!
Command: {input}
Thought: {agent_scratchpad}"""


PARSING_ERROR_PROMPT = """If you're returning a list of commands, make sure you output an array where each element is a JSON object. Each JSON object should have the following format:
PARSING_ERROR_PROMPT = """The correct format for a single command is:
```TypeScript
command: { //
action: string // The action to perform. This can be one of `land`, `takeoff`, `move`, `stop`.
params: { //
linear_speed: number // The linear speed of the drone in meters per second. The default value is 0.1. This value must be between 0 and 1.
distance: number // The distance to move in meters. This value must be a float. The default value is 0.1.
distance: number // The distance to move in meters. This value must be between -1 and 1. The default value is 0.1.
direction: string // The direction to move in. This can be one of `forward`, `backward`, `left`, `right`, `up`, `down`.
}
}
```
If you're returning a message, just return the string.
Example:
prompt = '''
Consider the following ontology:
{"action": "land", "params": {}}
{"action": "takeoff", "params": {}}
{"action": "move", "params": {"linear_speed": linear_speed, "distance": distance, "direction": direction}}
You may get a command in another language, translate it to English and then create the JSON.
The 'direction' parameter can take values "forward", "backward", "left", "right", "up", "down" to indicate the direction of movement. Here are some examples.
If speed is not given in the prompt, it is assumed to be 0.5 meters per second.
All numerical answers should be in float form.
Command: "takeoff and Move forward for 1 meter at a speed of 0.5 meters per second."
Thought: The command instructs the drone to takeoff first and then move forward for a specific distance and speed.
Action: Convert to JSON format
Action Input: "takeoff and Move forward for 1 meter at a speed of 0.5 meters per second."
Observation: [{"action": "takeoff", "params": {}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 1, "direction": "forward"}}]
Thought: I have broken the command out into discrete steps for the drone to follow formatted in the appropriate way.
Final Answer: '[{"action": "takeoff", "params": {}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 1, "direction": "forward"}}]'
'''
prompt = '''Command: "Land."
Thought: The command instructs the drone to land.
Action: Convert to JSON format
Action Input: "Land."
Observation: {"action": "land", "params": {}}
Thought: I have converted the command into a JSON format suitable for the drone.
Final Answer: '[{"action": "land", "params": {}}]'
'''
Note: The "Example" is just a demonstration of how the instructions are utilized in a practical scenario. It's not the exact execution but a representation.
Command: "Takeoff, move forward for 3 meters, then land."
Thought: The command instructs the drone to takeoff, move forward, and then land.
Action: Convert to JSON format
Action Input: "Takeoff, move forward for 3 meters, then land."
Observation: [{"action": "takeoff", "params": {}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 3, "direction": "forward"}}, {"action": "land", "params": {}}]
Thought: I have broken the command into discrete steps for the drone.
Final Answer: '[{"action": "takeoff", "params": {}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 3, "direction": "forward"}}, {"action": "land", "params": {}}]'
Command: "Move left for 2 meters, move upwards for 1 meter, then stop."
Thought: The command instructs the drone to move left, then move upwards, followed by a stop.
Action: Convert to JSON format
Action Input: "Move left for 2 meters, move upwards for 1 meter, then stop."
Observation: [{"action": "move", "params": {"linear_speed": 0.5, "distance": 2, "direction": "left"}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 1, "direction": "up"}}, {"action": "stop", "params": {}}]
Thought: I have parsed the commands sequentially for the drone.
Final Answer: '[{"action": "move", "params": {"linear_speed": 0.5, "distance": 2, "direction": "left"}}, {"action": "move", "params": {"linear_speed": 0.5, "distance": 1, "direction": "up"}}, {"action": "stop", "params": {}}]'
"""

tools = [CustomCommandToJSON()]
tools = load_tools(['human'], llm=LLM)
tools.append(CustomCommandToJSON())
tools.append(RetrievePOICoordinates())
tools.append(ComputeDroneMovements())

agent = initialize_agent(
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
Expand Down
129 changes: 126 additions & 3 deletions src/sjtu_drone/rosgpt/rosgpt/ros_agent/tools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from kor import create_extraction_chain, from_pydantic
from typing import Optional, Type
from typing import Optional, Type, Tuple
from pydantic import BaseModel, Field
from langchain.tools import BaseTool
from langchain.callbacks.manager import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun
Expand All @@ -10,7 +10,7 @@
class MoveParams(BaseModel):
linear_speed: float = Field(
...,
description="The linear speed of the drone in meters per second. The default value is 0.1. This value must be between 0 and 1.",
description="The linear speed of the drone in meters per second. The default value is 0.3. This value must be between 0 and 1.",
ge=0, le=1)

distance: float = Field(
Expand Down Expand Up @@ -41,7 +41,11 @@ class ExtractionInput(BaseModel):
("Takeoff the drone.", {"action": "takeoff"}),
("Move down for 2 meters at a speed of 0.4 meters per second.",
{"action": "move",
"params": {"linear_speed": 0.4, "distance": 2, "direction": "down"}})])
"params": {"linear_speed": 0.4, "distance": 2, "direction": "down"}}),
("Move forward for 2 meters at a speed of 0.4 meters per second.",
{"action": "move",
"params": {"linear_speed": 0.4, "distance": 2, "direction": "forward"}}),
("Stop the drone.", {"action": "stop"})])
extraction_chain = create_extraction_chain(LLM, schema, encoder_or_encoder_class='json', validator=validator)

class CustomCommandToJSON(BaseTool):
Expand All @@ -66,3 +70,122 @@ async def _arun(
) -> str:
"""Use the tool asynchronously."""
raise NotImplementedError("Command to JSON does not support async")

class POIInput(BaseModel):
location: str = Field(..., description="The location of the point of interest (POI).")

class RetrievePOICoordinates(BaseTool):
name = "retrieve_poi_coordinates"
description = ("useful when you want to retrieve the coordinates of the garden or the kitchen.")

args_schema: Type[BaseModel] = ExtractionInput
def _run(
self,
location: str,
run_manager: Optional[CallbackManagerForToolRun] = None
) -> str:
"""Use the tool."""
poi_coordinates = {
'garden': (10, 30, 2),
'kitchen': (60, 20, 1)
}
try:
return poi_coordinates[location]
except KeyError:
raise KeyError(f"Invalid location: {location}. Please try another tool.")

async def _arun(
self,
location: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None
) -> str:
"""Use the tool asynchronously."""
raise NotImplementedError("Retrieve POI Coordinates does not support async")

class GetDroneLocation(BaseTool):
name = "get_drone_location"
description = ("useful when you want to retrieve the current location of the drone.")

def _run(
self,
run_manager: Optional[CallbackManagerForToolRun] = None
) -> str:
"""Use the tool."""
drone_location = (10, 20, 1)

return drone_location

async def _arun(
self,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None
) -> str:
"""Use the tool asynchronously."""
raise NotImplementedError("Get Drone Location does not support async")

# TODO: Implement this tool
# class GetDroneOrientation(BaseTool):
# name = "get_drone_orientation"
# description = ("useful when you want to retrieve the current orientation of the drone.")
# args_schema: Type[BaseModel] = ExtractionInput

# def _run(
# self,
# run_manager: Optional[CallbackManagerForToolRun] = None
# ) -> str:
# """Use the tool."""
# drone_orientation = 0

# return drone_orientation

# async def _arun(
# self,
# run_manager: Optional[AsyncCallbackManagerForToolRun] = None
# ) -> str:
# """Use the tool asynchronously."""
# raise NotImplementedError("Get Drone Orientation does not support async")


class DroneToPOIInput(BaseModel):
coordinates: str = Field(
...,
description= ("The coordinates of the drone, the poi coordinates, and the speed in the form "
"[drone_x, drone_y, drone_z, poi_x, poi_y, poi_z, speed].")
)

class ComputeDroneMovements(BaseTool):
name = "compute_drone_movements"
description = ("useful when you want to compute the movements the drone should take to reach a certain "
"location, given a list of coordinates and speed in the form "
"[drone_x, drone_y, drone_z, poi_x, poi_y, poi_z, speed].")
args_schema: Type[BaseModel] = DroneToPOIInput

def _run(
self,
coordinates: str,
run_manager: Optional[CallbackManagerForToolRun] = None
) -> Tuple[str, str]:
"""Use the tool."""

# Convert the coordinates string to a tuple
# Coordinates could be in the form '[drone_x, drone_y, poi_x, poi_y]'
drone_x, drone_y, drone_z, poi_x, poi_y, poi_z, speed = eval(coordinates)

# Calculate x axis movement (left/right)
if drone_x < poi_x:
x_axis_movement = f"The drone should move right {poi_x - drone_x} meters at {speed} meters per second"
elif drone_x > poi_x:
x_axis_movement = f"The drone should move left {drone_x - poi_x} meters at {speed} meters per second"

# Calculate y axis movement (forward/backward)
if drone_y < poi_y:
y_axis_movement = f"The drone should move forward {poi_y - drone_y} meters at {speed} meters per second"
elif drone_y > poi_y:
y_axis_movement = f"The drone should move backward {drone_y - poi_y} meters at {speed} meters per second"

# Calculate z axis movement (up/down)
if drone_z < poi_z:
z_axis_movement = f"The drone should move up {poi_z - drone_z} meters at {speed} meters per second"
elif drone_z > poi_z:
z_axis_movement = f"The drone should move down {drone_z - poi_z} meters at {speed} meters per second"

return x_axis_movement, y_axis_movement, z_axis_movement

0 comments on commit 4630562

Please sign in to comment.