diff options
| author | Dax Raad <[email protected]> | 2025-12-07 15:52:27 -0500 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2025-12-07 15:52:27 -0500 |
| commit | 13b2cf50aeb791f4b47786f95c8572a5caa75c7e (patch) | |
| tree | cea24f522f7e0de330475bf0be7803cdf1a33371 /packages/sdk/python/src/opencode_ai/extras.py | |
| parent | 93b0abfce9660c4da21026b138857bf408aa4cae (diff) | |
| download | opencode-13b2cf50aeb791f4b47786f95c8572a5caa75c7e.tar.gz opencode-13b2cf50aeb791f4b47786f95c8572a5caa75c7e.zip | |
remove outdated SDKs
Diffstat (limited to 'packages/sdk/python/src/opencode_ai/extras.py')
| -rw-r--r-- | packages/sdk/python/src/opencode_ai/extras.py | 186 |
1 files changed, 0 insertions, 186 deletions
diff --git a/packages/sdk/python/src/opencode_ai/extras.py b/packages/sdk/python/src/opencode_ai/extras.py deleted file mode 100644 index 1a9110196..000000000 --- a/packages/sdk/python/src/opencode_ai/extras.py +++ /dev/null @@ -1,186 +0,0 @@ -from __future__ import annotations - -import time -from typing import AsyncIterator, Dict, Iterator, Optional - -import httpx - -from .api.default import ( - app_agents, - command_list, - config_get, - config_providers, - file_status, - path_get, - project_current, - project_list, - session_list, - tool_ids, -) -from .client import Client -from .types import UNSET, Unset - - -class OpenCodeClient: - """High-level convenience wrapper around the generated Client. - - Provides sensible defaults and a couple of helper methods, with optional retries. - """ - - def __init__( - self, - base_url: str = "http://localhost:4096", - *, - headers: Optional[Dict[str, str]] = None, - timeout: Optional[float] = None, - verify_ssl: bool | str | httpx.URLTypes | None = True, - token: Optional[str] = None, - auth_header_name: str = "Authorization", - auth_prefix: str = "Bearer", - retries: int = 0, - backoff_factor: float = 0.5, - status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504), - ) -> None: - httpx_timeout = None if timeout is None else httpx.Timeout(timeout) - all_headers = dict(headers or {}) - if token: - all_headers[auth_header_name] = f"{auth_prefix} {token}".strip() - self._client = Client( - base_url=base_url, - headers=all_headers, - timeout=httpx_timeout, - verify_ssl=verify_ssl if isinstance(verify_ssl, bool) else True, - ) - self._retries = max(0, int(retries)) - self._backoff = float(backoff_factor) - self._status_forcelist = set(status_forcelist) - - @property - def client(self) -> Client: - return self._client - - # ---- Internal retry helper ---- - - def _call_with_retries(self, fn, *args, **kwargs): - attempt = 0 - while True: - try: - return fn(*args, **kwargs) - except httpx.RequestError: - pass - except httpx.HTTPStatusError as e: - if e.response is None or e.response.status_code not in self._status_forcelist: - raise - if attempt >= self._retries: - # re-raise last exception if we have one - raise - sleep = self._backoff * (2**attempt) - time.sleep(sleep) - attempt += 1 - - # ---- Convenience wrappers over generated endpoints ---- - - def list_sessions(self, *, directory: str | Unset = UNSET): - """Return sessions in the current project. - - Wraps GET /session. Pass `directory` to target a specific project/directory if needed. - """ - return self._call_with_retries(session_list.sync, client=self._client, directory=directory) - - def get_config(self, *, directory: str | Unset = UNSET): - """Return opencode configuration for the current project (GET /config).""" - return self._call_with_retries(config_get.sync, client=self._client, directory=directory) - - def list_agents(self, *, directory: str | Unset = UNSET): - """List configured agents (GET /agent).""" - return self._call_with_retries(app_agents.sync, client=self._client, directory=directory) - - def list_projects(self, *, directory: str | Unset = UNSET): - """List known projects (GET /project).""" - return self._call_with_retries(project_list.sync, client=self._client, directory=directory) - - def current_project(self, *, directory: str | Unset = UNSET): - """Return current project (GET /project/current).""" - return self._call_with_retries(project_current.sync, client=self._client, directory=directory) - - def file_status(self, *, directory: str | Unset = UNSET): - """Return file status list (GET /file/status).""" - return self._call_with_retries(file_status.sync, client=self._client, directory=directory) - - def get_path(self, *, directory: str | Unset = UNSET): - """Return opencode path info (GET /path).""" - return self._call_with_retries(path_get.sync, client=self._client, directory=directory) - - def config_providers(self, *, directory: str | Unset = UNSET): - """Return configured providers (GET /config/providers).""" - return self._call_with_retries(config_providers.sync, client=self._client, directory=directory) - - def tool_ids(self, *, directory: str | Unset = UNSET): - """Return tool identifiers for a provider/model pair (GET /experimental/tool).""" - return self._call_with_retries(tool_ids.sync, client=self._client, directory=directory) - - def list_commands(self, *, directory: str | Unset = UNSET): - """List commands (GET /command).""" - return self._call_with_retries(command_list.sync, client=self._client, directory=directory) - - # ---- Server-Sent Events (SSE) streaming ---- - - def subscribe_events(self, *, directory: str | Unset = UNSET) -> Iterator[dict]: - """Subscribe to /event SSE endpoint and yield parsed JSON events. - - This is a blocking generator which yields one event dict per message. - """ - client = self._client.get_httpx_client() - params: dict[str, str] = {} - if directory is not UNSET and directory is not None: - params["directory"] = str(directory) - with client.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r: - r.raise_for_status() - buf = "" - for line_bytes in r.iter_lines(): - line = line_bytes.decode("utf-8") if isinstance(line_bytes, (bytes, bytearray)) else str(line_bytes) - if line.startswith(":"): - # comment/heartbeat - continue - if line == "": - if buf: - # end of event - for part in buf.split("\n"): - if part.startswith("data:"): - data = part[5:].strip() - if data: - try: - yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined] - except Exception: - # fall back: skip malformed - pass - buf = "" - continue - buf += line + "\n" - - async def subscribe_events_async(self, *, directory: str | Unset = UNSET) -> AsyncIterator[dict]: - """Async variant of subscribe_events using httpx.AsyncClient.""" - aclient = self._client.get_async_httpx_client() - params: dict[str, str] = {} - if directory is not UNSET and directory is not None: - params["directory"] = str(directory) - async with aclient.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r: - r.raise_for_status() - buf = "" - async for line_bytes in r.aiter_lines(): - line = line_bytes - if line.startswith(":"): - continue - if line == "": - if buf: - for part in buf.split("\n"): - if part.startswith("data:"): - data = part[5:].strip() - if data: - try: - yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined] - except Exception: - pass - buf = "" - continue - buf += line + "\n" |
