diff options
| author | Kevin King <[email protected]> | 2025-10-28 19:32:45 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-28 18:32:45 -0500 |
| commit | 0e60f666043910afb96e9de2f84b0b8a68c7e4d6 (patch) | |
| tree | 6ca20af712e2faca6262f029d6d8499c9888eb50 /packages/sdk/python/src/opencode_ai/extras.py | |
| parent | fc8db6cdf9cb81e29c5dda69c8646aa52e453a9c (diff) | |
| download | opencode-0e60f666043910afb96e9de2f84b0b8a68c7e4d6.tar.gz opencode-0e60f666043910afb96e9de2f84b0b8a68c7e4d6.zip | |
ignore: python sdk (#2779)
Co-authored-by: Aiden Cline <[email protected]>
Diffstat (limited to 'packages/sdk/python/src/opencode_ai/extras.py')
| -rw-r--r-- | packages/sdk/python/src/opencode_ai/extras.py | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/packages/sdk/python/src/opencode_ai/extras.py b/packages/sdk/python/src/opencode_ai/extras.py new file mode 100644 index 000000000..1a9110196 --- /dev/null +++ b/packages/sdk/python/src/opencode_ai/extras.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +import time +from typing import AsyncIterator, Dict, Iterator, Optional + +import httpx + +from .api.default import ( + app_agents, + command_list, + config_get, + config_providers, + file_status, + path_get, + project_current, + project_list, + session_list, + tool_ids, +) +from .client import Client +from .types import UNSET, Unset + + +class OpenCodeClient: + """High-level convenience wrapper around the generated Client. + + Provides sensible defaults and a couple of helper methods, with optional retries. + """ + + def __init__( + self, + base_url: str = "http://localhost:4096", + *, + headers: Optional[Dict[str, str]] = None, + timeout: Optional[float] = None, + verify_ssl: bool | str | httpx.URLTypes | None = True, + token: Optional[str] = None, + auth_header_name: str = "Authorization", + auth_prefix: str = "Bearer", + retries: int = 0, + backoff_factor: float = 0.5, + status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504), + ) -> None: + httpx_timeout = None if timeout is None else httpx.Timeout(timeout) + all_headers = dict(headers or {}) + if token: + all_headers[auth_header_name] = f"{auth_prefix} {token}".strip() + self._client = Client( + base_url=base_url, + headers=all_headers, + timeout=httpx_timeout, + verify_ssl=verify_ssl if isinstance(verify_ssl, bool) else True, + ) + self._retries = max(0, int(retries)) + self._backoff = float(backoff_factor) + self._status_forcelist = set(status_forcelist) + + @property + def client(self) -> Client: + return self._client + + # ---- Internal retry helper ---- + + def _call_with_retries(self, fn, *args, **kwargs): + attempt = 0 + while True: + try: + return fn(*args, **kwargs) + except httpx.RequestError: + pass + except httpx.HTTPStatusError as e: + if e.response is None or e.response.status_code not in self._status_forcelist: + raise + if attempt >= self._retries: + # re-raise last exception if we have one + raise + sleep = self._backoff * (2**attempt) + time.sleep(sleep) + attempt += 1 + + # ---- Convenience wrappers over generated endpoints ---- + + def list_sessions(self, *, directory: str | Unset = UNSET): + """Return sessions in the current project. + + Wraps GET /session. Pass `directory` to target a specific project/directory if needed. + """ + return self._call_with_retries(session_list.sync, client=self._client, directory=directory) + + def get_config(self, *, directory: str | Unset = UNSET): + """Return opencode configuration for the current project (GET /config).""" + return self._call_with_retries(config_get.sync, client=self._client, directory=directory) + + def list_agents(self, *, directory: str | Unset = UNSET): + """List configured agents (GET /agent).""" + return self._call_with_retries(app_agents.sync, client=self._client, directory=directory) + + def list_projects(self, *, directory: str | Unset = UNSET): + """List known projects (GET /project).""" + return self._call_with_retries(project_list.sync, client=self._client, directory=directory) + + def current_project(self, *, directory: str | Unset = UNSET): + """Return current project (GET /project/current).""" + return self._call_with_retries(project_current.sync, client=self._client, directory=directory) + + def file_status(self, *, directory: str | Unset = UNSET): + """Return file status list (GET /file/status).""" + return self._call_with_retries(file_status.sync, client=self._client, directory=directory) + + def get_path(self, *, directory: str | Unset = UNSET): + """Return opencode path info (GET /path).""" + return self._call_with_retries(path_get.sync, client=self._client, directory=directory) + + def config_providers(self, *, directory: str | Unset = UNSET): + """Return configured providers (GET /config/providers).""" + return self._call_with_retries(config_providers.sync, client=self._client, directory=directory) + + def tool_ids(self, *, directory: str | Unset = UNSET): + """Return tool identifiers for a provider/model pair (GET /experimental/tool).""" + return self._call_with_retries(tool_ids.sync, client=self._client, directory=directory) + + def list_commands(self, *, directory: str | Unset = UNSET): + """List commands (GET /command).""" + return self._call_with_retries(command_list.sync, client=self._client, directory=directory) + + # ---- Server-Sent Events (SSE) streaming ---- + + def subscribe_events(self, *, directory: str | Unset = UNSET) -> Iterator[dict]: + """Subscribe to /event SSE endpoint and yield parsed JSON events. + + This is a blocking generator which yields one event dict per message. + """ + client = self._client.get_httpx_client() + params: dict[str, str] = {} + if directory is not UNSET and directory is not None: + params["directory"] = str(directory) + with client.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r: + r.raise_for_status() + buf = "" + for line_bytes in r.iter_lines(): + line = line_bytes.decode("utf-8") if isinstance(line_bytes, (bytes, bytearray)) else str(line_bytes) + if line.startswith(":"): + # comment/heartbeat + continue + if line == "": + if buf: + # end of event + for part in buf.split("\n"): + if part.startswith("data:"): + data = part[5:].strip() + if data: + try: + yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined] + except Exception: + # fall back: skip malformed + pass + buf = "" + continue + buf += line + "\n" + + async def subscribe_events_async(self, *, directory: str | Unset = UNSET) -> AsyncIterator[dict]: + """Async variant of subscribe_events using httpx.AsyncClient.""" + aclient = self._client.get_async_httpx_client() + params: dict[str, str] = {} + if directory is not UNSET and directory is not None: + params["directory"] = str(directory) + async with aclient.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r: + r.raise_for_status() + buf = "" + async for line_bytes in r.aiter_lines(): + line = line_bytes + if line.startswith(":"): + continue + if line == "": + if buf: + for part in buf.split("\n"): + if part.startswith("data:"): + data = part[5:].strip() + if data: + try: + yield httpx._models.jsonlib.loads(data) # type: ignore[attr-defined] + except Exception: + pass + buf = "" + continue + buf += line + "\n" |
