Skip to content

webserver

Webserver for communication with AfterEffects.

Aiohttp (Asyncio) based websocket server used for communication with host application.

This webserver is started in spawned Python process that opens DCC during its launch, waits for connection from DCC and handles communication going forward. Server is closed before Python process is killed.

WebServerThread

Bases: Thread

Listener for websocket rpc requests.

It would be probably better to "attach" this to main thread (as for example Harmony needs to run something on main thread), but currently it creates separate thread and separate asyncio event loop

Source code in client/ayon_aftereffects/api/webserver.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class WebServerThread(threading.Thread):
    """ Listener for websocket rpc requests.

        It would be probably better to "attach" this to main thread (as for
        example Harmony needs to run something on main thread), but currently
        it creates separate thread and separate asyncio event loop
    """
    def __init__(self, module, port):
        super().__init__(daemon=True)

        self.is_running = False
        self.port = port
        self.module = module
        self.loop = None
        self.runner = None
        self.site = None
        self.tasks = []

    def run(self):
        self.is_running = True

        try:
            log.info("Starting web server")
            self.loop = asyncio.new_event_loop()  # create new loop for thread
            asyncio.set_event_loop(self.loop)

            self.loop.run_until_complete(self.start_server())

            websocket_url = "ws://localhost:{}/ws".format(self.port)

            log.debug(
                "Running Websocket server on URL: \"{}\"".format(websocket_url)
            )

            asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
            self.loop.run_forever()
        except Exception:
            self.is_running = False
            log.warning(
                "Websocket Server service has failed", exc_info=True
            )
            raise
        finally:
            self.loop.close()  # optional

            self.is_running = False
            self.module.thread_stopped()
            log.info("Websocket server stopped")

    async def start_server(self):
        """ Starts runner and TCPsite """
        self.runner = web.AppRunner(self.module.app)
        await self.runner.setup()
        self.site = web.TCPSite(self.runner, 'localhost', self.port)
        await self.site.start()

    def stop(self):
        """Sets is_running flag to false, 'check_shutdown' shuts server down"""
        self.is_running = False

    async def check_shutdown(self):
        """ Future that is running and checks if server should be running
            periodically.
        """
        while self.is_running:
            while self.tasks:
                task = self.tasks.pop(0)
                log.debug("waiting for task {}".format(task))
                await task
                log.debug("returned value {}".format(task.result))

            await asyncio.sleep(0.5)

        log.debug("Starting shutdown")
        await self.site.stop()
        log.debug("Site stopped")
        await self.runner.cleanup()
        log.debug("Runner stopped")
        tasks = [task for task in asyncio.all_tasks() if
                 task is not asyncio.current_task()]
        list(map(lambda task: task.cancel(), tasks))  # cancel all the tasks
        results = await asyncio.gather(*tasks, return_exceptions=True)
        log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
        await self.loop.shutdown_asyncgens()
        # to really make sure everything else has time to stop
        await asyncio.sleep(0.07)
        self.loop.stop()

check_shutdown() async

Future that is running and checks if server should be running periodically.

Source code in client/ayon_aftereffects/api/webserver.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
async def check_shutdown(self):
    """ Future that is running and checks if server should be running
        periodically.
    """
    while self.is_running:
        while self.tasks:
            task = self.tasks.pop(0)
            log.debug("waiting for task {}".format(task))
            await task
            log.debug("returned value {}".format(task.result))

        await asyncio.sleep(0.5)

    log.debug("Starting shutdown")
    await self.site.stop()
    log.debug("Site stopped")
    await self.runner.cleanup()
    log.debug("Runner stopped")
    tasks = [task for task in asyncio.all_tasks() if
             task is not asyncio.current_task()]
    list(map(lambda task: task.cancel(), tasks))  # cancel all the tasks
    results = await asyncio.gather(*tasks, return_exceptions=True)
    log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
    await self.loop.shutdown_asyncgens()
    # to really make sure everything else has time to stop
    await asyncio.sleep(0.07)
    self.loop.stop()

start_server() async

Starts runner and TCPsite

Source code in client/ayon_aftereffects/api/webserver.py
266
267
268
269
270
271
async def start_server(self):
    """ Starts runner and TCPsite """
    self.runner = web.AppRunner(self.module.app)
    await self.runner.setup()
    self.site = web.TCPSite(self.runner, 'localhost', self.port)
    await self.site.start()

stop()

Sets is_running flag to false, 'check_shutdown' shuts server down

Source code in client/ayon_aftereffects/api/webserver.py
273
274
275
def stop(self):
    """Sets is_running flag to false, 'check_shutdown' shuts server down"""
    self.is_running = False

WebServerTool

Basic asynchronous websocket RPC server.

Source code in client/ayon_aftereffects/api/webserver.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class WebServerTool:
    """Basic asynchronous websocket RPC server."""
    _instance = None

    def __init__(self):
        WebServerTool._instance = self

        self.client = None
        self.handlers = {}
        self.on_stop_callbacks = []

        port = None
        host_name = "localhost"
        websocket_url = os.getenv("WEBSOCKET_URL")
        if websocket_url:
            parsed = urllib.parse.urlparse(websocket_url)
            port = parsed.port
            host_name = parsed.netloc.split(":")[0]
        if not port:
            port = 8098  # fallback

        self.port = port
        self.host_name = host_name

        self.app = web.Application()

        # add route with multiple methods for single "external app"
        self.webserver_thread = WebServerThread(self, self.port)

    def get_websocket_url(self):
        """Return websocket URL for the server from the host name and port."""
        return f"ws://{self.host_name}:{self.port}/ws/"

    def add_route(self, *args, **kwargs):
        self.app.router.add_route(*args, **kwargs)

    def add_static(self, *args, **kwargs):
        self.app.router.add_static(*args, **kwargs)

    def start_server(self):
        if self.webserver_thread and not self.webserver_thread.is_alive():
            self.webserver_thread.start()

    def stop_server(self):
        self.stop()

    async def send_context_change(self, host):
        """
            Calls running webserver to inform about context change

            Used when new PS/AE should be triggered,
            but one already running, without
            this publish would point to old context.
        """
        client = WSRPCClient(self.get_websocket_url(),
                             loop=asyncio.get_event_loop())
        await client.connect()

        context = get_global_context()
        project_name = context["project_name"]
        folder_path = context["folder_path"]
        task_name = context["task_name"]
        log.info("Sending context change to {}{}/{}".format(
            project_name, folder_path, task_name
        ))

        await client.call(
            '{}.set_context'.format(host),
            project=project_name,
            folder=folder_path,
            task=task_name
        )
        await client.close()

    def port_occupied(self, host_name, port):
        """
            Check if 'url' is already occupied.

            This could mean, that app is already running and we are trying open it
            again. In that case, use existing running webserver.
            Check here is easier than capturing exception from thread.
        """
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as con:
            result = con.connect_ex((host_name, port)) == 0

        if result:
            log.warning(f"Port {port} is already in use")
        return result

    def call_on_client(self, stub, method_name, **kwargs):
        """Run a single RPC on the current WebSocket client, with retry on connection errors.

        When the CEP extension is blocked by a long JSX operation (e.g. get_layers
        on a large PSD or save()), the WebSocket transport can close. This method
        retries after a delay so the extension can reconnect.

        Args:
            stub: Object with a .client property.
            method_name: RPC method name.
            **kwargs: Keyword arguments passed to client.call().

        Returns:
            Result of the RPC.

        Raises:
            ConnectionResetError, ConnectionError, OSError: After all retries failed.
        """
        last_exception = None
        for attempt in range(self._CALL_MAX_RETRIES):
            try:
                client = stub.client
                if client is None:
                    raise ConnectionError("No WebSocket client connected")
                log.debug(
                    f"websocket.call_on_client attempt {attempt + 1}/{self._CALL_MAX_RETRIES}",
                )

                if self.webserver_thread.loop:
                    future = asyncio.run_coroutine_threadsafe(
                        coro=client.call(method_name, **kwargs),
                        loop=self.webserver_thread.loop,
                    )
                    return future.result()
            except (ConnectionResetError, ConnectionError, OSError) as e:
                last_exception = e
                if attempt >= self._CALL_MAX_RETRIES - 1:
                    log.warning(
                        f"WebSocket call failed after {attempt + 1} attempt(s): {e}",
                        exc_info=True,
                    )
                    raise e
                log.warning(
                    f"WebSocket connection error (attempt {attempt + 1}/{self._CALL_MAX_RETRIES}), waiting for "
                    f"healthy client (up to {self._CALL_RETRY_DELAY}s): {e}",
                )
                self._wait_for_healthy_client(self._CALL_RETRY_DELAY)

        if last_exception is not None:
            raise last_exception

    @staticmethod
    def get_instance():
        if WebServerTool._instance is None:
            WebServerTool()
        return WebServerTool._instance

    @property
    def is_running(self):
        if not self.webserver_thread:
            return False
        return self.webserver_thread.is_running

    def stop(self):
        if not self.is_running:
            return
        try:
            log.debug("Stopping websocket server")
            self.webserver_thread.is_running = False
            self.webserver_thread.stop()
        except Exception:
            log.warning(
                "Error has happened during Killing websocket server",
                exc_info=True
            )

    def thread_stopped(self):
        for callback in self.on_stop_callbacks:
            callback()

    # Retry policy for WebSocket calls (internal; CEP reconnect is ~5s)
    _CALL_MAX_RETRIES = 3
    _CALL_RETRY_DELAY = 6.0

    def _wait_for_healthy_client(self, timeout: float):
        """Poll for a WebSocket client whose transport is not closed.

        Return when one is found or timeout (seconds) is reached.
        """
        deadline = time.monotonic() + timeout
        while time.monotonic() < deadline:
            clients = WebSocketAsync.get_clients()
            for client in clients.values():
                sock = getattr(client, "socket", None)
                if sock is not None and getattr(sock, "closed", False):
                    continue
                return
            time.sleep(0.25)

call_on_client(stub, method_name, **kwargs)

Run a single RPC on the current WebSocket client, with retry on connection errors.

When the CEP extension is blocked by a long JSX operation (e.g. get_layers on a large PSD or save()), the WebSocket transport can close. This method retries after a delay so the extension can reconnect.

Parameters:

Name Type Description Default
stub

Object with a .client property.

required
method_name

RPC method name.

required
**kwargs

Keyword arguments passed to client.call().

{}

Returns:

Type Description

Result of the RPC.

Raises:

Type Description
(ConnectionResetError, ConnectionError, OSError)

After all retries failed.

Source code in client/ayon_aftereffects/api/webserver.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def call_on_client(self, stub, method_name, **kwargs):
    """Run a single RPC on the current WebSocket client, with retry on connection errors.

    When the CEP extension is blocked by a long JSX operation (e.g. get_layers
    on a large PSD or save()), the WebSocket transport can close. This method
    retries after a delay so the extension can reconnect.

    Args:
        stub: Object with a .client property.
        method_name: RPC method name.
        **kwargs: Keyword arguments passed to client.call().

    Returns:
        Result of the RPC.

    Raises:
        ConnectionResetError, ConnectionError, OSError: After all retries failed.
    """
    last_exception = None
    for attempt in range(self._CALL_MAX_RETRIES):
        try:
            client = stub.client
            if client is None:
                raise ConnectionError("No WebSocket client connected")
            log.debug(
                f"websocket.call_on_client attempt {attempt + 1}/{self._CALL_MAX_RETRIES}",
            )

            if self.webserver_thread.loop:
                future = asyncio.run_coroutine_threadsafe(
                    coro=client.call(method_name, **kwargs),
                    loop=self.webserver_thread.loop,
                )
                return future.result()
        except (ConnectionResetError, ConnectionError, OSError) as e:
            last_exception = e
            if attempt >= self._CALL_MAX_RETRIES - 1:
                log.warning(
                    f"WebSocket call failed after {attempt + 1} attempt(s): {e}",
                    exc_info=True,
                )
                raise e
            log.warning(
                f"WebSocket connection error (attempt {attempt + 1}/{self._CALL_MAX_RETRIES}), waiting for "
                f"healthy client (up to {self._CALL_RETRY_DELAY}s): {e}",
            )
            self._wait_for_healthy_client(self._CALL_RETRY_DELAY)

    if last_exception is not None:
        raise last_exception

get_websocket_url()

Return websocket URL for the server from the host name and port.

Source code in client/ayon_aftereffects/api/webserver.py
57
58
59
def get_websocket_url(self):
    """Return websocket URL for the server from the host name and port."""
    return f"ws://{self.host_name}:{self.port}/ws/"

port_occupied(host_name, port)

Check if 'url' is already occupied.

This could mean, that app is already running and we are trying open it again. In that case, use existing running webserver. Check here is easier than capturing exception from thread.

Source code in client/ayon_aftereffects/api/webserver.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def port_occupied(self, host_name, port):
    """
        Check if 'url' is already occupied.

        This could mean, that app is already running and we are trying open it
        again. In that case, use existing running webserver.
        Check here is easier than capturing exception from thread.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as con:
        result = con.connect_ex((host_name, port)) == 0

    if result:
        log.warning(f"Port {port} is already in use")
    return result

send_context_change(host) async

Calls running webserver to inform about context change

Used when new PS/AE should be triggered, but one already running, without this publish would point to old context.

Source code in client/ayon_aftereffects/api/webserver.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def send_context_change(self, host):
    """
        Calls running webserver to inform about context change

        Used when new PS/AE should be triggered,
        but one already running, without
        this publish would point to old context.
    """
    client = WSRPCClient(self.get_websocket_url(),
                         loop=asyncio.get_event_loop())
    await client.connect()

    context = get_global_context()
    project_name = context["project_name"]
    folder_path = context["folder_path"]
    task_name = context["task_name"]
    log.info("Sending context change to {}{}/{}".format(
        project_name, folder_path, task_name
    ))

    await client.call(
        '{}.set_context'.format(host),
        project=project_name,
        folder=folder_path,
        task=task_name
    )
    await client.close()