Skip to content

Public API

The main entry points for Silkweb. Functions and types below are available from import silkweb (see __all__ in the package for the canonical list).

Fetching

fetch

async_fetch async

async_fetch(url: str, *args, **kwargs)

Async variant of fetch.

Source code in silkweb/__init__.py
async def async_fetch(url: str, *args, **kwargs):
    """Async variant of `fetch`."""
    return await _async_fetch(url, *args, **kwargs)

Extraction

ask

ask(url: str, prompt: str, *, explain: bool = False, **fetch_kwargs: Any)

Sync wrapper around async_ask.

Source code in silkweb/__init__.py
def ask(
    url: str,
    prompt: str,
    *,
    explain: bool = False,
    **fetch_kwargs: Any,
):
    """Sync wrapper around `async_ask`."""
    return _run_sync(async_ask(url, prompt, explain=explain, **fetch_kwargs))

async_ask async

async_ask(url: str, prompt: str, *, output: str = 'auto', dataframe_engine: str = 'auto', explain: bool = False, **fetch_kwargs: Any)

Ask a natural-language question of a URL.

Pipeline: - fetch (auto tier) - hydration-first (optional: use hydration JSON as cleaned content) - otherwise clean → synthesize schema → extract → compile selectors → cache - output selection: - output="python": list[dict] - output="df": DataFrame (pandas/polars) if available - output="auto": backward-compatible auto-conversion when caller already imported pandas/polars

Source code in silkweb/__init__.py
async def async_ask(
    url: str,
    prompt: str,
    *,
    output: str = "auto",
    dataframe_engine: str = "auto",
    explain: bool = False,
    **fetch_kwargs: Any,
):
    """
    Ask a natural-language question of a URL.

    Pipeline:
    - fetch (auto tier)
    - hydration-first (optional: use hydration JSON as cleaned content)
    - otherwise clean → synthesize schema → extract → compile selectors → cache
    - output selection:
      - output="python": list[dict]
      - output="df": DataFrame (pandas/polars) if available
      - output="auto": backward-compatible auto-conversion when caller already imported pandas/polars
    """
    fetch_kwargs = dict(fetch_kwargs)

    cfg = get_config()

    cleaner_model = cast(str, fetch_kwargs.pop("cleaner_model", cfg.cleaner_model))
    schema_model = cast(str, fetch_kwargs.pop("schema_model", cfg.schema_model))
    extraction_model = cast(str, fetch_kwargs.pop("extraction_model", cfg.extraction_model))
    selector_model = cast(str, fetch_kwargs.pop("selector_model", cfg.selector_model))
    force_llm = bool(fetch_kwargs.pop("force_llm", cfg.force_llm))
    hydration_first = bool(fetch_kwargs.pop("hydration_first", cfg.hydration_first))
    hydration_subset = bool(fetch_kwargs.pop("hydration_subset", cfg.hydration_subset))
    hydration_max_chars = int(fetch_kwargs.pop("hydration_max_chars", cfg.hydration_max_chars))

    import time as _t

    report: ExtractionReport | None = ExtractionReport() if explain else None
    _wall0 = _t.time()

    _t0 = _t.time()
    page = await _async_fetch(url, tier="auto", **fetch_kwargs)
    _t_fetch = _t.time() - _t0
    log_event(
        "ask_fetch_done",
        url=url,
        tier=getattr(page, "fetch_tier", None),
        duration_ms=int(_t_fetch * 1000),
        html_chars=len(page.html or ""),
    )

    if report is not None:
        from .explain import tier_name_for_page

        report.tier_used = int(getattr(page, "fetch_tier", 0) or 0)
        report.tier_name = tier_name_for_page(report.tier_used, page)

    selector_cache = CacheManager.from_config().selectors

    cleaner_provider = create_provider(cleaner_model)
    schema_provider = create_provider(schema_model)
    extraction_provider = create_provider(extraction_model)
    selector_provider = create_provider(selector_model)
    healer = SelfHealer(max_attempts=max(1, int(cfg.max_retries)))

    hydration = page.hydration_data() if hydration_first else None
    if isinstance(hydration, dict) and hydration_subset:
        hydration_any: Any = _best_effort_hydration_subset(hydration)
    else:
        hydration_any = hydration

    hydration_payload = None
    if hydration_any is not None:
        with contextlib.suppress(Exception):
            hydration_payload = json.dumps(hydration_any, ensure_ascii=False)

    if (
        hydration_any is not None
        and hydration_payload is not None
        and 0 < hydration_max_chars < len(hydration_payload)
    ):
        log_event(
            "ask_hydration_skipped",
            url=url,
            tier=getattr(page, "fetch_tier", None),
            reason="too_large",
            hydration_chars=len(hydration_payload),
            max_chars=hydration_max_chars,
        )
        hydration_any = None

    if hydration_any is not None:
        cleaned = _cleaned_from_hydration(
            hydration_any, heading=str(page.metadata.get("title") or "")
        )
        log_event(
            "ask_clean_done",
            url=url,
            tier=getattr(page, "fetch_tier", None),
            method="hydration",
            hydration_chars=len(cleaned.markdown),
        )
    else:
        _t1 = _t.time()
        cleaned = await clean_html(page.html, provider=cleaner_provider, strategy="auto")
        log_event(
            "ask_clean_done",
            url=url,
            tier=getattr(page, "fetch_tier", None),
            method="clean_html",
            duration_ms=int((_t.time() - _t1) * 1000),
            token_estimate=getattr(cleaned, "token_estimate", None),
        )

    if report is not None:
        if hydration_any is not None:
            report.hydration_source = page.hydration_source()
        else:
            report.hydration_source = None

    _t2 = _t.time()
    log_event("ask_schema_start", url=url, tier=getattr(page, "fetch_tier", None))
    schema = await synthesize_schema(cleaned, prompt=prompt, provider=schema_provider)
    log_event(
        "ask_schema_done",
        url=url,
        tier=getattr(page, "fetch_tier", None),
        duration_ms=int((_t.time() - _t2) * 1000),
        fields=list(getattr(schema, "model_fields", {}).keys()),
    )

    if report is not None:
        report.note_llm(schema_provider)
        report.schema_inferred = pydantic_schema_line(schema)

    _t3 = _t.time()
    log_event("ask_extract_start", url=url, tier=getattr(page, "fetch_tier", None))
    items = await _extract_url(
        url=url,
        html=page.html,
        schema=schema,
        prompt=prompt,
        cleaner_provider=cleaner_provider,
        extraction_provider=extraction_provider,
        selector_provider=selector_provider,
        selector_cache=selector_cache,
        healer=healer,
        force_llm=force_llm,
        report=report,
    )
    log_event(
        "ask_extract_done",
        url=url,
        tier=getattr(page, "fetch_tier", None),
        duration_ms=int((_t.time() - _t3) * 1000),
        items=len(items),
    )

    if report is not None:
        report.records_extracted = len(items)
        report.total_duration_ms = (_t.time() - _wall0) * 1000.0
        _render_extraction_report(report)

    # Scalar convenience: single item + single field
    if len(items) == 1 and len(schema.model_fields) == 1:
        only_key = next(iter(schema.model_fields.keys()))
        return items[0].get(only_key)

    out_fmt = str(output or "auto").lower()
    if out_fmt in {"df", "dataframe"}:
        from .output.dataframe import to_dataframe

        df = to_dataframe(items, engine=cast(Any, dataframe_engine))
        return df if df is not None else items

    if out_fmt in {"python", "list", "dict"}:
        return items

    # Backward-compatible "auto": only auto-convert if caller already imported pandas/polars.
    df = _maybe_to_dataframe(items)
    return df if df is not None else items

extract

extract(url: str, schema, prompt: str, *, explain: bool = False, **kwargs: Any)

Sync wrapper around async_extract.

Source code in silkweb/__init__.py
def extract(url: str, schema, prompt: str, *, explain: bool = False, **kwargs: Any):
    """Sync wrapper around `async_extract`."""
    return _run_sync(async_extract(url, schema, prompt, explain=explain, **kwargs))

async_extract async

async_extract(url: str, schema, prompt: str, *, output: str = 'python', dataframe_engine: str = 'auto', explain: bool = False, **kwargs: Any)

Extract typed data from a URL using a provided Pydantic schema.

  • selector cache fast-path
  • self-heal on validation failure
  • output controls the return shape:
  • "python" / "list" / "dict": list[BaseModel] with __silk_meta__ when present
  • "df" / "dataframe": pandas or polars DataFrame (see dataframe_engine), else falls back to list
  • "auto": same as historical behavior — DataFrame only if auto_detect_dataframe and pandas/polars already imported
Source code in silkweb/__init__.py
async def async_extract(
    url: str,
    schema,
    prompt: str,
    *,
    output: str = "python",
    dataframe_engine: str = "auto",
    explain: bool = False,
    **kwargs: Any,
):
    """
    Extract typed data from a URL using a provided Pydantic schema.

    - selector cache fast-path
    - self-heal on validation failure
    - ``output`` controls the return shape:
      - ``"python"`` / ``"list"`` / ``"dict"``: ``list[BaseModel]`` with ``__silk_meta__`` when present
      - ``"df"`` / ``"dataframe"``: pandas or polars DataFrame (see ``dataframe_engine``), else falls back to list
      - ``"auto"``: same as historical behavior — DataFrame only if ``auto_detect_dataframe`` and pandas/polars already imported
    """
    from pydantic import BaseModel

    if not isinstance(schema, type) or not issubclass(schema, BaseModel):
        raise TypeError("schema must be a Pydantic BaseModel type")
    _normalize_extract_output(output)

    cfg = get_config()
    cleaner_model = cast(str, kwargs.pop("cleaner_model", cfg.cleaner_model))
    extraction_model = cast(str, kwargs.pop("extraction_model", cfg.extraction_model))
    selector_model = cast(str, kwargs.pop("selector_model", cfg.selector_model))
    force_llm = bool(kwargs.pop("force_llm", cfg.force_llm))

    import time as _t

    report: ExtractionReport | None = ExtractionReport() if explain else None
    _wall0 = _t.time()

    _t0 = _t.time()
    page = await _async_fetch(url, tier="auto", **kwargs)
    _t_fetch = _t.time() - _t0
    log_event(
        "extract_fetch_done",
        url=url,
        tier=getattr(page, "fetch_tier", None),
        duration_ms=int(_t_fetch * 1000),
        html_chars=len(page.html or ""),
    )

    if report is not None:
        from .explain import tier_name_for_page

        report.tier_used = int(getattr(page, "fetch_tier", 0) or 0)
        report.tier_name = tier_name_for_page(report.tier_used, page)
        report.hydration_source = None
        report.schema_inferred = pydantic_schema_line(schema)

    selector_cache = CacheManager.from_config().selectors

    healer = SelfHealer(max_attempts=max(1, int(cfg.max_retries)))
    _t1 = _t.time()
    log_event("extract_llm_start", url=url, tier=getattr(page, "fetch_tier", None))
    items = await _extract_url(
        url=url,
        html=page.html,
        schema=schema,
        prompt=prompt,
        cleaner_provider=create_provider(cleaner_model),
        extraction_provider=create_provider(extraction_model),
        selector_provider=create_provider(selector_model),
        selector_cache=selector_cache,
        healer=healer,
        force_llm=force_llm,
        report=report,
    )
    log_event(
        "extract_llm_done",
        url=url,
        tier=getattr(page, "fetch_tier", None),
        duration_ms=int((_t.time() - _t1) * 1000),
        items=len(items),
    )

    if report is not None:
        report.records_extracted = len(items)
        report.total_duration_ms = (_t.time() - _wall0) * 1000.0
        _render_extraction_report(report)

    return _finalize_extract_output(
        items,
        schema,
        output=output,
        dataframe_engine=dataframe_engine,
    )

async_extract_from_html async

async_extract_from_html(url: str, html: str, *, schema, prompt: str, output: str = 'python', dataframe_engine: str = 'auto', **kwargs: Any)

Same extraction contract as async_extract, but uses pre-fetched HTML (no network fetch).

Returns list[BaseModel] by default, or a DataFrame when output="df" / "dataframe", or auto-converts like async_extract when output="auto".

Source code in silkweb/__init__.py
async def async_extract_from_html(
    url: str,
    html: str,
    *,
    schema,
    prompt: str,
    output: str = "python",
    dataframe_engine: str = "auto",
    **kwargs: Any,
):
    """
    Same extraction contract as `async_extract`, but uses pre-fetched HTML (no network fetch).

    Returns `list[BaseModel]` by default, or a DataFrame when ``output="df"`` / ``"dataframe"``,
    or auto-converts like `async_extract` when ``output="auto"``.
    """
    from pydantic import BaseModel

    if not isinstance(schema, type) or not issubclass(schema, BaseModel):
        raise TypeError("schema must be a Pydantic BaseModel type")
    _normalize_extract_output(output)

    cfg = get_config()
    cleaner_model = cast(str, kwargs.pop("cleaner_model", cfg.cleaner_model))
    extraction_model = cast(str, kwargs.pop("extraction_model", cfg.extraction_model))
    selector_model = cast(str, kwargs.pop("selector_model", cfg.selector_model))
    force_llm = bool(kwargs.pop("force_llm", cfg.force_llm))

    selector_cache = CacheManager.from_config().selectors
    healer = SelfHealer(max_attempts=max(1, int(cfg.max_retries)))
    items = await _extract_url(
        url=url,
        html=html,
        schema=schema,
        prompt=prompt,
        cleaner_provider=create_provider(cleaner_model),
        extraction_provider=create_provider(extraction_model),
        selector_provider=create_provider(selector_model),
        selector_cache=selector_cache,
        healer=healer,
        force_llm=force_llm,
    )
    return _finalize_extract_output(
        items,
        schema,
        output=output,
        dataframe_engine=dataframe_engine,
    )

SilkQL

query

query(*args, **kwargs)

Compile and run a SilkQL query (sync). Arguments and return type match :func:async_query.

Source code in silkweb/__init__.py
def query(*args, **kwargs):
    """Compile and run a SilkQL query (sync). Arguments and return type match :func:`async_query`."""
    return _run_sync(async_query(*args, **kwargs))

async_query async

async_query(url: str, silkql_string: str, *, provider=None, cache: SelectorCache | None = None, follow_pagination: bool = False, max_pages: int = 20, **fetch_kwargs: Any) -> QueryResult

Compile and run a SilkQL query against url.

Fetches the page (tier "auto" by default; pass tier= like :func:fetch), extracts with the compiled schema, caches CSS/XPath selectors per domain, and returns a :class:QueryResult whose data is a one-element list containing the merged root model (list collections are merged across pages when follow_pagination is true).

  • provider: extraction LLM; defaults to configure(extraction_model=...).
  • cleaner_model / selector_model: optional model strings (popped from **fetch_kwargs), defaulting to config — same split as :func:extract.
  • cache: selector cache instance; defaults to CacheManager.from_config().selectors.
  • follow_pagination: when the SilkQL AST includes pagination { next_page_url }, follow relative/absolute next links up to max_pages.
  • force_llm: skip selector cache (popped from fetch_kwargs, default configure(force_llm=...)).
  • cached on the result is true if any scraped page used a selector-cache hit.
Source code in silkweb/__init__.py
async def async_query(
    url: str,
    silkql_string: str,
    *,
    provider=None,
    cache: SelectorCache | None = None,
    follow_pagination: bool = False,
    max_pages: int = 20,
    **fetch_kwargs: Any,
) -> QueryResult:
    """
    Compile and run a SilkQL query against ``url``.

    Fetches the page (tier ``"auto"`` by default; pass ``tier=`` like :func:`fetch`),
    extracts with the compiled schema, caches CSS/XPath selectors per domain, and returns
    a :class:`QueryResult` whose ``data`` is a one-element list containing the merged root
    model (list collections are merged across pages when ``follow_pagination`` is true).

    - ``provider``: extraction LLM; defaults to ``configure(extraction_model=...)``.
    - ``cleaner_model`` / ``selector_model``: optional model strings (popped from ``**fetch_kwargs``),
      defaulting to config — same split as :func:`extract`.
    - ``cache``: selector cache instance; defaults to ``CacheManager.from_config().selectors``.
    - ``follow_pagination``: when the SilkQL AST includes ``pagination { next_page_url }``, follow
      relative/absolute next links up to ``max_pages``.
    - ``force_llm``: skip selector cache (popped from ``fetch_kwargs``, default ``configure(force_llm=...)``).
    - ``cached`` on the result is true if **any** scraped page used a selector-cache hit.
    """
    cfg = get_config()
    prov = provider or create_provider(cfg.extraction_model)
    selector_cache = cache or CacheManager.from_config().selectors
    force_llm = bool(fetch_kwargs.pop("force_llm", cfg.force_llm))
    cleaner_model = cast(str, fetch_kwargs.pop("cleaner_model", cfg.cleaner_model))
    selector_model = cast(str, fetch_kwargs.pop("selector_model", cfg.selector_model))
    return await _execute_query(
        url,
        silkql_string,
        provider=prov,
        cache=selector_cache,
        cleaner_provider=create_provider(cleaner_model),
        selector_provider=create_provider(selector_model),
        follow_pagination=follow_pagination,
        max_pages=max_pages,
        force_llm=force_llm,
        **fetch_kwargs,
    )

QueryResult dataclass

QueryResult(data: list[BaseModel], pages_scraped: int, cached: bool)

Crawling

crawl

AsyncCrawler dataclass

AsyncCrawler(start_url: str, allowed_domains: set[str] | None = None, url_pattern: str | None = None, max_pages: int = 100, max_depth: int = 2, concurrency: int = 10, per_domain_concurrency: int = 2, max_pending_urls: int = 5000, schema: type[BaseModel] | None = None, prompt: str | None = None, dedup: SeenSet = SeenSet(), on_page: OnPage = None, on_item: OnItem = None, on_error: OnError = None, fetch_func: Callable[..., Awaitable[SilkPage]] | None = None, extract_func: Callable[..., Awaitable[list[BaseModel]]] | None = None, _pattern_re: Pattern[str] | None = None, _domain_sems: dict[str, Semaphore] = dict(), _pages_lock: Lock = asyncio.Lock(), _pages_fetched: int = 0)

run async

run(**fetch_kwargs: Any) -> AsyncGenerator[BaseModel, None]

Crawl starting at start_url, yielding extracted items.

Requires schema and prompt both set or both omitted; mismatched configuration raises ValueError.

Source code in silkweb/crawl/crawler.py
async def run(self, **fetch_kwargs: Any) -> AsyncGenerator[BaseModel, None]:
    """
    Crawl starting at `start_url`, yielding extracted items.

    Requires ``schema`` and ``prompt`` both set or both omitted; mismatched
    configuration raises ``ValueError``.
    """
    if (self.schema is None) ^ (self.prompt is None):
        raise ValueError("AsyncCrawler requires both schema and prompt, or neither")

    fetch = self.fetch_func or self._default_fetch
    extract = self.extract_func or (lambda **kw: self._default_extract(page=kw["page"]))

    q: asyncio.Queue[tuple[str, int] | None] = asyncio.Queue()
    start = _normalize_seed_url(self.start_url)
    if self._allowed(start) and self.dedup.add(start):
        q.put_nowait((start, 0))

    out_q: asyncio.Queue[BaseModel | None] = asyncio.Queue()

    async def worker() -> None:
        while True:
            item = await q.get()
            if item is None:
                q.task_done()
                break
            url, depth = item

            async with self._pages_lock:
                if self._pages_fetched >= int(self.max_pages):
                    q.task_done()
                    continue
                self._pages_fetched += 1

            dom = _domain(url)
            async with self._global_sem, self._domain_sem(dom):
                try:
                    page = await fetch(url, **fetch_kwargs)
                    if self.on_page:
                        await self.on_page(page)

                    # Discover links
                    if depth < int(self.max_depth):
                        cap = max(1, int(self.max_pending_urls))
                        for href in page.links(external=False):
                            nxt = _normalize_link(page.url or url, href)
                            if not nxt or not self._allowed(nxt):
                                continue
                            async with self._queue_cap_lock:
                                if q.qsize() >= cap:
                                    continue
                                if self.dedup.add(nxt):
                                    q.put_nowait((nxt, depth + 1))

                    # Extract items
                    if self.schema and self.prompt:
                        models = await extract(page=page)
                        for m in models:
                            if self.on_item:
                                await self.on_item(m)
                            await out_q.put(m)
                except Exception as e:
                    if self.on_error:
                        await self.on_error(url, e)
                finally:
                    q.task_done()

    n_workers = max(1, int(self.concurrency))
    tasks = [asyncio.create_task(worker()) for _ in range(n_workers)]

    async def closer() -> None:
        await q.join()
        for _ in tasks:
            q.put_nowait(None)
        await asyncio.gather(*tasks, return_exceptions=True)
        await out_q.put(None)

    closer_task = asyncio.create_task(closer())

    while True:
        it = await out_q.get()
        if it is None:
            break
        yield it

    # Ensure closer completes (and avoid task warnings)
    await closer_task

SeenSet dataclass

SeenSet(backend: DedupBackend = 'sqlite', sqlite_path: str | None = None, _mem: set[str] | None = None, _con: Connection | None = None)

URL deduplication set.

Backends: - sqlite: persistent set backed by a single table (single persistent connection). - memory: in-process set.

add

add(url: str) -> bool

Add url to seen-set. Returns True if it was newly added, False if already present.

Source code in silkweb/crawl/dedup.py
def add(self, url: str) -> bool:
    """
    Add url to seen-set.
    Returns True if it was newly added, False if already present.
    """
    u = (url or "").strip()
    if not u:
        return False
    if self.backend == "memory":
        assert self._mem is not None
        if u in self._mem:
            return False
        self._mem.add(u)
        return True

    con = self._get_con()
    cur = con.execute("INSERT OR IGNORE INTO seen_urls(url) VALUES(?)", (u,))
    con.commit()
    return cur.rowcount == 1

async_crawl async

async_crawl(start_url: str, *, allowed_domains: set[str] | None = None, url_pattern: str | None = None, max_pages: int = 100, max_depth: int = 2, concurrency: int = 10, per_domain_concurrency: int = 2, max_pending_urls: int = 5000, schema=None, prompt: str | None = None, on_page=None, on_item=None, on_error=None, **fetch_kwargs: Any)

Breadth-first crawl from start_url with URL dedup, global and per-domain concurrency, and optional structured extraction on each page.

  • schema / prompt: both required together for extraction; if both omitted, only on_page / link discovery run and the returned list is empty.
  • max_pages: hard cap on fetched pages.
  • max_depth: link-following depth from the start URL (0 = start page only).
  • max_pending_urls: best-effort cap on the crawl work-queue size to limit memory.
  • on_page, on_item, on_error: optional async callbacks (page after fetch, each extracted model, errors per URL).
  • Remaining keyword arguments are passed to the fetcher (same as :func:fetch).
Source code in silkweb/__init__.py
async def async_crawl(
    start_url: str,
    *,
    allowed_domains: set[str] | None = None,
    url_pattern: str | None = None,
    max_pages: int = 100,
    max_depth: int = 2,
    concurrency: int = 10,
    per_domain_concurrency: int = 2,
    max_pending_urls: int = 5000,
    schema=None,
    prompt: str | None = None,
    on_page=None,
    on_item=None,
    on_error=None,
    **fetch_kwargs: Any,
):
    """
    Breadth-first crawl from ``start_url`` with URL dedup, global and per-domain concurrency,
    and optional structured extraction on each page.

    - ``schema`` / ``prompt``: both required together for extraction; if both omitted, only
      ``on_page`` / link discovery run and the returned list is empty.
    - ``max_pages``: hard cap on fetched pages.
    - ``max_depth``: link-following depth from the start URL (0 = start page only).
    - ``max_pending_urls``: best-effort cap on the crawl work-queue size to limit memory.
    - ``on_page``, ``on_item``, ``on_error``: optional async callbacks (page after fetch, each
      extracted model, errors per URL).
    - Remaining keyword arguments are passed to the fetcher (same as :func:`fetch`).
    """
    from pydantic import BaseModel

    if (schema is not None) ^ (prompt is not None):
        raise ValueError("async_crawl requires both schema and prompt together, or neither")

    if schema is not None and (not isinstance(schema, type) or not issubclass(schema, BaseModel)):
        raise TypeError("schema must be a Pydantic BaseModel type")

    crawler = AsyncCrawler(
        start_url=start_url,
        allowed_domains=allowed_domains,
        url_pattern=url_pattern,
        max_pages=max_pages,
        max_depth=max_depth,
        concurrency=concurrency,
        per_domain_concurrency=per_domain_concurrency,
        max_pending_urls=max_pending_urls,
        schema=schema,
        prompt=prompt,
        on_page=on_page,
        on_item=on_item,
        on_error=on_error,
    )
    out: list[BaseModel] = []
    async for item in crawler.run(**fetch_kwargs):
        out.append(item)
    return out

crawl_sitemap

crawl_sitemap(*args, **kwargs)

Sync wrapper around :func:async_crawl_sitemap.

Source code in silkweb/__init__.py
def crawl_sitemap(*args, **kwargs):
    """Sync wrapper around :func:`async_crawl_sitemap`."""
    return _run_sync(async_crawl_sitemap(*args, **kwargs))

async_crawl_sitemap async

async_crawl_sitemap(sitemap_url: str, *, schema=None, prompt: str | None = None, max_pages: int = 100, max_sitemap_files: int = 20, concurrency: int = 10, per_domain_concurrency: int = 2, **fetch_kwargs: Any)

Fetch a sitemap (urlset or sitemapindex), collect page <loc> URLs via XML parsing, then run :func:async_crawl on each (max_depth=0, max_pages=1 per URL).

allowed_domains for each crawl defaults to the sitemap URL host. Pass max_sitemap_files to cap nested sitemap documents when the root is an index.

Source code in silkweb/__init__.py
async def async_crawl_sitemap(
    sitemap_url: str,
    *,
    schema=None,
    prompt: str | None = None,
    max_pages: int = 100,
    max_sitemap_files: int = 20,
    concurrency: int = 10,
    per_domain_concurrency: int = 2,
    **fetch_kwargs: Any,
):
    """
    Fetch a sitemap (``urlset`` or ``sitemapindex``), collect page ``<loc>`` URLs via XML
    parsing, then run :func:`async_crawl` on each (``max_depth=0``, ``max_pages=1`` per URL).

    ``allowed_domains`` for each crawl defaults to the sitemap URL host. Pass ``max_sitemap_files``
    to cap nested sitemap documents when the root is an index.
    """
    from pydantic import BaseModel

    from .crawl.sitemap import collect_page_urls_from_sitemap, host_allowed_domains

    if (schema is not None) ^ (prompt is not None):
        raise ValueError("async_crawl_sitemap requires both schema and prompt together, or neither")

    if schema is not None and (not isinstance(schema, type) or not issubclass(schema, BaseModel)):
        raise TypeError("schema must be a Pydantic BaseModel type")

    allowed = host_allowed_domains(sitemap_url)
    locs = await collect_page_urls_from_sitemap(
        _async_fetch,
        sitemap_url,
        max_pages=max_pages,
        max_sitemap_files=max_sitemap_files,
        **fetch_kwargs,
    )
    results: list[Any] = []
    for loc in locs:
        results.extend(
            await async_crawl(
                loc,
                schema=schema,
                prompt=prompt,
                allowed_domains=allowed,
                max_pages=1,
                max_depth=0,
                concurrency=concurrency,
                per_domain_concurrency=per_domain_concurrency,
                **fetch_kwargs,
            )
        )
    return results

API Discovery

discover_api

discover_api(url: str, session: SilkSession | None = None, *, output_path: str | None = None)

Discover JSON API endpoints for a URL.

Source code in silkweb/__init__.py
def discover_api(url: str, session: SilkSession | None = None, *, output_path: str | None = None):
    """Discover JSON API endpoints for a URL."""
    return _run_sync(_async_discover_api(url, session, output_path))

Fetch replay (observability)

silkweb.replay(session_file) reloads a recorded HTTP fetch session for debugging. It is not the same as replay_session, which replays browser actions from a saved SilkSession (see Sessions & authentication).

replay

replay(session_file: str) -> _ReplaySession

Load an HTTP fetch replay bundle (JSON *.silkweb + HTML sibling) written when configure(replay_dir=...) is set. Returns :class:observability.replay.ReplaySession with .html / .ask() / .extract() / .query() helpers.

This is not the same as :func:replay_session, which replays a Playwright recording from record_session (cookies and actions under ~/.silkweb/sessions).

Source code in silkweb/__init__.py
def replay(session_file: str) -> _ReplaySession:
    """
    Load an **HTTP fetch replay** bundle (JSON ``*.silkweb`` + HTML sibling) written when
    ``configure(replay_dir=...)`` is set. Returns :class:`observability.replay.ReplaySession`
    with ``.html`` / ``.ask()`` / ``.extract()`` / ``.query()`` helpers.

    This is **not** the same as :func:`replay_session`, which replays a **Playwright**
    recording from ``record_session`` (cookies and actions under ``~/.silkweb/sessions``).
    """
    return _replay(session_file)

Sessions (browser)

Interactive recording and headless replay use async Playwright helpers:

record_session async

record_session(name: str) -> SilkSession

Open a real (non-headless) browser and record navigations, clicks, and fills.

Persists to ~/.silkweb/sessions/<name>.silkweb (cookies, storage, actions). This is Playwright session recording — not the same as HTTP replay_dir / :func:silkweb.replay, which stores raw HTML + metadata for a single fetch.

Source code in silkweb/session/recorder.py
async def record(name: str) -> SilkSession:
    """
    Open a real (non-headless) browser and record navigations, clicks, and fills.

    Persists to ``~/.silkweb/sessions/<name>.silkweb`` (cookies, storage, actions).
    This is **Playwright session** recording — not the same as HTTP ``replay_dir`` /
    :func:`silkweb.replay`, which stores raw HTML + metadata for a single fetch.
    """
    s = SilkSession(name)
    s.actions = []
    await s._ensure_browser(headless=False)
    assert s._page is not None

    async def add_action(action: dict[str, Any]) -> None:
        s.actions.append(action)

    # Record top-frame navigations
    def on_nav(frame: Any) -> None:
        try:
            if getattr(frame, "parent_frame", None):
                return
            url = str(getattr(frame, "url", "") or "")
            if url:
                s.url = url
                s.actions.append({"type": "navigate", "url": url})
        except Exception:
            return

    s._page.on("framenavigated", on_nav)

    # Expose bindings for JS event capture
    await s._page.expose_binding(  # type: ignore[attr-defined]
        "__silkweb_record_action",
        lambda _source, payload: asyncio.create_task(add_action(dict(payload))),
    )

    # Inject event listeners for clicks and input changes
    await s._page.add_init_script(
        """
        (() => {
          const send = (payload) => {
            try { window.__silkweb_record_action(payload); } catch (e) {}
          };

          document.addEventListener('click', (e) => {
            const el = e.target;
            if (!el) return;
            const sel = el.id ? ('#' + el.id) : (el.getAttribute && el.getAttribute('name') ? ('[name=\"' + el.getAttribute('name') + '\"]') : el.tagName.toLowerCase());
            send({type: 'click', selector: sel, ts: Date.now()});
          }, true);

          document.addEventListener('input', (e) => {
            const el = e.target;
            if (!el) return;
            const tag = (el.tagName || '').toLowerCase();
            if (tag !== 'input' && tag !== 'textarea') return;
            const sel = el.id ? ('#' + el.id) : (el.getAttribute('name') ? ('[name=\"' + el.getAttribute('name') + '\"]') : tag);
            send({type: 'fill', selector: sel, value: el.value, ts: Date.now()});
          }, true);
        })();
        """
    )

    # Wait until user closes browser window (best-effort)
    with contextlib.suppress(Exception):
        await s._page.wait_for_event("close")  # type: ignore[attr-defined]

    s.created_at = datetime.now(tz=timezone.utc).isoformat()
    await s.save()
    await s.close()
    return s

replay_session async

replay_session(name: str) -> SilkSession

Replay a Playwright session by name (see :func:record) in headless mode.

Unlike :func:silkweb.replay, this does not load replay_dir HTML snapshots; it uses the session JSON written by record.

Source code in silkweb/session/recorder.py
async def replay(name: str) -> SilkSession:
    """
    Replay a **Playwright** session by ``name`` (see :func:`record`) in headless mode.

    Unlike :func:`silkweb.replay`, this does not load ``replay_dir`` HTML snapshots;
    it uses the session JSON written by ``record``.
    """
    s = SilkSession.load(name)
    s._check_cookie_expiry()
    await s._ensure_browser(headless=True)
    assert s._page is not None

    for act in s.actions or []:
        t = str(act.get("type") or "")
        if t == "navigate":
            url = str(act.get("url") or "")
            if url:
                await s._page.goto(url, wait_until="load", timeout=30_000)
                s.url = url
        elif t == "click":
            sel = str(act.get("selector") or "")
            if sel:
                await s._page.click(sel)
        elif t == "fill":
            sel = str(act.get("selector") or "")
            val = str(act.get("value") or "")
            if sel:
                await s._page.fill(sel, val)

    await s.save()
    await s.close()
    return s

SilkSession dataclass

SilkSession(name: str, url: str | None = None, created_at: str | None = None, ua: str | None = None, cookies: list[dict[str, Any]] | None = None, localStorage: dict[str, Any] | None = None, sessionStorage: dict[str, Any] | None = None, actions: list[dict[str, Any]] | None = None, _playwright: Any | None = None, _browser: Any | None = None, _context: Any | None = None, _page: Any | None = None)

Persisted Playwright session (cookies + localStorage + sessionStorage).

Storage format (JSON):

fetch async

fetch(url: str, *, tier: int = 2, proxy: str | None = None) -> Any

Navigate to a URL using a persisted session (Playwright).

tier and proxy are reserved for alignment with HTTP fetch tiers; the browser context currently uses global configure() defaults (user agent, etc.). Use Playwright-only proxy wiring in a future release if you need it here.

Source code in silkweb/session/session.py
async def fetch(self, url: str, *, tier: int = 2, proxy: str | None = None) -> Any:
    """
    Navigate to a URL using a persisted session (Playwright).

    ``tier`` and ``proxy`` are reserved for alignment with HTTP fetch tiers; the browser
    context currently uses global ``configure()`` defaults (user agent, etc.). Use
    Playwright-only proxy wiring in a future release if you need it here.
    """
    del tier, proxy  # reserved for API parity with HTTP fetch
    self._check_cookie_expiry()
    headless = True
    await self._ensure_browser(headless=headless)
    assert self._page is not None
    self.url = url
    await self._page.goto(url, wait_until="load", timeout=30_000)
    return self._page

save async

save() -> None

Serialize cookies + localStorage + sessionStorage to disk.

Source code in silkweb/session/session.py
async def save(self) -> None:
    """
    Serialize cookies + localStorage + sessionStorage to disk.
    """
    if self._context is None or self._page is None:
        # Nothing to save yet; still write minimal metadata.
        payload = {
            "cookies": self.cookies or [],
            "localStorage": self.localStorage or {},
            "sessionStorage": self.sessionStorage or {},
            "url": self.url,
            "created_at": self.created_at,
            "ua": self.ua,
            "actions": self.actions or [],
        }
        with open(self.path, "w", encoding="utf-8") as f:
            json.dump(payload, f, ensure_ascii=False, indent=2, default=str)
        return

    self.cookies = await self._context.cookies()
    try:
        self.localStorage = await self._page.evaluate(
            "() => Object.fromEntries(Object.entries(localStorage))"
        )
    except Exception:
        self.localStorage = self.localStorage or {}
    try:
        self.sessionStorage = await self._page.evaluate(
            "() => Object.fromEntries(Object.entries(sessionStorage))"
        )
    except Exception:
        self.sessionStorage = self.sessionStorage or {}

    payload = {
        "cookies": self.cookies or [],
        "localStorage": self.localStorage or {},
        "sessionStorage": self.sessionStorage or {},
        "url": self.url,
        "created_at": self.created_at,
        "ua": self.ua,
        "actions": self.actions or [],
    }
    with open(self.path, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=False, indent=2, default=str)

Change watching

watch

Bundled recipes

The silkweb.recipes object is a RecipeRegistry loaded from built-in YAML recipes:

RecipeRegistry

RecipeRegistry(*, directory: str | None = None)
Source code in silkweb/recipes/registry.py
def __init__(self, *, directory: str | None = None) -> None:
    self.directory = directory or _recipes_dir()
    self._recipes: dict[str, Recipe] = {}
    self.reload()

Pre-fetched HTML

When you already have HTML (no network fetch), you can run the same pipelines against a string:

ask_from_html

ask_from_html(url: str, html: str, *, prompt: str, **kwargs: Any)
Source code in silkweb/__init__.py
def ask_from_html(url: str, html: str, *, prompt: str, **kwargs: Any):
    return _run_sync(_ask_from_html(url, html, prompt=prompt, **kwargs))

extract_from_html

extract_from_html(url: str, html: str, *, schema: Any, prompt: str, **kwargs: Any)

Sync wrapper around async_extract_from_html (same return contract as extract).

Source code in silkweb/__init__.py
def extract_from_html(url: str, html: str, *, schema: Any, prompt: str, **kwargs: Any):
    """Sync wrapper around `async_extract_from_html` (same return contract as `extract`)."""
    return _run_sync(async_extract_from_html(url, html, schema=schema, prompt=prompt, **kwargs))

query_from_html

query_from_html(url: str, html: str, *, silkql_string: str, **kwargs: Any)

Sync SilkQL on existing HTML. Same pipeline as :func:async_query for a single page; see :func:async_query for options.

Source code in silkweb/__init__.py
def query_from_html(url: str, html: str, *, silkql_string: str, **kwargs: Any):
    """Sync SilkQL on existing HTML. Same pipeline as :func:`async_query` for a single page; see :func:`async_query` for options."""
    return _run_sync(_query_from_html(url, html, silkql_string=silkql_string, **kwargs))

Configuration

get_config

get_config() -> SilkwebConfig
Source code in silkweb/config.py
def get_config() -> SilkwebConfig:
    return _CONFIG

configure

configure(**kwargs: Any) -> SilkwebConfig

Update global Silkweb configuration.

Known fields are set on :class:SilkwebConfig; unknown keys go into extra.

When environment variable SILKWEB_STRICT_CONFIG is 1 / true / yes, unknown top-level keys raise :class:SilkwebConfigError instead of being stored in extra (helps catch typos like configure(timeouts=30)).

Source code in silkweb/config.py
def configure(**kwargs: Any) -> SilkwebConfig:
    """
    Update global Silkweb configuration.

    Known fields are set on :class:`SilkwebConfig`; unknown keys go into ``extra``.

    When environment variable ``SILKWEB_STRICT_CONFIG`` is ``1`` / ``true`` / ``yes``,
    unknown **top-level** keys raise :class:`SilkwebConfigError` instead of being stored
    in ``extra`` (helps catch typos like ``configure(timeouts=30)``).
    """
    strict = os.environ.get("SILKWEB_STRICT_CONFIG", "").strip().lower() in ("1", "true", "yes")
    for key, value in kwargs.items():
        if hasattr(_CONFIG, key):
            setattr(_CONFIG, key, value)
        else:
            if strict:
                raise SilkwebConfigError(
                    message=f"Unknown SilkwebConfig field {key!r}. "
                    f"Set SILKWEB_STRICT_CONFIG=0 or use `extra` via supported keys only.",
                    key=key,
                    value=value,
                )
            _CONFIG.extra[key] = value
    return _CONFIG

Unknown configure(...) keys normally go into SilkwebConfig.extra. With environment variable SILKWEB_STRICT_CONFIG set to 1, true, or yes, unknown top-level keys raise SilkwebConfigError instead (helps catch typos).

Session errors

SilkwebSessionError dataclass

SilkwebSessionError(message: str = 'Silkweb error', context: dict[str, Any] | None = None, name: str | None = None)

Bases: SilkwebError

Base error for session/recording failures.

SilkwebSessionExpiredError dataclass

SilkwebSessionExpiredError(message: str = 'Silkweb error', context: dict[str, Any] | None = None, name: str | None = None, expired_cookies: list[str] | None = None)

Bases: SilkwebSessionError

Session has expired (auth cookies likely stale).