Bases: Thread
Establishes an initial connection, then tries a few times to reconnect.
After that, fail and optionally run function on failure.
Source code in client/ayon_comfyui/api/ws_client.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 | class WSClientThread(Thread):
"""Establishes an initial connection, then tries a few times to reconnect.
After that, fail and optionally run function on failure.
"""
def __init__( # noqa: D107
self,
hostname: str = "localhost",
port: int | str = 55055,
use_https: bool = False, # noqa: FBT001, FBT002
retries: int = 3,
retry_interval: float = 5.0,
qtthread: QThread_interface = None,
):
self._host = hostname
self._port = port
self._endpoint = "ws"
self._https = use_https
self._url = f"ws{'s' if self._https else ''}://{self._host}:{self._port}/{self._endpoint}"
if ":" in self._host:
# In case the hostname is a netloc (happens during remote)
self._url = f"ws{'s' if self._https else ''}://{self._host}/{self._endpoint}"
self._retries: int = 0
self._total_retries: int = retries
self._retry_interval: float = retry_interval
self._initial_connect = True
self._on_broken: Callable | None = None
self._broken = False
self.loop = None
self._shutdown_event = None
self._qt_thread = qtthread
super().__init__()
def run(self) -> None:
"""Method representing the thread's activity."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._shutdown_event = asyncio.Event()
self.loop.run_until_complete(self.async_run())
except BaseException as e: # noqa: BLE001
log.debug(f"Error during client run: {e}") # noqa: G004
async def async_run(self) -> None:
"""Connection, pinging logic."""
while not self._shutdown_event.is_set():
await self.ws_client(wait_forever=self._initial_connect)
self._initial_connect = False
if (
self._retries < self._total_retries
and not self._shutdown_event.is_set()
):
self._retries += 1
log.info(f"Retry {self._retries} / {self._total_retries}") # noqa : G004
await asyncio.sleep(self._retry_interval)
else:
break
# when loop has been broken, run on_broken
if self._on_broken:
self._on_broken()
# tell qt thread heart is broken </3
if not self._shutdown_event.is_set():
self._qt_thread.sig_onheartbeat_fail.emit()
self._broken = True
def register_on_connection_broken(
self, func: Callable, *args: list[Any], **kwargs: dict[str, Any]
) -> None:
"""Register a function to be called when the connection breaks."""
self._on_broken = partial(func, *args, **kwargs)
async def ws_client(self, wait_forever: bool = False) -> None: # noqa: FBT001, FBT002
"""Connect to ComfyUI server to maintain heartbeat.
Returns when the connection has ended.
"""
async def _establish_con() -> None:
"""Block until connected.
Raises:
aiohttp.ClientConnectorError
"""
async with (
aiohttp.ClientSession() as ses,
ses.ws_connect(self._url, heartbeat=5) as ws,
):
# Block until heartbeat fails
log.info("Established connection!")
async for _ in ws:
pass
if wait_forever:
while not self._shutdown_event.is_set():
try:
await _establish_con()
break
except aiohttp.ClientConnectorError:
log.info("Couldn't connect, trying again in 1 second.")
await asyncio.sleep(1)
elif not self._shutdown_event.is_set():
try:
await _establish_con()
except aiohttp.ClientConnectorError:
log.info("Couldn't connect.")
def stop(self) -> None:
"""Set flag to stop client connection."""
self.loop.call_soon_threadsafe(self._shutdown_event.set)
|
async_run() async
Connection, pinging logic.
Source code in client/ayon_comfyui/api/ws_client.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 | async def async_run(self) -> None:
"""Connection, pinging logic."""
while not self._shutdown_event.is_set():
await self.ws_client(wait_forever=self._initial_connect)
self._initial_connect = False
if (
self._retries < self._total_retries
and not self._shutdown_event.is_set()
):
self._retries += 1
log.info(f"Retry {self._retries} / {self._total_retries}") # noqa : G004
await asyncio.sleep(self._retry_interval)
else:
break
# when loop has been broken, run on_broken
if self._on_broken:
self._on_broken()
# tell qt thread heart is broken </3
if not self._shutdown_event.is_set():
self._qt_thread.sig_onheartbeat_fail.emit()
self._broken = True
|
register_on_connection_broken(func, *args, **kwargs)
Register a function to be called when the connection breaks.
Source code in client/ayon_comfyui/api/ws_client.py
| def register_on_connection_broken(
self, func: Callable, *args: list[Any], **kwargs: dict[str, Any]
) -> None:
"""Register a function to be called when the connection breaks."""
self._on_broken = partial(func, *args, **kwargs)
|
run()
Method representing the thread's activity.
Source code in client/ayon_comfyui/api/ws_client.py
58
59
60
61
62
63
64
65
66 | def run(self) -> None:
"""Method representing the thread's activity."""
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._shutdown_event = asyncio.Event()
self.loop.run_until_complete(self.async_run())
except BaseException as e: # noqa: BLE001
log.debug(f"Error during client run: {e}") # noqa: G004
|
stop()
Set flag to stop client connection.
Source code in client/ayon_comfyui/api/ws_client.py
| def stop(self) -> None:
"""Set flag to stop client connection."""
self.loop.call_soon_threadsafe(self._shutdown_event.set)
|
ws_client(wait_forever=False) async
Connect to ComfyUI server to maintain heartbeat.
Returns when the connection has ended.
Source code in client/ayon_comfyui/api/ws_client.py
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | async def ws_client(self, wait_forever: bool = False) -> None: # noqa: FBT001, FBT002
"""Connect to ComfyUI server to maintain heartbeat.
Returns when the connection has ended.
"""
async def _establish_con() -> None:
"""Block until connected.
Raises:
aiohttp.ClientConnectorError
"""
async with (
aiohttp.ClientSession() as ses,
ses.ws_connect(self._url, heartbeat=5) as ws,
):
# Block until heartbeat fails
log.info("Established connection!")
async for _ in ws:
pass
if wait_forever:
while not self._shutdown_event.is_set():
try:
await _establish_con()
break
except aiohttp.ClientConnectorError:
log.info("Couldn't connect, trying again in 1 second.")
await asyncio.sleep(1)
elif not self._shutdown_event.is_set():
try:
await _establish_con()
except aiohttp.ClientConnectorError:
log.info("Couldn't connect.")
|