forked from vateseif/narrate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.py
137 lines (101 loc) · 2.8 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import inspect
from typing import List
from abc import abstractmethod
import gym
import panda_gym
import numpy as np
BASE_DIR = os.path.dirname(__file__)
# GPT4 api key.
os.environ["OPENAI_API_KEY"] = open(BASE_DIR + '/keys/gpt4.key', 'r').readline().rstrip()
class AbstractLLMConfig:
prompt: str
parsing: str
model_name: str
streaming: bool
temperature: float
class AbstractControllerConfig:
T: int
nx: int
nu: int
dt: float
lu: float # lower bound on u
hu: float # higher bound on u
class AbstractRobotConfig:
name: str
controller_type: str
class AbstractSimulaitonConfig:
render: bool
env_name: str
task: str
save_video: bool
class ObjBase:
'''
The object base that defines debugging tools
'''
def initialize (self, **kwargs):
pass
def sanityCheck (self):
# check the system parameters are coherent
return True
def errorMessage (self,msg):
print(self.__class__.__name__+'-'+inspect.stack()[1][3]+': [ERROR] '+msg+'\n')
return False
def warningMessage (self,msg):
print(self.__class__.__name__+'-'+inspect.stack()[1][3]+': [WARNING] '+msg+'\n')
return False
class AbstractController(ObjBase):
def __init__(self, cfg: AbstractControllerConfig) -> None:
self.cfg = cfg
@abstractmethod
def reset(self, x0:np.ndarray) -> None:
return
@abstractmethod
def apply_gpt_message(self, gpt_message:str) -> None:
return
@abstractmethod
def step(self) -> np.ndarray:
return
class AbstractLLM(ObjBase):
def __init__(self, cfg:AbstractLLMConfig) -> None:
self.cfg = cfg
@abstractmethod
def run(self):
return
class AbstractRobot(ObjBase):
def __init__(self, cfg:AbstractRobotConfig) -> None:
self.cfg = cfg
# components
self.TP: AbstractLLM # Task planner
self.OD: AbstractLLM # Optimization Designer
self.MPC: AbstractController # Controller
@abstractmethod
def reset_gpt(self):
return
class AbstractSimulation(ObjBase):
def __init__(self, cfg: AbstractSimulaitonConfig) -> None:
self.cfg = cfg
# init env
self.env = gym.make(f"Panda{cfg.env_name}-v2", render=cfg.render)
# init robots
self.robot # TODO: account for multiple robots
# count number of tasks solved from a plan
self.task_counter = 0
def reset(self):
""" Reset environment """
pass
def create_plan(self):
""" Triggers the Task Planner to generate a plan of subtasks"""
pass
def next_task(self):
""" Tasks the Optimization Designer to carry out the next task in the plam"""
pass
def _solve_task(self):
""" Applies the optimization designed by the Optimization Designer"""
pass
def _run(self):
""" Start the simulation """
pass
def run(self):
""" Executes self._run() in a separate thread"""
pass