Skip to content

Commit

Permalink
update celery task with batch process task - Adithya S K
Browse files Browse the repository at this point in the history
  • Loading branch information
adithya-s-k committed Oct 15, 2024
1 parent a61d081 commit 11939a6
Showing 1 changed file with 48 additions and 4 deletions.
52 changes: 48 additions & 4 deletions marker_api/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,29 @@
import io
import logging
from marker_api.utils import process_image_to_base64
from celery.signals import worker_process_init

logger = logging.getLogger(__name__)

model_list = None


@worker_process_init.connect
def initialize_models(**kwargs):
global model_list
if not model_list:
model_list = load_all_models()
print("Models loaded at worker startup")


class PDFConversionTask(Task):
abstract = True

def __init__(self):
super().__init__()
self.model_list = None

def __call__(self, *args, **kwargs):
if not self.model_list:
self.model_list = load_all_models()
# Use the global model_list initialized at worker startup
return self.run(*args, **kwargs)


Expand All @@ -27,7 +36,7 @@ def __call__(self, *args, **kwargs):
)
def convert_pdf_to_markdown(self, filename, pdf_content):
pdf_file = io.BytesIO(pdf_content)
markdown_text, images, metadata = convert_single_pdf(pdf_file, self.model_list)
markdown_text, images, metadata = convert_single_pdf(pdf_file, model_list)
image_data = {}
for i, (img_filename, image) in enumerate(images.items()):
logger.debug(f"Processing image {img_filename}")
Expand All @@ -41,3 +50,38 @@ def convert_pdf_to_markdown(self, filename, pdf_content):
"images": image_data,
"status": "ok",
}


# @celery_app.task(
# ignore_result=False, bind=True, base=PDFConversionTask, name="process_batch"
# )
# def process_batch(self, batch_data):
# results = []
# for filename, pdf_content in batch_data:
# try:
# result = convert_pdf_to_markdown(filename, pdf_content)
# results.append(result)
# except Exception as e:
# logger.error(f"Error processing {filename}: {str(e)}")
# results.append({"filename": filename, "status": "Error", "error": str(e)})
# return results


@celery_app.task(
ignore_result=False, bind=True, base=PDFConversionTask, name="process_batch"
)
def process_batch(self, batch_data):
results = []
total = len(batch_data)
for i, (filename, pdf_content) in enumerate(batch_data, start=1):
try:
result = convert_pdf_to_markdown(filename, pdf_content)
results.append(result)
except Exception as e:
logger.error(f"Error processing {filename}: {str(e)}")
results.append({"filename": filename, "status": "Error", "error": str(e)})

# Update progress
self.update_state(state="PROGRESS", meta={"current": i, "total": total})

return results

0 comments on commit 11939a6

Please sign in to comment.