summaryrefslogtreecommitdiffhomepage
path: root/packages/sdk/python/templates/extras.py
blob: f34bfdee6f5b10ba1b483db8f5f3ee5652f8f009 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from __future__ import annotations

from typing import AsyncIterator, Dict, Iterator, Optional

import httpx

from opencode_ai.api.default import config_get, session_list
from opencode_ai.client import Client
from opencode_ai.types import UNSET, Unset


class OpenCodeClient:
    """High-level convenience wrapper around the generated Client.

    Provides sensible defaults and a couple of helper methods.
    """

    def __init__(
        self,
        base_url: str = "http://localhost:4096",
        *,
        headers: Optional[Dict[str, str]] = None,
        timeout: Optional[float] = None,
        verify_ssl: bool | str | httpx.URLTypes | None = True,
    ) -> None:
        httpx_timeout = None if timeout is None else httpx.Timeout(timeout)
        self._client = Client(
            base_url=base_url,
            headers=headers or {},
            timeout=httpx_timeout,
            verify_ssl=verify_ssl if isinstance(verify_ssl, bool) else True,
        )

    @property
    def client(self) -> Client:
        return self._client

    # ---- Convenience wrappers over generated endpoints ----

    def list_sessions(self, *, directory: str | Unset = UNSET):
        return session_list.sync(client=self._client, directory=directory)

    def get_config(self, *, directory: str | Unset = UNSET):
        return config_get.sync(client=self._client, directory=directory)

    # ---- Server-Sent Events (SSE) streaming ----

    def subscribe_events(self, *, directory: str | Unset = UNSET) -> Iterator[dict]:
        """Subscribe to /event SSE endpoint and yield parsed JSON events.

        This is a blocking generator which yields one event dict per message.
        """
        client = self._client.get_httpx_client()
        params: dict[str, str] = {}
        if directory is not UNSET and directory is not None:
            params["directory"] = str(directory)
        with client.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
            r.raise_for_status()
            buf = ""
            for line_bytes in r.iter_lines():
                line = line_bytes.decode("utf-8") if isinstance(line_bytes, (bytes, bytearray)) else str(line_bytes)
                if line.startswith(":"):
                    # comment/heartbeat
                    continue
                if line == "":
                    if buf:
                        # end of event
                        for part in buf.split("\n"):
                            if part.startswith("data:"):
                                data = part[5:].strip()
                                if data:
                                    try:
                                        yield httpx._models.jsonlib.loads(data)  # type: ignore[attr-defined]
                                    except Exception:
                                        # fall back: skip malformed
                                        pass
                        buf = ""
                    continue
                buf += line + "\n"

    async def subscribe_events_async(self, *, directory: str | Unset = UNSET) -> AsyncIterator[dict]:
        """Async variant of subscribe_events using httpx.AsyncClient."""
        aclient = self._client.get_async_httpx_client()
        params: dict[str, str] = {}
        if directory is not UNSET and directory is not None:
            params["directory"] = str(directory)
        async with aclient.stream("GET", "/event", headers={"Accept": "text/event-stream"}, params=params) as r:
            r.raise_for_status()
            buf = ""
            async for line_bytes in r.aiter_lines():
                line = line_bytes
                if line.startswith(":"):
                    continue
                if line == "":
                    if buf:
                        for part in buf.split("\n"):
                            if part.startswith("data:"):
                                data = part[5:].strip()
                                if data:
                                    try:
                                        yield httpx._models.jsonlib.loads(data)  # type: ignore[attr-defined]
                                    except Exception:
                                        pass
                        buf = ""
                    continue
                buf += line + "\n"