Skip to content

webserver

Webserver for communication with AfterEffects.

Aiohttp (Asyncio) based websocket server used for communication with host application.

This webserver is started in spawned Python process that opens DCC during its launch, waits for connection from DCC and handles communication going forward. Server is closed before Python process is killed.

WebServerThread

Bases: Thread

Listener for websocket rpc requests.

It would be probably better to "attach" this to main thread (as for example Harmony needs to run something on main thread), but currently it creates separate thread and separate asyncio event loop

Source code in client/ayon_aftereffects/api/webserver.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class WebServerThread(threading.Thread):
    """ Listener for websocket rpc requests.

        It would be probably better to "attach" this to main thread (as for
        example Harmony needs to run something on main thread), but currently
        it creates separate thread and separate asyncio event loop
    """
    def __init__(self, module, port):
        super(WebServerThread, self).__init__()

        self.is_running = False
        self.port = port
        self.module = module
        self.loop = None
        self.runner = None
        self.site = None
        self.tasks = []

    def run(self):
        self.is_running = True

        try:
            log.info("Starting web server")
            self.loop = asyncio.new_event_loop()  # create new loop for thread
            asyncio.set_event_loop(self.loop)

            self.loop.run_until_complete(self.start_server())

            websocket_url = "ws://localhost:{}/ws".format(self.port)

            log.debug(
                "Running Websocket server on URL: \"{}\"".format(websocket_url)
            )

            asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
            self.loop.run_forever()
        except Exception:
            self.is_running = False
            log.warning(
                "Websocket Server service has failed", exc_info=True
            )
            raise
        finally:
            self.loop.close()  # optional

            self.is_running = False
            self.module.thread_stopped()
            log.info("Websocket server stopped")

    async def start_server(self):
        """ Starts runner and TCPsite """
        self.runner = web.AppRunner(self.module.app)
        await self.runner.setup()
        self.site = web.TCPSite(self.runner, 'localhost', self.port)
        await self.site.start()

    def stop(self):
        """Sets is_running flag to false, 'check_shutdown' shuts server down"""
        self.is_running = False

    async def check_shutdown(self):
        """ Future that is running and checks if server should be running
            periodically.
        """
        while self.is_running:
            while self.tasks:
                task = self.tasks.pop(0)
                log.debug("waiting for task {}".format(task))
                await task
                log.debug("returned value {}".format(task.result))

            await asyncio.sleep(0.5)

        log.debug("Starting shutdown")
        await self.site.stop()
        log.debug("Site stopped")
        await self.runner.cleanup()
        log.debug("Runner stopped")
        tasks = [task for task in asyncio.all_tasks() if
                 task is not asyncio.current_task()]
        list(map(lambda task: task.cancel(), tasks))  # cancel all the tasks
        results = await asyncio.gather(*tasks, return_exceptions=True)
        log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
        await self.loop.shutdown_asyncgens()
        # to really make sure everything else has time to stop
        await asyncio.sleep(0.07)
        self.loop.stop()

check_shutdown() async

Future that is running and checks if server should be running periodically.

Source code in client/ayon_aftereffects/api/webserver.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
async def check_shutdown(self):
    """ Future that is running and checks if server should be running
        periodically.
    """
    while self.is_running:
        while self.tasks:
            task = self.tasks.pop(0)
            log.debug("waiting for task {}".format(task))
            await task
            log.debug("returned value {}".format(task.result))

        await asyncio.sleep(0.5)

    log.debug("Starting shutdown")
    await self.site.stop()
    log.debug("Site stopped")
    await self.runner.cleanup()
    log.debug("Runner stopped")
    tasks = [task for task in asyncio.all_tasks() if
             task is not asyncio.current_task()]
    list(map(lambda task: task.cancel(), tasks))  # cancel all the tasks
    results = await asyncio.gather(*tasks, return_exceptions=True)
    log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
    await self.loop.shutdown_asyncgens()
    # to really make sure everything else has time to stop
    await asyncio.sleep(0.07)
    self.loop.stop()

start_server() async

Starts runner and TCPsite

Source code in client/ayon_aftereffects/api/webserver.py
204
205
206
207
208
209
async def start_server(self):
    """ Starts runner and TCPsite """
    self.runner = web.AppRunner(self.module.app)
    await self.runner.setup()
    self.site = web.TCPSite(self.runner, 'localhost', self.port)
    await self.site.start()

stop()

Sets is_running flag to false, 'check_shutdown' shuts server down

Source code in client/ayon_aftereffects/api/webserver.py
211
212
213
def stop(self):
    """Sets is_running flag to false, 'check_shutdown' shuts server down"""
    self.is_running = False

WebServerTool

Basic POC implementation of asychronic websocket RPC server. Uses class in external_app_1.py to mimic implementation for single external application. 'test_client' folder contains two test implementations of client

Source code in client/ayon_aftereffects/api/webserver.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class WebServerTool:
    """
        Basic POC implementation of asychronic websocket RPC server.
        Uses class in external_app_1.py to mimic implementation for single
        external application.
        'test_client' folder contains two test implementations of client
    """
    _instance = None

    def __init__(self):
        WebServerTool._instance = self

        self.client = None
        self.handlers = {}
        self.on_stop_callbacks = []

        port = None
        host_name = "localhost"
        websocket_url = os.getenv("WEBSOCKET_URL")
        if websocket_url:
            parsed = urllib.parse.urlparse(websocket_url)
            port = parsed.port
            host_name = parsed.netloc.split(":")[0]
        if not port:
            port = 8098  # fallback

        self.port = port
        self.host_name = host_name

        self.app = web.Application()

        # add route with multiple methods for single "external app"
        self.webserver_thread = WebServerThread(self, self.port)

    def add_route(self, *args, **kwargs):
        self.app.router.add_route(*args, **kwargs)

    def add_static(self, *args, **kwargs):
        self.app.router.add_static(*args, **kwargs)

    def start_server(self):
        if self.webserver_thread and not self.webserver_thread.is_alive():
            self.webserver_thread.start()

    def stop_server(self):
        self.stop()

    async def send_context_change(self, host):
        """
            Calls running webserver to inform about context change

            Used when new PS/AE should be triggered,
            but one already running, without
            this publish would point to old context.
        """
        client = WSRPCClient(os.getenv("WEBSOCKET_URL"),
                             loop=asyncio.get_event_loop())
        await client.connect()

        context = get_global_context()
        project_name = context["project_name"]
        folder_path = context["folder_path"]
        task_name = context["task_name"]
        log.info("Sending context change to {}{}/{}".format(
            project_name, folder_path, task_name
        ))

        await client.call(
            '{}.set_context'.format(host),
            project=project_name,
            folder=folder_path,
            task=task_name
        )
        await client.close()

    def port_occupied(self, host_name, port):
        """
            Check if 'url' is already occupied.

            This could mean, that app is already running and we are trying open it
            again. In that case, use existing running webserver.
            Check here is easier than capturing exception from thread.
        """
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as con:
            result = con.connect_ex((host_name, port)) == 0

        if result:
            print(f"Port {port} is already in use")
        return result

    def call(self, func):
        log.debug("websocket.call {}".format(func))
        future = asyncio.run_coroutine_threadsafe(
            func,
            self.webserver_thread.loop
        )
        result = future.result()
        return result

    @staticmethod
    def get_instance():
        if WebServerTool._instance is None:
            WebServerTool()
        return WebServerTool._instance

    @property
    def is_running(self):
        if not self.webserver_thread:
            return False
        return self.webserver_thread.is_running

    def stop(self):
        if not self.is_running:
            return
        try:
            log.debug("Stopping websocket server")
            self.webserver_thread.is_running = False
            self.webserver_thread.stop()
        except Exception:
            log.warning(
                "Error has happened during Killing websocket server",
                exc_info=True
            )

    def thread_stopped(self):
        for callback in self.on_stop_callbacks:
            callback()

port_occupied(host_name, port)

Check if 'url' is already occupied.

This could mean, that app is already running and we are trying open it again. In that case, use existing running webserver. Check here is easier than capturing exception from thread.

Source code in client/ayon_aftereffects/api/webserver.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def port_occupied(self, host_name, port):
    """
        Check if 'url' is already occupied.

        This could mean, that app is already running and we are trying open it
        again. In that case, use existing running webserver.
        Check here is easier than capturing exception from thread.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as con:
        result = con.connect_ex((host_name, port)) == 0

    if result:
        print(f"Port {port} is already in use")
    return result

send_context_change(host) async

Calls running webserver to inform about context change

Used when new PS/AE should be triggered, but one already running, without this publish would point to old context.

Source code in client/ayon_aftereffects/api/webserver.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def send_context_change(self, host):
    """
        Calls running webserver to inform about context change

        Used when new PS/AE should be triggered,
        but one already running, without
        this publish would point to old context.
    """
    client = WSRPCClient(os.getenv("WEBSOCKET_URL"),
                         loop=asyncio.get_event_loop())
    await client.connect()

    context = get_global_context()
    project_name = context["project_name"]
    folder_path = context["folder_path"]
    task_name = context["task_name"]
    log.info("Sending context change to {}{}/{}".format(
        project_name, folder_path, task_name
    ))

    await client.call(
        '{}.set_context'.format(host),
        project=project_name,
        folder=folder_path,
        task=task_name
    )
    await client.close()