Skip to content

integrate_ftrack_status

CollectFtrackTaskStatuses

Bases: FtrackPublishContextPlugin

Collect available task statuses on the project.

This is preparation for integration of task statuses.

Requirements

ftrackSession (ftrack_api.Session): Prepared ftrack session.

Provides

ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available task statuses on project by task type id. ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task statuses by task id. Status on task can be set only once. Value should be a name of status.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class CollectFtrackTaskStatuses(plugin.FtrackPublishContextPlugin):
    """Collect available task statuses on the project.

    This is preparation for integration of task statuses.

    Requirements:
        ftrackSession (ftrack_api.Session): Prepared ftrack session.

    Provides:
        ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available
            task statuses on project by task type id.
        ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task
            statuses by task id. Status on task can be set only once.
            Value should be a name of status.
    """

    # After 'CollectFtrackApi'
    order = pyblish.api.CollectorOrder + 0.4992
    label = "Collect ftrack Task Statuses"

    def process(self, context):
        ftrack_session = context.data("ftrackSession")
        if ftrack_session is None:
            self.log.info("ftrack session is not created.")
            return

        # Prepare available task statuses on the project
        project_name = context.data["projectName"]
        project_entity = ftrack_session.query((
            "select project_schema from Project where full_name is \"{}\""
        ).format(project_name)).one()
        project_schema = project_entity["project_schema"]

        task_type_ids = {
            task_type["id"]
            for task_type in ftrack_session.query("select id from Type").all()
        }
        task_statuses_by_type_id = {
            task_type_id: project_schema.get_statuses("Task", task_type_id)
            for task_type_id in task_type_ids
        }
        context.data["ftrackTaskStatuses"] = task_statuses_by_type_id
        context.data["ftrackStatusByTaskId"] = {}
        self.log.debug("Collected ftrack task statuses.")

IntegrateFtrackFarmStatus

Bases: IntegrateFtrackStatusBase

Collect task status names for instances that are sent to farm.

Instance which has set "farm" key in data to 'True' is considered as will be rendered on farm thus it's status should be changed.

Requirements

projectName (str): Name of the project. hostName (str): Name of the host. ftrackSession (ftrack_api.Session): Prepared ftrack session. ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available task statuses on project by task type id. ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task statuses by task id. Status on task can be set only once. Value should be a name of status.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class IntegrateFtrackFarmStatus(IntegrateFtrackStatusBase):
    """Collect task status names for instances that are sent to farm.

    Instance which has set "farm" key in data to 'True' is considered as will
    be rendered on farm thus it's status should be changed.

    Requirements:
        projectName (str): Name of the project.
        hostName (str): Name of the host.
        ftrackSession (ftrack_api.Session): Prepared ftrack session.
        ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available
            task statuses on project by task type id.
        ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task
            statuses by task id. Status on task can be set only once.
            Value should be a name of status.
    """

    order = pyblish.api.IntegratorOrder + 0.48
    label = "ftrack Task Status To Farm Status"
    active = True

    farm_status_profiles = []
    status_profiles = None

    def is_valid_instance(self, context, instance):
        if not instance.data.get("farm"):
            self.log.debug("{} Won't be rendered on farm.".format(
                instance.data["productName"]
            ))
            return False
        return super(IntegrateFtrackFarmStatus, self).is_valid_instance(
            context, instance)

    def get_status_profiles(self):
        if self.status_profiles is None:
            profiles = copy.deepcopy(self.farm_status_profiles)
            self.status_profiles = profiles
        return self.status_profiles

IntegrateFtrackLocalStatus

Bases: IntegrateFtrackStatusBase

Collect task status names for instances that are published locally.

Instance which has set "farm" key in data to 'True' is considered as will be rendered on farm thus it's status should be changed.

Requirements

projectName (str): Name of the project. hostName (str): Name of the host. ftrackSession (ftrack_api.Session): Prepared ftrack session. ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available task statuses on project by task type id. ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task statuses by task id. Status on task can be set only once. Value should be a name of status.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class IntegrateFtrackLocalStatus(IntegrateFtrackStatusBase):
    """Collect task status names for instances that are published locally.

    Instance which has set "farm" key in data to 'True' is considered as will
    be rendered on farm thus it's status should be changed.

    Requirements:
        projectName (str): Name of the project.
        hostName (str): Name of the host.
        ftrackSession (ftrack_api.Session): Prepared ftrack session.
        ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available
            task statuses on project by task type id.
        ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task
            statuses by task id. Status on task can be set only once.
            Value should be a name of status.
    """

    order = IntegrateFtrackFarmStatus.order + 0.001
    label = "ftrack Task Status Local Publish"
    active = True
    targets = ["local"]
    settings_key = "ftrack_task_status_local_publish"

    def is_valid_instance(self, context, instance):
        if instance.data.get("farm"):
            self.log.debug("{} Will be rendered on farm.".format(
                instance.data["productName"]
            ))
            return False
        return super(IntegrateFtrackLocalStatus, self).is_valid_instance(
            context, instance)

IntegrateFtrackOnFarmStatus

Bases: IntegrateFtrackStatusBase

Collect task status names for instances that are published on farm.

Requirements

projectName (str): Name of the project. hostName (str): Name of the host. ftrackSession (ftrack_api.Session): Prepared ftrack session. ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available task statuses on project by task type id. ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task statuses by task id. Status on task can be set only once. Value should be a name of status.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class IntegrateFtrackOnFarmStatus(IntegrateFtrackStatusBase):
    """Collect task status names for instances that are published on farm.

    Requirements:
        projectName (str): Name of the project.
        hostName (str): Name of the host.
        ftrackSession (ftrack_api.Session): Prepared ftrack session.
        ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available
            task statuses on project by task type id.
        ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task
            statuses by task id. Status on task can be set only once.
            Value should be a name of status.
    """

    order = IntegrateFtrackLocalStatus.order + 0.001
    label = "Ftrack Task Status On Farm Status"
    active = True
    targets = ["farm"]
    settings_key = "ftrack_task_status_on_farm_publish"

IntegrateFtrackStatusBase

Bases: FtrackPublishInstancePlugin

Base plugin for status collection.

Requirements

projectName (str): Name of the project. hostName (str): Name of the host. ftrackSession (ftrack_api.Session): Prepared ftrack session. ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available task statuses on project by task type id. ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task statuses by task id. Status on task can be set only once. Value should be a name of status.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class IntegrateFtrackStatusBase(plugin.FtrackPublishInstancePlugin):
    """Base plugin for status collection.

    Requirements:
        projectName (str): Name of the project.
        hostName (str): Name of the host.
        ftrackSession (ftrack_api.Session): Prepared ftrack session.
        ftrackTaskStatuses (dict[str, list[Any]]): Dictionary of available
            task statuses on project by task type id.
        ftrackStatusByTaskId (dict[str, str]): Empty dictionary of task
            statuses by task id. Status on task can be set only once.
            Value should be a name of status.
    """

    active = False
    settings_key = None
    status_profiles = []

    @classmethod
    def apply_settings(cls, project_settings):
        if not cls.is_ftrack_enabled(project_settings):
            cls.enabled = False
            return

        settings_key = cls.settings_key
        if settings_key is None:
            settings_key = cls.__name__

        try:
            settings = project_settings["ftrack"]["publish"][settings_key]
        except KeyError:
            return

        for key, value in settings.items():
            setattr(cls, key, value)

    def process(self, instance):
        context = instance.context
        # No profiles -> skip
        profiles = self.get_status_profiles()
        if not profiles:
            project_name = context.data["projectName"]
            self.log.debug((
                "Status profiles are not filled for project \"{}\". Skipping"
            ).format(project_name))
            return

        # Task statuses were not collected -> skip
        task_statuses_by_type_id = context.data.get("ftrackTaskStatuses")
        if not task_statuses_by_type_id:
            self.log.debug(
                "ftrack task statuses are not collected. Skipping.")
            return

        self.prepare_status_names(context, instance, profiles)

    def get_status_profiles(self):
        """List of profiles to determine status name.

        Example profile item:
            {
                "host_names": ["nuke"],
                "task_types": ["Compositing"],
                "task_names": ["Comp"],
                "product_types": ["render"],
                "product_names": ["renderComp"],
                "status_name": "Rendering",
            }

        Returns:
            list[dict[str, Any]]: List of profiles.
        """

        return self.status_profiles

    def prepare_status_names(self, context, instance, profiles):
        if not self.is_valid_instance(context, instance):
            return

        filter_data = self.get_profile_filter_data(context, instance)
        status_profile = filter_profiles(
            profiles,
            filter_data,
            logger=self.log
        )
        if not status_profile:
            return

        status_name = status_profile["status_name"]
        if status_name:
            self.fill_status(context, instance, status_name)

    def get_profile_filter_data(self, context, instance):
        task_entity = instance.data["ftrackTask"]
        return {
            "host_names": context.data["hostName"],
            "task_types": task_entity["type"]["name"],
            "task_names": task_entity["name"],
            "product_types": instance.data["productType"],
            "product_names": instance.data["productName"],
        }

    def is_valid_instance(self, context, instance):
        """Filter instances that should be processed.

        Ignore instances that are not enabled for publishing or don't have
        filled task. Also skip instances with tasks that already have defined
        status.

        Plugin should do more filtering which is custom for plugin logic.

        Args:
            context (pyblish.api.Context): Pyblish context.
            instance (pyblish.api.Instance): Instance to process.

        Returns:
            list[pyblish.api.Instance]: List of instances that should be
                processed.
        """

        ftrack_status_by_task_id = context.data["ftrackStatusByTaskId"]
        # Skip disabled instances
        if instance.data.get("publish") is False:
            return False

        task_entity = instance.data.get("ftrackTask")
        if not task_entity:
            self.log.debug(
                "Skipping instance {}. Does not have filled task".format(
                    instance.data["productName"]))
            return False

        task_id = task_entity["id"]
        if task_id in ftrack_status_by_task_id:
            self.log.debug("Status for task {} was already defined".format(
                task_entity["name"]
            ))
            return False

        return True

    def fill_status(self, context, instance, status_name):
        """Fill status for instance task.

        If task already had set status, it will be skipped.

        Args:
            context (pyblish.api.Context): Pyblish context.
            instance (pyblish.api.Instance): Pyblish instance.
            status_name (str): Name of status to set.
        """

        task_entity = instance.data["ftrackTask"]
        task_id = task_entity["id"]
        ftrack_status_by_task_id = context.data["ftrackStatusByTaskId"]
        if task_id in ftrack_status_by_task_id:
            self.log.debug("Status for task {} was already defined".format(
                task_entity["name"]
            ))
            return

        ftrack_status_by_task_id[task_id] = status_name
        self.log.info((
            "Task {} will be set to \"{}\" status."
        ).format(task_entity["name"], status_name))

fill_status(context, instance, status_name)

Fill status for instance task.

If task already had set status, it will be skipped.

Parameters:

Name Type Description Default
context Context

Pyblish context.

required
instance Instance

Pyblish instance.

required
status_name str

Name of status to set.

required
Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def fill_status(self, context, instance, status_name):
    """Fill status for instance task.

    If task already had set status, it will be skipped.

    Args:
        context (pyblish.api.Context): Pyblish context.
        instance (pyblish.api.Instance): Pyblish instance.
        status_name (str): Name of status to set.
    """

    task_entity = instance.data["ftrackTask"]
    task_id = task_entity["id"]
    ftrack_status_by_task_id = context.data["ftrackStatusByTaskId"]
    if task_id in ftrack_status_by_task_id:
        self.log.debug("Status for task {} was already defined".format(
            task_entity["name"]
        ))
        return

    ftrack_status_by_task_id[task_id] = status_name
    self.log.info((
        "Task {} will be set to \"{}\" status."
    ).format(task_entity["name"], status_name))

get_status_profiles()

List of profiles to determine status name.

Example profile item

{ "host_names": ["nuke"], "task_types": ["Compositing"], "task_names": ["Comp"], "product_types": ["render"], "product_names": ["renderComp"], "status_name": "Rendering", }

Returns:

Type Description

list[dict[str, Any]]: List of profiles.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_status_profiles(self):
    """List of profiles to determine status name.

    Example profile item:
        {
            "host_names": ["nuke"],
            "task_types": ["Compositing"],
            "task_names": ["Comp"],
            "product_types": ["render"],
            "product_names": ["renderComp"],
            "status_name": "Rendering",
        }

    Returns:
        list[dict[str, Any]]: List of profiles.
    """

    return self.status_profiles

is_valid_instance(context, instance)

Filter instances that should be processed.

Ignore instances that are not enabled for publishing or don't have filled task. Also skip instances with tasks that already have defined status.

Plugin should do more filtering which is custom for plugin logic.

Parameters:

Name Type Description Default
context Context

Pyblish context.

required
instance Instance

Instance to process.

required

Returns:

Type Description

list[pyblish.api.Instance]: List of instances that should be processed.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def is_valid_instance(self, context, instance):
    """Filter instances that should be processed.

    Ignore instances that are not enabled for publishing or don't have
    filled task. Also skip instances with tasks that already have defined
    status.

    Plugin should do more filtering which is custom for plugin logic.

    Args:
        context (pyblish.api.Context): Pyblish context.
        instance (pyblish.api.Instance): Instance to process.

    Returns:
        list[pyblish.api.Instance]: List of instances that should be
            processed.
    """

    ftrack_status_by_task_id = context.data["ftrackStatusByTaskId"]
    # Skip disabled instances
    if instance.data.get("publish") is False:
        return False

    task_entity = instance.data.get("ftrackTask")
    if not task_entity:
        self.log.debug(
            "Skipping instance {}. Does not have filled task".format(
                instance.data["productName"]))
        return False

    task_id = task_entity["id"]
    if task_id in ftrack_status_by_task_id:
        self.log.debug("Status for task {} was already defined".format(
            task_entity["name"]
        ))
        return False

    return True

IntegrateFtrackTaskStatus

Bases: FtrackPublishContextPlugin

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
class IntegrateFtrackTaskStatus(plugin.FtrackPublishContextPlugin):
    # Use order of Integrate ftrack Api plugin and offset it before or after
    base_order = pyblish.api.IntegratorOrder + 0.499
    # By default is after Integrate ftrack Api
    order = base_order + 0.0001
    label = "Integrate ftrack Task Status"

    @classmethod
    def apply_settings(cls, project_settings):
        """Apply project settings to plugin.

        Args:
            project_settings (dict[str, Any]): Project settings.
        """

        if not cls.is_ftrack_enabled(project_settings):
            cls.enabled = False
            return

        settings = (
            project_settings["ftrack"]["publish"]["IntegrateFtrackTaskStatus"]
        )
        diff = 0.001
        if not settings["after_version_statuses"]:
            diff = -diff
        cls.order = cls.base_order + diff

    def process(self, context):
        task_statuses_by_type_id = context.data.get("ftrackTaskStatuses")
        if not task_statuses_by_type_id:
            self.log.debug("ftrack task statuses are not collected. Skipping.")
            return

        status_by_task_id = self._get_status_by_task_id(context)
        if not status_by_task_id:
            self.log.debug("No statuses to set. Skipping.")
            return

        ftrack_session = context.data["ftrackSession"]

        task_entities = self._get_task_entities(
            ftrack_session, status_by_task_id)

        for task_entity in task_entities:
            task_path = "/".join([
                item["name"] for item in task_entity["link"]
            ])
            task_id = task_entity["id"]
            type_id = task_entity["type_id"]
            new_status = None
            status_name = status_by_task_id[task_id]
            self.log.debug(
                "Status to set {} on task {}.".format(status_name, task_path))
            status_name_low = status_name.lower()
            available_statuses = task_statuses_by_type_id[type_id]
            for status in available_statuses:
                if status["name"].lower() == status_name_low:
                    new_status = status
                    break

            if new_status is None:
                joined_statuses = ", ".join([
                    "'{}'".format(status["name"])
                    for status in available_statuses
                ])
                self.log.debug((
                    "Status '{}' was not found in available statuses: {}."
                ).format(status_name, joined_statuses))
                continue

            if task_entity["status_id"] != new_status["id"]:
                task_entity["status_id"] = new_status["id"]

                self.log.debug("Changing status of task '{}' to '{}'".format(
                    task_path, status_name
                ))
                ftrack_session.commit()

    def _get_status_by_task_id(self, context):
        status_by_task_id = context.data["ftrackStatusByTaskId"]
        return {
            task_id: status_name
            for task_id, status_name in status_by_task_id.items()
            if status_name
        }

    def _get_task_entities(self, ftrack_session, status_by_task_id):
        task_entities = []
        for chunk_ids in create_chunks(status_by_task_id.keys()):
            joined_ids = ",".join(
                ['"{}"'.format(task_id) for task_id in chunk_ids]
            )
            task_entities.extend(ftrack_session.query((
                "select id, type_id, status_id, link from Task"
                " where id in ({})"
            ).format(joined_ids)).all())
        return task_entities

apply_settings(project_settings) classmethod

Apply project settings to plugin.

Parameters:

Name Type Description Default
project_settings dict[str, Any]

Project settings.

required
Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_status.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@classmethod
def apply_settings(cls, project_settings):
    """Apply project settings to plugin.

    Args:
        project_settings (dict[str, Any]): Project settings.
    """

    if not cls.is_ftrack_enabled(project_settings):
        cls.enabled = False
        return

    settings = (
        project_settings["ftrack"]["publish"]["IntegrateFtrackTaskStatus"]
    )
    diff = 0.001
    if not settings["after_version_statuses"]:
        diff = -diff
    cls.order = cls.base_order + diff