Skip to content

integrate_ftrack_farm_status

IntegrateFtrackFarmStatus

Bases: FtrackPublishContextPlugin

Change task status when should be published on farm.

Instance which has set "farm" key in data to 'True' is considered as will be rendered on farm thus it's status should be changed.

Source code in client/ayon_ftrack/plugins/publish/integrate_ftrack_farm_status.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class IntegrateFtrackFarmStatus(plugin.FtrackPublishContextPlugin):
    """Change task status when should be published on farm.

    Instance which has set "farm" key in data to 'True' is considered as will
    be rendered on farm thus it's status should be changed.
    """

    order = pyblish.api.IntegratorOrder + 0.48
    label = "Integrate ftrack Farm Status"
    farm_status_profiles = []

    def process(self, context):
        # Quick end
        if not self.farm_status_profiles:
            project_name = context.data["projectName"]
            self.log.info((
                "Status profiles are not filled for project \"{}\". Skipping"
            ).format(project_name))
            return

        filtered_instances = self.filter_instances(context)
        instances_with_status_names = self.get_instances_with_statuse_names(
            context, filtered_instances
        )
        if instances_with_status_names:
            self.fill_statuses(context, instances_with_status_names)

    def filter_instances(self, context):
        filtered_instances = []
        for instance in context:
            # Skip disabled instances
            if instance.data.get("publish") is False:
                continue
            product_name = instance.data["productName"]
            msg_start = "Skipping instance {}.".format(product_name)
            if not instance.data.get("farm"):
                self.log.debug(
                    "{} Won't be rendered on farm.".format(msg_start)
                )
                continue

            task_entity = instance.data.get("ftrackTask")
            if not task_entity:
                self.log.debug(
                    "{} Does not have filled task".format(msg_start)
                )
                continue

            filtered_instances.append(instance)
        return filtered_instances

    def get_instances_with_statuse_names(self, context, instances):
        instances_with_status_names = []
        for instance in instances:
            product_type = instance.data["productType"]
            product_name = instance.data["productName"]
            task_entity = instance.data["ftrackTask"]
            host_name = context.data["hostName"]
            task_name = task_entity["name"]
            task_type = task_entity["type"]["name"]
            status_profile = filter_profiles(
                self.farm_status_profiles,
                {
                    "host_names": host_name,
                    "task_types": task_type,
                    "task_names": task_name,
                    "product_types": product_type,
                    "product_names": product_name,
                },
                logger=self.log
            )
            if not status_profile:
                # There already is log in 'filter_profiles'
                continue

            status_name = status_profile["status_name"]
            if status_name:
                instances_with_status_names.append((instance, status_name))
        return instances_with_status_names

    def fill_statuses(self, context, instances_with_status_names):
        # Prepare available task statuses on the project
        project_name = context.data["projectName"]
        session = context.data["ftrackSession"]
        project_entity = session.query((
            "select project_schema from Project where full_name is \"{}\""
        ).format(project_name)).one()
        project_schema = project_entity["project_schema"]

        task_type_ids = set()
        for item in instances_with_status_names:
            instance, _ = item
            task_entity = instance.data["ftrackTask"]
            task_type_ids.add(task_entity["type"]["id"])

        task_statuses_by_type_id = {
            task_type_id: project_schema.get_statuses("Task", task_type_id)
            for task_type_id in task_type_ids
        }

        # Keep track if anything has changed
        skipped_status_names = set()
        status_changed = False
        for item in instances_with_status_names:
            instance, status_name = item
            task_entity = instance.data["ftrackTask"]
            task_statuses = task_statuses_by_type_id[task_entity["type"]["id"]]
            status_name_low = status_name.lower()

            status_id = None
            status_name = None
            # Skip if status name was already tried to be found
            for status in task_statuses:
                if status["name"].lower() == status_name_low:
                    status_id = status["id"]
                    status_name = status["name"]
                    break

            if status_id is None:
                if status_name_low not in skipped_status_names:
                    skipped_status_names.add(status_name_low)
                    joined_status_names = ", ".join({
                        '"{}"'.format(status["name"])
                        for status in task_statuses
                    })
                    self.log.warning((
                        "Status \"{}\" is not available on project \"{}\"."
                        " Available statuses are {}"
                    ).format(status_name, project_name, joined_status_names))
                continue

            # Change task status id
            if status_id != task_entity["status_id"]:
                task_entity["status_id"] = status_id
                status_changed = True
                path = "/".join([
                    item["name"]
                    for item in task_entity["link"]
                ])
                self.log.debug("Set status \"{}\" to \"{}\"".format(
                    status_name, path
                ))

        if status_changed:
            session.commit()