forked from microsoft/UFO
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add record processor for the user demonstration learning
- Loading branch information
1 parent
1db2fe4
commit 92f354e
Showing
13 changed files
with
773 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
from . import record_processor | ||
|
||
if __name__ == "__main__": | ||
# Execute the main script | ||
record_processor.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
class DemonstrationStep: | ||
""" | ||
Class for the single step information in the user demonstration record. | ||
Multiple steps will be recorded to achieve a specific request. | ||
""" | ||
|
||
def __init__(self, application: str, description: str, action: str, screenshot: str, comment: str): | ||
""" | ||
Create a new step. | ||
""" | ||
self.application = application | ||
self.description = description | ||
self.action = action | ||
self.comment = comment | ||
self.screenshot = screenshot | ||
|
||
|
||
class DemonstrationRecord: | ||
""" | ||
Class for the user demonstration record. | ||
A serise of steps user performed to achieve a specific request will be recorded in this class. | ||
""" | ||
|
||
def __init__(self, applications: list, step_num: int, **steps: DemonstrationStep): | ||
""" | ||
Create a new Record. | ||
""" | ||
self.request = "" | ||
self.round = 0 | ||
self.applications = applications | ||
self.step_num = step_num | ||
for index, step in steps.items(): | ||
setattr(self, index, step.__dict__) | ||
|
||
def set_request(self, request: str): | ||
""" | ||
Set the request. | ||
""" | ||
self.request = request | ||
|
||
def get_request(self) -> str: | ||
""" | ||
Get the request. | ||
""" | ||
return self.request | ||
|
||
def get_applications(self) -> list: | ||
""" | ||
Get the application. | ||
""" | ||
return self.applications |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import re | ||
import xml.etree.ElementTree as ET | ||
from bs4 import BeautifulSoup | ||
from .demonstration_record import DemonstrationStep, DemonstrationRecord | ||
|
||
|
||
class PSRRecordParser: | ||
""" | ||
Class for parsing the steps recorder .mht file content to user demonstration record. | ||
""" | ||
|
||
def __init__(self, content: str): | ||
""" | ||
Constructor for the RecordParser class. | ||
""" | ||
self.content = content | ||
self.parts_dict = {} | ||
self.applications = [] | ||
self.comments = [] | ||
self.steps = [] | ||
|
||
def parse_to_record(self) -> DemonstrationRecord: | ||
""" | ||
Parse the steps recorder .mht file content to record in following steps: | ||
1. Find the boundary in the .mht file. | ||
2. Split the file by the boundary into parts. | ||
3. Get the comments for each step. | ||
4. Get the steps from the content. | ||
5. Construct the record object and return it. | ||
return: A record object. | ||
""" | ||
boundary = self.find_boundary() | ||
self.parts_dict = self.split_file_by_boundary(boundary) | ||
self.comments = self.get_comments( | ||
self.parts_dict['main.htm']['Content']) | ||
self.steps = self.get_steps(self.parts_dict['main.htm']['Content']) | ||
record = DemonstrationRecord( | ||
list(set(self.applications)), len(self.steps), **self.steps) | ||
|
||
return record | ||
|
||
def find_boundary(self) -> str: | ||
""" | ||
Find the boundary in the .mht file. | ||
""" | ||
|
||
boundary_start = self.content.find("boundary=") | ||
|
||
if boundary_start != -1: | ||
boundary_start += len("boundary=") | ||
boundary_end = self.content.find("\n", boundary_start) | ||
boundary = self.content[boundary_start:boundary_end].strip('\"') | ||
return boundary | ||
else: | ||
raise ValueError("Boundary not found in the .mht file.") | ||
|
||
def split_file_by_boundary(self, boundary: str) -> dict: | ||
""" | ||
Split the file by the boundary into parts, | ||
Store the parts in a dictionary, including the content type, | ||
content location and content transfer encoding. | ||
boundary: The boundary of the file. | ||
return: A dictionary of parts in the file. | ||
""" | ||
parts = self.content.split("--" + boundary) | ||
part_dict = {} | ||
for part in parts: | ||
content_type_start = part.find("Content-Type:") | ||
content_location_start = part.find("Content-Location:") | ||
content_transfer_encoding_start = part.find( | ||
"Content-Transfer-Encoding:") | ||
part_info = {} | ||
if content_location_start != -1: | ||
content_location_end = part.find("\n", content_location_start) | ||
content_location = part[content_location_start:content_location_end].split(":")[ | ||
1].strip() | ||
|
||
# add the content location | ||
if content_type_start != -1: | ||
content_type_end = part.find("\n", content_type_start) | ||
content_type = part[content_type_start:content_type_end].split(":")[ | ||
1].strip() | ||
part_info["Content-Type"] = content_type | ||
|
||
# add the content transfer encoding | ||
if content_transfer_encoding_start != -1: | ||
content_transfer_encoding_end = part.find( | ||
"\n", content_transfer_encoding_start) | ||
content_transfer_encoding = part[content_transfer_encoding_start:content_transfer_encoding_end].split(":")[ | ||
1].strip() | ||
part_info["Content-Transfer-Encoding"] = content_transfer_encoding | ||
|
||
content = part[content_location_end:].strip() | ||
part_info["Content"] = content | ||
part_dict[content_location] = part_info | ||
return part_dict | ||
|
||
def get_steps(self, content: str) -> dict: | ||
""" | ||
Get the steps from the content in fllowing steps: | ||
1. Find the UserActionData tag in the content. | ||
2. Parse the UserActionData tag to get the steps. | ||
3. Get the screenshot for each step. | ||
4. Get the comments for each step. | ||
content: The content of the main.htm file. | ||
return: A dictionary of steps. | ||
""" | ||
|
||
user_action_data = re.search( | ||
r'<UserActionData>(.*?)</UserActionData>', content, re.DOTALL) | ||
if user_action_data: | ||
|
||
root = ET.fromstring(user_action_data.group(1)) | ||
steps = {} | ||
|
||
for each_action in root.findall('EachAction'): | ||
|
||
action_number = each_action.get('ActionNumber') | ||
application = each_action.get('FileName') | ||
description = each_action.find('Description').text | ||
action = each_action.find('Action').text | ||
screenshot_file_name = each_action.find( | ||
'ScreenshotFileName').text | ||
screenshot = self.get_screenshot(screenshot_file_name) | ||
step_key = f"step_{int(action_number) - 1}" | ||
|
||
step = DemonstrationStep( | ||
application, description, action, screenshot, self.comments.get(step_key)) | ||
steps[step_key] = step | ||
self.applications.append(application) | ||
return steps | ||
else: | ||
raise ValueError("UserActionData not found in the file.") | ||
|
||
def get_comments(self, content: str) -> dict: | ||
""" | ||
Get the user input comments for each step | ||
content: The content of the main.htm file. | ||
return: A dictionary of comments for each step. | ||
""" | ||
soup = BeautifulSoup(content, 'html.parser') | ||
body = soup.body | ||
steps_html = body.find('div', id='Steps') | ||
steps = steps_html.find_all(lambda tag: tag.name == 'div' and tag.has_attr( | ||
'id') and re.match(r'^Step\d+$', tag['id'])) | ||
|
||
comments = {} | ||
for index, step in enumerate(steps): | ||
comment_tag = step.find('b', text='Comment: ') | ||
comments[f'step_{index}'] = comment_tag.next_sibling if comment_tag else None | ||
return comments | ||
|
||
def get_screenshot(self, screenshot_file_name: str) -> str: | ||
""" | ||
Get the screenshot by screenshot file name. | ||
The screenshot related information is stored in the parts_dict. | ||
screenshot_file_name: The file name of the screenshot. | ||
return: The screenshot in base64 string. | ||
""" | ||
screenshot_part = self.parts_dict[screenshot_file_name] | ||
content = screenshot_part['Content'] | ||
content_type = screenshot_part['Content-Type'] | ||
content_transfer_encoding = screenshot_part['Content-Transfer-Encoding'] | ||
|
||
screenshot = 'data:{type};{encoding}, {content}'.format( | ||
type=content_type, encoding=content_transfer_encoding, content=content) | ||
|
||
return screenshot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import os | ||
import argparse | ||
from record_processor.summarizer.summarizer import DemonstrationSummarizer | ||
from ufo.config.config import load_config | ||
from .parser.psr_record_parser import PSRRecordParser | ||
from .utils import create_folder, save_to_json, unzip_and_read_file | ||
from ufo.utils import print_with_color | ||
|
||
configs = load_config() | ||
|
||
args = argparse.ArgumentParser() | ||
args.add_argument("--request", "-r", help="The request that user want to achieve.", | ||
type=lambda s: s.strip() or None, nargs='+') | ||
args.add_argument("--behavior-record-path", "-p", help="The path for user behavior record in zip file.", | ||
type=lambda f: f if f.endswith(".zip") else None) | ||
parsed_args = args.parse_args() | ||
|
||
|
||
def main(): | ||
""" | ||
Main function. | ||
1. Read the user demonstration record and parse it. | ||
2. Summarize the demonstration record. | ||
3. Let user decide whether to save the demonstration record. | ||
4. Save the demonstration record if user choose to save. | ||
""" | ||
try: | ||
content = unzip_and_read_file(parsed_args.behavior_record_path) | ||
record = PSRRecordParser(content).parse_to_record() | ||
record.set_request(parsed_args.request[0]) | ||
|
||
summarizer = DemonstrationSummarizer( | ||
configs["ACTION_AGENT"]["VISUAL_MODE"], configs["DEMONSTRATION_PROMPT"], configs["ACTION_SELECTION_EXAMPLE_PROMPT"], configs["API_PROMPT"]) | ||
|
||
summaries, total_cost = summarizer.get_summary_list([record]) | ||
if asker(summaries): | ||
demonstration_path = configs["DEMONSTRATION_SAVED_PATH"] | ||
create_folder(demonstration_path) | ||
|
||
save_to_json(record.__dict__, os.path.join(demonstration_path, "demonstration_log", parsed_args.request[0].replace(' ', '_')) + ".json") | ||
summarizer.create_or_update_yaml(summaries, os.path.join(demonstration_path, "demonstration.yaml")) | ||
summarizer.create_or_update_vector_db(summaries, os.path.join(demonstration_path, "demonstration_db")) | ||
|
||
formatted_cost = '${:.2f}'.format(total_cost) | ||
print_with_color(f"Request total cost is {formatted_cost}", "yellow") | ||
|
||
except ValueError as e: | ||
print_with_color(str(e), "red") | ||
|
||
|
||
def asker(summaries) -> bool: | ||
plan = summaries[0]["example"]["Plan"] | ||
print_with_color("""Here's the plan summarized from your demonstration: """, "cyan") | ||
print_with_color(plan, "green") | ||
print_with_color("""Would you like to save the plan future reference by the agent? | ||
[Y] for yes, any other key for no.""", "cyan") | ||
|
||
response = input() | ||
|
||
if response.upper() == "Y": | ||
return True | ||
else: | ||
return False |
Oops, something went wrong.