From f2945e12d9f1459483b85e3979cff6410d9c4fd7 Mon Sep 17 00:00:00 2001 From: Kai Chappell Date: Sat, 26 Apr 2025 17:39:10 +0100 Subject: [PATCH] feat(backend): add data loading scripts --- backend/scripts/load_data.py | 219 +++++++++++++++++++++++++++ backend/scripts/validate_content.py | 227 ++++++++++++++++++++++++++++ 2 files changed, 446 insertions(+) create mode 100644 backend/scripts/load_data.py create mode 100644 backend/scripts/validate_content.py diff --git a/backend/scripts/load_data.py b/backend/scripts/load_data.py new file mode 100644 index 0000000..5220f05 --- /dev/null +++ b/backend/scripts/load_data.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python +"""Load YAML content data into the database.""" + +import asyncio +import sys +from pathlib import Path +from typing import Any + +import yaml +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +# Add src to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from src.db.database import async_session_factory +from src.models import Category, Difficulty, Explanation, Pattern, Question, Solution + + +async def load_categories(session: AsyncSession, data_dir: Path) -> dict[str, Category]: + """Load categories from YAML file.""" + categories_file = data_dir / "categories" / "categories.yaml" + if not categories_file.exists(): + print(f"Warning: {categories_file} not found") + return {} + + with open(categories_file) as f: + data = yaml.safe_load(f) + + categories: dict[str, Category] = {} + for item in data.get("categories", []): + result = await session.execute(select(Category).where(Category.slug == item["slug"])) + existing = result.scalar_one_or_none() + + if existing: + existing.name = item["name"] + existing.description = item.get("description") + categories[item["slug"]] = existing + else: + category = Category( + name=item["name"], + slug=item["slug"], + description=item.get("description"), + ) + session.add(category) + categories[item["slug"]] = category + + await session.flush() + print(f"Loaded {len(categories)} categories") + return categories + + +async def load_patterns(session: AsyncSession, data_dir: Path) -> dict[str, Pattern]: + """Load patterns from YAML file.""" + patterns_file = data_dir / "patterns" / "patterns.yaml" + if not patterns_file.exists(): + print(f"Warning: {patterns_file} not found") + return {} + + with open(patterns_file) as f: + data = yaml.safe_load(f) + + patterns: dict[str, Pattern] = {} + for item in data.get("patterns", []): + result = await session.execute(select(Pattern).where(Pattern.slug == item["slug"])) + existing = result.scalar_one_or_none() + + if existing: + existing.name = item["name"] + existing.description = item.get("description") + existing.when_to_use = item.get("when_to_use") + patterns[item["slug"]] = existing + else: + pattern = Pattern( + name=item["name"], + slug=item["slug"], + description=item.get("description"), + when_to_use=item.get("when_to_use"), + ) + session.add(pattern) + patterns[item["slug"]] = pattern + + await session.flush() + print(f"Loaded {len(patterns)} patterns") + return patterns + + +async def load_question( + session: AsyncSession, + question_file: Path, + categories: dict[str, Category], + patterns: dict[str, Pattern], +) -> None: + """Load a single question from YAML file.""" + with open(question_file) as f: + data: dict[str, Any] = yaml.safe_load(f) + + slug = data["slug"] + result = await session.execute(select(Question).where(Question.slug == slug)) + existing = result.scalar_one_or_none() + + if existing: + question = existing + question.title = data["title"] + question.difficulty = Difficulty(data["difficulty"]) + question.description = data["description"] + question.constraints = data.get("constraints") + question.examples = data.get("examples") + question.leetcode_id = data.get("leetcode_id") + question.leetcode_url = data.get("leetcode_url") + else: + question = Question( + title=data["title"], + slug=slug, + difficulty=Difficulty(data["difficulty"]), + description=data["description"], + constraints=data.get("constraints"), + examples=data.get("examples"), + leetcode_id=data.get("leetcode_id"), + leetcode_url=data.get("leetcode_url"), + ) + session.add(question) + + # Link categories + question.categories = [ + categories[cat_slug] for cat_slug in data.get("categories", []) if cat_slug in categories + ] + + # Link patterns + question.patterns = [ + patterns[pat_slug] for pat_slug in data.get("patterns", []) if pat_slug in patterns + ] + + await session.flush() + + # Handle explanation + if "explanation" in data: + exp_data = data["explanation"] + if question.explanation: + explanation = question.explanation + explanation.approach = exp_data["approach"] + explanation.intuition = exp_data["intuition"] + explanation.common_pitfalls = exp_data.get("common_pitfalls") + explanation.key_takeaways = exp_data.get("key_takeaways") + explanation.time_complexity = exp_data["time_complexity"] + explanation.space_complexity = exp_data["space_complexity"] + explanation.complexity_explanation = exp_data.get("complexity_explanation") + else: + explanation = Explanation( + question_id=question.id, + approach=exp_data["approach"], + intuition=exp_data["intuition"], + common_pitfalls=exp_data.get("common_pitfalls"), + key_takeaways=exp_data.get("key_takeaways"), + time_complexity=exp_data["time_complexity"], + space_complexity=exp_data["space_complexity"], + complexity_explanation=exp_data.get("complexity_explanation"), + ) + session.add(explanation) + + # Handle solutions (delete existing and recreate) + if existing and existing.solutions: + for sol in existing.solutions: + await session.delete(sol) + await session.flush() + + for sol_data in data.get("solutions", []): + solution = Solution( + question_id=question.id, + approach_name=sol_data["approach_name"], + code=sol_data["code"], + language=sol_data.get("language", "python"), + is_optimal=sol_data.get("is_optimal", False), + explanation=sol_data.get("explanation"), + ) + session.add(solution) + + print(f" Loaded: {data['title']}") + + +async def load_questions( + session: AsyncSession, + data_dir: Path, + categories: dict[str, Category], + patterns: dict[str, Pattern], +) -> int: + """Load all questions from YAML files.""" + questions_dir = data_dir / "questions" + if not questions_dir.exists(): + print(f"Warning: {questions_dir} not found") + return 0 + + count = 0 + for question_file in sorted(questions_dir.glob("*.yaml")): + await load_question(session, question_file, categories, patterns) + count += 1 + + return count + + +async def main() -> None: + """Load all content data into the database.""" + data_dir = Path(__file__).parent.parent / "data" + + print("Loading content data...") + print(f"Data directory: {data_dir}") + + async with async_session_factory() as session: + categories = await load_categories(session, data_dir) + patterns = await load_patterns(session, data_dir) + question_count = await load_questions(session, data_dir, categories, patterns) + + await session.commit() + + print(f"\nDone! Loaded {question_count} questions.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/backend/scripts/validate_content.py b/backend/scripts/validate_content.py new file mode 100644 index 0000000..2c70990 --- /dev/null +++ b/backend/scripts/validate_content.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python +"""Validate YAML content files for correctness.""" + +import sys +from pathlib import Path +from typing import Any + +import yaml + + +def validate_categories(data_dir: Path) -> tuple[set[str], list[str]]: + """Validate categories.yaml and return valid slugs.""" + errors: list[str] = [] + slugs: set[str] = set() + + categories_file = data_dir / "categories" / "categories.yaml" + if not categories_file.exists(): + errors.append(f"Missing file: {categories_file}") + return slugs, errors + + with open(categories_file) as f: + data = yaml.safe_load(f) + + if "categories" not in data: + errors.append("categories.yaml: missing 'categories' key") + return slugs, errors + + for i, cat in enumerate(data["categories"]): + prefix = f"categories.yaml[{i}]" + if "name" not in cat: + errors.append(f"{prefix}: missing 'name'") + if "slug" not in cat: + errors.append(f"{prefix}: missing 'slug'") + else: + if cat["slug"] in slugs: + errors.append(f"{prefix}: duplicate slug '{cat['slug']}'") + slugs.add(cat["slug"]) + + return slugs, errors + + +def validate_patterns(data_dir: Path) -> tuple[set[str], list[str]]: + """Validate patterns.yaml and return valid slugs.""" + errors: list[str] = [] + slugs: set[str] = set() + + patterns_file = data_dir / "patterns" / "patterns.yaml" + if not patterns_file.exists(): + errors.append(f"Missing file: {patterns_file}") + return slugs, errors + + with open(patterns_file) as f: + data = yaml.safe_load(f) + + if "patterns" not in data: + errors.append("patterns.yaml: missing 'patterns' key") + return slugs, errors + + for i, pat in enumerate(data["patterns"]): + prefix = f"patterns.yaml[{i}]" + if "name" not in pat: + errors.append(f"{prefix}: missing 'name'") + if "slug" not in pat: + errors.append(f"{prefix}: missing 'slug'") + else: + if pat["slug"] in slugs: + errors.append(f"{prefix}: duplicate slug '{pat['slug']}'") + slugs.add(pat["slug"]) + + return slugs, errors + + +def validate_question( + question_file: Path, + valid_categories: set[str], + valid_patterns: set[str], + seen_slugs: set[str], + seen_leetcode_ids: set[int], +) -> list[str]: + """Validate a single question file.""" + errors: list[str] = [] + filename = question_file.name + + try: + with open(question_file) as f: + data: dict[str, Any] = yaml.safe_load(f) + except yaml.YAMLError as e: + errors.append(f"{filename}: invalid YAML - {e}") + return errors + + # Required fields + required = ["title", "slug", "difficulty", "description"] + for field in required: + if field not in data: + errors.append(f"{filename}: missing required field '{field}'") + + # Validate slug + if "slug" in data: + if data["slug"] in seen_slugs: + errors.append(f"{filename}: duplicate slug '{data['slug']}'") + seen_slugs.add(data["slug"]) + + # Slug should match filename + expected_filename = f"{data['slug']}.yaml" + if question_file.name != expected_filename: + errors.append(f"{filename}: filename should be '{expected_filename}'") + + # Validate difficulty + if "difficulty" in data: + valid_difficulties = {"easy", "medium", "hard"} + if data["difficulty"] not in valid_difficulties: + errors.append( + f"{filename}: invalid difficulty '{data['difficulty']}' " + f"(must be one of {valid_difficulties})" + ) + + # Validate categories + for cat in data.get("categories", []): + if cat not in valid_categories: + errors.append(f"{filename}: unknown category '{cat}'") + + # Validate patterns + for pat in data.get("patterns", []): + if pat not in valid_patterns: + errors.append(f"{filename}: unknown pattern '{pat}'") + + # Validate leetcode_id uniqueness + if "leetcode_id" in data and data["leetcode_id"] is not None: + lid = data["leetcode_id"] + if lid in seen_leetcode_ids: + errors.append(f"{filename}: duplicate leetcode_id {lid}") + seen_leetcode_ids.add(lid) + + # Validate explanation + if "explanation" in data: + exp = data["explanation"] + exp_required = ["approach", "intuition", "time_complexity", "space_complexity"] + for field in exp_required: + if field not in exp: + errors.append(f"{filename}: explanation missing '{field}'") + + # Validate solutions + if "solutions" in data: + for i, sol in enumerate(data["solutions"]): + if "approach_name" not in sol: + errors.append(f"{filename}: solutions[{i}] missing 'approach_name'") + if "code" not in sol: + errors.append(f"{filename}: solutions[{i}] missing 'code'") + else: + errors.append(f"{filename}: missing 'solutions'") + + return errors + + +def validate_questions( + data_dir: Path, + valid_categories: set[str], + valid_patterns: set[str], +) -> list[str]: + """Validate all question files.""" + errors: list[str] = [] + questions_dir = data_dir / "questions" + + if not questions_dir.exists(): + errors.append(f"Missing directory: {questions_dir}") + return errors + + seen_slugs: set[str] = set() + seen_leetcode_ids: set[int] = set() + + question_files = list(questions_dir.glob("*.yaml")) + if not question_files: + errors.append("No question files found") + return errors + + for question_file in sorted(question_files): + file_errors = validate_question( + question_file, + valid_categories, + valid_patterns, + seen_slugs, + seen_leetcode_ids, + ) + errors.extend(file_errors) + + return errors + + +def main() -> int: + """Validate all content files.""" + data_dir = Path(__file__).parent.parent / "data" + + print(f"Validating content in {data_dir}...\n") + + all_errors: list[str] = [] + + # Validate categories + valid_categories, cat_errors = validate_categories(data_dir) + all_errors.extend(cat_errors) + print(f"Categories: {len(valid_categories)} valid, {len(cat_errors)} errors") + + # Validate patterns + valid_patterns, pat_errors = validate_patterns(data_dir) + all_errors.extend(pat_errors) + print(f"Patterns: {len(valid_patterns)} valid, {len(pat_errors)} errors") + + # Validate questions + questions_dir = data_dir / "questions" + question_count = len(list(questions_dir.glob("*.yaml"))) if questions_dir.exists() else 0 + question_errors = validate_questions(data_dir, valid_categories, valid_patterns) + all_errors.extend(question_errors) + print(f"Questions: {question_count} files, {len(question_errors)} errors") + + print() + + if all_errors: + print("Validation errors:") + for error in all_errors: + print(f" - {error}") + return 1 + + print("All content is valid!") + return 0 + + +if __name__ == "__main__": + sys.exit(main())