Skip to content

common

ftrack common functions that can be used in ftrack services or on client.

Most of the functionality is usable at multiple places and would require to be duplicated.

It is expected content of this folder is copied to place from which will be process able to import them.

BaseAction

Bases: BaseHandler

Custom Action base class.

Simplify action discovery and launch. This implementation represents single action with single callback. To change the behavior implement custom callbacks or override '_discover' and '_launch' methods.

Attributes:

Name Type Description
label str

Label of action of group name of action. Can be combined with 'variant' attribute.

variant str

Variant under 'label'. Can be combined with 'label' e.g. when 'label' is "Admin" and variant is "Kill jobs". In case there is more variants for "Admin" label they'll grouped in ftrack UI widgets.

identifier str

Action identifier. Is used to trigger the launch logic of action.

icon str

Url to icon (Browser which should show the icon must have access to the resource).

description str

Hint of action which is showed to user hovering over the action.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
class BaseAction(BaseHandler):
    """Custom Action base class.

    Simplify action discovery and launch. This implementation represents
    single action with single callback. To change the behavior implement
    custom callbacks or override '_discover' and '_launch' methods.

    Attributes:
        label (str): Label of action of group name of action. Can be combined
            with 'variant' attribute.
        variant (str): Variant under 'label'. Can be combined with 'label' e.g.
            when 'label' is "Admin" and variant is "Kill jobs". In case
            there is more variants for "Admin" label they'll grouped in ftrack
            UI widgets.
        identifier (str): Action identifier. Is used to trigger the launch
            logic of action.
        icon (str): Url to icon (Browser which should show the icon must have
            access to the resource).
        description (str): Hint of action which is showed to user hovering
            over the action.

    Args:
        session (ftrack_api.Session): Connected ftrack session.

    """
    __ignore_handler_class = True

    label: Optional[str] = None
    variant: Optional[str] = None
    identifier: Optional[str] = None
    description: Optional[str] = None
    icon: Optional[str] = None
    handler_type: str = "Action"
    preactions: List[str] = []

    _full_label: Optional[str] = None
    _discover_identifier: Optional[str] = None
    _launch_identifier: Optional[str] = None

    settings_frack_subkey: str = "user_handlers"
    settings_enabled_key: str = "enabled"

    def __init__(self, session: ftrack_api.Session):
        # Validate minimum requirements
        if not self.label:
            raise ValueError("Action missing 'label'.")

        if not self.identifier:
            raise ValueError("Action missing 'identifier'.")

        super().__init__(session)
        self.setup_launch_wrapper()

    def setup_launch_wrapper(self):
        self._launch = self.launch_wrapper(self._launch)

    @property
    def discover_identifier(self) -> str:
        return self.identifier

    @property
    def launch_identifier(self) -> str:
        return self.identifier

    @property
    def handler_label(self) -> str:
        return self.full_label

    @property
    def full_label(self) -> str:
        """Full label of action.

        Value of full label is cached.

        Returns:
            str: Label created from 'label' and 'variant' attributes.

        """
        if self._full_label is None:
            if self.variant:
                label = "{} {}".format(self.label, self.variant)
            else:
                label = self.label
            self._full_label = label
        return self._full_label

    def register(self):
        """Register to ftrack topics to discover and launch action."""
        self.session.event_hub.subscribe(
            "topic=ftrack.action.discover",
            self._discover,
            priority=self.priority
        )

        launch_subscription = (
            "topic=ftrack.action.launch and data.actionIdentifier={}"
        ).format(self.launch_identifier)
        self.session.event_hub.subscribe(launch_subscription, self._launch)

    def _translate_event(
        self,
        event: ftrack_api.event.base.Event,
        session: Optional[ftrack_api.Session] = None
    ) -> List[ftrack_api.entity.base.Entity]:
        """Translate event to receive entities based on it's data."""
        if session is None:
            session = self.session

        _entities = event["data"].get("entities_object", None)
        if _entities is not None and not _entities:
            return _entities

        if (
            _entities is None
            or _entities[0].get("link") == ftrack_api.symbol.NOT_SET
        ):
            _entities = [
                item
                for item in self._get_entities(event, session=session)
                if item is not None
            ]
            event["data"]["entities_object"] = _entities

        return _entities

    def _discover(
        self, event: ftrack_api.event.base.Event
    ) -> Optional[Dict[str, Any]]:
        """Decide if and how will be action showed to user in ftrack.

        Args:
            event (ftrack_api.Event): Event with topic which triggered this
                callback.

        Returns:
            Union[None, Dict[str, Any]]: None if action is not returned
                otherwise returns items to show in UI (structure of items is
                defined by ftrack and can be found in documentation).

        """
        entities = self._translate_event(event)
        if not entities:
            return None

        accepts = self.discover(self.session, entities, event)
        if not accepts:
            return None

        self.log.debug("Discovering action with selection: {}".format(
            event["data"].get("selection") or []
        ))

        return {
            "items": [{
                "label": self.label,
                "variant": self.variant,
                "description": self.description,
                "actionIdentifier": self.discover_identifier,
                "icon": self.icon,
            }]
        }

    def discover(
        self,
        session: ftrack_api.Session,
        entities: List[ftrack_api.entity.base.Entity],
        event: ftrack_api.event.base.Event,
    ) -> bool:
        """Decide if action is showed to used based on event data.

        Action should override the method to implement logic to show the
        action. The most common logic is based on combination of user roles
        and selected entities.

        Args:
            session (ftrack_api.Session): Session which triggered callback of
                the event.
            entities (List[Any]): Prepared list of entities from event data.
            event (ftrack_api.Event): ftrack event which caused this callback
                is triggered.

        Returns:
            bool: True if action should be returned.

        """
        return False

    def _handle_preactions(
        self, session: ftrack_api.Session, event: ftrack_api.event.base.Event
    ) -> bool:
        """Launch actions before launching this action.

        Concept came from Pype and got deprecated (and used) over time. Should
        be probably removed.

        Note:
            Added warning log that this functionlity is deprecated and will
                be removed in the future.

        Args:
            session (ftrack_api.Session): ftrack session.
            event (ftrack_api.Event): Event which triggered launch of this
                action.

        Returns:
            bool: Preactions were launched or not.

        Deprecated:
            Preactions are marked as deprecated. Server actions should not
                use preactions and local actions use local identifier which
                is hard to handle automatically

        """
        # If preactions are not set
        if len(self.preactions) == 0:
            return True

        if not event.get("data", {}).get("selection"):
            return False

        # If preactions were already started
        if event["data"].get("preactions_launched") is True:
            return True

        self.log.warning((
            "DEPRECATION WARNING: Action \"{}\" is using 'preactions'"
            " which are deprecated and will be removed Q2 2023."
        ).format(self.full_label))

        # Launch preactions
        for preaction in self.preactions:
            self.trigger_action(preaction, event)

        # Relaunch this action
        self.trigger_action(
            self.launch_identifier,
            event,
            additional_event_data={"preactions_launched": True}
        )
        return False

    def launch_wrapper(self, func):
        @functools.wraps(func)
        def wrapper_func(*args, **kwargs):
            self.log.info("{} \"{}\": Launched".format(
                self.handler_type, self.full_label
            ))

            try:
                output = func(*args, **kwargs)
                self.log.info("{} \"{}\": Finished".format(
                    self.handler_type, self.full_label
                ))

            except BaseException as exc:
                self.session.rollback()
                self.session._configure_locations()
                msg = "{} \"{}\": Failed ({})".format(
                    self.handler_type, self.full_label, str(exc))
                self.log.error(msg, exc_info=True)
                output = {
                    "success": False,
                    "message": msg
                }

            return output
        return wrapper_func

    def _launch(
        self, event: ftrack_api.event.base.Event
    ) -> Optional[Dict[str, Any]]:
        entities = self._translate_event(event)
        if not entities:
            return

        preactions_launched = self._handle_preactions(self.session, event)
        if preactions_launched is False:
            return

        interface = self._interface(self.session, entities, event)
        if interface:
            return interface

        response = self.launch(self.session, entities, event)

        return self._handle_result(response)

    def launch(
        self,
        session: ftrack_api.Session,
        entities: List[ftrack_api.entity.base.Entity],
        event: ftrack_api.event.base.Event
    ) -> Optional[Union[bool, Dict[str, Any]]]:
        """Main part of handling event callback.

        Args:
            session (ftrack_api.Session): Session which queried entities.
            entities (List[Any]): Prequeried entities based on event data.
            event (ftrack_api.Event): ftrack event to process.

        Returns:
            Union[bool, Dict[str, Any]]: True or false for success or fail,
                or more complex data structure e.g. to show interface to user.

        """
        raise NotImplementedError()

    def _interface(
        self,
        session: ftrack_api.Session,
        entities: List[ftrack_api.entity.base.Entity],
        event: ftrack_api.event.base.Event
    ) -> Optional[Dict[str, Any]]:
        interface = self.interface(session, entities, event)
        if not interface:
            return

        if isinstance(interface, (tuple, list)):
            return {"items": interface}

        if isinstance(interface, dict):
            if (
                "items" in interface
                or ("success" in interface and "message" in interface)
            ):
                return interface

            raise ValueError((
                "Invalid interface output expected key: \"items\" or keys:"
                " \"success\" and \"message\". Got: \"{}\""
            ).format(str(interface)))

        raise ValueError(
            "Invalid interface output type \"{}\"".format(
                str(type(interface))
            )
        )

    def interface(
        self,
        session: ftrack_api.Session,
        entities: List[ftrack_api.entity.base.Entity],
        event: ftrack_api.event.base.Event
    ) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
        """Show an interface to user befor the action is processed.

        This is part of launch callback which gives option to return ftrack
        widgets items. These items are showed to user which can fill/change
        values and submit them.

        Interface must in that case handle if event contains values from user.

        Args:
            session (ftrack_api.Session): Connected ftrack api session.
            entities (List[Any]): Entities on which was action triggered.
            event (ftrack_api.Event): Event which triggered launch callback.

        Returns:
            Union[None, List[Dict[str, Any], Dict[str, Any]]: None if nothing
                should be showed, list of items to show or dictionary with
                'items' key and possibly additional data
                (e.g. submit button label).

        """
        return None

    def _handle_result(self, result: Any) -> Optional[Dict[str, Any]]:
        """Validate the returned result from the action callback."""
        if not result:
            return None

        if isinstance(result, dict):
            if "items" in result:
                if not isinstance(result["items"], list):
                    raise TypeError(
                        "Invalid items type {} expected list".format(
                            str(type(result["items"]))))
                return result

            if "success" not in result and "message" not in result:
                self.log.error((
                    "{} \"{}\" Missing required keys"
                    " \"success\" and \"message\" in callback output. This is"
                    " soft fail."
                ).format(self.handler_type, self.full_label))

            elif "message" in result:
                if "success" not in result:
                    result["success"] = True
                return result

            # Fallback to 'bool' result
            result = result.get("success", True)

        if isinstance(result, bool):
            if result:
                return {
                    "success": True,
                    "message": "{} finished.".format(self.full_label)
                }
            return {
                "success": False,
                "message": "{} failed.".format(self.full_label)
            }

        return result

    @staticmethod
    def roles_check(
        settings_roles: List[str],
        user_roles: List[str],
        default: Optional[bool] = True
    ) -> bool:
        """Compare roles from setting and user's roles.

        Args:
            settings_roles(list): List of role names from settings.
            user_roles(list): User's lowered role names.
            default(bool): If 'settings_roles' is empty list.

        Returns:
            bool: 'True' if user has at least one role from settings or
                default if 'settings_roles' is empty.

        """
        if not settings_roles:
            return default

        user_roles = {
            role_name.lower()
            for role_name in user_roles
        }
        for role_name in settings_roles:
            if role_name.lower() in user_roles:
                return True
        return False

    @classmethod
    def get_user_entity_from_event(
        cls,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event
    ) -> Optional[ftrack_api.entity.user.User]:
        """Query user entity from event."""

        not_set = object()

        # Check if user is already stored in event data
        user_entity = event["data"].get("user_entity", not_set)
        if user_entity is not_set:
            # Query user entity from event
            user_info = event.get("source", {}).get("user", {})
            user_id = user_info.get("id")
            username = user_info.get("username")
            if user_id:
                user_entity = session.query(
                    "User where id is {}".format(user_id)
                ).first()
            if not user_entity and username:
                user_entity = session.query(
                    "User where username is {}".format(username)
                ).first()
            event["data"]["user_entity"] = user_entity

        return user_entity

    @classmethod
    def get_user_roles_from_event(
        cls,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event,
        lower: Optional[bool] = False
    ) -> List[str]:
        """Get user roles based on data in event.

        Args:
            session (ftrack_api.Session): Prepared ftrack session.
            event (ftrack_api.event.Event): Event which is processed.
            lower (Optional[bool]): Lower the role names. Default 'False'.

        Returns:
            List[str]: List of user roles.

        """
        not_set = object()

        user_roles = event["data"].get("user_roles", not_set)
        if user_roles is not_set:
            user_roles = []
            user_entity = cls.get_user_entity_from_event(session, event)
            for role in user_entity["user_security_roles"]:
                role_name = role["security_role"]["name"]
                if lower:
                    role_name = role_name.lower()
                user_roles.append(role_name)
            event["data"]["user_roles"] = user_roles
        return user_roles

    def get_project_name_from_event_with_entities(
        self,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event,
        entities: List[ftrack_api.entity.base.Entity],
    ) -> Optional[str]:
        """Load or query and fill project entity from/to event data.

        Project data are stored by ftrack id because in most cases it is
        easier to access project id than project name.

        Args:
            session (ftrack_api.Session): Current session.
            event (ftrack_api.Event): Processed event by session.
            entities (List[Any]): ftrack entities of selection.

        Returns:
            Optional[str]: Project name from event data.

        """
        # Try to get project entity from event
        project_name = event["data"].get("project_name")
        if not project_name:
            project_entity = self.get_project_from_entity(
                entities[0], session
            )
            project_name = project_entity["full_name"]

            event["data"]["project_name"] = project_name
        return project_name

    def get_ftrack_settings(
        self,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event,
        entities: List[ftrack_api.entity.base.Entity],
    ) -> Dict[str, Any]:
        project_name = self.get_project_name_from_event_with_entities(
            session, event, entities
        )
        project_settings = self.get_project_settings_from_event(
            event, project_name
        )
        return project_settings["ftrack"]

    def valid_roles(
        self,
        session: ftrack_api.Session,
        entities: List[ftrack_api.entity.base.Entity],
        event: ftrack_api.event.base.Event,
    ) -> bool:
        """Validate user roles by settings.

        Method requires to have set `settings_key` attribute.
        """
        ftrack_settings = self.get_ftrack_settings(session, event, entities)
        settings = (
            ftrack_settings[self.settings_frack_subkey][self.settings_key]
        )
        if self.settings_enabled_key:
            if not settings.get(self.settings_enabled_key, True):
                return False

        user_role_list = self.get_user_roles_from_event(session, event)
        if not self.roles_check(settings.get("role_list"), user_role_list):
            return False
        return True

full_label property

Full label of action.

Value of full label is cached.

Returns:

Name Type Description
str str

Label created from 'label' and 'variant' attributes.

discover(session, entities, event)

Decide if action is showed to used based on event data.

Action should override the method to implement logic to show the action. The most common logic is based on combination of user roles and selected entities.

Parameters:

Name Type Description Default
session Session

Session which triggered callback of the event.

required
entities List[Any]

Prepared list of entities from event data.

required
event Event

ftrack event which caused this callback is triggered.

required

Returns:

Name Type Description
bool bool

True if action should be returned.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def discover(
    self,
    session: ftrack_api.Session,
    entities: List[ftrack_api.entity.base.Entity],
    event: ftrack_api.event.base.Event,
) -> bool:
    """Decide if action is showed to used based on event data.

    Action should override the method to implement logic to show the
    action. The most common logic is based on combination of user roles
    and selected entities.

    Args:
        session (ftrack_api.Session): Session which triggered callback of
            the event.
        entities (List[Any]): Prepared list of entities from event data.
        event (ftrack_api.Event): ftrack event which caused this callback
            is triggered.

    Returns:
        bool: True if action should be returned.

    """
    return False

get_project_name_from_event_with_entities(session, event, entities)

Load or query and fill project entity from/to event data.

Project data are stored by ftrack id because in most cases it is easier to access project id than project name.

Parameters:

Name Type Description Default
session Session

Current session.

required
event Event

Processed event by session.

required
entities List[Any]

ftrack entities of selection.

required

Returns:

Type Description
Optional[str]

Optional[str]: Project name from event data.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
def get_project_name_from_event_with_entities(
    self,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event,
    entities: List[ftrack_api.entity.base.Entity],
) -> Optional[str]:
    """Load or query and fill project entity from/to event data.

    Project data are stored by ftrack id because in most cases it is
    easier to access project id than project name.

    Args:
        session (ftrack_api.Session): Current session.
        event (ftrack_api.Event): Processed event by session.
        entities (List[Any]): ftrack entities of selection.

    Returns:
        Optional[str]: Project name from event data.

    """
    # Try to get project entity from event
    project_name = event["data"].get("project_name")
    if not project_name:
        project_entity = self.get_project_from_entity(
            entities[0], session
        )
        project_name = project_entity["full_name"]

        event["data"]["project_name"] = project_name
    return project_name

get_user_entity_from_event(session, event) classmethod

Query user entity from event.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
@classmethod
def get_user_entity_from_event(
    cls,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event
) -> Optional[ftrack_api.entity.user.User]:
    """Query user entity from event."""

    not_set = object()

    # Check if user is already stored in event data
    user_entity = event["data"].get("user_entity", not_set)
    if user_entity is not_set:
        # Query user entity from event
        user_info = event.get("source", {}).get("user", {})
        user_id = user_info.get("id")
        username = user_info.get("username")
        if user_id:
            user_entity = session.query(
                "User where id is {}".format(user_id)
            ).first()
        if not user_entity and username:
            user_entity = session.query(
                "User where username is {}".format(username)
            ).first()
        event["data"]["user_entity"] = user_entity

    return user_entity

get_user_roles_from_event(session, event, lower=False) classmethod

Get user roles based on data in event.

Parameters:

Name Type Description Default
session Session

Prepared ftrack session.

required
event Event

Event which is processed.

required
lower Optional[bool]

Lower the role names. Default 'False'.

False

Returns:

Type Description
List[str]

List[str]: List of user roles.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
@classmethod
def get_user_roles_from_event(
    cls,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event,
    lower: Optional[bool] = False
) -> List[str]:
    """Get user roles based on data in event.

    Args:
        session (ftrack_api.Session): Prepared ftrack session.
        event (ftrack_api.event.Event): Event which is processed.
        lower (Optional[bool]): Lower the role names. Default 'False'.

    Returns:
        List[str]: List of user roles.

    """
    not_set = object()

    user_roles = event["data"].get("user_roles", not_set)
    if user_roles is not_set:
        user_roles = []
        user_entity = cls.get_user_entity_from_event(session, event)
        for role in user_entity["user_security_roles"]:
            role_name = role["security_role"]["name"]
            if lower:
                role_name = role_name.lower()
            user_roles.append(role_name)
        event["data"]["user_roles"] = user_roles
    return user_roles

interface(session, entities, event)

Show an interface to user befor the action is processed.

This is part of launch callback which gives option to return ftrack widgets items. These items are showed to user which can fill/change values and submit them.

Interface must in that case handle if event contains values from user.

Parameters:

Name Type Description Default
session Session

Connected ftrack api session.

required
entities List[Any]

Entities on which was action triggered.

required
event Event

Event which triggered launch callback.

required

Returns:

Type Description
Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]

Union[None, List[Dict[str, Any], Dict[str, Any]]: None if nothing should be showed, list of items to show or dictionary with 'items' key and possibly additional data (e.g. submit button label).

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def interface(
    self,
    session: ftrack_api.Session,
    entities: List[ftrack_api.entity.base.Entity],
    event: ftrack_api.event.base.Event
) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
    """Show an interface to user befor the action is processed.

    This is part of launch callback which gives option to return ftrack
    widgets items. These items are showed to user which can fill/change
    values and submit them.

    Interface must in that case handle if event contains values from user.

    Args:
        session (ftrack_api.Session): Connected ftrack api session.
        entities (List[Any]): Entities on which was action triggered.
        event (ftrack_api.Event): Event which triggered launch callback.

    Returns:
        Union[None, List[Dict[str, Any], Dict[str, Any]]: None if nothing
            should be showed, list of items to show or dictionary with
            'items' key and possibly additional data
            (e.g. submit button label).

    """
    return None

launch(session, entities, event)

Main part of handling event callback.

Parameters:

Name Type Description Default
session Session

Session which queried entities.

required
entities List[Any]

Prequeried entities based on event data.

required
event Event

ftrack event to process.

required

Returns:

Type Description
Optional[Union[bool, Dict[str, Any]]]

Union[bool, Dict[str, Any]]: True or false for success or fail, or more complex data structure e.g. to show interface to user.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def launch(
    self,
    session: ftrack_api.Session,
    entities: List[ftrack_api.entity.base.Entity],
    event: ftrack_api.event.base.Event
) -> Optional[Union[bool, Dict[str, Any]]]:
    """Main part of handling event callback.

    Args:
        session (ftrack_api.Session): Session which queried entities.
        entities (List[Any]): Prequeried entities based on event data.
        event (ftrack_api.Event): ftrack event to process.

    Returns:
        Union[bool, Dict[str, Any]]: True or false for success or fail,
            or more complex data structure e.g. to show interface to user.

    """
    raise NotImplementedError()

register()

Register to ftrack topics to discover and launch action.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def register(self):
    """Register to ftrack topics to discover and launch action."""
    self.session.event_hub.subscribe(
        "topic=ftrack.action.discover",
        self._discover,
        priority=self.priority
    )

    launch_subscription = (
        "topic=ftrack.action.launch and data.actionIdentifier={}"
    ).format(self.launch_identifier)
    self.session.event_hub.subscribe(launch_subscription, self._launch)

roles_check(settings_roles, user_roles, default=True) staticmethod

Compare roles from setting and user's roles.

Parameters:

Name Type Description Default
settings_roles(list)

List of role names from settings.

required
user_roles(list)

User's lowered role names.

required
default(bool)

If 'settings_roles' is empty list.

required

Returns:

Name Type Description
bool bool

'True' if user has at least one role from settings or default if 'settings_roles' is empty.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
@staticmethod
def roles_check(
    settings_roles: List[str],
    user_roles: List[str],
    default: Optional[bool] = True
) -> bool:
    """Compare roles from setting and user's roles.

    Args:
        settings_roles(list): List of role names from settings.
        user_roles(list): User's lowered role names.
        default(bool): If 'settings_roles' is empty list.

    Returns:
        bool: 'True' if user has at least one role from settings or
            default if 'settings_roles' is empty.

    """
    if not settings_roles:
        return default

    user_roles = {
        role_name.lower()
        for role_name in user_roles
    }
    for role_name in settings_roles:
        if role_name.lower() in user_roles:
            return True
    return False

valid_roles(session, entities, event)

Validate user roles by settings.

Method requires to have set settings_key attribute.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def valid_roles(
    self,
    session: ftrack_api.Session,
    entities: List[ftrack_api.entity.base.Entity],
    event: ftrack_api.event.base.Event,
) -> bool:
    """Validate user roles by settings.

    Method requires to have set `settings_key` attribute.
    """
    ftrack_settings = self.get_ftrack_settings(session, event, entities)
    settings = (
        ftrack_settings[self.settings_frack_subkey][self.settings_key]
    )
    if self.settings_enabled_key:
        if not settings.get(self.settings_enabled_key, True):
            return False

    user_role_list = self.get_user_roles_from_event(session, event)
    if not self.roles_check(settings.get("role_list"), user_role_list):
        return False
    return True

BaseEventHandler

Bases: BaseHandler

Event handler listening to topics.

Output of callback is not handled and handler is not designed for actions.

By default is listening to "ftrack.update". To change it override 'register' method of change 'subscription_topic' attribute.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_event_handler.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class BaseEventHandler(BaseHandler):
    """Event handler listening to topics.

    Output of callback is not handled and handler is not designed for actions.

    By default is listening to "ftrack.update". To change it override
    'register' method of change 'subscription_topic' attribute.
    """
    __ignore_handler_class: bool = True

    subscription_topic: str = "ftrack.update"
    handler_type: str = "Event"

    def register(self):
        """Register to subscription topic."""
        self.session.event_hub.subscribe(
            "topic={}".format(self.subscription_topic),
            self._process,
            priority=self.priority
        )

    def process(self, event: ftrack_api.event.base.Event):
        """Callback triggered on event with matching topic.

        Args:
            event (ftrack_api.Event): ftrack event to process.

        """
        return self.launch(self.session, event)


    def launch(
        self,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event
    ):
        """Deprecated method used for backwards compatibility.

        Override 'process' method rather then 'launch'. Method name 'launch'
        is derived from action event handler which does not make sense in terms
        of not action based processing.

        Args:
            session (ftrack_api.Session): ftrack session which triggered
                the event.
            event (ftrack_api.Event): ftrack event to process.

        """
        raise NotImplementedError()

    def _process(self, event: ftrack_api.event.base.Event):
        return self._launch(event)

    def _launch(self, event: ftrack_api.event.base.Event):
        """Callback kept for backwards compatibility.

        Will be removed when default
        """
        self.session.rollback()
        self.session._local_cache.clear()

        try:
            self.process(event)

        except Exception as exc:
            self.log.error(
                "Event \"{}\" Failed: {}".format(
                    self.__class__.__name__, str(exc)
                ),
                exc_info=True
            )
            self.session.rollback()
            self.session._configure_locations()

    def _translate_event(
        self,
        event: ftrack_api.event.base.Event,
        session: Optional[ftrack_api.Session] = None
    ):
        """Receive entity objects based on event.

        Args:
            event (ftrack_api.Event): Event to process.
            session (ftrack_api.Session): Connected ftrack session.

        Returns:
            List[ftrack_api.Entity]: Queried entities based on event data.

        """
        return self._get_entities(
            event,
            session,
            ignore=["socialfeed", "socialnotification", "team"]
        )

launch(session, event)

Deprecated method used for backwards compatibility.

Override 'process' method rather then 'launch'. Method name 'launch' is derived from action event handler which does not make sense in terms of not action based processing.

Parameters:

Name Type Description Default
session Session

ftrack session which triggered the event.

required
event Event

ftrack event to process.

required
Source code in client/ayon_ftrack/common/event_handlers/ftrack_event_handler.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def launch(
    self,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event
):
    """Deprecated method used for backwards compatibility.

    Override 'process' method rather then 'launch'. Method name 'launch'
    is derived from action event handler which does not make sense in terms
    of not action based processing.

    Args:
        session (ftrack_api.Session): ftrack session which triggered
            the event.
        event (ftrack_api.Event): ftrack event to process.

    """
    raise NotImplementedError()

process(event)

Callback triggered on event with matching topic.

Parameters:

Name Type Description Default
event Event

ftrack event to process.

required
Source code in client/ayon_ftrack/common/event_handlers/ftrack_event_handler.py
29
30
31
32
33
34
35
36
def process(self, event: ftrack_api.event.base.Event):
    """Callback triggered on event with matching topic.

    Args:
        event (ftrack_api.Event): ftrack event to process.

    """
    return self.launch(self.session, event)

register()

Register to subscription topic.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_event_handler.py
21
22
23
24
25
26
27
def register(self):
    """Register to subscription topic."""
    self.session.event_hub.subscribe(
        "topic={}".format(self.subscription_topic),
        self._process,
        priority=self.priority
    )

BaseHandler

Base class for handling ftrack events.

Attributes:

Name Type Description
enabled bool

Is handler enabled.

priority int

Priority of handler processing. The lower value is the earlier is handler processed.

handler_type str

Has only debugging purposes.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
class BaseHandler(metaclass=ABCMeta):
    """Base class for handling ftrack events.

    Attributes:
        enabled (bool): Is handler enabled.
        priority (int): Priority of handler processing. The lower value is the
            earlier is handler processed.
        handler_type (str): Has only debugging purposes.

    Args:
        session (ftrack_api.Session): Connected ftrack session.

    """
    _log: Optional[logging.Logger] = None
    _process_id: Optional[str] = None
    # Default priority is 100
    enabled: bool = True
    priority: int = 100
    handler_type: str = "Base"
    _handler_label: Optional[str] = None
    # Mark base classes to be ignored for discovery
    __ignore_handler_class: bool = True

    def __init__(self, session):
        if not isinstance(session, ftrack_api.session.Session):
            raise TypeError(
                "Expected 'ftrack_api.Session' object got '{}'".format(
                    str(type(session))))

        self._session = session

        self.register = self.register_wrapper(self.register)

    @classmethod
    def ignore_handler_class(cls) -> bool:
        """Check if handler class should be ignored.

        Do not touch implementation of this method, set
            '__ignore_handler_class' to 'True' if you want to ignore class.

        """
        cls_name = cls.__name__
        if not cls_name.startswith("_"):
            cls_name = f"_{cls_name}"
        return getattr(cls, f"{cls_name}__ignore_handler_class", False)

    @staticmethod
    def join_filter_values(values: Iterable[str]) -> str:
        return ",".join({'"{}"'.format(value) for value in values})

    @classmethod
    def join_query_keys(cls, keys: Iterable[str]) -> str:
        return cls.join_filter_values(keys)

    @property
    def log(self) -> logging.Logger:
        """Quick access to logger.

        Returns:
            logging.Logger: Logger that can be used for logging of handler.

        """
        if self._log is None:
            # TODO better logging mechanism
            self._log = logging.getLogger(self.__class__.__name__)
            self._log.setLevel(logging.DEBUG)
        return self._log

    @property
    def handler_label(self) -> str:
        if self._handler_label is None:
            self._handler_label = self.__class__.__name__
        return self._handler_label

    @property
    def session(self) -> ftrack_api.Session:
        """Fast access to session.

        Returns:
            session (ftrack_api.Session): Session which is source of events.

        """
        return self._session

    def reset_session(self):
        """Reset session cache."""
        self.session.reset()

    @staticmethod
    def process_identifier() -> str:
        """Helper property to have unified access to process id.

        Todos:
            Use some global approach rather then implementation on
                'BaseEntity'.

        """
        if not BaseHandler._process_id:
            BaseHandler._process_id = str(uuid.uuid4())
        return BaseHandler._process_id

    @abstractmethod
    def register(self):
        """Subscribe to event topics."""
        pass

    def cleanup(self):
        """Cleanup handler.

        This method should end threads, timers, close connections, etc.
        """
        pass

    def register_wrapper(self, func):
        @functools.wraps(func)
        def wrapper_register(*args, **kwargs):
            if not self.enabled:
                return

            try:
                start_time = time.perf_counter()
                func(*args, **kwargs)
                end_time = time.perf_counter()
                run_time = end_time - start_time
                self.log.info((
                    "{} \"{}\" - Registered successfully ({:.4f}sec)"
                ).format(self.handler_type, self.handler_label, run_time))

            except NotImplementedError:
                self.log.error((
                    "{} \"{}\" - Register method is not implemented"
                ).format(self.handler_type, self.handler_label))

            except Exception as exc:
                self.log.error("{} \"{}\" - Registration failed ({})".format(
                    self.handler_type, self.handler_label, str(exc)
                ))
        return wrapper_register

    def _get_entities(self, event, session=None, ignore=None):
        entities = []
        selection = event["data"].get("selection")
        if not selection:
            return entities

        if ignore is None:
            ignore = []
        elif isinstance(ignore, str):
            ignore = [ignore]

        filtered_selection = []
        for entity in selection:
            if entity["entityType"] not in ignore:
                filtered_selection.append(entity)

        if not filtered_selection:
            return entities

        if session is None:
            session = self.session
            session._local_cache.clear()

        for entity in filtered_selection:
            entities.append(session.get(
                self._get_entity_type(entity, session),
                entity.get("entityId")
            ))

        return entities

    def _get_entity_type(self, entity, session=None):
        """Translate entity type so it can be used with API.

        Todos:
            Use object id rather.

        """
        # Get entity type and make sure it is lower cased. Most places except
        # the component tab in the Sidebar will use lower case notation.
        entity_type = entity.get("entityType").replace("_", "").lower()

        if session is None:
            session = self.session

        for schema in session.schemas:
            alias_for = schema.get("alias_for")

            if (
                alias_for
                and isinstance(alias_for, str)
                and alias_for.lower() == entity_type
            ):
                return schema["id"]

        for schema in self.session.schemas:
            if schema["id"].lower() == entity_type:
                return schema["id"]

        raise ValueError(
            "Unable to translate entity type: {0}.".format(entity_type)
        )

    def show_message(
        self,
        event: ftrack_api.event.base.Event,
        message: str,
        success: Optional[bool]=False,
    ):
        """Shows message to user who triggered event.

        Args:
            event (ftrack_api.event.base.Event): Event used for source
                of user id.
            message (str): Message that will be shown to user.
            success (bool): Define type (color) of message. False -> red color.

        """
        if not isinstance(success, bool):
            success = False

        try:
            message = str(message)
        except Exception:
            return

        user_id = event["source"]["user"]["id"]
        target = (
            "applicationId=ftrack.client.web and user.id=\"{}\""
        ).format(user_id)
        self.session.event_hub.publish(
            ftrack_api.event.base.Event(
                topic="ftrack.action.trigger-user-interface",
                data={
                    "type": "message",
                    "success": success,
                    "message": message
                },
                target=target
            ),
            on_error="ignore"
        )

    def show_interface(
        self,
        items: List[Dict[str, Any]],
        title: Optional[str] = "",
        user_id: Optional[str] = None,
        user: Optional[Any] = None,
        event: Optional[ftrack_api.event.base.Event] = None,
        username: Optional[str] = None,
        submit_btn_label: Optional[str] = None,
    ):
        """Shows ftrack widgets interface to user.

        Interface is shown to a user. To identify user one of arguments must be
        passed: 'user_id', 'user', 'event', 'username'.

        Args:
            items (List[Dict[str, Any]]) Interface items (their structure is
                defined by ftrack documentation).
            title (str): Title of shown widget.
            user_id (str): User id.
            user (Any): Object of ftrack user (queried using ftrack api
                session).
            event (ftrack_api.Event): Event which can be used as source for
                user id.
            username (str): Username of user to get it's id. This is slowest
                way how user id is received.
            submit_btn_label (str): Label of submit button in ftrack widget.

        """
        if user_id:
            pass

        elif user:
            user_id = user["id"]

        elif username:
            user = self.session.query(
                "User where username is \"{}\"".format(username)
            ).first()
            if not user:
                raise ValueError((
                    "ftrack user with username \"{}\" was not found!"
                ).format(username))

            user_id = user["id"]

        elif event:
            user_id = event["source"]["user"]["id"]

        if not user_id:
            return

        target = (
            "applicationId=ftrack.client.web and user.id=\"{}\""
        ).format(user_id)

        event_data = {
            "type": "widget",
            "items": items,
            "title": title
        }
        if submit_btn_label:
            event_data["submit_button_label"] = submit_btn_label

        self.session.event_hub.publish(
            ftrack_api.event.base.Event(
                topic="ftrack.action.trigger-user-interface",
                data=event_data,
                target=target
            ),
            on_error="ignore"
        )

    def show_interface_from_dict(
        self,
        messages: Dict[str, Union[str, List[str]]],
        title: Optional[str] = "",
        user_id: Optional[str] = None,
        user: Optional[Any] = None,
        event: Optional[ftrack_api.event.base.Event] = None,
        username: Optional[str] = None,
        submit_btn_label: Optional[str] = None,
    ):
        # TODO Find out how and where is this used
        if not messages:
            self.log.debug("No messages to show! (messages dict is empty)")
            return
        items = []
        splitter = {"type": "label", "value": "---"}
        first = True
        for key, value in messages.items():
            if not first:
                items.append(splitter)
            first = False

            items.append({"type": "label", "value": "<h3>{}</h3>".format(key)})
            if isinstance(value, str):
                value = [value]

            for item in value:
                items.append({"type": "label", "value": f"<p>{item}</p>"})

        self.show_interface(
            items,
            title=title,
            user_id=user_id,
            user=user,
            event=event,
            username=username,
            submit_btn_label=submit_btn_label
        )

    def trigger_action(
        self,
        action_identifier: str,
        event: Optional[ftrack_api.event.base.Event] = None,
        session: Optional[ftrack_api.Session] = None,
        selection: Optional[List[Dict[str, str]]] = None,
        user_data: Optional[Dict[str, Any]] = None,
        topic: Optional[str] = "ftrack.action.launch",
        additional_event_data: Optional[Dict[str, Any]] = None,
        on_error: Optional[str] = "ignore"
    ):
        self.log.debug(
            "Triggering action \"{}\" Begins".format(action_identifier))

        if not session:
            session = self.session

        # Getting selection and user data
        if event:
            if selection is None:
                selection = event.get("data", {}).get("selection")
            if user_data is None:
                user_data = event.get("source", {}).get("user")

        # Without selection and user data skip triggering
        msg = "Can't trigger \"{}\" action without {}."
        if selection is None:
            self.log.error(msg.format(action_identifier, "selection"))
            return

        if user_data is None:
            self.log.error(msg.format(action_identifier, "user data"))
            return

        event_data = {
            "actionIdentifier": action_identifier,
            "selection": selection
        }

        # Add additional data
        if additional_event_data:
            event_data.update(additional_event_data)

        # Create and trigger event
        session.event_hub.publish(
            ftrack_api.event.base.Event(
                topic=topic,
                data=event_data,
                source={"user": user_data}
            ),
            on_error=on_error
        )
        self.log.debug(
            "Action \"{}\" triggered".format(action_identifier))

    def trigger_event(
        self,
        topic: str,
        event_data: Optional[Dict[str, Any]] = None,
        session: Optional[ftrack_api.Session] = None,
        source: Optional[Dict[str, Any]] = None,
        event: Optional[ftrack_api.event.base.Event] = None,
        on_error: Optional[str] = "ignore"
    ):
        if session is None:
            session = self.session

        if not source and event:
            source = event.get("source")

        if event_data is None:
            event_data = {}
        # Create and trigger event
        event = ftrack_api.event.base.Event(
            topic=topic,
            data=event_data,
            source=source
        )
        session.event_hub.publish(event, on_error=on_error)

        self.log.debug((
            "Publishing event: {}"
        ).format(str(event.__dict__)))

    def get_project_from_entity(
        self,
        entity: ftrack_api.entity.base.Entity,
        session: Optional[ftrack_api.Session] = None
    ):
        low_entity_type = entity.entity_type.lower()
        if low_entity_type == "project":
            return entity

        if "project" in entity:
            # reviewsession, task(Task, Shot, Sequence,...)
            return entity["project"]

        if low_entity_type == "filecomponent":
            entity = entity["version"]
            low_entity_type = entity.entity_type.lower()

        if low_entity_type == "assetversion":
            asset = entity["asset"]
            parent = None
            if asset:
                parent = asset["parent"]

            if parent:
                if parent.entity_type.lower() == "project":
                    return parent

                if "project" in parent:
                    return parent["project"]

        project_data = entity["link"][0]

        if session is None:
            session = self.session
        return session.query(
            "Project where id is {}".format(project_data["id"])
        ).one()

    def get_project_entity_from_event(
        self,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event,
        project_id: str,
    ):
        """Load or query and fill project entity from/to event data.

        Project data are stored by ftrack id because in most cases it is
        easier to access project id than project name.

        Args:
            session (ftrack_api.Session): Current session.
            event (ftrack_api.Event): Processed event by session.
            project_id (str): ftrack project id.

        Returns:
            Union[str, None]: Project name based on entities or None if project
                cannot be defined.

        """
        if not project_id:
            raise ValueError(
                "Entered `project_id` is not valid. {} ({})".format(
                    str(project_id), str(type(project_id))
                )
            )

        project_id_mapping = event["data"].setdefault(
            "project_entity_by_id", {}
        )
        if project_id in project_id_mapping:
            return project_id_mapping[project_id]

        # Get project entity from task and store to event
        project_entity = session.query((
            "select full_name from Project where id is \"{}\""
        ).format(project_id)).first()
        project_id_mapping[project_id] = project_entity

        return project_entity

    def get_project_name_from_event(
        self,
        session: ftrack_api.Session,
        event: ftrack_api.event.base.Event,
        project_id: str,
    ):
        """Load or query and fill project entity from/to event data.

        Project data are stored by ftrack id because in most cases it is
        easier to access project id than project name.

        Args:
            session (ftrack_api.Session): Current session.
            event (ftrack_api.Event): Processed event by session.
            project_id (str): ftrack project id.

        Returns:
            Union[str, None]: Project name based on entities or None if project
                cannot be defined.

        """
        if not project_id:
            raise ValueError(
                "Entered `project_id` is not valid. {} ({})".format(
                    str(project_id), str(type(project_id))
                )
            )

        project_id_mapping = event["data"].setdefault("project_id_name", {})
        if project_id in project_id_mapping:
            return project_id_mapping[project_id]

        # Get project entity from task and store to event
        project_entity = self.get_project_entity_from_event(
            session, event, project_id
        )
        if project_entity:
            project_name = project_entity["full_name"]
        project_id_mapping[project_id] = project_name
        return project_name

    def get_ayon_project_from_event(
        self,
        event: ftrack_api.event.base.Event,
        project_name: str
    ):
        """Get AYON project from event.

        Args:
            event (ftrack_api.Event): Event which is source of project id.
            project_name (Union[str, None]): Project name.

        Returns:
            Union[dict[str, Any], None]: AYON project.

        """
        ayon_projects = event["data"].setdefault("ayon_projects", {})
        if project_name in ayon_projects:
            return ayon_projects[project_name]

        project = None
        if project_name:
            project = get_project(project_name)
        ayon_projects[project_name] = project
        return project

    def get_project_settings_from_event(
        self,
        event: ftrack_api.event.base.Event,
        project_name: str
    ):
        """Load or fill AYON's project settings from event data.

        Project data are stored by ftrack id because in most cases it is
        easier to access project id than project name.

        Args:
            event (ftrack_api.Event): Processed event by session.
            project_name (str): Project name.

        """
        project_settings_by_name = event["data"].setdefault(
            "project_settings", {}
        )
        if project_name in project_settings_by_name:
            return copy.deepcopy(project_settings_by_name[project_name])

        # NOTE there is no safe way how to get project settings if project
        #   does not exist on AYON server.
        # TODO Should we somehow find out if ftrack is enabled for the
        #   project?
        # TODO how to find out which bundle should be used?
        project = self.get_ayon_project_from_event(event, project_name)
        if not project:
            project_name = None
        project_settings = get_addons_settings(project_name=project_name)
        project_settings_by_name[project_name] = project_settings
        return copy.deepcopy(project_settings)

    @staticmethod
    def get_entity_path(entity: ftrack_api.entity.base.Entity) -> str:
        """Return full hierarchical path to entity."""
        return "/".join(
            [ent["name"] for ent in entity["link"]]
        )

    @classmethod
    def add_traceback_to_job(
        cls,
        job: ftrack_api.entity.job.Job,
        session: ftrack_api.Session,
        exc_info: Tuple,
        description: Optional[str] = None,
        component_name: Optional[str] = None,
        job_status: Optional[str] = None
    ):
        """Add traceback file to a job.

        Args:
            job (JobEntity): Entity of job where file should be able to
                download (Created or queried with passed session).
            session (Session): ftrack session which was used to query/create
                entered job.
            exc_info (tuple): Exception info (e.g. from `sys.exc_info()`).
            description (str): Change job description to describe what
                happened. Job description won't change if not passed.
            component_name (str): Name of component and default name of
                downloaded file. Class name and current date time are used if
                not specified.
            job_status (str): Status of job which will be set. By default is
                set to 'failed'.

        """
        if description:
            job_data = {
                "description": description
            }
            job["data"] = json.dumps(job_data)

        if not job_status:
            job_status = "failed"

        job["status"] = job_status

        # Create temp file where traceback will be stored
        with tempfile.NamedTemporaryFile(
            mode="w", prefix="ayon_ftrack_", suffix=".txt", delete=False
        ) as temp_obj:
            temp_filepath = temp_obj.name

        # Store traceback to file
        result = traceback.format_exception(*exc_info)
        with open(temp_filepath, "w") as temp_file:
            temp_file.write("".join(result))

        # Upload file with traceback to ftrack server and add it to job
        if not component_name:
            component_name = "{}_{}".format(
                cls.__name__,
                datetime.datetime.now().strftime("%y-%m-%d-%H%M")
            )
        cls.add_file_component_to_job(
            job, session, temp_filepath, component_name
        )
        # Delete temp file
        os.remove(temp_filepath)

    @staticmethod
    def add_file_component_to_job(
        job: ftrack_api.entity.job.Job,
        session: ftrack_api.Session,
        filepath: str,
        basename: Optional[str] = None
    ):
        """Add filepath as downloadable component to job.

        Args:
            job (JobEntity): Entity of job where file should be able to
                download (Created or queried with passed session).
            session (Session): ftrack session which was used to query/create
                entered job.
            filepath (str): Path to file which should be added to job.
            basename (str): Defines name of file which will be downloaded on
                user's side. Must be without extension otherwise extension will
                be duplicated in downloaded name. Basename from entered path
                used when not entered.

        """
        # Make sure session's locations are configured
        # - they can be deconfigured e.g. using `rollback` method
        session._configure_locations()

        # Query `ftrack.server` location where component will be stored
        location = session.query(
            "Location where name is \"ftrack.server\""
        ).one()

        # Use filename as basename if not entered (must be without extension)
        if basename is None:
            basename = os.path.splitext(
                os.path.basename(filepath)
            )[0]

        component = session.create_component(
            filepath,
            data={"name": basename},
            location=location
        )
        session.create(
            "JobComponent",
            {
                "component_id": component["id"],
                "job_id": job["id"]
            }
        )
        session.commit()

log property

Quick access to logger.

Returns:

Type Description
Logger

logging.Logger: Logger that can be used for logging of handler.

session property

Fast access to session.

Returns:

Name Type Description
session Session

Session which is source of events.

add_file_component_to_job(job, session, filepath, basename=None) staticmethod

Add filepath as downloadable component to job.

Parameters:

Name Type Description Default
job JobEntity

Entity of job where file should be able to download (Created or queried with passed session).

required
session Session

ftrack session which was used to query/create entered job.

required
filepath str

Path to file which should be added to job.

required
basename str

Defines name of file which will be downloaded on user's side. Must be without extension otherwise extension will be duplicated in downloaded name. Basename from entered path used when not entered.

None
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
@staticmethod
def add_file_component_to_job(
    job: ftrack_api.entity.job.Job,
    session: ftrack_api.Session,
    filepath: str,
    basename: Optional[str] = None
):
    """Add filepath as downloadable component to job.

    Args:
        job (JobEntity): Entity of job where file should be able to
            download (Created or queried with passed session).
        session (Session): ftrack session which was used to query/create
            entered job.
        filepath (str): Path to file which should be added to job.
        basename (str): Defines name of file which will be downloaded on
            user's side. Must be without extension otherwise extension will
            be duplicated in downloaded name. Basename from entered path
            used when not entered.

    """
    # Make sure session's locations are configured
    # - they can be deconfigured e.g. using `rollback` method
    session._configure_locations()

    # Query `ftrack.server` location where component will be stored
    location = session.query(
        "Location where name is \"ftrack.server\""
    ).one()

    # Use filename as basename if not entered (must be without extension)
    if basename is None:
        basename = os.path.splitext(
            os.path.basename(filepath)
        )[0]

    component = session.create_component(
        filepath,
        data={"name": basename},
        location=location
    )
    session.create(
        "JobComponent",
        {
            "component_id": component["id"],
            "job_id": job["id"]
        }
    )
    session.commit()

add_traceback_to_job(job, session, exc_info, description=None, component_name=None, job_status=None) classmethod

Add traceback file to a job.

Parameters:

Name Type Description Default
job JobEntity

Entity of job where file should be able to download (Created or queried with passed session).

required
session Session

ftrack session which was used to query/create entered job.

required
exc_info tuple

Exception info (e.g. from sys.exc_info()).

required
description str

Change job description to describe what happened. Job description won't change if not passed.

None
component_name str

Name of component and default name of downloaded file. Class name and current date time are used if not specified.

None
job_status str

Status of job which will be set. By default is set to 'failed'.

None
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
@classmethod
def add_traceback_to_job(
    cls,
    job: ftrack_api.entity.job.Job,
    session: ftrack_api.Session,
    exc_info: Tuple,
    description: Optional[str] = None,
    component_name: Optional[str] = None,
    job_status: Optional[str] = None
):
    """Add traceback file to a job.

    Args:
        job (JobEntity): Entity of job where file should be able to
            download (Created or queried with passed session).
        session (Session): ftrack session which was used to query/create
            entered job.
        exc_info (tuple): Exception info (e.g. from `sys.exc_info()`).
        description (str): Change job description to describe what
            happened. Job description won't change if not passed.
        component_name (str): Name of component and default name of
            downloaded file. Class name and current date time are used if
            not specified.
        job_status (str): Status of job which will be set. By default is
            set to 'failed'.

    """
    if description:
        job_data = {
            "description": description
        }
        job["data"] = json.dumps(job_data)

    if not job_status:
        job_status = "failed"

    job["status"] = job_status

    # Create temp file where traceback will be stored
    with tempfile.NamedTemporaryFile(
        mode="w", prefix="ayon_ftrack_", suffix=".txt", delete=False
    ) as temp_obj:
        temp_filepath = temp_obj.name

    # Store traceback to file
    result = traceback.format_exception(*exc_info)
    with open(temp_filepath, "w") as temp_file:
        temp_file.write("".join(result))

    # Upload file with traceback to ftrack server and add it to job
    if not component_name:
        component_name = "{}_{}".format(
            cls.__name__,
            datetime.datetime.now().strftime("%y-%m-%d-%H%M")
        )
    cls.add_file_component_to_job(
        job, session, temp_filepath, component_name
    )
    # Delete temp file
    os.remove(temp_filepath)

cleanup()

Cleanup handler.

This method should end threads, timers, close connections, etc.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
125
126
127
128
129
130
def cleanup(self):
    """Cleanup handler.

    This method should end threads, timers, close connections, etc.
    """
    pass

get_ayon_project_from_event(event, project_name)

Get AYON project from event.

Parameters:

Name Type Description Default
event Event

Event which is source of project id.

required
project_name Union[str, None]

Project name.

required

Returns:

Type Description

Union[dict[str, Any], None]: AYON project.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def get_ayon_project_from_event(
    self,
    event: ftrack_api.event.base.Event,
    project_name: str
):
    """Get AYON project from event.

    Args:
        event (ftrack_api.Event): Event which is source of project id.
        project_name (Union[str, None]): Project name.

    Returns:
        Union[dict[str, Any], None]: AYON project.

    """
    ayon_projects = event["data"].setdefault("ayon_projects", {})
    if project_name in ayon_projects:
        return ayon_projects[project_name]

    project = None
    if project_name:
        project = get_project(project_name)
    ayon_projects[project_name] = project
    return project

get_entity_path(entity) staticmethod

Return full hierarchical path to entity.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
636
637
638
639
640
641
@staticmethod
def get_entity_path(entity: ftrack_api.entity.base.Entity) -> str:
    """Return full hierarchical path to entity."""
    return "/".join(
        [ent["name"] for ent in entity["link"]]
    )

get_project_entity_from_event(session, event, project_id)

Load or query and fill project entity from/to event data.

Project data are stored by ftrack id because in most cases it is easier to access project id than project name.

Parameters:

Name Type Description Default
session Session

Current session.

required
event Event

Processed event by session.

required
project_id str

ftrack project id.

required

Returns:

Type Description

Union[str, None]: Project name based on entities or None if project cannot be defined.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def get_project_entity_from_event(
    self,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event,
    project_id: str,
):
    """Load or query and fill project entity from/to event data.

    Project data are stored by ftrack id because in most cases it is
    easier to access project id than project name.

    Args:
        session (ftrack_api.Session): Current session.
        event (ftrack_api.Event): Processed event by session.
        project_id (str): ftrack project id.

    Returns:
        Union[str, None]: Project name based on entities or None if project
            cannot be defined.

    """
    if not project_id:
        raise ValueError(
            "Entered `project_id` is not valid. {} ({})".format(
                str(project_id), str(type(project_id))
            )
        )

    project_id_mapping = event["data"].setdefault(
        "project_entity_by_id", {}
    )
    if project_id in project_id_mapping:
        return project_id_mapping[project_id]

    # Get project entity from task and store to event
    project_entity = session.query((
        "select full_name from Project where id is \"{}\""
    ).format(project_id)).first()
    project_id_mapping[project_id] = project_entity

    return project_entity

get_project_name_from_event(session, event, project_id)

Load or query and fill project entity from/to event data.

Project data are stored by ftrack id because in most cases it is easier to access project id than project name.

Parameters:

Name Type Description Default
session Session

Current session.

required
event Event

Processed event by session.

required
project_id str

ftrack project id.

required

Returns:

Type Description

Union[str, None]: Project name based on entities or None if project cannot be defined.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def get_project_name_from_event(
    self,
    session: ftrack_api.Session,
    event: ftrack_api.event.base.Event,
    project_id: str,
):
    """Load or query and fill project entity from/to event data.

    Project data are stored by ftrack id because in most cases it is
    easier to access project id than project name.

    Args:
        session (ftrack_api.Session): Current session.
        event (ftrack_api.Event): Processed event by session.
        project_id (str): ftrack project id.

    Returns:
        Union[str, None]: Project name based on entities or None if project
            cannot be defined.

    """
    if not project_id:
        raise ValueError(
            "Entered `project_id` is not valid. {} ({})".format(
                str(project_id), str(type(project_id))
            )
        )

    project_id_mapping = event["data"].setdefault("project_id_name", {})
    if project_id in project_id_mapping:
        return project_id_mapping[project_id]

    # Get project entity from task and store to event
    project_entity = self.get_project_entity_from_event(
        session, event, project_id
    )
    if project_entity:
        project_name = project_entity["full_name"]
    project_id_mapping[project_id] = project_name
    return project_name

get_project_settings_from_event(event, project_name)

Load or fill AYON's project settings from event data.

Project data are stored by ftrack id because in most cases it is easier to access project id than project name.

Parameters:

Name Type Description Default
event Event

Processed event by session.

required
project_name str

Project name.

required
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
def get_project_settings_from_event(
    self,
    event: ftrack_api.event.base.Event,
    project_name: str
):
    """Load or fill AYON's project settings from event data.

    Project data are stored by ftrack id because in most cases it is
    easier to access project id than project name.

    Args:
        event (ftrack_api.Event): Processed event by session.
        project_name (str): Project name.

    """
    project_settings_by_name = event["data"].setdefault(
        "project_settings", {}
    )
    if project_name in project_settings_by_name:
        return copy.deepcopy(project_settings_by_name[project_name])

    # NOTE there is no safe way how to get project settings if project
    #   does not exist on AYON server.
    # TODO Should we somehow find out if ftrack is enabled for the
    #   project?
    # TODO how to find out which bundle should be used?
    project = self.get_ayon_project_from_event(event, project_name)
    if not project:
        project_name = None
    project_settings = get_addons_settings(project_name=project_name)
    project_settings_by_name[project_name] = project_settings
    return copy.deepcopy(project_settings)

ignore_handler_class() classmethod

Check if handler class should be ignored.

Do not touch implementation of this method, set '__ignore_handler_class' to 'True' if you want to ignore class.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def ignore_handler_class(cls) -> bool:
    """Check if handler class should be ignored.

    Do not touch implementation of this method, set
        '__ignore_handler_class' to 'True' if you want to ignore class.

    """
    cls_name = cls.__name__
    if not cls_name.startswith("_"):
        cls_name = f"_{cls_name}"
    return getattr(cls, f"{cls_name}__ignore_handler_class", False)

process_identifier() staticmethod

Helper property to have unified access to process id.

Todos

Use some global approach rather then implementation on 'BaseEntity'.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
107
108
109
110
111
112
113
114
115
116
117
118
@staticmethod
def process_identifier() -> str:
    """Helper property to have unified access to process id.

    Todos:
        Use some global approach rather then implementation on
            'BaseEntity'.

    """
    if not BaseHandler._process_id:
        BaseHandler._process_id = str(uuid.uuid4())
    return BaseHandler._process_id

register() abstractmethod

Subscribe to event topics.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
120
121
122
123
@abstractmethod
def register(self):
    """Subscribe to event topics."""
    pass

reset_session()

Reset session cache.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
103
104
105
def reset_session(self):
    """Reset session cache."""
    self.session.reset()

show_interface(items, title='', user_id=None, user=None, event=None, username=None, submit_btn_label=None)

Shows ftrack widgets interface to user.

Interface is shown to a user. To identify user one of arguments must be passed: 'user_id', 'user', 'event', 'username'.

Parameters:

Name Type Description Default
title str

Title of shown widget.

''
user_id str

User id.

None
user Any

Object of ftrack user (queried using ftrack api session).

None
event Event

Event which can be used as source for user id.

None
username str

Username of user to get it's id. This is slowest way how user id is received.

None
submit_btn_label str

Label of submit button in ftrack widget.

None
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def show_interface(
    self,
    items: List[Dict[str, Any]],
    title: Optional[str] = "",
    user_id: Optional[str] = None,
    user: Optional[Any] = None,
    event: Optional[ftrack_api.event.base.Event] = None,
    username: Optional[str] = None,
    submit_btn_label: Optional[str] = None,
):
    """Shows ftrack widgets interface to user.

    Interface is shown to a user. To identify user one of arguments must be
    passed: 'user_id', 'user', 'event', 'username'.

    Args:
        items (List[Dict[str, Any]]) Interface items (their structure is
            defined by ftrack documentation).
        title (str): Title of shown widget.
        user_id (str): User id.
        user (Any): Object of ftrack user (queried using ftrack api
            session).
        event (ftrack_api.Event): Event which can be used as source for
            user id.
        username (str): Username of user to get it's id. This is slowest
            way how user id is received.
        submit_btn_label (str): Label of submit button in ftrack widget.

    """
    if user_id:
        pass

    elif user:
        user_id = user["id"]

    elif username:
        user = self.session.query(
            "User where username is \"{}\"".format(username)
        ).first()
        if not user:
            raise ValueError((
                "ftrack user with username \"{}\" was not found!"
            ).format(username))

        user_id = user["id"]

    elif event:
        user_id = event["source"]["user"]["id"]

    if not user_id:
        return

    target = (
        "applicationId=ftrack.client.web and user.id=\"{}\""
    ).format(user_id)

    event_data = {
        "type": "widget",
        "items": items,
        "title": title
    }
    if submit_btn_label:
        event_data["submit_button_label"] = submit_btn_label

    self.session.event_hub.publish(
        ftrack_api.event.base.Event(
            topic="ftrack.action.trigger-user-interface",
            data=event_data,
            target=target
        ),
        on_error="ignore"
    )

show_message(event, message, success=False)

Shows message to user who triggered event.

Parameters:

Name Type Description Default
event Event

Event used for source of user id.

required
message str

Message that will be shown to user.

required
success bool

Define type (color) of message. False -> red color.

False
Source code in client/ayon_ftrack/common/event_handlers/ftrack_base_handler.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def show_message(
    self,
    event: ftrack_api.event.base.Event,
    message: str,
    success: Optional[bool]=False,
):
    """Shows message to user who triggered event.

    Args:
        event (ftrack_api.event.base.Event): Event used for source
            of user id.
        message (str): Message that will be shown to user.
        success (bool): Define type (color) of message. False -> red color.

    """
    if not isinstance(success, bool):
        success = False

    try:
        message = str(message)
    except Exception:
        return

    user_id = event["source"]["user"]["id"]
    target = (
        "applicationId=ftrack.client.web and user.id=\"{}\""
    ).format(user_id)
    self.session.event_hub.publish(
        ftrack_api.event.base.Event(
            topic="ftrack.action.trigger-user-interface",
            data={
                "type": "message",
                "success": success,
                "message": message
            },
            target=target
        ),
        on_error="ignore"
    )

FtrackServer

Helper wrapper to run ftrack server with event handlers.

Handlers are discovered based on a list of paths. Each path is scanned for python files which are imported as modules. Each module is checked for 'register' function or classes inheriting from 'BaseHandler'. If class inheriting from 'BaseHandler' is found it is instantiated and 'register' method is called. If 'register' function is found it is called with ftrack session as argument and 'BaseHandler' from the file are ignored.

Function 'register' tells discovery system to skip looking for classes.

Classes that start with '_' are ignored. It is possible to define attribute __ignore_handler_class = True on class definition to mark a "base class" that will be ignored on discovery, so you can safely import custom base classes in the files.

Source code in client/ayon_ftrack/common/ftrack_server.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class FtrackServer:
    """Helper wrapper to run ftrack server with event handlers.

    Handlers are discovered based on a list of paths. Each path is scanned for
    python files which are imported as modules. Each module is checked for
    'register' function or classes inheriting from 'BaseHandler'. If class
    inheriting from 'BaseHandler' is found it is instantiated and 'register'
    method is called. If 'register' function is found it is called with
    ftrack session as argument and 'BaseHandler' from the file are ignored.

    Function 'register' tells discovery system to skip looking for classes.

    Classes that start with '_' are ignored. It is possible to define
    attribute `__ignore_handler_class = True` on class definition to mark
    a "base class" that will be ignored on discovery, so you can safely import
    custom base classes in the files.
    """
    def __init__(self, handler_paths=None):
        # set ftrack logging to Warning only - OPTIONAL
        ftrack_log = logging.getLogger("ftrack_api")
        ftrack_log.setLevel(logging.WARNING)

        self.log = logging.getLogger(__name__)

        self._stopped = True
        self._is_running = False

        if handler_paths is None:
            handler_paths = []

        self._handler_paths = handler_paths

        self._session = None
        self._cached_modules = []
        self._cached_objects = []

    def stop_session(self):
        session = self._session
        self._session = None
        self._stopped = True
        if session.event_hub.connected is True:
            session.event_hub.disconnect()
        session.close()

    def get_session(self):
        return self._session

    def get_handler_paths(self):
        return self._handler_paths

    def set_handler_paths(self, paths):
        if self._is_running:
            raise ValueError(
                "Cannot change handler paths when server is running."
            )
        self._handler_paths = paths

    session = property(get_session)
    handler_paths = property(get_handler_paths, set_handler_paths)

    def run_server(self, session=None):
        if self._is_running:
            raise ValueError("Server is already running.")
        self._stopped = False
        self._is_running = True
        if not session:
            session = ftrack_api.Session(auto_connect_event_hub=True)

        # Wait until session has connected event hub
        if session._auto_connect_event_hub_thread:
            # Use timeout from session (since ftrack-api 2.1.0)
            timeout = getattr(session, "request_timeout", 60)
            self.log.info("Waiting for event hub to connect")
            started = time.time()
            while not session.event_hub.connected:
                if (time.time() - started) > timeout:
                    raise RuntimeError((
                        "Connection to ftrack was not created in {} seconds"
                    ).format(timeout))
                time.sleep(0.1)

        elif not session.event_hub.connected:
            self.log.info("Connecting event hub")
            session.event_hub.connect()

        self._session = session
        if not self._handler_paths:
            self.log.warning((
                "Paths to event handlers are not set."
                " ftrack server won't launch."
            ))
            self._is_running = False
            return

        self._load_handlers()

        msg = "Registration of event handlers has finished!"
        self.log.info(len(msg) * "*")
        self.log.info(msg)

        # keep event_hub on session running
        try:
            session.event_hub.wait()
        finally:
            for handler in self._cached_objects:
                try:
                    handler.cleanup()
                except Exception:
                    self.log.warning(
                        "Failed to cleanup handler", exc_info=True
                    )
            self._is_running = False
            self._cached_modules = []

    def _load_handlers(self):
        register_functions = []
        handler_classes = []

        # Iterate all paths
        paths = self._handler_paths
        for path in paths:
            # Try to format path with environments
            try:
                path = path.format(**os.environ)
            except BaseException:
                pass

            # Get all modules with functions
            modules, crashed = modules_from_path(path)
            for filepath, exc_info in crashed:
                self.log.warning("Filepath load crashed {}.\n{}".format(
                    filepath, "".join(traceback.format_exception(*exc_info))
                ))

            for filepath, module in modules:
                self._cached_modules.append(module)
                register_function = getattr(module, "register", None)
                if register_function is not None:
                    if isinstance(register_function, types.FunctionType):
                        register_functions.append(
                            (filepath, register_function)
                        )
                    else:
                        self.log.warning(
                            f"\"{filepath}\""
                            " - Found 'register' but it is not a function."
                        )
                    continue

                for attr_name in dir(module):
                    if attr_name.startswith("_"):
                        self.log.debug(
                            f"Skipping private class '{attr_name}'"
                        )
                        continue

                    attr = getattr(module, attr_name, None)
                    if (
                        not inspect.isclass(attr)
                        or not issubclass(attr, BaseHandler)
                        or attr.ignore_handler_class()
                    ):
                        continue

                    if inspect.isabstract(attr):
                        self.log.warning(
                            f"Skipping abstract class '{attr_name}'."
                        )
                        continue
                    handler_classes.append(attr)

                if not handler_classes:
                    self.log.warning(
                        f"\"{filepath}\""
                        " - No 'register' function"
                        " or 'BaseHandler' classes found."
                    )

        if not register_functions and not handler_classes:
            self.log.warning((
                "There are no files with `register` function or 'BaseHandler'"
                " classes in registered paths:\n- \"{}\""
            ).format("- \n".join(paths)))

        for filepath, register_func in register_functions:
            try:
                register_func(self._session)
            except Exception:
                self.log.warning(
                    f"\"{filepath}\" - register was not successful",
                    exc_info=True
                )

        for handler_class in handler_classes:
            try:
                obj = handler_class(self._session)
                obj.register()
                self._cached_objects.append(obj)

            except Exception:
                self.log.warning(
                    f"\"{handler_class}\" - register was not successful",
                    exc_info=True
                )

LocalAction

Bases: BaseAction

Action that warn user when more Processes with same action are running.

Action is launched all the time but if id does not match id of current instanace then message is shown to user.

Handy for actions where matters if is executed on specific machine.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
class LocalAction(BaseAction):
    """Action that warn user when more Processes with same action are running.

    Action is launched all the time but if id does not match id of current
    instanace then message is shown to user.

    Handy for actions where matters if is executed on specific machine.
    """
    __ignore_handler_class: bool = True
    _full_launch_identifier: bool = None

    @property
    def discover_identifier(self) -> str:
        if self._discover_identifier is None:
            self._discover_identifier = "{}.{}".format(
                self.identifier, self.process_identifier()
            )
        return self._discover_identifier

    @property
    def launch_identifier(self) -> str:
        """Catch all topics with same identifier."""
        if self._launch_identifier is None:
            self._launch_identifier = "{}.*".format(self.identifier)
        return self._launch_identifier

    @property
    def full_launch_identifier(self):
        """Catch all topics with same identifier."""
        if self._full_launch_identifier is None:
            self._full_launch_identifier = "{}.{}".format(
                self.identifier, self.process_identifier()
            )
        return self._full_launch_identifier

    def register(self):
        """Register to ftrack topics to discover and launch action.

        Filter events to this session user.
        """
        # Subscribe to discover topic for user under this session
        self.session.event_hub.subscribe(
            "topic=ftrack.action.discover and source.user.username={}".format(
                self.session.api_user
            ),
            self._discover,
            priority=self.priority
        )

        launch_subscription = (
            "topic=ftrack.action.launch"
            " and data.actionIdentifier={}"
            " and source.user.username={}"
        ).format(self.launch_identifier, self.session.api_user)
        self.session.event_hub.subscribe(
            launch_subscription,
            self._launch
        )

    def _discover(
        self, event: ftrack_api.event.base.Event
    ) -> Optional[Dict[str, Any]]:
        entities = self._translate_event(event)
        if not entities:
            return

        accepts = self.discover(self.session, entities, event)
        if not accepts:
            return

        self.log.debug("Discovering action with selection: {0}".format(
            event["data"].get("selection", [])
        ))

        return {
            "items": [{
                "label": self.label,
                "variant": self.variant,
                "description": self.description,
                "actionIdentifier": self.discover_identifier,
                "icon": self.icon,
            }]
        }

    def _launch(
        self, event: ftrack_api.event.base.Event
    ) -> Optional[Dict[str, Any]]:
        event_identifier = event["data"]["actionIdentifier"]
        # Check if identifier is same
        # - show message that acion may not be triggered on this machine
        if event_identifier != self.full_launch_identifier:
            return {
                "success": False,
                "message": (
                    "There are running more AYON processes"
                    " where this action could be launched."
                )
            }
        return super()._launch(event)

full_launch_identifier property

Catch all topics with same identifier.

launch_identifier property

Catch all topics with same identifier.

register()

Register to ftrack topics to discover and launch action.

Filter events to this session user.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def register(self):
    """Register to ftrack topics to discover and launch action.

    Filter events to this session user.
    """
    # Subscribe to discover topic for user under this session
    self.session.event_hub.subscribe(
        "topic=ftrack.action.discover and source.user.username={}".format(
            self.session.api_user
        ),
        self._discover,
        priority=self.priority
    )

    launch_subscription = (
        "topic=ftrack.action.launch"
        " and data.actionIdentifier={}"
        " and source.user.username={}"
    ).format(self.launch_identifier, self.session.api_user)
    self.session.event_hub.subscribe(
        launch_subscription,
        self._launch
    )

ServerAction

Bases: BaseAction

Action class meant to be used on event server.

Unlike the BaseAction roles are not checked on register but on discover. For the same reason register is modified to not filter topics by username.

Source code in client/ayon_ftrack/common/event_handlers/ftrack_action_handler.py
677
678
679
680
681
682
683
684
685
class ServerAction(BaseAction):
    """Action class meant to be used on event server.

    Unlike the `BaseAction` roles are not checked on register but on discover.
    For the same reason register is modified to not filter topics by username.
    """
    __ignore_handler_class: bool = True

    settings_frack_subkey: str = "service_event_handlers"

convert_to_fps(source_value)

Convert value into fps value.

Non string values are kept untouched. String is tried to convert. Valid values: "1000" "1000.05" "1000,05" ",05" ".05" "1000," "1000." "1000/1000" "1000.05/1000" "1000/1000.05" "1000.05/1000.05" "1000,05/1000" "1000/1000,05" "1000,05/1000,05"

Invalid values: "/" "/1000" "1000/" "," "." ...any other string

Returns:

Name Type Description
float

Converted value.

Raises:

Type Description
InvalidFpsValue

When value can't be converted to float.

Source code in client/ayon_ftrack/common/lib.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def convert_to_fps(source_value):
    """Convert value into fps value.

    Non string values are kept untouched. String is tried to convert.
    Valid values:
    "1000"
    "1000.05"
    "1000,05"
    ",05"
    ".05"
    "1000,"
    "1000."
    "1000/1000"
    "1000.05/1000"
    "1000/1000.05"
    "1000.05/1000.05"
    "1000,05/1000"
    "1000/1000,05"
    "1000,05/1000,05"

    Invalid values:
    "/"
    "/1000"
    "1000/"
    ","
    "."
    ...any other string

    Returns:
        float: Converted value.

    Raises:
        InvalidFpsValue: When value can't be converted to float.
    """

    if not isinstance(source_value, str):
        if isinstance(source_value, numbers.Number):
            return float(source_value)
        return source_value

    value = source_value.strip().replace(",", ".")
    if not value:
        raise InvalidFpsValue("Got empty value")

    subs = value.split("/")
    if len(subs) == 1:
        str_value = subs[0]
        if not is_string_number(str_value):
            raise InvalidFpsValue(
                "Value \"{}\" can't be converted to number.".format(value)
            )
        return float(str_value)

    elif len(subs) == 2:
        divident, divisor = subs
        if not divident or not is_string_number(divident):
            raise InvalidFpsValue(
                "Divident value \"{}\" can't be converted to number".format(
                    divident
                )
            )

        if not divisor or not is_string_number(divisor):
            raise InvalidFpsValue(
                "Divisor value \"{}\" can't be converted to number".format(
                    divident
                )
            )
        divisor_float = float(divisor)
        if divisor_float == 0.0:
            raise InvalidFpsValue("Can't divide by zero")
        return float(divident) / divisor_float

    raise InvalidFpsValue(
        "Value can't be converted to number \"{}\"".format(source_value)
    )

create_chunks(iterable, chunk_size=None)

Separate iterable into multiple chunks by size.

Parameters:

Name Type Description Default
iterable Iterable[Any]

Object that will be separated into chunks.

required
chunk_size int

Size of one chunk. Default value is 200.

None

Returns:

Type Description

List[List[Any]]: Chunked items.

Source code in client/ayon_ftrack/common/lib.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def create_chunks(iterable, chunk_size=None):
    """Separate iterable into multiple chunks by size.

    Args:
        iterable (Iterable[Any]): Object that will be separated into chunks.
        chunk_size (int): Size of one chunk. Default value is 200.

    Returns:
        List[List[Any]]: Chunked items.
    """

    chunks = []
    tupled_iterable = tuple(iterable)
    if not tupled_iterable:
        return chunks
    iterable_size = len(tupled_iterable)
    if chunk_size is None:
        chunk_size = 200

    if chunk_size < 1:
        chunk_size = 1

    for idx in range(0, iterable_size, chunk_size):
        chunks.append(tupled_iterable[idx:idx + chunk_size])
    return chunks

default_custom_attributes_definition()

Default custom attribute definitions created in ftracl.

Todos

Convert to list of dictionaries to be able determine order. Check if ftrack api support to define order first!

Returns:

Type Description

dict[str, Any]: Custom attribute configurations per entity type that can be used to create/update custom attributes.

Source code in client/ayon_ftrack/common/custom_attributes.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def default_custom_attributes_definition():
    """Default custom attribute definitions created in ftracl.

    Todos:
        Convert to list of dictionaries to be able determine order. Check if
            ftrack api support to define order first!

    Returns:
        dict[str, Any]: Custom attribute configurations per entity type that
            can be used to create/update custom attributes.

    """
    # TODO use AYON built-in attributes as source of truth
    json_file_path = os.path.join(
        os.path.dirname(os.path.abspath(__file__)),
        "custom_attributes.json"
    )
    with open(json_file_path, "r") as json_stream:
        data = json.load(json_stream)
    return data

ensure_mandatory_custom_attributes_exists(session, addon_settings, attr_confs=None, custom_attribute_types=None, groups=None, security_roles=None)

Make sure that mandatory custom attributes exists in ftrack.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
addon_settings Dict[str, Any]

Addon settings.

required
attr_confs Optional[List[Entity]]

Pre-fetched all existing custom attribute configurations in ftrack.

None
custom_attribute_types Optional[List[Entity]]

Pre-fetched custom attribute types.

None
groups Optional[List[Entity]]

Pre-fetched custom attribute groups.

None
security_roles Optional[List[Entity]]

Pre-fetched security roles.

None
Source code in client/ayon_ftrack/common/custom_attributes.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def ensure_mandatory_custom_attributes_exists(
    session: ftrack_api.Session,
    addon_settings: Dict[str, Any],
    attr_confs: Optional[List["FtrackEntity"]] = None,
    custom_attribute_types: Optional[List["FtrackEntity"]] = None,
    groups: Optional[List["FtrackEntity"]] = None,
    security_roles: Optional[List["FtrackEntity"]] = None,
):
    """Make sure that mandatory custom attributes exists in ftrack.

    Args:
        session (ftrack_api.Session): Connected ftrack session.
        addon_settings (Dict[str, Any]): Addon settings.
        attr_confs (Optional[List[FtrackEntity]]): Pre-fetched all existing
            custom attribute configurations in ftrack.
        custom_attribute_types (Optional[List[FtrackEntity]]): Pre-fetched
            custom attribute types.
        groups (Optional[List[FtrackEntity]]): Pre-fetched custom attribute
            groups.
        security_roles (Optional[List[FtrackEntity]]): Pre-fetched security
            roles.

    """
    if attr_confs is None:
        attr_confs = get_all_attr_configs(session)

    # Split existing custom attributes
    attr_confs_by_entity_type = collections.defaultdict(list)
    hier_confs = []
    for attr_conf in attr_confs:
        if attr_conf["is_hierarchical"]:
            hier_confs.append(attr_conf)
        else:
            entity_type = attr_conf["entity_type"]
            attr_confs_by_entity_type[entity_type].append(attr_conf)

    # Prepare possible attribute types
    if custom_attribute_types is None:
        custom_attribute_types = session.query(
            "select id, name from CustomAttributeType"
        ).all()

    attr_type_id_by_low_name = {
        attr_type["name"].lower(): attr_type["id"]
        for attr_type in custom_attribute_types
    }

    if security_roles is None:
        security_roles = session.query(
            "select id, name, type from SecurityRole"
        ).all()

    security_roles = {
        role["name"].lower(): role
        for role in security_roles
    }
    mandatory_attributes_settings = (
        addon_settings
        ["custom_attributes"]
        ["mandatory_attributes"]
    )

    # Prepare group
    group_entity = ensure_custom_attribute_group_exists(
        session, CUST_ATTR_GROUP, groups
    )
    group_id = group_entity["id"]

    for item in [
        {
            "key": CUST_ATTR_KEY_SERVER_ID,
            "type": "text",
            "label": "AYON ID",
            "default": "",
            "is_hierarchical": True,
            "config": {"markdown": False},
            "group_id": group_id,
        },
        {
            "key": CUST_ATTR_KEY_SERVER_PATH,
            "type": "text",
            "label": "AYON path",
            "default": "",
            "is_hierarchical": True,
            "config": {"markdown": False},
            "group_id": group_id,
        },
        {
            "key": CUST_ATTR_KEY_SYNC_FAIL,
            "type": "boolean",
            "label": "AYON sync failed",
            "is_hierarchical": True,
            "default": False,
            "group_id": group_id,
        },
        {
            "key": CUST_ATTR_AUTO_SYNC,
            "type": "boolean",
            "label": "AYON auto-sync",
            "default": False,
            "is_hierarchical": False,
            "entity_type": "show",
            "group_id": group_id,
        }
    ]:
        key = item["key"]
        attr_settings = mandatory_attributes_settings[key]
        read_roles = []
        write_roles = []
        for role_names, roles in (
            (attr_settings["read_security_roles"], read_roles),
            (attr_settings["write_security_roles"], write_roles),
        ):
            if not role_names:
                roles.extend(security_roles.values())
                continue

            for name in role_names:
                role = security_roles.get(name.lower())
                if role is not None:
                    roles.append(role)

        is_hierarchical = item["is_hierarchical"]
        entity_type_confs = hier_confs
        if not is_hierarchical:
            entity_type = item["entity_type"]
            entity_type_confs = attr_confs_by_entity_type.get(entity_type, [])
        matching_attr_conf = next(
            (
                attr_conf
                for attr_conf in entity_type_confs
                if attr_conf["key"] == key
            ),
            None
        )

        entity_data = copy.deepcopy(item)
        attr_type = entity_data.pop("type")
        entity_data["type_id"] = attr_type_id_by_low_name[attr_type.lower()]
        # Convert 'config' to json string
        config = entity_data.get("config")
        if isinstance(config, dict):
            entity_data["config"] = json.dumps(config)

        if matching_attr_conf is None:
            # Make sure 'entity_type' is filled for hierarchical attribute
            # - it is required to be able to create custom attribute
            if is_hierarchical:
                entity_data.setdefault("entity_type", "show")
            # Make sure config is set to empty dictionary for creation
            entity_data.setdefault("config", "{}")
            entity_data["read_security_roles"] = read_roles
            entity_data["write_security_roles"] = write_roles
            session.create(
                "CustomAttributeConfiguration",
                entity_data
            )
            session.commit()
            continue

        changed = False
        for key, value in entity_data.items():
            if matching_attr_conf[key] != value:
                matching_attr_conf[key] = value
                changed = True

        match_read_role_ids = {
            role["id"] for role in matching_attr_conf["read_security_roles"]
        }
        match_write_role_ids = {
            role["id"] for role in matching_attr_conf["write_security_roles"]
        }
        if match_read_role_ids != {role["id"] for role in read_roles}:
            matching_attr_conf["read_security_roles"] = read_roles
            changed = True
        if match_write_role_ids != {role["id"] for role in write_roles}:
            matching_attr_conf["write_security_roles"] = write_roles
            changed = True

        if changed:
            session.commit()

get_all_attr_configs(session, fields=None)

Query custom attribute configurations from ftrack server.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
fields Optional[Iterable[str]]

Field to query for attribute configurations.

None

Returns:

Type Description
List[Entity]

List[FtrackEntity]: ftrack custom attributes.

Source code in client/ayon_ftrack/common/custom_attributes.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_all_attr_configs(
    session: ftrack_api.Session,
    fields: Optional[Iterable[str]] = None,
) -> List["FtrackEntity"]:
    """Query custom attribute configurations from ftrack server.

    Args:
        session (ftrack_api.Session): Connected ftrack session.
        fields (Optional[Iterable[str]]): Field to query for
            attribute configurations.

    Returns:
        List[FtrackEntity]: ftrack custom attributes.

    """
    if not fields:
        fields = {
            "id",
            "key",
            "entity_type",
            "object_type_id",
            "is_hierarchical",
            "default",
            "group_id",
            "type_id",
            # "config",
            # "label",
            # "sort",
            # "project_id",
        }

    joined_fields = ", ".join(set(fields))

    return session.query(
        f"select {joined_fields} from CustomAttributeConfiguration"
    ).all()

get_custom_attributes_by_entity_id(session, entity_ids, attr_configs, skip_none_values=True, store_by_key=True)

Query custom attribute values and store their value by entity and attr.

There is option to return values by attribute key or attribute id. In case the output should be stored by key and there is hierarchical attribute with same key as non-hierarchical it's then hierarchical value has priority of usage.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
entity_ids Iterable[str]

Entity ids for which custom attribute values should be returned.

required
attr_configs

Custom attribute configurations.

required
skip_none_values bool

Custom attribute with value set to 'None' won't be in output.

True
store_by_key bool

Output will be stored by attribute keys if true otherwise is value stored by attribute id.

True

Returns:

Type Description

Dict[str, Dict[str, Any]]: Custom attribute values by entity id.

Source code in client/ayon_ftrack/common/custom_attributes.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def get_custom_attributes_by_entity_id(
    session,
    entity_ids,
    attr_configs,
    skip_none_values=True,
    store_by_key=True
):
    """Query custom attribute values and store their value by entity and attr.

    There is option to return values by attribute key or attribute id. In case
    the output should be stored by key and there is hierarchical attribute
    with same key as non-hierarchical it's then hierarchical value
    has priority of usage.

    Args:
        session (ftrack_api.Session): Connected ftrack session.
        entity_ids (Iterable[str]): Entity ids for which custom attribute
            values should be returned.
        attr_configs: Custom attribute configurations.
        skip_none_values (bool): Custom attribute with value set to 'None'
            won't be in output.
        store_by_key (bool): Output will be stored by attribute keys if true
            otherwise is value stored by attribute id.

    Returns:
        Dict[str, Dict[str, Any]]: Custom attribute values by entity id.

    """
    entity_ids = set(entity_ids)
    hier_attr_ids = {
        attr_conf["id"]
        for attr_conf in attr_configs
        if attr_conf["is_hierarchical"]
    }
    attr_by_id = {
        attr_conf["id"]: attr_conf["key"]
        for attr_conf in attr_configs
    }

    value_items = query_custom_attribute_values(
        session, attr_by_id.keys(), entity_ids
    )

    output = collections.defaultdict(dict)
    for value_item in value_items:
        value = value_item["value"]
        if skip_none_values and value is None:
            continue

        entity_id = value_item["entity_id"]
        entity_values = output[entity_id]
        attr_id = value_item["configuration_id"]
        if not store_by_key:
            entity_values[attr_id] = value
            continue

        attr_key = attr_by_id[attr_id]
        # Hierarchical attributes are always preferred
        if attr_key not in entity_values or attr_id in hier_attr_ids:
            entity_values[attr_key] = value

    return output

get_custom_attributes_mapping(session, addon_settings, attr_confs=None, ayon_attributes=None)

Query custom attribute configurations from ftrack server.

Returns:

Type Description
CustomAttributesMapping

Dict[str, List[object]]: ftrack custom attributes.

Source code in client/ayon_ftrack/common/custom_attributes.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def get_custom_attributes_mapping(
    session: ftrack_api.Session,
    addon_settings: Dict[str, Any],
    attr_confs: Optional[List[object]] = None,
    ayon_attributes: Optional[List[object]] = None,
) -> CustomAttributesMapping:
    """Query custom attribute configurations from ftrack server.

    Returns:
        Dict[str, List[object]]: ftrack custom attributes.

    """
    cust_attr = addon_settings["custom_attributes"]
    # "custom_attributes/attributes_mapping/mapping"
    attributes_mapping = cust_attr["attributes_mapping"]

    if attr_confs is None:
        attr_confs = get_all_attr_configs(session)

    if ayon_attributes is None:
        ayon_attributes = ayon_api.get_attributes_schema()["attributes"]

    ayon_attribute_names = {
        attr["name"]
        for attr in ayon_attributes
    }

    hier_attrs = []
    nonhier_attrs = []
    for attr_conf in attr_confs:
        if attr_conf["is_hierarchical"]:
            hier_attrs.append(attr_conf)
        else:
            nonhier_attrs.append(attr_conf)

    output = CustomAttributesMapping()
    if not attributes_mapping["enabled"]:
        builtin_attrs = {
            attr["name"]
            for attr in ayon_attributes
            if attr["builtin"]
        }
        for attr_conf in hier_attrs:
            attr_name = attr_conf["key"]
            # Use only AYON attribute hierarchical equivalent
            if (
                attr_name in output
                or attr_name not in ayon_attribute_names
            ):
                continue

            # Attribute must be in builtin attributes or openpype/ayon group
            # NOTE get rid of group name check when only mapping is used
            if (
                attr_name in builtin_attrs
                or attr_conf["group"]["name"] in ("openpype", CUST_ATTR_GROUP)
            ):
                output.add_mapping_item(MappedAYONAttribute(
                    attr_name,
                    True,
                    [attr_conf],
                ))

    else:
        for item in attributes_mapping["mapping"]:
            ayon_attr_name = item["name"]
            if ayon_attr_name not in ayon_attribute_names:
                continue

            is_hierarchical = item["attr_type"] == "hierarchical"

            mapped_item = MappedAYONAttribute(
                ayon_attr_name, is_hierarchical, []
            )

            if is_hierarchical:
                attr_name = item["hierarchical"]
                for attr_conf in hier_attrs:
                    if attr_conf["key"] == attr_name:
                        mapped_item.add_attr_conf(attr_conf)
                        break
            else:
                attr_names = item["standard"]
                for attr_conf in nonhier_attrs:
                    if attr_conf["key"] in attr_names:
                        mapped_item.add_attr_conf(attr_conf)
            output.add_mapping_item(mapped_item)

    for attr_name in ayon_attribute_names:
        if attr_name not in output:
            output.add_mapping_item(MappedAYONAttribute(attr_name))

    return output

get_datetime_data(datetime_obj=None)

Returns current datetime data as dictionary.

Note

This function is copied from 'ayon_core.lib'.

Parameters:

Name Type Description Default
datetime_obj datetime

Specific datetime object

None

Returns:

Name Type Description
dict

prepared date & time data

Available keys

"d" - in shortest possible way. "dd" - with 2 digits. "ddd" - shortened week day. e.g.: Mon, ... "dddd" - full name of week day. e.g.: Monday, ... "m" - in shortest possible way. e.g.: 1 if January "mm" - with 2 digits. "mmm" - shortened month name. e.g.: Jan, ... "mmmm" - full month name. e.g.: January, ... "yy" - shortened year. e.g.: 19, 20, ... "yyyy" - full year. e.g.: 2019, 2020, ... "H" - shortened hours. "HH" - with 2 digits. "h" - shortened hours. "hh" - with 2 digits. "ht" - AM or PM. "M" - shortened minutes. "MM" - with 2 digits. "S" - shortened seconds. "SS" - with 2 digits.

Source code in client/ayon_ftrack/common/utils.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_datetime_data(datetime_obj=None):
    """Returns current datetime data as dictionary.

    Note:
        This function is copied from 'ayon_core.lib'.

    Args:
        datetime_obj (datetime): Specific datetime object

    Returns:
        dict: prepared date & time data

    Available keys:
        "d" - <Day of month number> in shortest possible way.
        "dd" - <Day of month number> with 2 digits.
        "ddd" - <Week day name> shortened week day. e.g.: `Mon`, ...
        "dddd" - <Week day name> full name of week day. e.g.: `Monday`, ...
        "m" - <Month number> in shortest possible way. e.g.: `1` if January
        "mm" - <Month number> with 2 digits.
        "mmm" - <Month name> shortened month name. e.g.: `Jan`, ...
        "mmmm" - <Month name> full month name. e.g.: `January`, ...
        "yy" - <Year number> shortened year. e.g.: `19`, `20`, ...
        "yyyy" - <Year number> full year. e.g.: `2019`, `2020`, ...
        "H" - <Hours number 24-hour> shortened hours.
        "HH" - <Hours number 24-hour> with 2 digits.
        "h" - <Hours number 12-hour> shortened hours.
        "hh" - <Hours number 12-hour> with 2 digits.
        "ht" - <Midday type> AM or PM.
        "M" - <Minutes number> shortened minutes.
        "MM" - <Minutes number> with 2 digits.
        "S" - <Seconds number> shortened seconds.
        "SS" - <Seconds number> with 2 digits.
    """

    if not datetime_obj:
        datetime_obj = datetime.datetime.now()

    year = datetime_obj.strftime("%Y")

    month = datetime_obj.strftime("%m")
    month_name_full = datetime_obj.strftime("%B")
    month_name_short = datetime_obj.strftime("%b")
    day = datetime_obj.strftime("%d")

    weekday_full = datetime_obj.strftime("%A")
    weekday_short = datetime_obj.strftime("%a")

    hours = datetime_obj.strftime("%H")
    hours_midday = datetime_obj.strftime("%I")
    hour_midday_type = datetime_obj.strftime("%p")
    minutes = datetime_obj.strftime("%M")
    seconds = datetime_obj.strftime("%S")

    return {
        "d": str(int(day)),
        "dd": str(day),
        "ddd": weekday_short,
        "dddd": weekday_full,
        "m": str(int(month)),
        "mm": str(month),
        "mmm": month_name_short,
        "mmmm": month_name_full,
        "yy": str(year[2:]),
        "yyyy": str(year),
        "H": str(int(hours)),
        "HH": str(hours),
        "h": str(int(hours_midday)),
        "hh": str(hours_midday),
        "ht": hour_midday_type,
        "M": str(int(minutes)),
        "MM": str(minutes),
        "S": str(int(seconds)),
        "SS": str(seconds),
    }

get_folder_path_for_entities(session, entities, path_cust_attr_id=None, allow_use_link=True)

Get folder path for ftrack entities.

Folder path is received from custom attribute, or from entity link which contains name of parent entities.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
entities List[dict]

List of ftrack entities.

required
path_cust_attr_id Union[str, None]

Custom attribute configuration id which stores entity path.

None
allow_use_link bool

Use 'link' value if path is not found in custom attributes.

True

Returns:

Type Description

dict[str, Union[str, None]]: Entity path by ftrack entity id. Output will always contain all entity ids from input.

Source code in client/ayon_ftrack/common/utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get_folder_path_for_entities(
    session, entities, path_cust_attr_id=None, allow_use_link=True
):
    """Get folder path for ftrack entities.

    Folder path is received from custom attribute, or from entity link
        which contains name of parent entities.

    Args:
        session (ftrack_api.Session): Connected ftrack session.
        entities (List[dict]): List of ftrack entities.
        path_cust_attr_id (Union[str, None]): Custom attribute
            configuration id which stores entity path.
        allow_use_link (bool): Use 'link' value if path is not found in
            custom attributes.

    Returns:
        dict[str, Union[str, None]]: Entity path by ftrack entity id.
            Output will always contain all entity ids from input.
    """

    entities_by_id = {
        entity["id"]: entity
        for entity in entities
    }
    entity_ids = set(entities_by_id.keys())
    folder_paths_by_id = {
        entity_id: None
        for entity_id in entity_ids
    }
    if not folder_paths_by_id:
        return folder_paths_by_id

    if path_cust_attr_id is None:
        cust_attr_conf = session.query(
            "select id, key from CustomAttributeConfiguration"
            f" where key is '{CUST_ATTR_KEY_SERVER_PATH}'"
        ).first()
        if cust_attr_conf:
            path_cust_attr_id = cust_attr_conf["id"]

    value_items = []
    if path_cust_attr_id is not None:
        value_items = query_custom_attribute_values(
            session, {path_cust_attr_id}, entity_ids
        )

    for value_item in value_items:
        path = value_item["value"]
        entity_id = value_item["entity_id"]
        if path:
            entity_ids.discard(entity_id)
            folder_paths_by_id[entity_id] = path

    if allow_use_link:
        for missing_id in entity_ids:
            entity = entities_by_id[missing_id]
            # Use stupidly simple solution
            link_names = [item["name"] for item in entity["link"]]
            # Change project name to empty string
            link_names[0] = ""
            folder_paths_by_id[missing_id] = "/".join(link_names)

    return folder_paths_by_id

get_ftrack_icon_url(icon_name, addon_version, addon_name=None)

Helper to get icon url to server.

The existence of file is not validated.

Parameters:

Name Type Description Default
icon_name str

Name of icon filename.

required
addon_version str

Version of addon.

required
addon_name Optional[str]

Name of addon. For development purposes. Default value 'ftrack'.

None

Returns:

Name Type Description
str

Url to icon on server.

Source code in client/ayon_ftrack/common/lib.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def get_ftrack_icon_url(icon_name, addon_version, addon_name=None):
    """Helper to get icon url to server.

    The existence of file is not validated.

    Args:
        icon_name (str): Name of icon filename.
        addon_version (str): Version of addon.
        addon_name (Optional[str]): Name of addon. For development purposes.
            Default value 'ftrack'.

    Returns:
        str: Url to icon on server.
    """

    return get_ftrack_public_url(
        "icons", icon_name,
        addon_version=addon_version,
        addon_name=addon_name
    )

get_ftrack_public_url(*args, addon_version, addon_name=None)

Url to public path in ftrack addon.

Parameters:

Name Type Description Default
args tuple[str]

Subpaths in 'public' dir.

()
addon_version str

Version of addon.

required
addon_name Optional[str]

Name of addon. This is for development purposes. Default value 'ftrack'.

None

Returns:

Name Type Description
str

Url to public file on server in ftrack addon.

Source code in client/ayon_ftrack/common/lib.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def get_ftrack_public_url(*args, addon_version, addon_name=None):
    """Url to public path in ftrack addon.

    Args:
        args (tuple[str]): Subpaths in 'public' dir.
        addon_version (str): Version of addon.
        addon_name (Optional[str]): Name of addon. This is for development
            purposes. Default value 'ftrack'.

    Returns:
        str: Url to public file on server in ftrack addon.
    """

    server_url = get_base_url()
    parts = [
        server_url,
        "addons",
        addon_name or "ftrack",
        addon_version,
        "public"
    ]
    parts.extend(args)
    return "/".join(parts)

get_host_ip()

Get IP of machine.

Returns:

Type Description

Union[str, None]: IP address of machine or None if could not be detected.

Source code in client/ayon_ftrack/common/lib.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_host_ip():
    """Get IP of machine.

    Returns:
        Union[str, None]: IP address of machine or None if could not be
            detected.
    """

    host_name = socket.gethostname()
    try:
        return socket.gethostbyname(host_name)
    except Exception:
        pass

    return None

get_service_ftrack_icon_url(icon_name, addon_version=None, addon_name=None)

Icon url to server for service process.

Information about addon version are taken from registered service in 'ayon_api'.

Parameters:

Name Type Description Default
icon_name str

Name of icon filename.

required
addon_version Optional[str]

Version of addon. Version from registered service is used if not passed. For development purposes.

None
addon_name Optional[str]

Name of addon. For development purposes.

None

Returns:

Name Type Description
str

Url to icon on server.

Source code in client/ayon_ftrack/common/lib.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def get_service_ftrack_icon_url(
    icon_name, addon_version=None, addon_name=None
):
    """Icon url to server for service process.

    Information about addon version are taken from registered service
    in 'ayon_api'.

    Args:
        icon_name (str): Name of icon filename.
        addon_version (Optional[str]): Version of addon. Version from
            registered service is used if not passed. For development purposes.
        addon_name (Optional[str]): Name of addon. For development purposes.

    Returns:
        str: Url to icon on server.
    """

    return get_ftrack_icon_url(
        icon_name,
        addon_version=addon_version or get_service_addon_version(),
        addon_name=addon_name or get_service_addon_name()
    )

import_filepath(filepath, module_name=None)

Import python file as python module.

Python 2 and Python 3 compatibility.

Parameters:

Name Type Description Default
filepath(str)

Path to python file.

required
module_name(str)

Name of loaded module. Only for Python 3. By default is filled with filename of filepath.

required
Source code in client/ayon_ftrack/common/python_module_tools.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def import_filepath(filepath, module_name=None):
    """Import python file as python module.

    Python 2 and Python 3 compatibility.

    Args:
        filepath(str): Path to python file.
        module_name(str): Name of loaded module. Only for Python 3. By default
            is filled with filename of filepath.
    """
    if module_name is None:
        module_name = os.path.splitext(os.path.basename(filepath))[0]

    # Make sure it is not 'unicode' in Python 2
    module_name = str(module_name)

    # Prepare module object where content of file will be parsed
    module = types.ModuleType(module_name)

    # Use loader so module has full specs
    module_loader = importlib.machinery.SourceFileLoader(
        module_name, filepath
    )
    module_loader.exec_module(module)
    return module

is_ftrack_enabled_in_settings(project_settings)

Check if ftrack is enabled in ftrack project settings.

This function expect settings for a specific project. It is not checking if ftrack is enabled in general.

Project settings gives option to disable ftrack integration per project. That should disable most of ftrack integration functionality, especially pipeline integration > publish plugins, and some automations like event server handlers.

Parameters:

Name Type Description Default
project_settings dict[str, Any]

ftrack project settings.

required

Returns:

Name Type Description
bool

True if ftrack is enabled in project settings.

Source code in client/ayon_ftrack/common/lib.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def is_ftrack_enabled_in_settings(project_settings):
    """Check if ftrack is enabled in ftrack project settings.

    This function expect settings for a specific project. It is not checking
    if ftrack is enabled in general.

    Project settings gives option to disable ftrack integration per project.
    That should disable most of ftrack integration functionality, especially
    pipeline integration > publish plugins, and some automations like event
    server handlers.

    Args:
        project_settings (dict[str, Any]): ftrack project settings.

    Returns:
        bool: True if ftrack is enabled in project settings.
    """

    ftrack_enabled = project_settings.get("enabled")
    # If 'ftrack_enabled' is not set, we assume it is enabled.
    # - this is for backwards compatibility - remove in future
    if ftrack_enabled is None:
        return True
    return ftrack_enabled

join_filter_values(values)

Prepare values to be used for filtering in ftrack query.

Parameters:

Name Type Description Default
Iterable[str]

Values to join for filter query.

required

Returns:

Name Type Description
str

Prepared values for ftrack query.

Source code in client/ayon_ftrack/common/lib.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def join_filter_values(values):
    """Prepare values to be used for filtering in ftrack query.

    Args:
        Iterable[str]: Values to join for filter query.

    Returns:
        str: Prepared values for ftrack query.
    """

    return ",".join({
        '"{}"'.format(value)
        for value in values
    })

map_ftrack_users_to_ayon_users(ftrack_users, ayon_users=None)

Map ftrack users to AYON users.

Mapping is based on 2 possible keys, email and username where email has higher priority. Once AYON user is mapped it cannot be mapped again to different user.

Fields used from ftrack users: 'id', 'username', 'email'.

Parameters:

Name Type Description Default
ftrack_users List[User]

List of ftrack users.

required
ayon_users List[Dict[str, Any]]

List of AYON users.

None

Returns:

Type Description
Dict[str, Union[str, None]]

Dict[str, Union[str, None]]: Mapping of ftrack user id to AYON username.

Source code in client/ayon_ftrack/common/users.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def map_ftrack_users_to_ayon_users(
    ftrack_users: List["ftrack_api.entity.user.User"],
    ayon_users: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, "Union[str, None]"]:
    """Map ftrack users to AYON users.

    Mapping is based on 2 possible keys, email and username where email has
    higher priority. Once AYON user is mapped it cannot be mapped again to
    different user.

    Fields used from ftrack users: 'id', 'username', 'email'.

    Args:
        ftrack_users (List[ftrack_api.entity.user.User]): List of ftrack users.
        ayon_users (List[Dict[str, Any]]): List of AYON users.

    Returns:
        Dict[str, Union[str, None]]: Mapping of ftrack user id
            to AYON username.

    """
    if ayon_users is None:
        ayon_users = ayon_api.get_users()

    mapping: Dict[str, "Union[str, None]"] = {
        user["id"]: None
        for user in ftrack_users
    }
    ayon_users_by_email: Dict[str, str] = {}
    ayon_users_by_name: Dict[str, str] = {}
    for ayon_user in ayon_users:
        ayon_name = ayon_user["name"]
        ayon_email = ayon_user["attrib"]["email"]
        ayon_users_by_name[ayon_name.lower()] = ayon_name
        if ayon_email:
            ayon_users_by_email[ayon_email.lower()] = ayon_name

    mapped_ayon_users: Set[str] = set()
    for ftrack_user in ftrack_users:
        ftrack_id: str = ftrack_user["id"]
        # Make sure username does not contain '@' character
        ftrack_name: str = ftrack_user["username"].split("@", 1)[0]
        ftrack_email: str = ftrack_user["email"]

        if ftrack_email and ftrack_email.lower() in ayon_users_by_email:
            ayon_name: str = ayon_users_by_email[ftrack_email.lower()]
            if ayon_name not in mapped_ayon_users:
                mapping[ftrack_id] = ayon_name
                mapped_ayon_users.add(ayon_name)
            continue

        if ftrack_name in ayon_users_by_name:
            ayon_name: str = ayon_users_by_name[ftrack_name]
            if ayon_name not in mapped_ayon_users:
                mapped_ayon_users.add(ayon_name)
                mapping[ftrack_id] = ayon_name

    return mapping

modules_from_path(folder_path, log=None)

Get python scripts as modules from a path.

Parameters:

Name Type Description Default
folder_path str

Path to folder containing python scripts.

required
log Optional[Logger]

Logger used for logs.

None

Returns:

Type Description

tuple: First list contains successfully imported modules and second list contains tuples of path and exception.

Source code in client/ayon_ftrack/common/python_module_tools.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def modules_from_path(folder_path, log=None):
    """Get python scripts as modules from a path.

    Arguments:
        folder_path (str): Path to folder containing python scripts.
        log (Optional[logging.Logger]): Logger used for logs.

    Returns:
        tuple<list, list>: First list contains successfully imported modules
            and second list contains tuples of path and exception.
    """
    crashed = []
    modules = []
    output = (modules, crashed)
    # Just skip and return empty list if path is not set
    if not folder_path:
        return output

    if log is None:
        log = logging.getLogger("modules_from_path")
    # Do not allow relative imports
    if folder_path.startswith("."):
        log.warning((
            "BUG: Relative paths are not allowed for security reasons. {}"
        ).format(folder_path))
        return output

    folder_path = os.path.normpath(folder_path)

    if not os.path.isdir(folder_path):
        log.warning("Not a directory path: {}".format(folder_path))
        return output

    for filename in os.listdir(folder_path):
        # Ignore files which start with underscore
        if filename.startswith("_"):
            continue

        mod_name, mod_ext = os.path.splitext(filename)
        if not mod_ext == ".py":
            continue

        full_path = os.path.join(folder_path, filename)
        if not os.path.isfile(full_path):
            continue

        try:
            module = import_filepath(full_path, mod_name)
            modules.append((full_path, module))

        except Exception:
            crashed.append((full_path, sys.exc_info()))
            log.warning(
                "Failed to load path: \"{0}\"".format(full_path),
                exc_info=True
            )
            continue

    return output

query_custom_attribute_values(session, attr_ids, entity_ids)

Query custom attribute values from ftrack database.

Using ftrack call method result may differ based on used table name and version of ftrack server.

For hierarchical attributes you shou always use only_set_values=True otherwise result will be default value of custom attribute and it would not be possible to differentiate if value is set on entity or default value is used.

Parameters:

Name Type Description Default
session Session

Connected ftrack session.

required
attr_ids Iterable[str]

Attribute configuration ids.

required
entity_ids Iterable[str]

Entity ids for which are values queried.

required

Returns:

Type Description

List[Dict[str, Any]]: Results from server.

Source code in client/ayon_ftrack/common/custom_attributes.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def query_custom_attribute_values(session, attr_ids, entity_ids):
    """Query custom attribute values from ftrack database.

    Using ftrack call method result may differ based on used table name and
    version of ftrack server.

    For hierarchical attributes you shou always use `only_set_values=True`
    otherwise result will be default value of custom attribute and it would not
    be possible to differentiate if value is set on entity or default value is
    used.

    Args:
        session (ftrack_api.Session): Connected ftrack session.
        attr_ids (Iterable[str]): Attribute configuration ids.
        entity_ids (Iterable[str]): Entity ids for which are values queried.

    Returns:
        List[Dict[str, Any]]: Results from server.
    """

    output = []
    # Just skip
    attr_ids = set(attr_ids)
    entity_ids = set(entity_ids)
    if not attr_ids or not entity_ids:
        return output

    # Prepare values to query
    attributes_joined = join_filter_values(attr_ids)

    # Query values in chunks
    chunk_size = 5000 // len(attr_ids)
    # Make sure entity_ids is `list` for chunk selection
    for chunk in create_chunks(entity_ids, chunk_size):
        entity_ids_joined = join_filter_values(chunk)
        output.extend(
            session.query(
                (
                    "select value, entity_id, configuration_id"
                    " from CustomAttributeValue"
                    " where entity_id in ({}) and configuration_id in ({})"
                ).format(entity_ids_joined, attributes_joined)
            ).all()
        )
    return output