Skip to content

server

FtrackAddon

Bases: BaseServerAddon

Source code in server/__init__.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class FtrackAddon(BaseServerAddon):
    settings_model: Type[FtrackSettings] = FtrackSettings

    async def get_default_settings(self):
        settings_model_cls = self.get_settings_model()
        return settings_model_cls(**DEFAULT_VALUES)

    async def pre_setup(self):
        """Make sure older version of addon use the new way of attributes."""

        # Force older addon versions to skip creation of attributes
        #   - this was added in version 0.2.2
        instance = AddonLibrary.getinstance()
        app_defs = instance.data.get(self.name)
        my_version = semver.Version.parse(self.version)
        for version, addon in app_defs.versions.items():
            if version == self.version:
                continue
            try:
                addon_version = semver.Version.parse(version)
                if addon_version > my_version:
                    continue
            except Exception:
                pass
            if hasattr(addon, "create_ftrack_attributes"):
                addon.create_ftrack_attributes = (
                    self._empty_create_ftrack_attributes)

    async def setup(self):
        need_restart = await self.create_ftrack_attributes()
        if need_restart:
            self.request_server_restart()

    def initialize(self) -> None:
        self.add_endpoint(
            "/customProcessorHandlers",
            self.get_custom_processor_handlers,
            method="GET",
        )
        self.add_endpoint(
            "/ftrackProjects",
            self.get_ftrack_projects_info,
            method="GET",
        )

    async def get_custom_processor_handlers(
        self,
        variant: str = Query("production"),
    ) -> {}:
        bundles = await Postgres.fetch(
            "SELECT name, is_production, is_staging,"
            " is_dev, data->'addons' as addons FROM bundles"
        )
        bundles_by_variant = {
            "production": None,
            "staging": None
        }
        for bundle in bundles:
            if bundle["is_dev"]:
                bundles_by_variant[bundle["name"]] = bundle
                continue

            if bundle["is_production"]:
                bundles_by_variant["production"] = bundle

            if bundle["is_staging"]:
                bundles_by_variant["staging"] = bundle

        handlers = []
        output = {"custom_handlers": handlers}
        if variant not in bundles_by_variant:
            return output
        addons = bundles_by_variant[variant]["addons"]
        addon_library = AddonLibrary.getinstance()
        for addon_name, addon_version in addons.items():
            addons_mapping = addon_library.get(addon_name) or {}
            addon = addons_mapping.get(addon_version)
            if not hasattr(addon, "get_custom_ftrack_handlers_endpoint"):
                continue
            try:
                endpoint = addon.get_custom_ftrack_handlers_endpoint()
                if endpoint:
                    handlers.append({
                        "addon_name": addon_name,
                        "addon_version": addon_version,
                        "endpoint": endpoint,
                    })
            except BaseException as exc:
                logging.warning(
                    f"Failed to receive ftrack handlers from addon"
                    f" {addon_name} {addon_version}. {exc}"
                )

        return output

    async def _prepare_ftrack_session(
        self, variant: str, settings_model: FtrackSettings | None
    ) -> FtrackSession:
        # TODO validate user permissions
        # - What permissions user must have to allow this endpoint?
        if settings_model is None:
            settings_model = await self.get_studio_settings(variant)
        ftrack_server_url = settings_model.ftrack_server
        service_settings = settings_model.service_settings
        api_key_secret = service_settings.api_key
        username_secret = service_settings.username

        if not ftrack_server_url or not api_key_secret or not username_secret:
            raise InvalidSettingsException("Required settings are not set.")

        ftrack_api_key = await Secrets.get(api_key_secret)
        ftrack_username = await Secrets.get(username_secret)

        if not ftrack_api_key or not ftrack_username:
            raise InvalidSettingsException(
                "Invalid service settings, secrets are not set."
            )

        session = FtrackSession(
            ftrack_server_url, ftrack_api_key, ftrack_username
        )
        try:
            await session.validate()
        except InvalidCredentials:
            raise InvalidSettingsException(
                "Service settings contain invalid credentials."
            )
        except ServerError as exc:
            raise AyonException(
                "Unknown error occurred while connecting to ftrack server."
                f" {exc}"
            )
        return session

    async def get_ftrack_projects_info(
        self,
        user: CurrentUser,
        variant: str = Query("production"),
    ) -> {}:
        # TODO validate user permissions
        # - What permissions user must have to allow this endpoint?
        session = await self._prepare_ftrack_session(variant)
        projects = [
            {
                "id": project["id"],
                "name": project["full_name"],
                "code": project["name"],
                "active": project["status"] == "active",
            }
            async for project in await session.get_projects()
        ]

        return {
            "projects": projects,
        }

    async def _empty_create_ftrack_attributes(self):
        return False

    async def create_ftrack_attributes(self) -> bool:
        """Make sure there are required attributes which ftrack addon needs.

        Returns:
            bool: 'True' if an attribute was created or updated.
        """

        query = "SELECT name, position, scope, data from public.attributes"
        ftrack_id_attribute_data = {
            "type": "string",
            "title": "ftrack id",
            "inherit": False,
        }
        ftrack_path_attribute_data = {
            "type": "string",
            "title": "ftrack path",
            "inherit": False,
        }
        ftrack_id_expected_scope = ["project", "folder", "task", "version"]
        ftrack_path_expected_scope = ["project", "folder", "task"]

        ftrack_id_match_position = None
        ftrack_id_matches = False
        ftrack_path_match_position = None
        ftrack_path_matches = False
        position = 1
        if Postgres.pool is None:
            await Postgres.connect()
        async for row in Postgres.iterate(query):
            position += 1
            if row["name"] == FTRACK_ID_ATTRIB:
                # Check if scope is matching ftrack addon requirements
                if not set(ftrack_id_expected_scope) - set(row["scope"]):
                    ftrack_id_matches = True
                ftrack_id_match_position = row["position"]

            elif row["name"] == FTRACK_PATH_ATTRIB:
                if not set(ftrack_path_expected_scope) - set(row["scope"]):
                    ftrack_path_matches = True
                ftrack_path_match_position = row["position"]

        if ftrack_id_matches and ftrack_path_matches:
            return False

        postgre_query = "\n".join((
            "INSERT INTO public.attributes",
            "    (name, position, scope, data)",
            "VALUES",
            "    ($1, $2, $3, $4)",
            "ON CONFLICT (name)",
            "DO UPDATE SET",
            "    scope = $3,",
            "    data = $4",
        ))
        if not ftrack_id_matches:
            # Reuse position from found attribute
            if ftrack_id_match_position is None:
                ftrack_id_match_position = position
                position += 1

            await Postgres.execute(
                postgre_query,
                FTRACK_ID_ATTRIB,
                ftrack_id_match_position,
                ftrack_id_expected_scope,
                ftrack_id_attribute_data,
            )

        if not ftrack_path_matches:
            if ftrack_path_match_position is None:
                ftrack_path_match_position = position
                position += 1

            await Postgres.execute(
                postgre_query,
                FTRACK_PATH_ATTRIB,
                ftrack_path_match_position,
                ftrack_path_expected_scope,
                ftrack_path_attribute_data,
            )
        return True

    async def convert_settings_overrides(
        self,
        source_version: str,
        overrides: dict[str, Any],
    ) -> dict[str, Any]:
        await convert_settings_overrides(source_version, overrides)
        return await super().convert_settings_overrides(
            source_version, overrides
        )

create_ftrack_attributes() async

Make sure there are required attributes which ftrack addon needs.

Returns:

Name Type Description
bool bool

'True' if an attribute was created or updated.

Source code in server/__init__.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
async def create_ftrack_attributes(self) -> bool:
    """Make sure there are required attributes which ftrack addon needs.

    Returns:
        bool: 'True' if an attribute was created or updated.
    """

    query = "SELECT name, position, scope, data from public.attributes"
    ftrack_id_attribute_data = {
        "type": "string",
        "title": "ftrack id",
        "inherit": False,
    }
    ftrack_path_attribute_data = {
        "type": "string",
        "title": "ftrack path",
        "inherit": False,
    }
    ftrack_id_expected_scope = ["project", "folder", "task", "version"]
    ftrack_path_expected_scope = ["project", "folder", "task"]

    ftrack_id_match_position = None
    ftrack_id_matches = False
    ftrack_path_match_position = None
    ftrack_path_matches = False
    position = 1
    if Postgres.pool is None:
        await Postgres.connect()
    async for row in Postgres.iterate(query):
        position += 1
        if row["name"] == FTRACK_ID_ATTRIB:
            # Check if scope is matching ftrack addon requirements
            if not set(ftrack_id_expected_scope) - set(row["scope"]):
                ftrack_id_matches = True
            ftrack_id_match_position = row["position"]

        elif row["name"] == FTRACK_PATH_ATTRIB:
            if not set(ftrack_path_expected_scope) - set(row["scope"]):
                ftrack_path_matches = True
            ftrack_path_match_position = row["position"]

    if ftrack_id_matches and ftrack_path_matches:
        return False

    postgre_query = "\n".join((
        "INSERT INTO public.attributes",
        "    (name, position, scope, data)",
        "VALUES",
        "    ($1, $2, $3, $4)",
        "ON CONFLICT (name)",
        "DO UPDATE SET",
        "    scope = $3,",
        "    data = $4",
    ))
    if not ftrack_id_matches:
        # Reuse position from found attribute
        if ftrack_id_match_position is None:
            ftrack_id_match_position = position
            position += 1

        await Postgres.execute(
            postgre_query,
            FTRACK_ID_ATTRIB,
            ftrack_id_match_position,
            ftrack_id_expected_scope,
            ftrack_id_attribute_data,
        )

    if not ftrack_path_matches:
        if ftrack_path_match_position is None:
            ftrack_path_match_position = position
            position += 1

        await Postgres.execute(
            postgre_query,
            FTRACK_PATH_ATTRIB,
            ftrack_path_match_position,
            ftrack_path_expected_scope,
            ftrack_path_attribute_data,
        )
    return True

pre_setup() async

Make sure older version of addon use the new way of attributes.

Source code in server/__init__.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def pre_setup(self):
    """Make sure older version of addon use the new way of attributes."""

    # Force older addon versions to skip creation of attributes
    #   - this was added in version 0.2.2
    instance = AddonLibrary.getinstance()
    app_defs = instance.data.get(self.name)
    my_version = semver.Version.parse(self.version)
    for version, addon in app_defs.versions.items():
        if version == self.version:
            continue
        try:
            addon_version = semver.Version.parse(version)
            if addon_version > my_version:
                continue
        except Exception:
            pass
        if hasattr(addon, "create_ftrack_attributes"):
            addon.create_ftrack_attributes = (
                self._empty_create_ftrack_attributes)