From f5dd3bfc6c8a825da3525f4f122056ce67ef7332 Mon Sep 17 00:00:00 2001
From: handsomezhuzhu <2658601135@qq.com>
Date: Fri, 12 Dec 2025 23:59:28 +0800
Subject: [PATCH] =?UTF-8?q?=E9=95=BF=E6=96=87=E6=9C=AC=E6=8B=86=E5=88=86?=
=?UTF-8?q?=EF=BC=8C=E5=89=8D=E7=AB=AF=E5=8F=8D=E9=A6=88=E8=BF=98=E6=9C=AA?=
=?UTF-8?q?=E6=88=90=E5=8A=9F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
Dockerfile | 7 +-
backend/routers/exam.py | 180 +++++++++++++++++++-
backend/services/auth_service.py | 39 +++++
backend/services/llm_service.py | 54 +++++-
backend/services/progress_service.py | 149 ++++++++++++++++
frontend/src/components/ParsingProgress.jsx | 121 +++++++++++++
frontend/src/pages/ExamDetail.jsx | 98 ++++++++---
7 files changed, 605 insertions(+), 43 deletions(-)
create mode 100644 backend/services/progress_service.py
create mode 100644 frontend/src/components/ParsingProgress.jsx
diff --git a/Dockerfile b/Dockerfile
index 264a2ad..55801ed 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,15 +21,10 @@ FROM python:3.11-slim
WORKDIR /app
-# 安装系统依赖
-RUN apt-get update && apt-get install -y \
- build-essential \
- && rm -rf /var/lib/apt/lists/*
-
# 复制后端依赖文件
COPY backend/requirements.txt ./
-# 安装 Python 依赖
+# 安装 Python 依赖(使用预编译wheel包,无需gcc)
RUN pip install --no-cache-dir -r requirements.txt
# 复制后端代码
diff --git a/backend/routers/exam.py b/backend/routers/exam.py
index 47c8c9e..674cce1 100644
--- a/backend/routers/exam.py
+++ b/backend/routers/exam.py
@@ -2,12 +2,14 @@
Exam Router - Handles exam creation, file upload, and deduplication
"""
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, BackgroundTasks
+from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from typing import List, Optional
from datetime import datetime, timedelta
import os
import aiofiles
+import json
from database import get_db
from models import User, Exam, Question, ExamStatus, SystemConfig
@@ -19,6 +21,7 @@ from services.auth_service import get_current_user
from services.document_parser import document_parser
from services.llm_service import LLMService
from services.config_service import load_llm_config
+from services.progress_service import progress_service
from utils import is_allowed_file, calculate_content_hash
from dedup_utils import is_duplicate_question
@@ -264,9 +267,11 @@ async def async_parse_and_save(
):
"""
Background task to parse document and save questions with deduplication.
+ Sends real-time progress updates via SSE.
"""
from database import AsyncSessionLocal
from sqlalchemy import select
+ from services.progress_service import ProgressUpdate, ProgressStatus
async with AsyncSessionLocal() as db:
try:
@@ -276,6 +281,14 @@ async def async_parse_and_save(
exam.status = ExamStatus.PROCESSING
await db.commit()
+ # Send initial progress
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PARSING,
+ message="开始解析文档...",
+ progress=5.0
+ ))
+
# Load LLM configuration from database
llm_config = await load_llm_config(db)
llm_service = LLMService(config=llm_config)
@@ -293,12 +306,27 @@ async def async_parse_and_save(
# Use Gemini's native PDF processing
print(f"[Exam {exam_id}] Using Gemini native PDF processing", flush=True)
print(f"[Exam {exam_id}] PDF file size: {len(file_content)} bytes", flush=True)
- questions_data = await llm_service.parse_document_with_pdf(file_content, filename)
+
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PARSING,
+ message="使用Gemini解析PDF文档...",
+ progress=10.0
+ ))
+
+ questions_data = await llm_service.parse_document_with_pdf(file_content, filename, exam_id)
else:
# Extract text first, then parse
if is_pdf:
print(f"[Exam {exam_id}] ⚠️ Warning: Using text extraction for PDF (provider does not support native PDF)", flush=True)
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PARSING,
+ message="提取文档文本内容...",
+ progress=10.0
+ ))
+
print(f"[Exam {exam_id}] Extracting text from document...", flush=True)
text_content = await document_parser.parse_file(file_content, filename)
@@ -309,17 +337,40 @@ async def async_parse_and_save(
# Check if document is too long and needs splitting
if len(text_content) > 5000:
- print(f"[Exam {exam_id}] Document is long, splitting into chunks...", flush=True)
text_chunks = document_parser.split_text_with_overlap(text_content, chunk_size=3000, overlap=1000)
- print(f"[Exam {exam_id}] Split into {len(text_chunks)} chunks", flush=True)
+ total_chunks = len(text_chunks)
+
+ print(f"[Exam {exam_id}] Document is long, splitting into chunks...", flush=True)
+ print(f"[Exam {exam_id}] Split into {total_chunks} chunks", flush=True)
+
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.SPLITTING,
+ message=f"文档已拆分为 {total_chunks} 个部分",
+ progress=15.0,
+ total_chunks=total_chunks
+ ))
all_questions = []
for chunk_idx, chunk in enumerate(text_chunks):
- print(f"[Exam {exam_id}] Processing chunk {chunk_idx + 1}/{len(text_chunks)}...", flush=True)
+ current_chunk = chunk_idx + 1
+ chunk_progress = 15.0 + (60.0 * current_chunk / total_chunks)
+
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PROCESSING_CHUNK,
+ message=f"正在处理第 {current_chunk}/{total_chunks} 部分...",
+ progress=chunk_progress,
+ total_chunks=total_chunks,
+ current_chunk=current_chunk,
+ questions_extracted=len(all_questions)
+ ))
+
+ print(f"[Exam {exam_id}] Processing chunk {current_chunk}/{total_chunks}...", flush=True)
try:
chunk_questions = await llm_service.parse_document(chunk)
- print(f"[Exam {exam_id}] Chunk {chunk_idx + 1} extracted {len(chunk_questions)} questions", flush=True)
+ print(f"[Exam {exam_id}] Chunk {current_chunk} extracted {len(chunk_questions)} questions", flush=True)
# Fuzzy deduplicate across chunks
for q in chunk_questions:
@@ -327,7 +378,7 @@ async def async_parse_and_save(
if not is_duplicate_question(q, all_questions, threshold=0.85):
all_questions.append(q)
else:
- print(f"[Exam {exam_id}] Skipped fuzzy duplicate from chunk {chunk_idx + 1}", flush=True)
+ print(f"[Exam {exam_id}] Skipped fuzzy duplicate from chunk {current_chunk}", flush=True)
except Exception as chunk_error:
print(f"[Exam {exam_id}] Chunk {chunk_idx + 1} failed: {str(chunk_error)}", flush=True)
@@ -335,11 +386,37 @@ async def async_parse_and_save(
questions_data = all_questions
print(f"[Exam {exam_id}] Total questions after fuzzy deduplication: {len(questions_data)}", flush=True)
+
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.DEDUPLICATING,
+ message=f"所有部分处理完成,提取了 {len(questions_data)} 个题目",
+ progress=75.0,
+ total_chunks=total_chunks,
+ current_chunk=total_chunks,
+ questions_extracted=len(questions_data)
+ ))
else:
print(f"[Exam {exam_id}] Document content preview:\n{text_content[:500]}\n{'...' if len(text_content) > 500 else ''}", flush=True)
print(f"[Exam {exam_id}] Calling LLM to extract questions...", flush=True)
+
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PARSING,
+ message="正在提取题目...",
+ progress=30.0
+ ))
+
questions_data = await llm_service.parse_document(text_content)
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.DEDUPLICATING,
+ message=f"提取了 {len(questions_data)} 个题目",
+ progress=60.0,
+ questions_extracted=len(questions_data)
+ ))
+
except Exception as parse_error:
print(f"[Exam {exam_id}] ⚠️ Parse error details: {type(parse_error).__name__}", flush=True)
print(f"[Exam {exam_id}] ⚠️ Parse error message: {str(parse_error)}", flush=True)
@@ -351,6 +428,14 @@ async def async_parse_and_save(
raise Exception("No questions found in document")
# Process questions with deduplication and AI answer generation
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.SAVING,
+ message="正在去重并保存题目到数据库...",
+ progress=80.0,
+ questions_extracted=len(questions_data)
+ ))
+
print(f"[Exam {exam_id}] Processing questions with deduplication...")
parse_result = await process_questions_with_dedup(exam_id, questions_data, db, llm_service)
@@ -370,9 +455,28 @@ async def async_parse_and_save(
print(f"[Exam {exam_id}] ✅ {parse_result.message}")
+ # Send completion progress
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.COMPLETED,
+ message=f"完成!添加了 {parse_result.new_added} 个题目(去重 {parse_result.duplicates_removed} 个)",
+ progress=100.0,
+ questions_extracted=parse_result.total_parsed,
+ questions_added=parse_result.new_added,
+ duplicates_removed=parse_result.duplicates_removed
+ ))
+
except Exception as e:
print(f"[Exam {exam_id}] ❌ Error: {str(e)}")
+ # Send error progress
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.FAILED,
+ message=f"处理失败:{str(e)}",
+ progress=0.0
+ ))
+
# Update exam status to failed
result = await db.execute(select(Exam).where(Exam.id == exam_id))
exam = result.scalar_one()
@@ -549,6 +653,70 @@ async def get_exam_detail(
return exam
+@router.get("/{exam_id}/progress")
+async def get_exam_progress(
+ exam_id: int,
+ token: Optional[str] = None,
+ db: AsyncSession = Depends(get_db)
+):
+ """
+ Get real-time progress updates for exam document parsing (SSE endpoint)
+
+ Returns Server-Sent Events stream with progress updates
+ """
+ # Authenticate using token from query parameter (EventSource doesn't support custom headers)
+ from services.auth_service import get_current_user_from_token
+
+ if not token:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Token required"
+ )
+
+ try:
+ current_user = await get_current_user_from_token(token, db)
+ except Exception as e:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Invalid token"
+ )
+
+ # Verify exam belongs to user
+ result = await db.execute(
+ select(Exam).where(
+ and_(Exam.id == exam_id, Exam.user_id == current_user.id)
+ )
+ )
+ exam = result.scalar_one_or_none()
+
+ if not exam:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail="Exam not found"
+ )
+
+ async def event_generator():
+ """Generate SSE events"""
+ async for update in progress_service.subscribe(exam_id):
+ # Format as SSE
+ data = json.dumps(update.to_dict())
+ yield f"data: {data}\n\n"
+
+ # Stop if completed or failed
+ if update.status in ["completed", "failed"]:
+ break
+
+ return StreamingResponse(
+ event_generator(),
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "X-Accel-Buffering": "no" # Disable nginx buffering
+ }
+ )
+
+
@router.delete("/{exam_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_exam(
exam_id: int,
diff --git a/backend/services/auth_service.py b/backend/services/auth_service.py
index 35dd6f4..58eda3d 100644
--- a/backend/services/auth_service.py
+++ b/backend/services/auth_service.py
@@ -82,3 +82,42 @@ async def get_optional_user(
return await get_current_user(credentials, db)
except HTTPException:
return None
+
+
+async def get_current_user_from_token(token: str, db: AsyncSession) -> User:
+ """
+ Get current user from JWT token string (for SSE with query params).
+
+ Args:
+ token: JWT token string
+ db: Database session
+
+ Returns:
+ User object
+
+ Raises:
+ Exception: If token is invalid or user not found
+ """
+ # Decode token
+ payload = decode_access_token(token)
+ if payload is None:
+ raise Exception("Invalid token")
+
+ user_id = payload.get("sub")
+ if user_id is None:
+ raise Exception("Invalid token payload")
+
+ # Convert user_id to int if it's a string
+ try:
+ user_id = int(user_id)
+ except (ValueError, TypeError):
+ raise Exception("Invalid user ID")
+
+ # Get user from database
+ result = await db.execute(select(User).where(User.id == user_id))
+ user = result.scalar_one_or_none()
+
+ if user is None:
+ raise Exception("User not found")
+
+ return user
diff --git a/backend/services/llm_service.py b/backend/services/llm_service.py
index 7d7a88e..5554e3d 100644
--- a/backend/services/llm_service.py
+++ b/backend/services/llm_service.py
@@ -453,7 +453,7 @@ class LLMService:
return chunks
- async def parse_document_with_pdf(self, pdf_bytes: bytes, filename: str) -> List[Dict[str, Any]]:
+ async def parse_document_with_pdf(self, pdf_bytes: bytes, filename: str, exam_id: int = None) -> List[Dict[str, Any]]:
"""
Parse PDF document using Gemini's native PDF understanding.
Automatically splits large PDFs into overlapping chunks.
@@ -462,6 +462,7 @@ class LLMService:
Args:
pdf_bytes: PDF file content as bytes
filename: Original filename for logging
+ exam_id: Optional exam ID for progress updates
Returns:
List of question dictionaries
@@ -471,17 +472,44 @@ class LLMService:
# Split PDF into chunks
pdf_chunks = self.split_pdf_pages(pdf_bytes, pages_per_chunk=4, overlap=1)
+ total_chunks = len(pdf_chunks)
- print(f"[Gemini PDF] Processing {len(pdf_chunks)} chunk(s) for {filename}")
+ print(f"[Gemini PDF] Processing {total_chunks} chunk(s) for {filename}")
+
+ # Send progress update if exam_id provided
+ if exam_id:
+ from services.progress_service import progress_service, ProgressUpdate, ProgressStatus
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.SPLITTING,
+ message=f"PDF已拆分为 {total_chunks} 个部分",
+ progress=15.0,
+ total_chunks=total_chunks
+ ))
all_questions = []
# Process each chunk with fuzzy deduplication
for chunk_idx, chunk_bytes in enumerate(pdf_chunks):
- print(f"[Gemini PDF] Processing chunk {chunk_idx + 1}/{len(pdf_chunks)}")
+ current_chunk = chunk_idx + 1
+ chunk_progress = 15.0 + (60.0 * current_chunk / total_chunks)
+
+ print(f"[Gemini PDF] Processing chunk {current_chunk}/{total_chunks}")
+
+ # Send progress update
+ if exam_id:
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.PROCESSING_CHUNK,
+ message=f"正在处理第 {current_chunk}/{total_chunks} 部分...",
+ progress=chunk_progress,
+ total_chunks=total_chunks,
+ current_chunk=current_chunk,
+ questions_extracted=len(all_questions)
+ ))
try:
- questions = await self._parse_pdf_chunk(chunk_bytes, f"{filename}_chunk_{chunk_idx + 1}")
- print(f"[Gemini PDF] Chunk {chunk_idx + 1} extracted {len(questions)} questions")
+ questions = await self._parse_pdf_chunk(chunk_bytes, f"{filename}_chunk_{current_chunk}")
+ print(f"[Gemini PDF] Chunk {current_chunk} extracted {len(questions)} questions")
# Fuzzy deduplicate across chunks
from dedup_utils import is_duplicate_question
@@ -490,15 +518,27 @@ class LLMService:
if not is_duplicate_question(q, all_questions, threshold=0.85):
all_questions.append(q)
else:
- print(f"[PDF Split] Skipped fuzzy duplicate from chunk {chunk_idx + 1}")
+ print(f"[PDF Split] Skipped fuzzy duplicate from chunk {current_chunk}")
except Exception as e:
- print(f"[Gemini PDF] Chunk {chunk_idx + 1} failed: {str(e)}")
+ print(f"[Gemini PDF] Chunk {current_chunk} failed: {str(e)}")
# Continue with other chunks
continue
print(f"[Gemini PDF] Total questions extracted: {len(all_questions)} (after deduplication)")
+ # Send final progress for PDF processing
+ if exam_id:
+ await progress_service.update_progress(ProgressUpdate(
+ exam_id=exam_id,
+ status=ProgressStatus.DEDUPLICATING,
+ message=f"PDF处理完成,提取了 {len(all_questions)} 个题目",
+ progress=75.0,
+ total_chunks=total_chunks,
+ current_chunk=total_chunks,
+ questions_extracted=len(all_questions)
+ ))
+
return all_questions
async def _parse_pdf_chunk(self, pdf_bytes: bytes, chunk_name: str) -> List[Dict[str, Any]]:
diff --git a/backend/services/progress_service.py b/backend/services/progress_service.py
new file mode 100644
index 0000000..4861a05
--- /dev/null
+++ b/backend/services/progress_service.py
@@ -0,0 +1,149 @@
+"""
+Progress Service - Manages document parsing progress for real-time updates
+"""
+import asyncio
+from typing import Dict, Optional, AsyncGenerator
+from datetime import datetime
+from enum import Enum
+
+
+class ProgressStatus(str, Enum):
+ """Progress status types"""
+ PENDING = "pending"
+ PARSING = "parsing"
+ SPLITTING = "splitting"
+ PROCESSING_CHUNK = "processing_chunk"
+ DEDUPLICATING = "deduplicating"
+ SAVING = "saving"
+ COMPLETED = "completed"
+ FAILED = "failed"
+
+
+class ProgressUpdate:
+ """Progress update data structure"""
+ def __init__(
+ self,
+ exam_id: int,
+ status: ProgressStatus,
+ message: str,
+ progress: float = 0.0,
+ total_chunks: int = 0,
+ current_chunk: int = 0,
+ questions_extracted: int = 0,
+ questions_added: int = 0,
+ duplicates_removed: int = 0
+ ):
+ self.exam_id = exam_id
+ self.status = status
+ self.message = message
+ self.progress = progress # 0-100
+ self.total_chunks = total_chunks
+ self.current_chunk = current_chunk
+ self.questions_extracted = questions_extracted
+ self.questions_added = questions_added
+ self.duplicates_removed = duplicates_removed
+ self.timestamp = datetime.now().isoformat()
+
+ def to_dict(self) -> dict:
+ """Convert to dictionary for JSON serialization"""
+ return {
+ "exam_id": self.exam_id,
+ "status": self.status.value,
+ "message": self.message,
+ "progress": round(self.progress, 1),
+ "total_chunks": self.total_chunks,
+ "current_chunk": self.current_chunk,
+ "questions_extracted": self.questions_extracted,
+ "questions_added": self.questions_added,
+ "duplicates_removed": self.duplicates_removed,
+ "timestamp": self.timestamp
+ }
+
+
+class ProgressService:
+ """Service for managing parsing progress"""
+
+ def __init__(self):
+ # Store progress updates for each exam
+ self._progress: Dict[int, ProgressUpdate] = {}
+ # Store queues for SSE connections
+ self._queues: Dict[int, list] = {}
+
+ async def update_progress(self, update: ProgressUpdate):
+ """
+ Update progress for an exam and notify all listeners
+
+ Args:
+ update: Progress update object
+ """
+ exam_id = update.exam_id
+ self._progress[exam_id] = update
+
+ # Send to all connected SSE clients for this exam
+ if exam_id in self._queues:
+ dead_queues = []
+ for queue in self._queues[exam_id]:
+ try:
+ await queue.put(update)
+ except Exception as e:
+ print(f"[Progress] Failed to send update to queue: {e}")
+ dead_queues.append(queue)
+
+ # Clean up dead queues
+ for dead_queue in dead_queues:
+ self._queues[exam_id].remove(dead_queue)
+
+ def get_progress(self, exam_id: int) -> Optional[ProgressUpdate]:
+ """Get current progress for an exam"""
+ return self._progress.get(exam_id)
+
+ async def subscribe(self, exam_id: int) -> AsyncGenerator[ProgressUpdate, None]:
+ """
+ Subscribe to progress updates for an exam (SSE stream)
+
+ Args:
+ exam_id: Exam ID to subscribe to
+
+ Yields:
+ Progress updates as they occur
+ """
+ # Create a queue for this connection
+ queue = asyncio.Queue()
+
+ # Register the queue
+ if exam_id not in self._queues:
+ self._queues[exam_id] = []
+ self._queues[exam_id].append(queue)
+
+ try:
+ # Send current progress if exists
+ current_progress = self.get_progress(exam_id)
+ if current_progress:
+ yield current_progress
+
+ # Stream updates
+ while True:
+ update = await queue.get()
+ yield update
+
+ # Stop streaming if completed or failed
+ if update.status in [ProgressStatus.COMPLETED, ProgressStatus.FAILED]:
+ break
+
+ finally:
+ # Cleanup
+ if exam_id in self._queues and queue in self._queues[exam_id]:
+ self._queues[exam_id].remove(queue)
+ if not self._queues[exam_id]:
+ del self._queues[exam_id]
+
+ def clear_progress(self, exam_id: int):
+ """Clear progress data for an exam"""
+ if exam_id in self._progress:
+ del self._progress[exam_id]
+ if exam_id in self._queues:
+ del self._queues[exam_id]
+
+
+# Singleton instance
+progress_service = ProgressService()
diff --git a/frontend/src/components/ParsingProgress.jsx b/frontend/src/components/ParsingProgress.jsx
new file mode 100644
index 0000000..8c30020
--- /dev/null
+++ b/frontend/src/components/ParsingProgress.jsx
@@ -0,0 +1,121 @@
+/**
+ * Parsing Progress Component
+ * Displays real-time progress for document parsing
+ */
+import React from 'react'
+import { Loader, CheckCircle, XCircle, FileText, Layers } from 'lucide-react'
+
+export const ParsingProgress = ({ progress }) => {
+ if (!progress) return null
+
+ const { status, message, progress: percentage, total_chunks, current_chunk, questions_extracted, questions_added, duplicates_removed } = progress
+
+ const getStatusIcon = () => {
+ switch (status) {
+ case 'completed':
+ return
{message}
+ + {/* Progress Bar */} + {status !== 'completed' && status !== 'failed' && ( ++ {current_chunk}/{total_chunks} +
+部分
+{questions_extracted}
+题目
+{questions_added}
+题目
+{duplicates_removed}
+题目
+