Bases: Thread
Serves the basic webpage on http://localhost:port with the
Source code in client/ayon_comfyui/api/iframe/static_server.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 | class StaticServerThread(Thread):
"""Serves the basic webpage on http://localhost:port with the <iframe>
in it."""
def __init__(
self,
*args, # noqa : ANN002
port: int = 5454,
comfy_webui_port: int = 55056,
comfy_url: str = "http://127.0.0.1:8188",
**kwargs, # noqa : ANN003
):
fpath = Path(__file__).parent
self.port = port
self.comfy_url = comfy_url
self.comfy_webui_ws_port = comfy_webui_port
async def index(request: web.Request) -> web.Response: # noqa :ARG001, RUF029
"""Return templated HTML page."""
return web.Response(
text=template_html(
comfy_url=self.comfy_url,
webui_port=self.comfy_webui_ws_port,
),
content_type="text/html",
)
self._app = web.Application()
self._app.router.add_get("/", index)
self._app.router.add_static("/static/", fpath / "static/")
self.loop = None
self._shutdown_event = None
super().__init__(*args, **kwargs)
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._shutdown_event = asyncio.Event()
self.loop.run_until_complete(self.async_run())
async def async_run(self):
runner = web.AppRunner(self._app)
await runner.setup()
site = web.TCPSite(runner, "localhost", self.port)
await site.start()
print(f"Static Server running on http://localhost:{self.port}")
# Shutdown conditional
await self._shutdown_event.wait()
await site.stop()
await runner.cleanup()
def stop(self):
self.loop.call_soon_threadsafe(self._shutdown_event.set)
|
__init__(*args, port=5454, comfy_webui_port=55056, comfy_url='http://127.0.0.1:8188', **kwargs)
Source code in client/ayon_comfyui/api/iframe/static_server.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | def __init__(
self,
*args, # noqa : ANN002
port: int = 5454,
comfy_webui_port: int = 55056,
comfy_url: str = "http://127.0.0.1:8188",
**kwargs, # noqa : ANN003
):
fpath = Path(__file__).parent
self.port = port
self.comfy_url = comfy_url
self.comfy_webui_ws_port = comfy_webui_port
async def index(request: web.Request) -> web.Response: # noqa :ARG001, RUF029
"""Return templated HTML page."""
return web.Response(
text=template_html(
comfy_url=self.comfy_url,
webui_port=self.comfy_webui_ws_port,
),
content_type="text/html",
)
self._app = web.Application()
self._app.router.add_get("/", index)
self._app.router.add_static("/static/", fpath / "static/")
self.loop = None
self._shutdown_event = None
super().__init__(*args, **kwargs)
|