Skip to content

qt_rpc

Using a QObject to manage the lifetime of RPC server.

QRPCManager

Bases: QObject, QThread_interface

Manage RPC async processes within QThread context.

Source code in client/ayon_comfyui/api/qt_rpc.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class QRPCManager(QObject, QThread_interface):
    """Manage RPC async processes within QThread context."""

    _main_tasks: ClassVar[Type[deque]] = deque()
    _stored_man: ClassVar[QRPCManager] = None

    sig_onheartbeat_fail = Signal()
    sig_onfrontendcon_fail = Signal()

    def __init__(  # noqa :PLR0913
        self,
        *,
        parent: QObject = None,
        client_hostname: str = "localhost",
        client_port: int | str = 55055,
        server_port: int | str = 55056,
        static_port: int | str = 5454,
        comfy_url: str = "http://127.0.0.1:8818",
        use_https: bool = False,
    ) -> None:
        """Construct QRPCManager.

        For a local session, the client hostname will always be hostname.

        The client port is the port we use to connect to the backend,
        That's where this plugin acts as a "client"
        The server port is the port we open to the webui.
        """
        # Sneak constructed object into class definition.
        # Semi-Singleton behavior.
        self.__class__._stored_man = self  # noqa: SLF001
        log.info("within QRPCManager init")
        super().__init__(parent=parent)

        self._server_thread = RPCServerThread(
            server_port, self, https=use_https
        )
        self._static_thread = StaticServerThread(
            port=static_port, comfy_url=comfy_url
        )
        self._ws_client_thread = WSClientThread(
            hostname=client_hostname,
            port=client_port,
            use_https=use_https,
            qtthread=self,
        )

        self._process: Popen = None

        # Stub just exists by itself.
        self._stub = RPCStub()
        self._comfy_url = comfy_url

        # Define QTimers to process the tasks
        loop_timer = QTimer()
        loop_timer.setInterval(100)
        loop_timer.timeout.connect(self.process_scheduled_tasks)

        self.sig_onheartbeat_fail.connect(self.handle_failed_heartbeat)
        self.sig_onfrontendcon_fail.connect(self.handle_failed_tab)

        self._loop_timer = loop_timer

    @property
    def server_thread(self) -> RPCServerThread:
        """Get Server Thread for RPC."""
        return self._server_thread

    @property
    def static_server_thread(self) -> StaticServerThread:
        """Get Static Server Thread for RPC."""
        return self._static_thread

    @property
    def ws_pulse_client(self) -> WSClientThread:
        """Get WS client to pulse backend."""
        return self._ws_client_thread

    def schedule(self, function: Callable, *args, **kwargs) -> None:  # noqa: ANN002, ANN003
        """Schedule function in qt thread."""
        log.info("scheduled function to qt thread")
        f = partial(function, *args, **kwargs)
        self._main_tasks.append(f)

    def process_scheduled_tasks(self) -> None:
        """QTimer scheduled process to run tasks stored in task queue."""
        # capture current length of queue
        current_tasks = len(self._main_tasks)

        if current_tasks > 0:
            log.info("processing tasks...")
        # move tasks out of list
        # (generator expression later made concrete during filter)
        task_list = (self._main_tasks.popleft() for _ in range(current_tasks))
        task_list = [safe_partial(t) for t in task_list if t]  # filter Falsey

        results = [task() for task in task_list]

        [
            log.debug(f"Error encountered in scheduled task: {res.error}")  # noqa: G004
            for res in results
            if res.is_err
        ]

    def attach_comfyui_process(self, process: Popen) -> None:
        """Set internal process to potentially opened comfyui subprocess."""
        self._process = process

    def start_server(self) -> None:
        """Wraps server start logic in thread, and inject settings.

        The procedure is as follows:

        Firstly, a simple webserver hosting the 'static' content starts,
        meaning the <iframe> in which to embed ComfyUI as well as the
        javascript bridging websocket RPC and iframe RPC.

        Secondly, the internal websocket server starts. This server
        is responsible for sending commands over to ComfyUI through
        the proxy hosted by the static server.

        Next, we start a simple websocket client (no RPC),
        which keeps a heartbeat to the backend.

        Lastly we start the internal QTimer loop. This will
        pop calls from the call queue and execute them, so that we can
        properly execute stuff in the main qt context.
        """
        log.info("within QRPCManager start_server")
        # Statically hosted content for embedding the rest
        try:
            self._static_thread.start()
        except BaseException as e:  # noqa: BLE001
            log.debug(f"failure in static server thread start: {e}")  # noqa: G004
        log.info("static <iframe> server thread supposedly started")

        # RPC websocket server on http://localhost
        try:
            self._server_thread.setup_server()
            self._server_thread.start()
        except BaseException as e:  # noqa: BLE001
            log.debug(f"failure in websocketserver thread start: {e}")  # noqa: G004
        log.info("websocketserver thread supposedly started")

        # Pulse websocket to backend
        try:
            self._ws_client_thread.start()
        except BaseException as e:  # noqa: BLE001
            log.debug(f"failure in ws client thread start {e}")  # noqa: G004
        log.info("ws client pulse to backend thread supposedly started")

        # QTimer loop
        self._loop_timer.start()
        log.info("Started QT loop")

    def handle_failed_heartbeat(self) -> None:
        """Handle signal received when a backend failed."""
        log.error("HEARTBEAT FAILED! Stopping services...")
        log.info("Stopping static hosted site...")
        self.static_server_thread.stop()
        self.static_server_thread.join()
        log.info("Static hosted site stopped.")

        log.info("Stopping Websocket RPC Server...")
        self.server_thread.stop()
        self.server_thread.join()
        log.info("Websocket RPC Server stopped.")

        self._loop_timer.stop()
        QCoreApplication.exit(0)

    def handle_failed_tab(self) -> None:
        """Handle signal received when a tab is closed."""
        log.error("Tab closed! Stopping services...")
        log.info("Stopping heartbeat websocket client...")
        self.ws_pulse_client.stop()
        log.info("Heartbeat websocket client stopped.")

        log.info("Stopping Websocket RPC Server...")
        self.server_thread.stop()
        self.server_thread.join()
        log.info("Websocket RPC Server stopped.")

        log.info("Stopping static hosted site...")
        self.static_server_thread.stop()
        self.static_server_thread.join()
        log.info("Static hosted site stopped.")
        # For subprocess (not here in heartbeat.)
        if self._process:
            log.info("Killing ComfyUI process...")
            # TODO(@anyone): test other platforms, pls
            if sys.platform != "win32":
                self._process.kill()
                self._process.wait()
            else:
                # On windows, forcibly kill entire task tree.
                subprocess.run(
                    ["taskkill", "/PID", str(self._process.pid), "/T", "/F"],  # noqa: S607
                    check=False,
                )

        self._loop_timer.stop()
        QCoreApplication.exit(0)

    @classmethod
    def get_instance(cls) -> QRPCManager:
        """Service Locator-like pull Singleton from class registry.

        Returns:
            class contained QRPCManager instance.
        """
        return cls._stored_man

    @property
    def stub(self) -> RPCStub:
        """Return stored stub."""
        return self._stub

    @property
    def comfy_url(self) -> str:
        """Return UI url (frontend)."""
        return self._comfy_url

comfy_url property

Return UI url (frontend).

server_thread property

Get Server Thread for RPC.

static_server_thread property

Get Static Server Thread for RPC.

stub property

Return stored stub.

ws_pulse_client property

Get WS client to pulse backend.

__init__(*, parent=None, client_hostname='localhost', client_port=55055, server_port=55056, static_port=5454, comfy_url='http://127.0.0.1:8818', use_https=False)

Construct QRPCManager.

For a local session, the client hostname will always be hostname.

The client port is the port we use to connect to the backend, That's where this plugin acts as a "client" The server port is the port we open to the webui.

Source code in client/ayon_comfyui/api/qt_rpc.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(  # noqa :PLR0913
    self,
    *,
    parent: QObject = None,
    client_hostname: str = "localhost",
    client_port: int | str = 55055,
    server_port: int | str = 55056,
    static_port: int | str = 5454,
    comfy_url: str = "http://127.0.0.1:8818",
    use_https: bool = False,
) -> None:
    """Construct QRPCManager.

    For a local session, the client hostname will always be hostname.

    The client port is the port we use to connect to the backend,
    That's where this plugin acts as a "client"
    The server port is the port we open to the webui.
    """
    # Sneak constructed object into class definition.
    # Semi-Singleton behavior.
    self.__class__._stored_man = self  # noqa: SLF001
    log.info("within QRPCManager init")
    super().__init__(parent=parent)

    self._server_thread = RPCServerThread(
        server_port, self, https=use_https
    )
    self._static_thread = StaticServerThread(
        port=static_port, comfy_url=comfy_url
    )
    self._ws_client_thread = WSClientThread(
        hostname=client_hostname,
        port=client_port,
        use_https=use_https,
        qtthread=self,
    )

    self._process: Popen = None

    # Stub just exists by itself.
    self._stub = RPCStub()
    self._comfy_url = comfy_url

    # Define QTimers to process the tasks
    loop_timer = QTimer()
    loop_timer.setInterval(100)
    loop_timer.timeout.connect(self.process_scheduled_tasks)

    self.sig_onheartbeat_fail.connect(self.handle_failed_heartbeat)
    self.sig_onfrontendcon_fail.connect(self.handle_failed_tab)

    self._loop_timer = loop_timer

attach_comfyui_process(process)

Set internal process to potentially opened comfyui subprocess.

Source code in client/ayon_comfyui/api/qt_rpc.py
134
135
136
def attach_comfyui_process(self, process: Popen) -> None:
    """Set internal process to potentially opened comfyui subprocess."""
    self._process = process

get_instance() classmethod

Service Locator-like pull Singleton from class registry.

Returns:

Type Description
QRPCManager

class contained QRPCManager instance.

Source code in client/ayon_comfyui/api/qt_rpc.py
234
235
236
237
238
239
240
241
@classmethod
def get_instance(cls) -> QRPCManager:
    """Service Locator-like pull Singleton from class registry.

    Returns:
        class contained QRPCManager instance.
    """
    return cls._stored_man

handle_failed_heartbeat()

Handle signal received when a backend failed.

Source code in client/ayon_comfyui/api/qt_rpc.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def handle_failed_heartbeat(self) -> None:
    """Handle signal received when a backend failed."""
    log.error("HEARTBEAT FAILED! Stopping services...")
    log.info("Stopping static hosted site...")
    self.static_server_thread.stop()
    self.static_server_thread.join()
    log.info("Static hosted site stopped.")

    log.info("Stopping Websocket RPC Server...")
    self.server_thread.stop()
    self.server_thread.join()
    log.info("Websocket RPC Server stopped.")

    self._loop_timer.stop()
    QCoreApplication.exit(0)

handle_failed_tab()

Handle signal received when a tab is closed.

Source code in client/ayon_comfyui/api/qt_rpc.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def handle_failed_tab(self) -> None:
    """Handle signal received when a tab is closed."""
    log.error("Tab closed! Stopping services...")
    log.info("Stopping heartbeat websocket client...")
    self.ws_pulse_client.stop()
    log.info("Heartbeat websocket client stopped.")

    log.info("Stopping Websocket RPC Server...")
    self.server_thread.stop()
    self.server_thread.join()
    log.info("Websocket RPC Server stopped.")

    log.info("Stopping static hosted site...")
    self.static_server_thread.stop()
    self.static_server_thread.join()
    log.info("Static hosted site stopped.")
    # For subprocess (not here in heartbeat.)
    if self._process:
        log.info("Killing ComfyUI process...")
        # TODO(@anyone): test other platforms, pls
        if sys.platform != "win32":
            self._process.kill()
            self._process.wait()
        else:
            # On windows, forcibly kill entire task tree.
            subprocess.run(
                ["taskkill", "/PID", str(self._process.pid), "/T", "/F"],  # noqa: S607
                check=False,
            )

    self._loop_timer.stop()
    QCoreApplication.exit(0)

process_scheduled_tasks()

QTimer scheduled process to run tasks stored in task queue.

Source code in client/ayon_comfyui/api/qt_rpc.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def process_scheduled_tasks(self) -> None:
    """QTimer scheduled process to run tasks stored in task queue."""
    # capture current length of queue
    current_tasks = len(self._main_tasks)

    if current_tasks > 0:
        log.info("processing tasks...")
    # move tasks out of list
    # (generator expression later made concrete during filter)
    task_list = (self._main_tasks.popleft() for _ in range(current_tasks))
    task_list = [safe_partial(t) for t in task_list if t]  # filter Falsey

    results = [task() for task in task_list]

    [
        log.debug(f"Error encountered in scheduled task: {res.error}")  # noqa: G004
        for res in results
        if res.is_err
    ]

schedule(function, *args, **kwargs)

Schedule function in qt thread.

Source code in client/ayon_comfyui/api/qt_rpc.py
108
109
110
111
112
def schedule(self, function: Callable, *args, **kwargs) -> None:  # noqa: ANN002, ANN003
    """Schedule function in qt thread."""
    log.info("scheduled function to qt thread")
    f = partial(function, *args, **kwargs)
    self._main_tasks.append(f)

start_server()

Wraps server start logic in thread, and inject settings.

The procedure is as follows:

Firstly, a simple webserver hosting the 'static' content starts, meaning the