feat(backend): add data loading scripts

This commit is contained in:
2025-04-26 17:39:10 +01:00
parent e6c971f371
commit 1070b1e1fb
2 changed files with 446 additions and 0 deletions

View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python
"""Load YAML content data into the database."""
import asyncio
import sys
from pathlib import Path
from typing import Any
import yaml
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.db.database import async_session_factory
from src.models import Category, Difficulty, Explanation, Pattern, Question, Solution
async def load_categories(session: AsyncSession, data_dir: Path) -> dict[str, Category]:
"""Load categories from YAML file."""
categories_file = data_dir / "categories" / "categories.yaml"
if not categories_file.exists():
print(f"Warning: {categories_file} not found")
return {}
with open(categories_file) as f:
data = yaml.safe_load(f)
categories: dict[str, Category] = {}
for item in data.get("categories", []):
result = await session.execute(select(Category).where(Category.slug == item["slug"]))
existing = result.scalar_one_or_none()
if existing:
existing.name = item["name"]
existing.description = item.get("description")
categories[item["slug"]] = existing
else:
category = Category(
name=item["name"],
slug=item["slug"],
description=item.get("description"),
)
session.add(category)
categories[item["slug"]] = category
await session.flush()
print(f"Loaded {len(categories)} categories")
return categories
async def load_patterns(session: AsyncSession, data_dir: Path) -> dict[str, Pattern]:
"""Load patterns from YAML file."""
patterns_file = data_dir / "patterns" / "patterns.yaml"
if not patterns_file.exists():
print(f"Warning: {patterns_file} not found")
return {}
with open(patterns_file) as f:
data = yaml.safe_load(f)
patterns: dict[str, Pattern] = {}
for item in data.get("patterns", []):
result = await session.execute(select(Pattern).where(Pattern.slug == item["slug"]))
existing = result.scalar_one_or_none()
if existing:
existing.name = item["name"]
existing.description = item.get("description")
existing.when_to_use = item.get("when_to_use")
patterns[item["slug"]] = existing
else:
pattern = Pattern(
name=item["name"],
slug=item["slug"],
description=item.get("description"),
when_to_use=item.get("when_to_use"),
)
session.add(pattern)
patterns[item["slug"]] = pattern
await session.flush()
print(f"Loaded {len(patterns)} patterns")
return patterns
async def load_question(
session: AsyncSession,
question_file: Path,
categories: dict[str, Category],
patterns: dict[str, Pattern],
) -> None:
"""Load a single question from YAML file."""
with open(question_file) as f:
data: dict[str, Any] = yaml.safe_load(f)
slug = data["slug"]
result = await session.execute(select(Question).where(Question.slug == slug))
existing = result.scalar_one_or_none()
if existing:
question = existing
question.title = data["title"]
question.difficulty = Difficulty(data["difficulty"])
question.description = data["description"]
question.constraints = data.get("constraints")
question.examples = data.get("examples")
question.leetcode_id = data.get("leetcode_id")
question.leetcode_url = data.get("leetcode_url")
else:
question = Question(
title=data["title"],
slug=slug,
difficulty=Difficulty(data["difficulty"]),
description=data["description"],
constraints=data.get("constraints"),
examples=data.get("examples"),
leetcode_id=data.get("leetcode_id"),
leetcode_url=data.get("leetcode_url"),
)
session.add(question)
# Link categories
question.categories = [
categories[cat_slug] for cat_slug in data.get("categories", []) if cat_slug in categories
]
# Link patterns
question.patterns = [
patterns[pat_slug] for pat_slug in data.get("patterns", []) if pat_slug in patterns
]
await session.flush()
# Handle explanation
if "explanation" in data:
exp_data = data["explanation"]
if question.explanation:
explanation = question.explanation
explanation.approach = exp_data["approach"]
explanation.intuition = exp_data["intuition"]
explanation.common_pitfalls = exp_data.get("common_pitfalls")
explanation.key_takeaways = exp_data.get("key_takeaways")
explanation.time_complexity = exp_data["time_complexity"]
explanation.space_complexity = exp_data["space_complexity"]
explanation.complexity_explanation = exp_data.get("complexity_explanation")
else:
explanation = Explanation(
question_id=question.id,
approach=exp_data["approach"],
intuition=exp_data["intuition"],
common_pitfalls=exp_data.get("common_pitfalls"),
key_takeaways=exp_data.get("key_takeaways"),
time_complexity=exp_data["time_complexity"],
space_complexity=exp_data["space_complexity"],
complexity_explanation=exp_data.get("complexity_explanation"),
)
session.add(explanation)
# Handle solutions (delete existing and recreate)
if existing and existing.solutions:
for sol in existing.solutions:
await session.delete(sol)
await session.flush()
for sol_data in data.get("solutions", []):
solution = Solution(
question_id=question.id,
approach_name=sol_data["approach_name"],
code=sol_data["code"],
language=sol_data.get("language", "python"),
is_optimal=sol_data.get("is_optimal", False),
explanation=sol_data.get("explanation"),
)
session.add(solution)
print(f" Loaded: {data['title']}")
async def load_questions(
session: AsyncSession,
data_dir: Path,
categories: dict[str, Category],
patterns: dict[str, Pattern],
) -> int:
"""Load all questions from YAML files."""
questions_dir = data_dir / "questions"
if not questions_dir.exists():
print(f"Warning: {questions_dir} not found")
return 0
count = 0
for question_file in sorted(questions_dir.glob("*.yaml")):
await load_question(session, question_file, categories, patterns)
count += 1
return count
async def main() -> None:
"""Load all content data into the database."""
data_dir = Path(__file__).parent.parent / "data"
print("Loading content data...")
print(f"Data directory: {data_dir}")
async with async_session_factory() as session:
categories = await load_categories(session, data_dir)
patterns = await load_patterns(session, data_dir)
question_count = await load_questions(session, data_dir, categories, patterns)
await session.commit()
print(f"\nDone! Loaded {question_count} questions.")
if __name__ == "__main__":
asyncio.run(main())