Skip to content

ws_client

WSClientThread

Bases: Thread

Establishes an initial connection, then tries a few times to reconnect.

After that, fail and optionally run function on failure.

Source code in client/ayon_comfyui/api/ws_client.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class WSClientThread(Thread):
    """Establishes an initial connection, then tries a few times to reconnect.

    After that, fail and optionally run function on failure.
    """

    def __init__(  # noqa: D107
        self,
        hostname: str = "localhost",
        port: int | str = 55055,
        use_https: bool = False,  # noqa: FBT001, FBT002
        retries: int = 3,
        retry_interval: float = 5.0,
        qtthread: QThread_interface = None,
    ):
        self._host = hostname
        self._port = port
        self._endpoint = "ws"
        self._https = use_https
        self._url = f"ws{'s' if self._https else ''}://{self._host}:{self._port}/{self._endpoint}"
        if ":" in self._host:
            # In case the hostname is a netloc (happens during remote)
            self._url = f"ws{'s' if self._https else ''}://{self._host}/{self._endpoint}"

        self._retries: int = 0
        self._total_retries: int = retries
        self._retry_interval: float = retry_interval

        self._initial_connect = True
        self._on_broken: Callable | None = None
        self._broken = False

        self.loop = None
        self._shutdown_event = None

        self._qt_thread = qtthread

        super().__init__()

    def run(self) -> None:
        """Method representing the thread's activity."""
        try:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)
            self._shutdown_event = asyncio.Event()
            self.loop.run_until_complete(self.async_run())
        except BaseException as e:  # noqa: BLE001
            log.debug(f"Error during client run: {e}")  # noqa: G004

    async def async_run(self) -> None:
        """Connection, pinging logic."""
        while not self._shutdown_event.is_set():
            await self.ws_client(wait_forever=self._initial_connect)
            self._initial_connect = False
            if (
                self._retries < self._total_retries
                and not self._shutdown_event.is_set()
            ):
                self._retries += 1
                log.info(f"Retry {self._retries} / {self._total_retries}")  # noqa : G004
                await asyncio.sleep(self._retry_interval)
            else:
                break
        # when loop has been broken, run on_broken
        if self._on_broken:
            self._on_broken()

        # tell qt thread heart is broken </3
        if not self._shutdown_event.is_set():
            self._qt_thread.sig_onheartbeat_fail.emit()

        self._broken = True

    def register_on_connection_broken(
        self, func: Callable, *args: list[Any], **kwargs: dict[str, Any]
    ) -> None:
        """Register a function to be called when the connection breaks."""
        self._on_broken = partial(func, *args, **kwargs)

    async def ws_client(self, wait_forever: bool = False) -> None:  # noqa: FBT001, FBT002
        """Connect to ComfyUI server to maintain heartbeat.

        Returns when the connection has ended.
        """

        async def _establish_con() -> None:
            """Block until connected.

            Raises:
                aiohttp.ClientConnectorError
            """
            async with (
                aiohttp.ClientSession() as ses,
                ses.ws_connect(self._url, heartbeat=5) as ws,
            ):
                # Block until heartbeat fails
                log.info("Established connection!")
                async for _ in ws:
                    pass

        if wait_forever:
            while not self._shutdown_event.is_set():
                try:
                    await _establish_con()
                    break
                except aiohttp.ClientConnectorError:
                    log.info("Couldn't connect, trying again in 1 second.")
                    await asyncio.sleep(1)
        elif not self._shutdown_event.is_set():
            try:
                await _establish_con()
            except aiohttp.ClientConnectorError:
                log.info("Couldn't connect.")

    def stop(self) -> None:
        """Set flag to stop client connection."""
        self.loop.call_soon_threadsafe(self._shutdown_event.set)

async_run() async

Connection, pinging logic.

Source code in client/ayon_comfyui/api/ws_client.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async def async_run(self) -> None:
    """Connection, pinging logic."""
    while not self._shutdown_event.is_set():
        await self.ws_client(wait_forever=self._initial_connect)
        self._initial_connect = False
        if (
            self._retries < self._total_retries
            and not self._shutdown_event.is_set()
        ):
            self._retries += 1
            log.info(f"Retry {self._retries} / {self._total_retries}")  # noqa : G004
            await asyncio.sleep(self._retry_interval)
        else:
            break
    # when loop has been broken, run on_broken
    if self._on_broken:
        self._on_broken()

    # tell qt thread heart is broken </3
    if not self._shutdown_event.is_set():
        self._qt_thread.sig_onheartbeat_fail.emit()

    self._broken = True

register_on_connection_broken(func, *args, **kwargs)

Register a function to be called when the connection breaks.

Source code in client/ayon_comfyui/api/ws_client.py
92
93
94
95
96
def register_on_connection_broken(
    self, func: Callable, *args: list[Any], **kwargs: dict[str, Any]
) -> None:
    """Register a function to be called when the connection breaks."""
    self._on_broken = partial(func, *args, **kwargs)

run()

Method representing the thread's activity.

Source code in client/ayon_comfyui/api/ws_client.py
58
59
60
61
62
63
64
65
66
def run(self) -> None:
    """Method representing the thread's activity."""
    try:
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self._shutdown_event = asyncio.Event()
        self.loop.run_until_complete(self.async_run())
    except BaseException as e:  # noqa: BLE001
        log.debug(f"Error during client run: {e}")  # noqa: G004

stop()

Set flag to stop client connection.

Source code in client/ayon_comfyui/api/ws_client.py
133
134
135
def stop(self) -> None:
    """Set flag to stop client connection."""
    self.loop.call_soon_threadsafe(self._shutdown_event.set)

ws_client(wait_forever=False) async

Connect to ComfyUI server to maintain heartbeat.

Returns when the connection has ended.

Source code in client/ayon_comfyui/api/ws_client.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
async def ws_client(self, wait_forever: bool = False) -> None:  # noqa: FBT001, FBT002
    """Connect to ComfyUI server to maintain heartbeat.

    Returns when the connection has ended.
    """

    async def _establish_con() -> None:
        """Block until connected.

        Raises:
            aiohttp.ClientConnectorError
        """
        async with (
            aiohttp.ClientSession() as ses,
            ses.ws_connect(self._url, heartbeat=5) as ws,
        ):
            # Block until heartbeat fails
            log.info("Established connection!")
            async for _ in ws:
                pass

    if wait_forever:
        while not self._shutdown_event.is_set():
            try:
                await _establish_con()
                break
            except aiohttp.ClientConnectorError:
                log.info("Couldn't connect, trying again in 1 second.")
                await asyncio.sleep(1)
    elif not self._shutdown_event.is_set():
        try:
            await _establish_con()
        except aiohttp.ClientConnectorError:
            log.info("Couldn't connect.")