96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
from uuid import UUID
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from src.models import Category, Difficulty, Pattern, Question
|
|
from src.models.question import QuestionPattern
|
|
|
|
|
|
class QuestionService:
|
|
def __init__(self, db: AsyncSession) -> None:
|
|
self.db = db
|
|
|
|
async def get_questions(
|
|
self,
|
|
*,
|
|
page: int = 1,
|
|
limit: int = 20,
|
|
difficulties: list[Difficulty] | None = None,
|
|
category_slug: str | None = None,
|
|
pattern_slug: str | None = None,
|
|
search: str | None = None,
|
|
) -> tuple[list[Question], int]:
|
|
query = (
|
|
select(Question)
|
|
.options(
|
|
selectinload(Question.categories),
|
|
selectinload(Question.patterns),
|
|
selectinload(Question.question_patterns).selectinload(QuestionPattern.pattern),
|
|
)
|
|
.order_by(Question.created_at.desc())
|
|
)
|
|
|
|
count_query = select(func.count(Question.id))
|
|
|
|
if difficulties:
|
|
query = query.where(Question.difficulty.in_(difficulties))
|
|
count_query = count_query.where(Question.difficulty.in_(difficulties))
|
|
|
|
if category_slug:
|
|
query = query.join(Question.categories).where(Category.slug == category_slug)
|
|
count_query = count_query.join(Question.categories).where(
|
|
Category.slug == category_slug
|
|
)
|
|
|
|
if pattern_slug:
|
|
query = query.join(Question.patterns).where(Pattern.slug == pattern_slug)
|
|
count_query = count_query.join(Question.patterns).where(Pattern.slug == pattern_slug)
|
|
|
|
if search:
|
|
search_filter = Question.title.ilike(f"%{search}%")
|
|
query = query.where(search_filter)
|
|
count_query = count_query.where(search_filter)
|
|
|
|
total_result = await self.db.execute(count_query)
|
|
total = total_result.scalar() or 0
|
|
|
|
offset = (page - 1) * limit
|
|
query = query.offset(offset).limit(limit)
|
|
|
|
result = await self.db.execute(query)
|
|
questions = list(result.scalars().unique().all())
|
|
|
|
return questions, total
|
|
|
|
async def get_question_by_slug(self, slug: str) -> Question | None:
|
|
query = (
|
|
select(Question)
|
|
.options(
|
|
selectinload(Question.categories),
|
|
selectinload(Question.patterns),
|
|
selectinload(Question.question_patterns).selectinload(QuestionPattern.pattern),
|
|
selectinload(Question.explanation),
|
|
selectinload(Question.solutions),
|
|
)
|
|
.where(Question.slug == slug)
|
|
)
|
|
result = await self.db.execute(query)
|
|
return result.scalar_one_or_none()
|
|
|
|
async def get_question_by_id(self, question_id: UUID) -> Question | None:
|
|
query = (
|
|
select(Question)
|
|
.options(
|
|
selectinload(Question.categories),
|
|
selectinload(Question.patterns),
|
|
selectinload(Question.question_patterns).selectinload(QuestionPattern.pattern),
|
|
selectinload(Question.explanation),
|
|
selectinload(Question.solutions),
|
|
)
|
|
.where(Question.id == question_id)
|
|
)
|
|
result = await self.db.execute(query)
|
|
return result.scalar_one_or_none()
|