905 行
27 KiB
Python
905 行
27 KiB
Python
#!/usr/bin/env python3
|
|
"""Import Luogu CSP-J/S beginner problem set into local SQLite."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from urllib.parse import quote
|
|
|
|
import requests
|
|
|
|
|
|
DEFAULT_BASE_URL = "https://www.luogu.com.cn"
|
|
DEFAULT_TAG_IDS = [343, 342, 82, 83] # CSP-J, CSP-S, NOIP-junior, NOIP-senior
|
|
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
|
|
CONTEXT_RE = re.compile(
|
|
r'<script[^>]*id="lentille-context"[^>]*>(.*?)</script>', re.DOTALL
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class LuoguListItem:
|
|
pid: str
|
|
title: str
|
|
difficulty: int
|
|
tags: list[int]
|
|
total_submit: int
|
|
total_accepted: int
|
|
type: str
|
|
|
|
|
|
@dataclass
|
|
class UpsertRecord:
|
|
slug: str
|
|
title: str
|
|
statement_md: str
|
|
difficulty: int
|
|
source: str
|
|
statement_url: str
|
|
llm_profile_json: str
|
|
sample_input: str
|
|
sample_output: str
|
|
tags: list[str]
|
|
|
|
|
|
def now_sec() -> int:
|
|
return int(time.time())
|
|
|
|
|
|
def requests_retry_text(
|
|
session: requests.Session,
|
|
url: str,
|
|
*,
|
|
timeout: int,
|
|
retries: int,
|
|
sleep_sec: float,
|
|
) -> str:
|
|
last_error: Exception | None = None
|
|
for attempt in range(1, retries + 1):
|
|
try:
|
|
resp = session.get(url, timeout=timeout)
|
|
except requests.RequestException as exc:
|
|
last_error = exc
|
|
if attempt < retries:
|
|
time.sleep(sleep_sec * attempt)
|
|
continue
|
|
raise RuntimeError(f"GET failed: {url}: {exc}") from exc
|
|
|
|
if resp.status_code in RETRYABLE_STATUS:
|
|
if attempt < retries:
|
|
time.sleep(sleep_sec * attempt)
|
|
continue
|
|
raise RuntimeError(f"GET failed after retry: {url}: {resp.status_code}")
|
|
if resp.status_code >= 400:
|
|
raise RuntimeError(f"GET failed: {url}: {resp.status_code}")
|
|
return resp.text
|
|
|
|
if last_error:
|
|
raise RuntimeError(f"GET failed: {url}: {last_error}") from last_error
|
|
raise RuntimeError(f"GET failed: {url}: unknown error")
|
|
|
|
|
|
def extract_context_json(html_text: str) -> dict[str, Any]:
|
|
match = CONTEXT_RE.search(html_text)
|
|
if not match:
|
|
raise RuntimeError("lentille-context script not found")
|
|
try:
|
|
return json.loads(match.group(1))
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError("failed to parse lentille-context json") from exc
|
|
|
|
|
|
def parse_tag_ids(raw: str) -> list[int]:
|
|
out: list[int] = []
|
|
for part in raw.split(","):
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
out.append(int(part))
|
|
if not out:
|
|
raise ValueError("at least one tag id is required")
|
|
return out
|
|
|
|
|
|
def normalize_tag(text: str) -> str:
|
|
lower = text.strip().lower()
|
|
compact = re.sub(r"[^a-z0-9]+", "-", lower).strip("-")
|
|
return compact or text.strip()
|
|
|
|
|
|
def ensure_problem_columns(conn: sqlite3.Connection) -> None:
|
|
cur = conn.cursor()
|
|
cur.execute("PRAGMA table_info(problems)")
|
|
cols = {str(row[1]) for row in cur.fetchall()}
|
|
needed = {
|
|
"sample_input": "ALTER TABLE problems ADD COLUMN sample_input TEXT NOT NULL DEFAULT ''",
|
|
"sample_output": "ALTER TABLE problems ADD COLUMN sample_output TEXT NOT NULL DEFAULT ''",
|
|
"statement_url": "ALTER TABLE problems ADD COLUMN statement_url TEXT NOT NULL DEFAULT ''",
|
|
"llm_profile_json": "ALTER TABLE problems ADD COLUMN llm_profile_json TEXT NOT NULL DEFAULT '{}'",
|
|
}
|
|
for col, sql in needed.items():
|
|
if col not in cols:
|
|
cur.execute(sql)
|
|
conn.commit()
|
|
|
|
|
|
def ensure_core_tables(conn: sqlite3.Connection) -> None:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS problems (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
slug TEXT NOT NULL UNIQUE,
|
|
title TEXT NOT NULL,
|
|
statement_md TEXT NOT NULL,
|
|
difficulty INTEGER NOT NULL DEFAULT 1,
|
|
source TEXT NOT NULL DEFAULT '',
|
|
statement_url TEXT NOT NULL DEFAULT '',
|
|
llm_profile_json TEXT NOT NULL DEFAULT '{}',
|
|
sample_input TEXT NOT NULL DEFAULT '',
|
|
sample_output TEXT NOT NULL DEFAULT '',
|
|
created_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS problem_tags (
|
|
problem_id INTEGER NOT NULL,
|
|
tag TEXT NOT NULL,
|
|
PRIMARY KEY(problem_id, tag)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_problem_tags_tag ON problem_tags(tag)")
|
|
conn.commit()
|
|
|
|
|
|
def ensure_import_tables(conn: sqlite3.Connection) -> None:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS import_jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
status TEXT NOT NULL,
|
|
trigger TEXT NOT NULL DEFAULT 'manual',
|
|
total_count INTEGER NOT NULL DEFAULT 0,
|
|
processed_count INTEGER NOT NULL DEFAULT 0,
|
|
success_count INTEGER NOT NULL DEFAULT 0,
|
|
failed_count INTEGER NOT NULL DEFAULT 0,
|
|
options_json TEXT NOT NULL DEFAULT '{}',
|
|
last_error TEXT NOT NULL DEFAULT '',
|
|
started_at INTEGER NOT NULL,
|
|
finished_at INTEGER,
|
|
updated_at INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS import_job_items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id INTEGER NOT NULL,
|
|
source_path TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
title TEXT NOT NULL DEFAULT '',
|
|
difficulty INTEGER NOT NULL DEFAULT 0,
|
|
problem_id INTEGER,
|
|
error_text TEXT NOT NULL DEFAULT '',
|
|
started_at INTEGER,
|
|
finished_at INTEGER,
|
|
updated_at INTEGER NOT NULL,
|
|
created_at INTEGER NOT NULL,
|
|
UNIQUE(job_id, source_path)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_import_jobs_created_at ON import_jobs(created_at DESC)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_import_job_items_job_status "
|
|
"ON import_job_items(job_id, status, updated_at DESC)"
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def create_import_job(
|
|
conn: sqlite3.Connection, trigger: str, total_count: int, options_json: str
|
|
) -> int:
|
|
ts = now_sec()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO import_jobs(
|
|
status,trigger,total_count,processed_count,success_count,failed_count,
|
|
options_json,last_error,started_at,finished_at,updated_at,created_at
|
|
) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
"running",
|
|
trigger or "manual",
|
|
total_count,
|
|
0,
|
|
0,
|
|
0,
|
|
options_json,
|
|
"",
|
|
ts,
|
|
None,
|
|
ts,
|
|
ts,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def seed_import_items(
|
|
conn: sqlite3.Connection, job_id: int, items: list[LuoguListItem]
|
|
) -> None:
|
|
ts = now_sec()
|
|
cur = conn.cursor()
|
|
cur.executemany(
|
|
"""
|
|
INSERT OR IGNORE INTO import_job_items(
|
|
job_id,source_path,status,title,difficulty,problem_id,error_text,
|
|
started_at,finished_at,updated_at,created_at
|
|
) VALUES(?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
job_id,
|
|
item.pid,
|
|
"queued",
|
|
"",
|
|
0,
|
|
None,
|
|
"",
|
|
None,
|
|
None,
|
|
ts,
|
|
ts,
|
|
)
|
|
for item in items
|
|
],
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def update_import_item_success(
|
|
conn: sqlite3.Connection,
|
|
job_id: int,
|
|
source_path: str,
|
|
title: str,
|
|
difficulty: int,
|
|
problem_id: int,
|
|
note: str = "",
|
|
) -> None:
|
|
ts = now_sec()
|
|
conn.execute(
|
|
"""
|
|
UPDATE import_job_items
|
|
SET status='success',
|
|
title=?,
|
|
difficulty=?,
|
|
problem_id=?,
|
|
error_text=?,
|
|
started_at=COALESCE(started_at, ?),
|
|
finished_at=?,
|
|
updated_at=?
|
|
WHERE job_id=? AND source_path=?
|
|
""",
|
|
(title, difficulty, problem_id, note, ts, ts, ts, job_id, source_path),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def update_import_item_failed(
|
|
conn: sqlite3.Connection, job_id: int, source_path: str, error_text: str
|
|
) -> None:
|
|
ts = now_sec()
|
|
conn.execute(
|
|
"""
|
|
UPDATE import_job_items
|
|
SET status='failed',
|
|
error_text=?,
|
|
started_at=COALESCE(started_at, ?),
|
|
finished_at=?,
|
|
updated_at=?
|
|
WHERE job_id=? AND source_path=?
|
|
""",
|
|
(error_text[:500], ts, ts, ts, job_id, source_path),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def update_import_job_progress(
|
|
conn: sqlite3.Connection,
|
|
job_id: int,
|
|
processed_count: int,
|
|
success_count: int,
|
|
failed_count: int,
|
|
last_error: str,
|
|
) -> None:
|
|
ts = now_sec()
|
|
conn.execute(
|
|
"""
|
|
UPDATE import_jobs
|
|
SET processed_count=?,
|
|
success_count=?,
|
|
failed_count=?,
|
|
last_error=?,
|
|
updated_at=?
|
|
WHERE id=?
|
|
""",
|
|
(processed_count, success_count, failed_count, last_error[:500], ts, job_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def finish_import_job(
|
|
conn: sqlite3.Connection,
|
|
job_id: int,
|
|
success_count: int,
|
|
failed_count: int,
|
|
last_error: str,
|
|
) -> None:
|
|
ts = now_sec()
|
|
status = "completed" if failed_count == 0 else "completed_with_errors"
|
|
conn.execute(
|
|
"""
|
|
UPDATE import_jobs
|
|
SET status=?,
|
|
processed_count=total_count,
|
|
success_count=?,
|
|
failed_count=?,
|
|
last_error=?,
|
|
finished_at=?,
|
|
updated_at=?
|
|
WHERE id=?
|
|
""",
|
|
(status, success_count, failed_count, last_error[:500], ts, ts, job_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def upsert_problem(conn: sqlite3.Connection, rec: UpsertRecord) -> tuple[int, bool]:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM problems WHERE slug=?", (rec.slug,))
|
|
row = cur.fetchone()
|
|
|
|
if row is None:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO problems(
|
|
slug,title,statement_md,difficulty,source,statement_url,llm_profile_json,
|
|
sample_input,sample_output,created_at
|
|
) VALUES(?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
rec.slug,
|
|
rec.title,
|
|
rec.statement_md,
|
|
rec.difficulty,
|
|
rec.source,
|
|
rec.statement_url,
|
|
rec.llm_profile_json,
|
|
rec.sample_input,
|
|
rec.sample_output,
|
|
now_sec(),
|
|
),
|
|
)
|
|
problem_id = int(cur.lastrowid)
|
|
inserted = True
|
|
else:
|
|
problem_id = int(row[0])
|
|
cur.execute(
|
|
"""
|
|
UPDATE problems
|
|
SET title=?,statement_md=?,difficulty=?,source=?,statement_url=?,
|
|
llm_profile_json=?,sample_input=?,sample_output=?
|
|
WHERE id=?
|
|
""",
|
|
(
|
|
rec.title,
|
|
rec.statement_md,
|
|
rec.difficulty,
|
|
rec.source,
|
|
rec.statement_url,
|
|
rec.llm_profile_json,
|
|
rec.sample_input,
|
|
rec.sample_output,
|
|
problem_id,
|
|
),
|
|
)
|
|
inserted = False
|
|
|
|
cur.execute("DELETE FROM problem_tags WHERE problem_id=?", (problem_id,))
|
|
for tag in rec.tags:
|
|
cur.execute(
|
|
"INSERT OR IGNORE INTO problem_tags(problem_id,tag) VALUES(?,?)",
|
|
(problem_id, tag),
|
|
)
|
|
conn.commit()
|
|
return problem_id, inserted
|
|
|
|
|
|
def markdown_to_absolute(base_url: str, text: str) -> str:
|
|
if not text:
|
|
return ""
|
|
text = re.sub(r"\]\(/", f"]({base_url}/", text)
|
|
text = re.sub(r"!\[\]\(/", f"
|
|
return text
|
|
|
|
|
|
def build_statement_md(base_url: str, pid: str, detail: dict[str, Any]) -> str:
|
|
content = detail.get("content") or {}
|
|
title = str(detail.get("title") or pid).strip()
|
|
|
|
background = markdown_to_absolute(base_url, str(content.get("background") or "").strip())
|
|
description = markdown_to_absolute(base_url, str(content.get("description") or "").strip())
|
|
format_i = markdown_to_absolute(base_url, str(content.get("formatI") or "").strip())
|
|
format_o = markdown_to_absolute(base_url, str(content.get("formatO") or "").strip())
|
|
hint = markdown_to_absolute(base_url, str(content.get("hint") or "").strip())
|
|
|
|
lines = [
|
|
f"# {pid} {title}",
|
|
"",
|
|
f"- Source: Luogu",
|
|
f"- Problem URL: {base_url}/problem/{pid}",
|
|
]
|
|
if background:
|
|
lines += ["", "## Background", "", background]
|
|
if description:
|
|
lines += ["", "## Description", "", description]
|
|
if format_i:
|
|
lines += ["", "## Input Format", "", format_i]
|
|
if format_o:
|
|
lines += ["", "## Output Format", "", format_o]
|
|
if hint:
|
|
lines += ["", "## Hint", "", hint]
|
|
return "\n".join(lines).strip()
|
|
|
|
|
|
def build_record(
|
|
base_url: str,
|
|
list_item: LuoguListItem,
|
|
detail: dict[str, Any],
|
|
tag_catalog: dict[int, dict[str, Any]],
|
|
) -> UpsertRecord:
|
|
pid = list_item.pid
|
|
title = str(detail.get("title") or list_item.title or pid).strip()
|
|
difficulty = int(detail.get("difficulty") or list_item.difficulty or 1)
|
|
statement_url = f"{base_url}/problem/{pid}"
|
|
statement_md = build_statement_md(base_url, pid, detail)
|
|
|
|
samples = detail.get("samples") or []
|
|
sample_input = ""
|
|
sample_output = ""
|
|
if samples and isinstance(samples[0], list) and len(samples[0]) >= 2:
|
|
sample_input = str(samples[0][0] or "")
|
|
sample_output = str(samples[0][1] or "")
|
|
|
|
detail_tag_ids = detail.get("tags") or []
|
|
if not isinstance(detail_tag_ids, list):
|
|
detail_tag_ids = []
|
|
tag_ids = list(dict.fromkeys([*list_item.tags, *detail_tag_ids]))
|
|
|
|
tag_names: list[str] = []
|
|
knowledge_points: list[str] = []
|
|
normalized_tags: set[str] = {"luogu", "csp"}
|
|
for tid in tag_ids:
|
|
tag = tag_catalog.get(int(tid))
|
|
if not tag:
|
|
continue
|
|
name = str(tag.get("name") or "").strip()
|
|
if not name:
|
|
continue
|
|
tag_names.append(name)
|
|
normalized_tags.add(normalize_tag(name))
|
|
|
|
ttype = int(tag.get("type") or 0)
|
|
if ttype == 2 and len(knowledge_points) < 8:
|
|
knowledge_points.append(name)
|
|
|
|
upper_name = name.upper()
|
|
if "CSP-J" in upper_name:
|
|
normalized_tags.add("csp-j")
|
|
if "CSP-S" in upper_name:
|
|
normalized_tags.add("csp-s")
|
|
if "NOIP 普及" in name:
|
|
normalized_tags.add("noip-junior")
|
|
if "NOIP 提高" in name:
|
|
normalized_tags.add("noip-senior")
|
|
|
|
if not knowledge_points:
|
|
knowledge_points = tag_names[:6]
|
|
|
|
answer = "See official solutions/discussions and verify with your own proof."
|
|
explanation = (
|
|
"This problem is imported from Luogu. The statement and examples are preserved; "
|
|
"practice with your own derivation and compare with accepted solutions."
|
|
)
|
|
|
|
profile = {
|
|
"schema_version": 1,
|
|
"platform": "luogu",
|
|
"pid": pid,
|
|
"difficulty": difficulty,
|
|
"tags": tag_names,
|
|
"tag_ids": tag_ids,
|
|
"knowledge_points": knowledge_points,
|
|
"answer": answer,
|
|
"explanation": explanation,
|
|
"stats": {
|
|
"total_submit": int(list_item.total_submit),
|
|
"total_accepted": int(list_item.total_accepted),
|
|
},
|
|
"source": {
|
|
"url": statement_url,
|
|
"type": list_item.type,
|
|
},
|
|
"generated_at": now_sec(),
|
|
}
|
|
|
|
all_tags = sorted({t for t in normalized_tags if t})[:30]
|
|
return UpsertRecord(
|
|
slug=f"luogu-{pid.lower()}",
|
|
title=f"{pid} {title}",
|
|
statement_md=statement_md,
|
|
difficulty=max(1, min(10, difficulty)),
|
|
source=f"luogu:{pid}",
|
|
statement_url=statement_url,
|
|
llm_profile_json=json.dumps(profile, ensure_ascii=False),
|
|
sample_input=sample_input,
|
|
sample_output=sample_output,
|
|
tags=all_tags,
|
|
)
|
|
|
|
|
|
def build_fallback_record(
|
|
base_url: str,
|
|
list_item: LuoguListItem,
|
|
tag_catalog: dict[int, dict[str, Any]],
|
|
error_text: str,
|
|
) -> UpsertRecord:
|
|
fallback_detail: dict[str, Any] = {
|
|
"title": list_item.title,
|
|
"difficulty": list_item.difficulty,
|
|
"tags": list_item.tags,
|
|
"samples": [],
|
|
"content": {
|
|
"description": (
|
|
"题面抓取失败(已自动降级导入)。"
|
|
f"请访问原题链接查看完整题面:{base_url}/problem/{list_item.pid}"
|
|
)
|
|
},
|
|
}
|
|
rec = build_record(base_url, list_item, fallback_detail, tag_catalog)
|
|
profile = json.loads(rec.llm_profile_json)
|
|
profile["fallback_import"] = True
|
|
profile["fallback_reason"] = error_text[:240]
|
|
rec.llm_profile_json = json.dumps(profile, ensure_ascii=False)
|
|
return rec
|
|
|
|
|
|
def fetch_tag_catalog(
|
|
session: requests.Session,
|
|
base_url: str,
|
|
timeout: int,
|
|
retries: int,
|
|
sleep_sec: float,
|
|
) -> dict[int, dict[str, Any]]:
|
|
text = requests_retry_text(
|
|
session,
|
|
f"{base_url}/_lfe/tags/zh-CN",
|
|
timeout=timeout,
|
|
retries=retries,
|
|
sleep_sec=sleep_sec,
|
|
)
|
|
payload = json.loads(text)
|
|
tags = payload.get("tags") or []
|
|
out: dict[int, dict[str, Any]] = {}
|
|
for row in tags:
|
|
if not isinstance(row, dict) or "id" not in row:
|
|
continue
|
|
out[int(row["id"])] = row
|
|
return out
|
|
|
|
|
|
def fetch_list_page(
|
|
session: requests.Session,
|
|
base_url: str,
|
|
tags_csv: str,
|
|
page: int,
|
|
timeout: int,
|
|
retries: int,
|
|
sleep_sec: float,
|
|
) -> tuple[int, int, list[LuoguListItem]]:
|
|
url = f"{base_url}/problem/list?type=all&tag={quote(tags_csv)}&page={page}"
|
|
html_text = requests_retry_text(
|
|
session, url, timeout=timeout, retries=retries, sleep_sec=sleep_sec
|
|
)
|
|
ctx = extract_context_json(html_text)
|
|
problems = ((ctx.get("data") or {}).get("problems") or {})
|
|
count = int(problems.get("count") or 0)
|
|
per_page = int(problems.get("perPage") or 50)
|
|
|
|
result: list[LuoguListItem] = []
|
|
for row in problems.get("result") or []:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
pid = str(row.get("pid") or "").strip()
|
|
if not pid:
|
|
continue
|
|
tags = row.get("tags") if isinstance(row.get("tags"), list) else []
|
|
result.append(
|
|
LuoguListItem(
|
|
pid=pid,
|
|
title=str(row.get("title") or "").strip(),
|
|
difficulty=int(row.get("difficulty") or 1),
|
|
tags=[int(x) for x in tags if isinstance(x, int)],
|
|
total_submit=int(row.get("totalSubmit") or 0),
|
|
total_accepted=int(row.get("totalAccepted") or 0),
|
|
type=str(row.get("type") or "").strip(),
|
|
)
|
|
)
|
|
return count, per_page, result
|
|
|
|
|
|
def fetch_problem_detail(
|
|
base_url: str,
|
|
pid: str,
|
|
timeout: int,
|
|
retries: int,
|
|
sleep_sec: float,
|
|
) -> dict[str, Any]:
|
|
session = requests.Session()
|
|
session.headers.update(
|
|
{
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Referer": f"{base_url}/problem/list",
|
|
}
|
|
)
|
|
html_text = requests_retry_text(
|
|
session,
|
|
f"{base_url}/problem/{pid}",
|
|
timeout=timeout,
|
|
retries=retries,
|
|
sleep_sec=sleep_sec,
|
|
)
|
|
ctx = extract_context_json(html_text)
|
|
detail = ((ctx.get("data") or {}).get("problem") or {})
|
|
if not isinstance(detail, dict):
|
|
raise RuntimeError(f"problem detail invalid: {pid}")
|
|
return detail
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Import Luogu CSP-J/S problem set")
|
|
parser.add_argument("--db-path", required=True, help="SQLite db path")
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument(
|
|
"--tag-ids",
|
|
default=",".join(str(x) for x in DEFAULT_TAG_IDS),
|
|
help="Comma separated Luogu tag IDs",
|
|
)
|
|
parser.add_argument("--workers", type=int, default=3)
|
|
parser.add_argument("--max-problems", type=int, default=0)
|
|
parser.add_argument("--timeout", type=int, default=25)
|
|
parser.add_argument("--retries", type=int, default=5)
|
|
parser.add_argument("--retry-sleep-sec", type=float, default=1.2)
|
|
parser.add_argument("--clear-existing", action="store_true")
|
|
parser.add_argument("--clear-all-problems", action="store_true")
|
|
parser.add_argument("--job-trigger", default="manual")
|
|
parser.add_argument("--clear-existing-source-prefix", default="")
|
|
parser.add_argument("--skip-llm", action="store_true")
|
|
parser.add_argument("--llm-limit", type=int, default=0)
|
|
args = parser.parse_args()
|
|
|
|
tag_ids = parse_tag_ids(args.tag_ids)
|
|
tags_csv = ",".join(str(x) for x in tag_ids)
|
|
|
|
session = requests.Session()
|
|
session.headers.update(
|
|
{
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
"Referer": f"{args.base_url}/problem/list",
|
|
}
|
|
)
|
|
|
|
tag_catalog = fetch_tag_catalog(
|
|
session,
|
|
args.base_url,
|
|
timeout=args.timeout,
|
|
retries=args.retries,
|
|
sleep_sec=args.retry_sleep_sec,
|
|
)
|
|
|
|
total_count, per_page, first_page_items = fetch_list_page(
|
|
session,
|
|
args.base_url,
|
|
tags_csv,
|
|
page=1,
|
|
timeout=args.timeout,
|
|
retries=args.retries,
|
|
sleep_sec=args.retry_sleep_sec,
|
|
)
|
|
total_pages = max(1, math.ceil(max(1, total_count) / max(1, per_page)))
|
|
|
|
all_items: dict[str, LuoguListItem] = {item.pid: item for item in first_page_items}
|
|
for page in range(2, total_pages + 1):
|
|
_, _, page_items = fetch_list_page(
|
|
session,
|
|
args.base_url,
|
|
tags_csv,
|
|
page=page,
|
|
timeout=args.timeout,
|
|
retries=args.retries,
|
|
sleep_sec=args.retry_sleep_sec,
|
|
)
|
|
for item in page_items:
|
|
all_items[item.pid] = item
|
|
|
|
selected = sorted(all_items.values(), key=lambda x: x.pid)
|
|
if args.max_problems > 0:
|
|
selected = selected[: args.max_problems]
|
|
|
|
conn = sqlite3.connect(args.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.execute("PRAGMA busy_timeout=5000")
|
|
ensure_core_tables(conn)
|
|
ensure_problem_columns(conn)
|
|
ensure_import_tables(conn)
|
|
|
|
cleared_count = 0
|
|
if args.clear_all_problems:
|
|
cur = conn.execute("SELECT COUNT(1) FROM problems")
|
|
cleared_count = int(cur.fetchone()[0] or 0)
|
|
conn.execute("DELETE FROM problems")
|
|
conn.commit()
|
|
elif args.clear_existing:
|
|
cur = conn.execute("SELECT COUNT(1) FROM problems WHERE source LIKE 'luogu:%'")
|
|
cleared_count = int(cur.fetchone()[0] or 0)
|
|
conn.execute("DELETE FROM problems WHERE source LIKE 'luogu:%'")
|
|
conn.commit()
|
|
|
|
inserted = 0
|
|
updated = 0
|
|
failed = 0
|
|
fallback_used = 0
|
|
total = len(selected)
|
|
last_error = ""
|
|
|
|
options_json = json.dumps(
|
|
{
|
|
"source": "luogu",
|
|
"tag_ids": tag_ids,
|
|
"workers": max(1, args.workers),
|
|
"max_problems": args.max_problems,
|
|
"clear_existing": bool(args.clear_existing),
|
|
"clear_all_problems": bool(args.clear_all_problems),
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
job_id = create_import_job(conn, args.job_trigger, total, options_json)
|
|
seed_import_items(conn, job_id, selected)
|
|
|
|
with ThreadPoolExecutor(max_workers=max(1, args.workers)) as executor:
|
|
futures = {
|
|
executor.submit(
|
|
fetch_problem_detail,
|
|
args.base_url,
|
|
item.pid,
|
|
args.timeout,
|
|
args.retries,
|
|
args.retry_sleep_sec,
|
|
): item
|
|
for item in selected
|
|
}
|
|
done_count = 0
|
|
for future in as_completed(futures):
|
|
item = futures[future]
|
|
done_count += 1
|
|
try:
|
|
detail = future.result()
|
|
record = build_record(args.base_url, item, detail, tag_catalog)
|
|
problem_id, is_insert = upsert_problem(conn, record)
|
|
if is_insert:
|
|
inserted += 1
|
|
else:
|
|
updated += 1
|
|
update_import_item_success(
|
|
conn,
|
|
job_id,
|
|
item.pid,
|
|
record.title,
|
|
record.difficulty,
|
|
problem_id,
|
|
)
|
|
print(
|
|
f"[{done_count}/{total}] {item.pid} -> {record.title} "
|
|
f"(difficulty={record.difficulty})",
|
|
flush=True,
|
|
)
|
|
except Exception as exc:
|
|
try:
|
|
record = build_fallback_record(
|
|
args.base_url, item, tag_catalog, str(exc)
|
|
)
|
|
problem_id, is_insert = upsert_problem(conn, record)
|
|
if is_insert:
|
|
inserted += 1
|
|
else:
|
|
updated += 1
|
|
fallback_used += 1
|
|
update_import_item_success(
|
|
conn,
|
|
job_id,
|
|
item.pid,
|
|
record.title,
|
|
record.difficulty,
|
|
problem_id,
|
|
note=f"fallback: {str(exc)[:300]}",
|
|
)
|
|
print(f"[fallback] {item.pid}: {exc}", flush=True)
|
|
except Exception as inner_exc:
|
|
failed += 1
|
|
last_error = str(inner_exc)
|
|
update_import_item_failed(
|
|
conn,
|
|
job_id,
|
|
item.pid,
|
|
f"{exc}; fallback failed: {inner_exc}",
|
|
)
|
|
print(f"[skip] {item.pid}: {exc}; fallback failed: {inner_exc}", flush=True)
|
|
update_import_job_progress(
|
|
conn,
|
|
job_id,
|
|
done_count,
|
|
inserted + updated,
|
|
failed,
|
|
last_error,
|
|
)
|
|
|
|
finish_import_job(conn, job_id, inserted + updated, failed, last_error)
|
|
conn.close()
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"db_path": args.db_path,
|
|
"tags": tag_ids,
|
|
"selected_count": total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"failed": failed,
|
|
"fallback_used": fallback_used,
|
|
"cleared_count": cleared_count,
|
|
"job_id": job_id,
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|