From df455d7fb554c693cc4ff853ff46b7b8a75fd748 Mon Sep 17 00:00:00 2001 From: hao Date: Wed, 18 Mar 2026 16:13:21 -0700 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0:=201=20=E4=B8=AA=E6=96=87?= =?UTF-8?q?=E4=BB=B6=20-=202026-03-18=2016:13:21?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/intel/sources/runner.py | 129 ++++++++++++++++++++++---------- 1 file changed, 89 insertions(+), 40 deletions(-) diff --git a/scripts/intel/sources/runner.py b/scripts/intel/sources/runner.py index 70b07050..9489a7dd 100644 --- a/scripts/intel/sources/runner.py +++ b/scripts/intel/sources/runner.py @@ -28,6 +28,9 @@ HANDLERS = { "vendor-index": vendor_index.fetch, } +DEFAULT_MAX_WORKERS = 20 +MAX_WORKER_CAP = 32 + def _failure_category(exc: Exception) -> str: if isinstance(exc, requests.exceptions.SSLError): @@ -74,6 +77,68 @@ def build_failure(system: Dict[str, Any], source: Dict[str, Any], exc: Exception } +def _collect_jobs( + source_map: Dict[str, Any], + *, + tier: Optional[str] = None, +) -> List[Tuple[Dict[str, Any], Dict[str, Any]]]: + jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = [] + for system in source_map["systems"]: + if tier and system.get("tier") != tier: + continue + for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False): + jobs.append((system, source)) + return jobs + + +def _max_workers(job_count: int) -> int: + if job_count <= 0: + return 4 + configured = os.environ.get("WEBSAFE_INTEL_MAX_WORKERS") + if configured: + try: + value = int(configured) + except ValueError: + value = DEFAULT_MAX_WORKERS + else: + value = DEFAULT_MAX_WORKERS + value = max(4, min(MAX_WORKER_CAP, value)) + return min(value, job_count) + + +def _collect_source_candidates( + system: Dict[str, Any], + source: Dict[str, Any], + *, + since_dt: Optional[datetime], + include_undated: bool, +) -> Tuple[List[Candidate], Optional[Dict[str, Any]]]: + handler = HANDLERS.get(source["kind"]) + if handler is None: + return ( + [], + { + "system_id": system["system_id"], + "display_name": system["display_name"], + "source_name": source["name"], + "source_kind": source["kind"], + "source_bucket": source.get("bucket_name"), + "category": "schema", + "exception": "UnsupportedSourceKind", + "message": f"Unsupported source kind {source['kind']}", + "status_code": None, + "url": source.get("url") or "", + "summary": f"{system['system_id']}::{source['name']}::schema::Unsupported source kind {source['kind']}", + }, + ) + try: + items = handler(system, source) + filtered = [item for item in items if _passes_since(item, since_dt, include_undated)] + return filtered, None + except Exception as exc: + return [], build_failure(system, source, exc) + + def probe_source(system: Dict[str, Any], source: Dict[str, Any]) -> Dict[str, Any]: kind = source["kind"] if kind == "ghsa-global": @@ -187,36 +252,26 @@ def collect_candidates( ) -> Tuple[List[Candidate], List[Dict[str, Any]]]: all_candidates: List[Candidate] = [] failures: List[Dict[str, Any]] = [] - - for system in source_map["systems"]: - if tier and system.get("tier") != tier: - continue - for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False): - handler = HANDLERS.get(source["kind"]) - if handler is None: - failures.append( - { - "system_id": system["system_id"], - "display_name": system["display_name"], - "source_name": source["name"], - "source_kind": source["kind"], - "source_bucket": source.get("bucket_name"), - "category": "schema", - "exception": "UnsupportedSourceKind", - "message": f"Unsupported source kind {source['kind']}", - "status_code": None, - "url": source.get("url") or "", - "summary": f"{system['system_id']}::{source['name']}::schema::Unsupported source kind {source['kind']}", - } - ) - continue - try: - items = handler(system, source) - for item in items: - if _passes_since(item, since_dt, include_undated): - all_candidates.append(item) - except Exception as exc: - failures.append(build_failure(system, source, exc)) + jobs = _collect_jobs(source_map, tier=tier) + with ThreadPoolExecutor(max_workers=_max_workers(len(jobs))) as executor: + future_map = { + executor.submit( + _collect_source_candidates, + system, + source, + since_dt=since_dt, + include_undated=include_undated, + ): (system, source) + for system, source in jobs + } + for future in as_completed(future_map): + items, failure = future.result() + if items: + all_candidates.extend(items) + if failure: + failures.append(failure) + all_candidates.sort(key=lambda item: (item.system_id, item.published_at or "", item.title, item.source_name)) + failures.sort(key=lambda item: (item.get("system_id", ""), item.get("source_name", ""), item.get("category", ""))) return all_candidates, failures @@ -224,18 +279,10 @@ def probe_sources( source_map: Dict[str, Any], tier: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: - jobs: List[Tuple[Dict[str, Any], Dict[str, Any]]] = [] + jobs = _collect_jobs(source_map, tier=tier) probes: List[Dict[str, Any]] = [] failures: List[Dict[str, Any]] = [] - - for system in source_map["systems"]: - if tier and system.get("tier") != tier: - continue - for _system, _bucket_name, source in iter_all_sources({"systems": [system]}, include_retired=False): - jobs.append((system, source)) - - max_workers = min(16, max(4, len(jobs) or 1)) - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with ThreadPoolExecutor(max_workers=_max_workers(len(jobs))) as executor: future_map = {executor.submit(probe_source, system, source): (system, source) for system, source in jobs} for future in as_completed(future_map): system, source = future_map[future] @@ -251,6 +298,8 @@ def probe_sources( ) except Exception as exc: failures.append(build_failure(system, source, exc)) + probes.sort(key=lambda item: (item["system_id"], item["source_name"])) + failures.sort(key=lambda item: (item.get("system_id", ""), item.get("source_name", ""), item.get("category", ""))) return probes, failures