Skip to content

process

Handling of processes in Ayon Applications.

ProcessIdTriplet

Bases: NamedTuple

Triplet of process identification values.

Source code in client/ayon_applications/process.py
22
23
24
25
26
class ProcessIdTriplet(NamedTuple):
    """Triplet of process identification values."""
    pid: int
    executable: str
    start_time: Optional[float]  # the same goes for start time

ProcessInfo dataclass

Information about a process launched by the addon.

Attributes:

Name Type Description
name str

Name of the process.

executable Path

Path to the executable.

args list[str]

Arguments for the process.

env dict[str, str]

Environment variables for the process.

hash str

Hash of the process information.

cwd str

Current working directory for the process.

pid int

Process ID of the launched process.

active bool

Whether the process is currently active.

output Path

Output of the process.

Source code in client/ayon_applications/process.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclass
class ProcessInfo:
    """Information about a process launched by the addon.

    Attributes:
        name (str): Name of the process.
        executable (Path): Path to the executable.
        args (list[str]): Arguments for the process.
        env (dict[str, str]): Environment variables for the process.
        hash (str): Hash of the process information.
        cwd (str): Current working directory for the process.
        pid (int): Process ID of the launched process.
        active (bool): Whether the process is currently active.
        output (Path): Output of the process.

    """

    name: str
    executable: Path
    args: list[str]
    env: dict[str, str]
    cwd: str
    hash: Optional[str] = None
    pid: Optional[int] = None
    active: bool = False
    output: Optional[Path] = None
    start_time: Optional[float] = None
    created_at: Optional[str] = None

    def __post_init__(self) -> None:
        """Post-initialization to compute the hash if not provided."""
        if self.hash is None:
            self.hash = ProcessManager.get_process_info_hash(self)

__post_init__()

Post-initialization to compute the hash if not provided.

Source code in client/ayon_applications/process.py
58
59
60
61
def __post_init__(self) -> None:
    """Post-initialization to compute the hash if not provided."""
    if self.hash is None:
        self.hash = ProcessManager.get_process_info_hash(self)

ProcessManager

Manager for handling processes in AYON Applications.

Source code in client/ayon_applications/process.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
class ProcessManager:
    """Manager for handling processes in AYON Applications."""

    log: logging.Logger

    def __init__(self) -> None:
        """Initialize the ProcessManager."""
        self.log = logging.getLogger(f"{__name__}.ProcessManager")
        # Use thread-local storage for SQLite connections to avoid
        # sharing connections between threads (fixes Linux SQLite issues)
        self._thread_local = threading.local()

    @staticmethod
    def get_process_info_storage_location() -> Path:
        """Get the path to process info storage.

        Returns:
            Path: Path to the process handlers storage.

        """
        return Path(get_launcher_local_dir()) / "process_handlers.db"

    def _get_process_storage_connection(self) -> sqlite3.Connection:
        """Get a thread-local SQLite connection.

        Each thread gets its own connection to avoid thread-safety issues
        that can occur on Linux.

        Returns:
            sqlite3.Connection: Thread-local connection to the process storage.

        """
        # Check if this thread already has a connection
        if hasattr(self._thread_local, "connection"):
            return self._thread_local.connection

        # Create a new connection for this thread
        cnx = sqlite3.connect(
            self.get_process_info_storage_location(),
            # Enable thread safety for SQLite operations
            check_same_thread=False
        )
        cursor = cnx.cursor()
        cursor.execute(
            "CREATE TABLE IF NOT EXISTS process_info ("
            "hash TEXT PRIMARY KEY, "
            "name TEXT, "
            "executable TEXT, "
            "args TEXT DEFAULT NULL, "
            "env TEXT DEFAULT NULL, "
            "cwd TEXT DEFAULT NULL, "
            "pid INTEGER DEFAULT NULL, "
            "output_file TEXT DEFAULT NULL, "
            "start_time REAL DEFAULT NULL, "
            "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
            ")"
        )
        cnx.commit()
        self._thread_local.connection = cnx

        return self._thread_local.connection

    @staticmethod
    def get_process_info_hash(process_info: ProcessInfo) -> str:
        """Get hash of the process information.

        Returns:
            str: Hash of the process information.
        """
        return ProcessManager.get_process_info_hash_by_values(
            process_info.executable,
            process_info.name,
            process_info.pid,
            process_info.start_time,
        )

    @staticmethod
    def get_process_info_hash_by_values(
        executable: Path,
        name: str,
        pid: Optional[int] = None,
        start_time: Optional[float] = None,
    ) -> str:
        """Get hash of the process information by values.

        Args:
            executable (Path): Path to the executable.
            name (str): Name of the process.
            pid (Optional[int]): Process ID of the launched process.
            start_time (Optional[float]): Start time of the process.

        Returns:
            str: Hash of the process information.

        """
        start = (
            f"{start_time}"
            if start_time is not None
            else ""
        )
        key = f"{name}{pid}{executable}{start}"
        return sha256(key.encode()).hexdigest()

    def store_process_info(self, process_info: ProcessInfo) -> None:
        """Store process information.

        Args:
            process_info (ProcessInfo): Process handler to store.

        """
        # refresh hash in case some values changed
        process_info.hash = ProcessManager.get_process_info_hash(process_info)
        if process_info.pid is None:
            self.log.warning((
                "Cannot store process info for process without PID. "
                "Process name: %s"
            ), process_info.name)
            return

        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        cursor.execute(
            "INSERT OR REPLACE INTO process_info "
            "(hash, name, executable, args, env, cwd, "
            "pid, output_file, start_time) "
            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
            (
                process_info.hash,
                process_info.name,
                process_info.executable.as_posix(),
                json.dumps(process_info.args),
                json.dumps(process_info.env),
                process_info.cwd,
                process_info.pid,
                (
                    process_info.output.as_posix()
                    if process_info.output else None
                ),
                process_info.start_time,
            )
        )
        cnx.commit()

    def get_process_info(self, process_hash: str) -> Optional[ProcessInfo]:
        """Get process information by hash.

        Args:
            process_hash (str): Hash of the process.

        Returns:
            Optional[ProcessInfo]: Process information or None if not found.
        """
        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        cursor.execute(
            "SELECT * FROM process_info WHERE hash = ?",
            (process_hash,)
        )
        row = cursor.fetchone()
        if row is None:
            return None

        return ProcessInfo(
            name=row[1],
            executable=Path(row[2]),
            args=json.loads(row[3]),
            env=json.loads(row[4]),
            cwd=row[5],
            hash=process_hash,
            pid=row[6],
            output=Path(row[7]) if row[7] else None,
            start_time=row[8],
            created_at=row[9],
        )

    def get_process_info_by_name(
        self, name: str) -> Optional[ProcessInfo]:
        """Get process information by name.

        Args:
            name (str): Name of the process.

        Returns:
            Optional[ProcessInfo]: Process information or None if not found.
        """
        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        query = "SELECT * FROM process_info WHERE name = ?"
        params = [name]

        cursor.execute(query, params)
        row = cursor.fetchone()
        if row is None:
            return None

        return ProcessInfo(
            name=row[1],
            executable=Path(row[2]),
            args=json.loads(row[3]),
            env=json.loads(row[4]),
            cwd=row[5],
            pid=row[6],
            output=Path(row[7]) if row[7] else None,
            start_time=row[8],
            created_at=row[9],
        )

    def get_process_info_by_pid(self, pid: int) -> Optional[ProcessInfo]:
        """Get process information by process id.

        Args:
            pid (int): ID of the process.

        Returns:
            Optional[ProcessInfo]: Process information or None if not found.
        """
        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        query = "SELECT * FROM process_info WHERE pid = ?"
        params = [pid]

        cursor.execute(query, params)
        row = cursor.fetchone()
        if row is None:
            return None

        return ProcessInfo(
            name=row[1],
            executable=Path(row[2]),
            args=json.loads(row[3]),
            env=json.loads(row[4]),
            cwd=row[5],
            pid=row[6],
            output=Path(row[7]) if row[7] else None,
            start_time=row[8],
            created_at=row[9],
        )

    def get_current_process_info(self) -> Optional[ProcessInfo]:
        """Get information for the current process.

        Returns:
            Optional[ProcessInfo]: Process information or None if not found.
        """
        return self.get_process_info_by_pid(os.getpid())

    def get_all_process_info(self) -> list[ProcessInfo]:
        """Get all process information from the database.

        Returns:
            list[ProcessInfo]: List of all process information.
        """
        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        cursor.execute("SELECT * FROM process_info ORDER BY created_at DESC")
        rows = cursor.fetchall()

        processes: list[ProcessInfo] = [
            ProcessInfo(
                name=row[1],
                executable=Path(row[2]),
                args=json.loads(row[3]) if row[3] else [],
                env=json.loads(row[4]) if row[4] else {},
                cwd=row[5],
                pid=row[6],
                output=Path(row[7]) if row[7] else None,
                start_time=row[8],
                created_at=row[9],
            )
            for row in rows
        ]
        # Check if processes are still running
        # This is done by checking the pid of the process.
        # It is using `_are_processes_running` method which
        # checks for processes in batch, mostly because of the fallback
        # on systems without `psutil` module. See `_are_processes_running`
        # documentation for more details.
        # Build list of (pid, executable_name, start_time) triplets so the
        # check can verify PID + image and, when possible, process start time
        # (stronger protection against PID reuse).
        pid_triplets: list[ProcessIdTriplet] = []
        processes_with_pid = []
        for proc in processes:
            if proc.pid is None:
                continue
            exe = proc.executable.as_posix()
            pid_triplets.append(
                ProcessIdTriplet(proc.pid, exe, proc.start_time))
            processes_with_pid.append(proc)

        if pid_triplets:
            running_status = self._are_processes_running(pid_triplets)
            for proc, (_, is_running) in zip(  # noqa: B905
                    processes_with_pid, running_status):
                proc.active = is_running

        return processes

    def delete_process_info(self, process_hash: str) -> bool:
        """Delete process information by hash.

        This also deletes the output file if it exists.

        Args:
            process_hash (str): Hash of the process to delete.

        Returns:
            bool: True if deleted, False if not found.
        """
        process = self.get_process_info(process_hash)
        if process is None:
            return False
        if process.output and Path(process.output).exists():
            # File might not exist anymore, so we use contextlib.suppress
            with contextlib.suppress(OSError):
                os.remove(process.output)

        cnx = self._get_process_storage_connection()
        cursor = cnx.cursor()
        cursor.execute(
            "DELETE FROM process_info WHERE hash = ?",
            (process_hash,))
        cnx.commit()
        return cursor.rowcount > 0

    def delete_inactive_processes(self) -> int:
        """Delete all inactive process information.

        This also deletes the output files of the inactive processes.

        Returns:
            int: Number of deleted processes.
        """
        cnx = self._get_process_storage_connection()

        # Get all processes and check which ones are inactive
        all_processes = self.get_all_process_info()

        files_to_delete = [
            process.output
            for process in all_processes
            if (
                    not process.active
                    and (process.output and Path(process.output).exists())
            )
        ]

        inactive_hashes = []

        for process in all_processes:
            if not process.active:
                process_hash = self.get_process_info_hash(process)
                inactive_hashes.append(process_hash)

        if not inactive_hashes:
            return 0

        cursor = cnx.cursor()
        placeholders = ",".join("?" * len(inactive_hashes))
        cursor.execute(
            ("DELETE FROM process_info WHERE "  # noqa: S608
            f"hash IN ({placeholders})"),
            inactive_hashes
        )
        cnx.commit()

        for file_path in files_to_delete:
            # File might not exist anymore, so we use contextlib.suppress
            with contextlib.suppress(OSError):
                os.remove(file_path)

        return cursor.rowcount

    @staticmethod
    def _is_process_running(
            pid: int,
            executable: str,
            start_time: Optional[float] = None) -> bool:
        """Check if a process is running using psutil.

        Args:
            pid (int): Process ID to check.
            executable (str): Executable name to verify.
            start_time (Optional[float]): Start time to verify.

        Returns:
            bool: True if the process is running, False otherwise.

        """
        import psutil

        try:
            proc = psutil.Process(pid)
        except (psutil.NoSuchProcess, psutil.ZombieProcess):
            return False

        # If start_time provided, verify it matches process creation time
        if start_time is not None:
            try:
                proc_ct = proc.create_time()
                # allow small tolerance for float differences
                if abs(proc_ct - float(start_time)) > 1.0:
                    return False
            except Exception:  # noqa: BLE001
                # cannot verify start time -> conservative False
                return False

        if not executable:
            # No executable provided, process exists
            # (and start_time matched if provided)
            return True

        # Try to get executable path/name and command line first
        candidates = set()
        with contextlib.suppress(Exception):
            exe_path = proc.exe() if hasattr(proc, "exe") else None
            if exe_path:
                candidates.add(Path(exe_path).as_posix())

            name = proc.name()
            if name:
                candidates.add(name)

            cmd = proc.cmdline()
            if cmd:
                first = cmd[0]
                candidates.add(first)
        if platform.system().lower() == "windows":
            # On Windows be more relaxed and check image name only
            candidates = {c.lower() for c in candidates if c}
            return Path(executable).name.lower() in candidates

        return Path(executable).as_posix() in candidates

    @staticmethod
    def _are_processes_running(
            pid_triplets: list[ProcessIdTriplet]) -> list[tuple[int, bool]]:
        """Check if the processes are still running.

        This checks for presence of `psutil` module and uses it if available.

        Args:
            pid_triplets (list[ProcessIdTriplet]): Processes ID to check.

        Returns:
            list[tuple[int, bool]]: List of tuples with process ID and
                boolean indicating if the process is running.

        """
        if not pid_triplets:
            result: list[tuple[int, bool]] = []
            return result

        return ProcessManager._check_processes_running(
                pid_triplets)

    @staticmethod
    def _check_processes_running(
            pid_triplets: list[ProcessIdTriplet]) -> list[tuple[int, bool]]:
        """Check if processes are running using psutil.

        Args:
            pid_triplets (list[ProcessIdTriplet]): List of triplets

        Returns:
            list[tuple[int, bool]]: List of tuples with process ID and
                boolean indicating if the process is running.

        """
        import psutil

        result: list[tuple[int, bool]] = []

        for pid, exe, start_time in pid_triplets:
            try:
                is_running = ProcessManager._is_process_running(
                    pid, exe, start_time
                )
            except Exception:  # noqa: BLE001
                # if something goes wrong, fall back to pid_exists
                try:
                    is_running = psutil.pid_exists(pid)
                except Exception:   # noqa: BLE001
                    is_running = False
            result.append((pid, is_running))
        return result

    @staticmethod
    def get_executable_path_by_pid(pid: int) -> Optional[Path]:
        """Get the executable path of a process by its PID using psutil.

        Args:
            pid (int): Process ID.

        Returns:
            Optional[Path]: The executable path of the process, or None if it
                cannot be determined.

        """
        import psutil

        exe_path = None
        if pid:
            try:
                exe_path_str = psutil.Process(pid).exe()
                if exe_path_str:
                    exe_path = Path(exe_path_str)
            except (
                    psutil.NoSuchProcess,
                    psutil.ZombieProcess,
                    psutil.AccessDenied):
                exe_path = None
        return exe_path

    @staticmethod
    def get_process_start_time(
            process: subprocess.Popen) -> Optional[float]:
        """Get the start time of a process using psutil.

        Returns:
            Optional[float]: The start time of the process in seconds since
                the epoch, or None if it cannot be determined.

        """
        import psutil

        start_time = None
        if process.pid:
            try:
                start_time = psutil.Process(process.pid).create_time()
            except (
                    psutil.NoSuchProcess,
                    psutil.ZombieProcess,
                    psutil.AccessDenied):
                start_time = None
        return start_time

    @staticmethod
    def get_process_start_time_by_pid(pid: int) -> Optional[float]:
        """Get the start time of a process by PID using psutil.

        Args:
            pid (int): Process ID.

        Returns:
            Optional[float]: The start time of the process in seconds since
                the epoch, or None if it cannot be determined.

        """
        import psutil

        start_time = None
        if pid:
            try:
                start_time = psutil.Process(pid).create_time()
            except (
                    psutil.NoSuchProcess,
                    psutil.ZombieProcess,
                    psutil.AccessDenied):
                start_time = None
        return start_time

__init__()

Initialize the ProcessManager.

Source code in client/ayon_applications/process.py
69
70
71
72
73
74
def __init__(self) -> None:
    """Initialize the ProcessManager."""
    self.log = logging.getLogger(f"{__name__}.ProcessManager")
    # Use thread-local storage for SQLite connections to avoid
    # sharing connections between threads (fixes Linux SQLite issues)
    self._thread_local = threading.local()

delete_inactive_processes()

Delete all inactive process information.

This also deletes the output files of the inactive processes.

Returns:

Name Type Description
int int

Number of deleted processes.

Source code in client/ayon_applications/process.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def delete_inactive_processes(self) -> int:
    """Delete all inactive process information.

    This also deletes the output files of the inactive processes.

    Returns:
        int: Number of deleted processes.
    """
    cnx = self._get_process_storage_connection()

    # Get all processes and check which ones are inactive
    all_processes = self.get_all_process_info()

    files_to_delete = [
        process.output
        for process in all_processes
        if (
                not process.active
                and (process.output and Path(process.output).exists())
        )
    ]

    inactive_hashes = []

    for process in all_processes:
        if not process.active:
            process_hash = self.get_process_info_hash(process)
            inactive_hashes.append(process_hash)

    if not inactive_hashes:
        return 0

    cursor = cnx.cursor()
    placeholders = ",".join("?" * len(inactive_hashes))
    cursor.execute(
        ("DELETE FROM process_info WHERE "  # noqa: S608
        f"hash IN ({placeholders})"),
        inactive_hashes
    )
    cnx.commit()

    for file_path in files_to_delete:
        # File might not exist anymore, so we use contextlib.suppress
        with contextlib.suppress(OSError):
            os.remove(file_path)

    return cursor.rowcount

delete_process_info(process_hash)

Delete process information by hash.

This also deletes the output file if it exists.

Parameters:

Name Type Description Default
process_hash str

Hash of the process to delete.

required

Returns:

Name Type Description
bool bool

True if deleted, False if not found.

Source code in client/ayon_applications/process.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def delete_process_info(self, process_hash: str) -> bool:
    """Delete process information by hash.

    This also deletes the output file if it exists.

    Args:
        process_hash (str): Hash of the process to delete.

    Returns:
        bool: True if deleted, False if not found.
    """
    process = self.get_process_info(process_hash)
    if process is None:
        return False
    if process.output and Path(process.output).exists():
        # File might not exist anymore, so we use contextlib.suppress
        with contextlib.suppress(OSError):
            os.remove(process.output)

    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    cursor.execute(
        "DELETE FROM process_info WHERE hash = ?",
        (process_hash,))
    cnx.commit()
    return cursor.rowcount > 0

get_all_process_info()

Get all process information from the database.

Returns:

Type Description
list[ProcessInfo]

list[ProcessInfo]: List of all process information.

Source code in client/ayon_applications/process.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def get_all_process_info(self) -> list[ProcessInfo]:
    """Get all process information from the database.

    Returns:
        list[ProcessInfo]: List of all process information.
    """
    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    cursor.execute("SELECT * FROM process_info ORDER BY created_at DESC")
    rows = cursor.fetchall()

    processes: list[ProcessInfo] = [
        ProcessInfo(
            name=row[1],
            executable=Path(row[2]),
            args=json.loads(row[3]) if row[3] else [],
            env=json.loads(row[4]) if row[4] else {},
            cwd=row[5],
            pid=row[6],
            output=Path(row[7]) if row[7] else None,
            start_time=row[8],
            created_at=row[9],
        )
        for row in rows
    ]
    # Check if processes are still running
    # This is done by checking the pid of the process.
    # It is using `_are_processes_running` method which
    # checks for processes in batch, mostly because of the fallback
    # on systems without `psutil` module. See `_are_processes_running`
    # documentation for more details.
    # Build list of (pid, executable_name, start_time) triplets so the
    # check can verify PID + image and, when possible, process start time
    # (stronger protection against PID reuse).
    pid_triplets: list[ProcessIdTriplet] = []
    processes_with_pid = []
    for proc in processes:
        if proc.pid is None:
            continue
        exe = proc.executable.as_posix()
        pid_triplets.append(
            ProcessIdTriplet(proc.pid, exe, proc.start_time))
        processes_with_pid.append(proc)

    if pid_triplets:
        running_status = self._are_processes_running(pid_triplets)
        for proc, (_, is_running) in zip(  # noqa: B905
                processes_with_pid, running_status):
            proc.active = is_running

    return processes

get_current_process_info()

Get information for the current process.

Returns:

Type Description
Optional[ProcessInfo]

Optional[ProcessInfo]: Process information or None if not found.

Source code in client/ayon_applications/process.py
302
303
304
305
306
307
308
def get_current_process_info(self) -> Optional[ProcessInfo]:
    """Get information for the current process.

    Returns:
        Optional[ProcessInfo]: Process information or None if not found.
    """
    return self.get_process_info_by_pid(os.getpid())

get_executable_path_by_pid(pid) staticmethod

Get the executable path of a process by its PID using psutil.

Parameters:

Name Type Description Default
pid int

Process ID.

required

Returns:

Type Description
Optional[Path]

Optional[Path]: The executable path of the process, or None if it cannot be determined.

Source code in client/ayon_applications/process.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
@staticmethod
def get_executable_path_by_pid(pid: int) -> Optional[Path]:
    """Get the executable path of a process by its PID using psutil.

    Args:
        pid (int): Process ID.

    Returns:
        Optional[Path]: The executable path of the process, or None if it
            cannot be determined.

    """
    import psutil

    exe_path = None
    if pid:
        try:
            exe_path_str = psutil.Process(pid).exe()
            if exe_path_str:
                exe_path = Path(exe_path_str)
        except (
                psutil.NoSuchProcess,
                psutil.ZombieProcess,
                psutil.AccessDenied):
            exe_path = None
    return exe_path

get_process_info(process_hash)

Get process information by hash.

Parameters:

Name Type Description Default
process_hash str

Hash of the process.

required

Returns:

Type Description
Optional[ProcessInfo]

Optional[ProcessInfo]: Process information or None if not found.

Source code in client/ayon_applications/process.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_process_info(self, process_hash: str) -> Optional[ProcessInfo]:
    """Get process information by hash.

    Args:
        process_hash (str): Hash of the process.

    Returns:
        Optional[ProcessInfo]: Process information or None if not found.
    """
    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    cursor.execute(
        "SELECT * FROM process_info WHERE hash = ?",
        (process_hash,)
    )
    row = cursor.fetchone()
    if row is None:
        return None

    return ProcessInfo(
        name=row[1],
        executable=Path(row[2]),
        args=json.loads(row[3]),
        env=json.loads(row[4]),
        cwd=row[5],
        hash=process_hash,
        pid=row[6],
        output=Path(row[7]) if row[7] else None,
        start_time=row[8],
        created_at=row[9],
    )

get_process_info_by_name(name)

Get process information by name.

Parameters:

Name Type Description Default
name str

Name of the process.

required

Returns:

Type Description
Optional[ProcessInfo]

Optional[ProcessInfo]: Process information or None if not found.

Source code in client/ayon_applications/process.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_process_info_by_name(
    self, name: str) -> Optional[ProcessInfo]:
    """Get process information by name.

    Args:
        name (str): Name of the process.

    Returns:
        Optional[ProcessInfo]: Process information or None if not found.
    """
    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    query = "SELECT * FROM process_info WHERE name = ?"
    params = [name]

    cursor.execute(query, params)
    row = cursor.fetchone()
    if row is None:
        return None

    return ProcessInfo(
        name=row[1],
        executable=Path(row[2]),
        args=json.loads(row[3]),
        env=json.loads(row[4]),
        cwd=row[5],
        pid=row[6],
        output=Path(row[7]) if row[7] else None,
        start_time=row[8],
        created_at=row[9],
    )

get_process_info_by_pid(pid)

Get process information by process id.

Parameters:

Name Type Description Default
pid int

ID of the process.

required

Returns:

Type Description
Optional[ProcessInfo]

Optional[ProcessInfo]: Process information or None if not found.

Source code in client/ayon_applications/process.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def get_process_info_by_pid(self, pid: int) -> Optional[ProcessInfo]:
    """Get process information by process id.

    Args:
        pid (int): ID of the process.

    Returns:
        Optional[ProcessInfo]: Process information or None if not found.
    """
    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    query = "SELECT * FROM process_info WHERE pid = ?"
    params = [pid]

    cursor.execute(query, params)
    row = cursor.fetchone()
    if row is None:
        return None

    return ProcessInfo(
        name=row[1],
        executable=Path(row[2]),
        args=json.loads(row[3]),
        env=json.loads(row[4]),
        cwd=row[5],
        pid=row[6],
        output=Path(row[7]) if row[7] else None,
        start_time=row[8],
        created_at=row[9],
    )

get_process_info_hash(process_info) staticmethod

Get hash of the process information.

Returns:

Name Type Description
str str

Hash of the process information.

Source code in client/ayon_applications/process.py
126
127
128
129
130
131
132
133
134
135
136
137
138
@staticmethod
def get_process_info_hash(process_info: ProcessInfo) -> str:
    """Get hash of the process information.

    Returns:
        str: Hash of the process information.
    """
    return ProcessManager.get_process_info_hash_by_values(
        process_info.executable,
        process_info.name,
        process_info.pid,
        process_info.start_time,
    )

get_process_info_hash_by_values(executable, name, pid=None, start_time=None) staticmethod

Get hash of the process information by values.

Parameters:

Name Type Description Default
executable Path

Path to the executable.

required
name str

Name of the process.

required
pid Optional[int]

Process ID of the launched process.

None
start_time Optional[float]

Start time of the process.

None

Returns:

Name Type Description
str str

Hash of the process information.

Source code in client/ayon_applications/process.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@staticmethod
def get_process_info_hash_by_values(
    executable: Path,
    name: str,
    pid: Optional[int] = None,
    start_time: Optional[float] = None,
) -> str:
    """Get hash of the process information by values.

    Args:
        executable (Path): Path to the executable.
        name (str): Name of the process.
        pid (Optional[int]): Process ID of the launched process.
        start_time (Optional[float]): Start time of the process.

    Returns:
        str: Hash of the process information.

    """
    start = (
        f"{start_time}"
        if start_time is not None
        else ""
    )
    key = f"{name}{pid}{executable}{start}"
    return sha256(key.encode()).hexdigest()

get_process_info_storage_location() staticmethod

Get the path to process info storage.

Returns:

Name Type Description
Path Path

Path to the process handlers storage.

Source code in client/ayon_applications/process.py
76
77
78
79
80
81
82
83
84
@staticmethod
def get_process_info_storage_location() -> Path:
    """Get the path to process info storage.

    Returns:
        Path: Path to the process handlers storage.

    """
    return Path(get_launcher_local_dir()) / "process_handlers.db"

get_process_start_time(process) staticmethod

Get the start time of a process using psutil.

Returns:

Type Description
Optional[float]

Optional[float]: The start time of the process in seconds since the epoch, or None if it cannot be determined.

Source code in client/ayon_applications/process.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
@staticmethod
def get_process_start_time(
        process: subprocess.Popen) -> Optional[float]:
    """Get the start time of a process using psutil.

    Returns:
        Optional[float]: The start time of the process in seconds since
            the epoch, or None if it cannot be determined.

    """
    import psutil

    start_time = None
    if process.pid:
        try:
            start_time = psutil.Process(process.pid).create_time()
        except (
                psutil.NoSuchProcess,
                psutil.ZombieProcess,
                psutil.AccessDenied):
            start_time = None
    return start_time

get_process_start_time_by_pid(pid) staticmethod

Get the start time of a process by PID using psutil.

Parameters:

Name Type Description Default
pid int

Process ID.

required

Returns:

Type Description
Optional[float]

Optional[float]: The start time of the process in seconds since the epoch, or None if it cannot be determined.

Source code in client/ayon_applications/process.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
@staticmethod
def get_process_start_time_by_pid(pid: int) -> Optional[float]:
    """Get the start time of a process by PID using psutil.

    Args:
        pid (int): Process ID.

    Returns:
        Optional[float]: The start time of the process in seconds since
            the epoch, or None if it cannot be determined.

    """
    import psutil

    start_time = None
    if pid:
        try:
            start_time = psutil.Process(pid).create_time()
        except (
                psutil.NoSuchProcess,
                psutil.ZombieProcess,
                psutil.AccessDenied):
            start_time = None
    return start_time

store_process_info(process_info)

Store process information.

Parameters:

Name Type Description Default
process_info ProcessInfo

Process handler to store.

required
Source code in client/ayon_applications/process.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def store_process_info(self, process_info: ProcessInfo) -> None:
    """Store process information.

    Args:
        process_info (ProcessInfo): Process handler to store.

    """
    # refresh hash in case some values changed
    process_info.hash = ProcessManager.get_process_info_hash(process_info)
    if process_info.pid is None:
        self.log.warning((
            "Cannot store process info for process without PID. "
            "Process name: %s"
        ), process_info.name)
        return

    cnx = self._get_process_storage_connection()
    cursor = cnx.cursor()
    cursor.execute(
        "INSERT OR REPLACE INTO process_info "
        "(hash, name, executable, args, env, cwd, "
        "pid, output_file, start_time) "
        "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
        (
            process_info.hash,
            process_info.name,
            process_info.executable.as_posix(),
            json.dumps(process_info.args),
            json.dumps(process_info.env),
            process_info.cwd,
            process_info.pid,
            (
                process_info.output.as_posix()
                if process_info.output else None
            ),
            process_info.start_time,
        )
    )
    cnx.commit()