Skip to content

Commit

Permalink
add auto-refresh and stage file processing code
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tyuan committed Dec 11, 2024
1 parent edd09f0 commit f1818f3
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 0 deletions.
1 change: 1 addition & 0 deletions polaris-py/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
10 changes: 10 additions & 0 deletions polaris-py/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Polaris Auto-Refresh Demo
There are two components to this demo: a long-running python process to pull from SQS and write into the directory table, and a client to process unstructured data.

## Setup
1. Make sure you have access to [AWS Engineering Sandbox](https://snowflakecomputing.atlassian.net/wiki/spaces/EN/pages/2904162782/Engineering+Sandbox+Access) and my bucket `s3://datalake-storage-team/tyuan-polaris-test`.
2. Run polaris and login via spark: `./regtests/run_spark_sql.sh s3://datalake-storage-team/tyuan-polaris-test arn:aws:iam::631484165566:role/datalake-storage-integration-role`
3. Create a database and directory table: `create table directory(path string, size bigint, last_modified string, etag string);`
4. Replace the SQS id/key in refresh.py with credentials that have permission to poll the queue. Run the refresh script.
5. Upload a file into s3://datalake-storage-team/tyuan-polaris-test/db1/stage1. The refresh script should pick up the new file in a few seconds.
6. Replace API key in client.py with a valid OpenAI api key. Use it to download and summarize files in the directory table.
39 changes: 39 additions & 0 deletions polaris-py/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pyiceberg.catalog import load_catalog
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.table import Table
import openai

def load_table(catalog_name, table_identifier) -> Table:
catalog = load_catalog(catalog_name,
**{
'uri': 'http://localhost:8181/api/catalog',
'warehouse': catalog_name,
'token': 'principal:root;realm:default-realm',
'client.region': 'us-west-2'
})
return catalog.load_table(table_identifier)

if __name__ == '__main__':
table = load_table('manual_spark', 'db1.directory')
scan = table.scan(selected_fields=('path',)).to_arrow()
input_file = PyArrowFileIO({'client.region': 'us-west-2'}).new_input(location=f's3://datalake-storage-team/{scan[0][0]}')

# Test opening and reading the file
r = input_file.open(seekable=False)
input_text = str(r.read())
print("original text:" + input_text)

openai.api_key = '<api-key>'
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are an AI assistant that summarizes text."},
{"role": "user", "content": f"Summarize the following text:\n{input_text}"}
],
max_tokens=100,
temperature=0.5
)

# Extract the summary
summary = response.choices[0].message.content
print("Summary:", summary)
52 changes: 52 additions & 0 deletions polaris-py/refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from time import sleep
from client import load_table
import pyarrow as pa
import boto3
import json

def poll_messages(region, queue_url, table):
sqs = boto3.client("sqs",
aws_access_key_id="<id>",
aws_secret_access_key="<key>",
region_name=region)
try:
# Receive messages
print("Polling messages.")
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # Adjust up to 10 for batch processing
WaitTimeSeconds=10 # Enable long polling
)

# Check if messages were received
if "Messages" in response:
rows = []
for message in response["Messages"]:
record = json.loads(message['Body'])['Records'][0]
object = record['s3']['object']
print(f"message ID: {message['MessageId']}, object: {object}")
rows.append({'path': object['key'], 'size': object['size'], 'last_modified': record['eventTime'], 'etag': object['eTag']})

# Delete the message after processing
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message["ReceiptHandle"]
)
print(f"Deleted message ID: {message['MessageId']}")

# Append the rows to table
df = pa.Table.from_pylist(rows)
table.append(df)
print("Committed to directory.")

else:
print("No messages available.")

except Exception as e:
print(f"An error occurred: {e}")

if __name__ == '__main__':
table = load_table('manual_spark', 'db1.directory')
while True:
poll_messages('us-west-2', "https://sqs.us-west-2.amazonaws.com/631484165566/tyuan-polaris", table)
sleep(5)

0 comments on commit f1818f3

Please sign in to comment.