Skip to content

worker

CollectSceneData

Bases: BaseCommand

Helper command which will collect all useful info about workfile.

Result is dictionary with all layers data, exposure frames by layer ids pre/post behavior of layers by their ids, group information and scene data.

Source code in client/ayon_tvpaint/worker/worker_job.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class CollectSceneData(BaseCommand):
    """Helper command which will collect all useful info about workfile.

    Result is dictionary with all layers data, exposure frames by layer ids
    pre/post behavior of layers by their ids, group information and scene data.
    """
    name = "collect_scene_data"

    def execute(self):
        from ayon_tvpaint.api.lib import (
            get_layers_data,
            get_groups_data,
            get_layers_pre_post_behavior,
            get_layers_exposure_frames,
            get_scene_data
        )

        groups_data = get_groups_data(communicator=self.communicator)
        layers_data = get_layers_data(communicator=self.communicator)
        layer_ids = [
            layer_data["layer_id"]
            for layer_data in layers_data
        ]
        pre_post_beh_by_layer_id = get_layers_pre_post_behavior(
            layer_ids, communicator=self.communicator
        )
        exposure_frames_by_layer_id = get_layers_exposure_frames(
            layer_ids, layers_data, communicator=self.communicator
        )

        self._result = {
            "layers_data": layers_data,
            "exposure_frames_by_layer_id": exposure_frames_by_layer_id,
            "pre_post_beh_by_layer_id": pre_post_beh_by_layer_id,
            "groups_data": groups_data,
            "scene_data": get_scene_data(self.communicator)
        }

    @classmethod
    def from_existing(cls, data):
        return cls(data)

ExecuteGeorgeScript

Bases: BaseCommand

Execute multiline george script in TVPaint.

Parameters:

Name Type Description Default
script_lines(list)

Lines that will be executed in george script through temp george file.

required
tmp_file_keys(list)

List of formatting keys in george script that require replacement with path to a temp file where result will be stored. The content of file is stored to result by the key.

required
root_dir_key(str)

Formatting key that will be replaced in george script with job queue root which can be different on worker side.

required
data(dict)

Raw data about command.

required
Source code in client/ayon_tvpaint/worker/worker_job.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class ExecuteGeorgeScript(BaseCommand):
    """Execute multiline george script in TVPaint.

    Args:
        script_lines(list): Lines that will be executed in george script
            through temp george file.
        tmp_file_keys(list): List of formatting keys in george script that
            require replacement with path to a temp file where result will be
            stored. The content of file is stored to result by the key.
        root_dir_key(str): Formatting key that will be replaced in george
            script with job queue root which can be different on worker side.
        data(dict): Raw data about command.
    """
    name = "execute_george_through_file"

    def __init__(
        self, script_lines, tmp_file_keys=None, root_dir_key=None, data=None
    ):
        data = data or {}
        if not tmp_file_keys:
            tmp_file_keys = data.get("tmp_file_keys") or []

        data["script_lines"] = script_lines
        data["tmp_file_keys"] = tmp_file_keys
        data["root_dir_key"] = root_dir_key
        self._script_lines = script_lines
        self._tmp_file_keys = tmp_file_keys
        self._root_dir_key = root_dir_key
        super().__init__(data)

    def execute(self):
        filepath_by_key = {}
        script = self._script_lines
        if isinstance(script, list):
            script = "\n".join(script)

        # Replace temporary files in george script
        for key in self._tmp_file_keys:
            output_file = tempfile.NamedTemporaryFile(
                mode="w", prefix=TMP_FILE_PREFIX, suffix=".txt", delete=False
            )
            output_file.close()
            format_key = "{" + key + "}"
            output_path = output_file.name.replace("\\", "/")
            script = script.replace(format_key, output_path)
            filepath_by_key[key] = output_path

        # Replace job queue root in script
        if self._root_dir_key:
            job_queue_root = self.job_queue_root()
            format_key = "{" + self._root_dir_key + "}"
            script = script.replace(
                format_key, job_queue_root.replace("\\", "/")
            )

        # Execute the script
        self.execute_george_through_file(script)

        # Store result of temporary files
        result = {}
        for key, filepath in filepath_by_key.items():
            with open(filepath, "r") as stream:
                data = stream.read()
            result[key] = data
            os.remove(filepath)

        self._result = result

    @classmethod
    def from_existing(cls, data):
        """Recreate the object from data."""
        script_lines = data.pop("script_lines")
        tmp_file_keys = data.pop("tmp_file_keys", None)
        root_dir_key = data.pop("root_dir_key", None)
        return cls(script_lines, tmp_file_keys, root_dir_key, data)

from_existing(data) classmethod

Recreate the object from data.

Source code in client/ayon_tvpaint/worker/worker_job.py
238
239
240
241
242
243
244
@classmethod
def from_existing(cls, data):
    """Recreate the object from data."""
    script_lines = data.pop("script_lines")
    tmp_file_keys = data.pop("tmp_file_keys", None)
    root_dir_key = data.pop("root_dir_key", None)
    return cls(script_lines, tmp_file_keys, root_dir_key, data)

ExecuteSimpleGeorgeScript

Bases: BaseCommand

Execute simple george script in TVPaint.

Parameters:

Name Type Description Default
script(str)

Script that will be executed.

required
Source code in client/ayon_tvpaint/worker/worker_job.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class ExecuteSimpleGeorgeScript(BaseCommand):
    """Execute simple george script in TVPaint.

    Args:
        script(str): Script that will be executed.
    """
    name = "execute_george_simple"

    def __init__(self, script, data=None):
        data = data or {}
        data["script"] = script
        self._script = script
        super().__init__(data)

    def execute(self):
        self._result = self.execute_george(self._script)

    @classmethod
    def from_existing(cls, data):
        script = data.pop("script")
        return cls(script, data)

JobFailed

Bases: Exception

Raised when job was sent and finished unsuccessfully.

Source code in client/ayon_tvpaint/worker/worker_job.py
16
17
18
19
20
21
22
23
24
25
26
27
28
class JobFailed(Exception):
    """Raised when job was sent and finished unsuccessfully."""
    def __init__(self, job_status):
        job_state = job_status["state"]
        job_message = job_status["message"] or "Unknown issue"
        error_msg = (
            "Job didn't finish properly."
            " Job state: \"{}\" | Job message: \"{}\""
        ).format(job_state, job_message)

        self.job_status = job_status

        super().__init__(error_msg)

ProcessTVPaintCommands

Bases: TVPaintCommands

Worker side of TVPaint Commands.

It is expected this object is created only on worker's side from existing data loaded from job.

Workfile path logic is based on 'SenderTVPaintCommands'.

Source code in client/ayon_tvpaint/worker/worker_job.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
class ProcessTVPaintCommands(TVPaintCommands):
    """Worker side of TVPaint Commands.

    It is expected this object is created only on worker's side from existing
    data loaded from job.

    Workfile path logic is based on 'SenderTVPaintCommands'.
    """
    def __init__(self, workfile, commands, communicator):
        super(ProcessTVPaintCommands, self).__init__(workfile)

        self._communicator = communicator

        self.commands_from_data(commands)

    def _prepare_workfile(self, workfile):
        """Preprend job queue root before passed workfile."""
        workfile = workfile.replace("\\", "/")
        job_queue_root = self.job_queue_root().replace("\\", "/")
        new_workfile = "/".join([job_queue_root, workfile])
        while "//" in new_workfile:
            new_workfile = new_workfile.replace("//", "/")
        return os.path.normpath(new_workfile)

    @property
    def communicator(self):
        """Access to TVPaint communicator."""
        return self._communicator

    def commands_from_data(self, commands_data):
        """Recreate command from passed data."""
        for command_data in commands_data:
            command_name = command_data["command"]

            klass = self.classes_by_name[command_name]
            command = klass.from_existing(command_data)
            self.add_command(command)

    def execute_george(self, george_script):
        """Helper method to execute george script."""
        return self.communicator.execute_george(george_script)

    def execute_george_through_file(self, george_script):
        """Helper method to execute george script through temp file."""
        temporary_file = tempfile.NamedTemporaryFile(
            mode="w", prefix=TMP_FILE_PREFIX, suffix=".grg", delete=False
        )
        temporary_file.write(george_script)
        temporary_file.close()
        temp_file_path = temporary_file.name.replace("\\", "/")
        self.execute_george("tv_runscript {}".format(temp_file_path))
        os.remove(temp_file_path)

    def _open_workfile(self):
        """Open workfile in TVPaint."""
        workfile = self._workfile
        print("Opening workfile {}".format(workfile))
        george_script = "tv_LoadProject '\"'\"{}\"'\"'".format(workfile)
        self.execute_george_through_file(george_script)

    def _close_workfile(self):
        """Close workfile in TVPaint."""
        print("Closing workfile")
        self.execute_george_through_file("tv_projectclose")

    def execute(self):
        """Execute commands."""
        # First open the workfile
        self._open_workfile()
        # Execute commands one by one
        # TODO maybe stop processing when command fails?
        print("Commands execution started ({})".format(len(self._commands)))
        for command in self._commands:
            command.execute()
            command.set_done()
        # Finally close workfile
        self._close_workfile()

communicator property

Access to TVPaint communicator.

commands_from_data(commands_data)

Recreate command from passed data.

Source code in client/ayon_tvpaint/worker/worker_job.py
486
487
488
489
490
491
492
493
def commands_from_data(self, commands_data):
    """Recreate command from passed data."""
    for command_data in commands_data:
        command_name = command_data["command"]

        klass = self.classes_by_name[command_name]
        command = klass.from_existing(command_data)
        self.add_command(command)

execute()

Execute commands.

Source code in client/ayon_tvpaint/worker/worker_job.py
522
523
524
525
526
527
528
529
530
531
532
533
def execute(self):
    """Execute commands."""
    # First open the workfile
    self._open_workfile()
    # Execute commands one by one
    # TODO maybe stop processing when command fails?
    print("Commands execution started ({})".format(len(self._commands)))
    for command in self._commands:
        command.execute()
        command.set_done()
    # Finally close workfile
    self._close_workfile()

execute_george(george_script)

Helper method to execute george script.

Source code in client/ayon_tvpaint/worker/worker_job.py
495
496
497
def execute_george(self, george_script):
    """Helper method to execute george script."""
    return self.communicator.execute_george(george_script)

execute_george_through_file(george_script)

Helper method to execute george script through temp file.

Source code in client/ayon_tvpaint/worker/worker_job.py
499
500
501
502
503
504
505
506
507
508
def execute_george_through_file(self, george_script):
    """Helper method to execute george script through temp file."""
    temporary_file = tempfile.NamedTemporaryFile(
        mode="w", prefix=TMP_FILE_PREFIX, suffix=".grg", delete=False
    )
    temporary_file.write(george_script)
    temporary_file.close()
    temp_file_path = temporary_file.name.replace("\\", "/")
    self.execute_george("tv_runscript {}".format(temp_file_path))
    os.remove(temp_file_path)

SenderTVPaintCommands

Bases: TVPaintCommands

Sender implementation of TVPaint Commands.

Source code in client/ayon_tvpaint/worker/worker_job.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
class SenderTVPaintCommands(TVPaintCommands):
    """Sender implementation of TVPaint Commands."""
    def _prepare_workfile(self, workfile):
        """Remove job queue root from workfile path.

        It is expected that worker will add it's root before passed workfile.
        """
        new_workfile = workfile.replace("\\", "/")
        job_queue_root = self.job_queue_root().replace("\\", "/")
        if job_queue_root not in new_workfile:
            raise ValueError((
                "Workfile is not located in JobQueue root."
                " Workfile path: \"{}\". JobQueue root: \"{}\""
            ).format(workfile, job_queue_root))
        return new_workfile.replace(job_queue_root, "")

    def commands_data(self):
        """Commands data to be able recreate them."""
        return [
            command.command_data()
            for command in self._commands
        ]

    def to_job_data(self):
        """Convert commands to job data before sending to workers server."""
        return {
            "workfile": self._workfile,
            "function": "commands",
            "commands": self.commands_data()
        }

    def set_result(self, result):
        commands_by_id = {
            command.id: command
            for command in self._commands
        }

        for item in result:
            command = commands_by_id[item["id"]]
            command.set_result(item["result"])
            command.set_done()

    def _send_job(self):
        """Send job to a workers server."""
        # Send job data to job queue server
        job_data = self.to_job_data()
        self.log.debug("Sending job to JobQueue server.\n{}".format(
            json.dumps(job_data, indent=4)
        ))
        job_id = self._job_queue_module.send_job("tvpaint", job_data)
        self.log.info((
            "Job sent to JobQueue server and got id \"{}\"."
            " Waiting for finishing the job."
        ).format(job_id))

        return job_id

    def send_job_and_wait(self):
        """Send job to workers server and wait for response.

        Result of job is stored into the object.

        Raises:
            JobFailed: When job was finished but not successfully.
        """
        job_id = self._send_job()
        while True:
            job_status = self._job_queue_module.get_job_status(job_id)
            if job_status["done"]:
                break
            time.sleep(1)

        # Check if job state is done
        if job_status["state"] != "done":
            raise JobFailed(job_status)

        self.set_result(job_status["result"])

        self.log.debug("Job is done and result is stored.")

commands_data()

Commands data to be able recreate them.

Source code in client/ayon_tvpaint/worker/worker_job.py
392
393
394
395
396
397
def commands_data(self):
    """Commands data to be able recreate them."""
    return [
        command.command_data()
        for command in self._commands
    ]

send_job_and_wait()

Send job to workers server and wait for response.

Result of job is stored into the object.

Raises:

Type Description
JobFailed

When job was finished but not successfully.

Source code in client/ayon_tvpaint/worker/worker_job.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def send_job_and_wait(self):
    """Send job to workers server and wait for response.

    Result of job is stored into the object.

    Raises:
        JobFailed: When job was finished but not successfully.
    """
    job_id = self._send_job()
    while True:
        job_status = self._job_queue_module.get_job_status(job_id)
        if job_status["done"]:
            break
        time.sleep(1)

    # Check if job state is done
    if job_status["state"] != "done":
        raise JobFailed(job_status)

    self.set_result(job_status["result"])

    self.log.debug("Job is done and result is stored.")

to_job_data()

Convert commands to job data before sending to workers server.

Source code in client/ayon_tvpaint/worker/worker_job.py
399
400
401
402
403
404
405
def to_job_data(self):
    """Convert commands to job data before sending to workers server."""
    return {
        "workfile": self._workfile,
        "function": "commands",
        "commands": self.commands_data()
    }