7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 | class ValidateFtrackAttributes(plugin.FtrackPublishInstancePlugin):
"""
This will validate attributes in ftrack against data in scene.
This is array (list) of checks in format:
[
[<attribute>, <operator>, <expression>]
]
Where <attribute> is name of ftrack attribute, <operator> is one of:
"is", is_not", "greater_than", "less_than", "contains", "not_contains",
"starts_with", "ends_with"
<expression> is python code that is evaluated by validator. This allows
you to fetch whatever value in scene you want, for example in Maya:
[
"fps", "is",
"from maya import mel; out = mel.eval('currentTimeUnitToFPS()')"
]
will test if ftrack fps attribute on current Task parent is same as fps
info we get from maya. Store the value you need to compare in
variable `out` in your expression.
"""
label = "Validate Custom ftrack Attributes"
order = ValidateContentsOrder
families = ["ftrack"]
optional = True
# Ignore standalone host, because it does not have an ftrack entity
# associated.
hosts = [
"blender",
"fusion",
"harmony",
"houdini",
"maya",
"nuke",
"hiero",
"photoshop",
"premiere",
"resolve",
"unreal"
]
def process(self, instance):
context = instance.context
task = context.data.get('ftrackTask', False)
if not task:
self._raise(AttributeError,
"Missing ftrack Task entity in context")
host = pyblish.api.current_host()
to_check = self.ftrack_custom_attributes.get(host, {})
if not to_check:
self.log.warning("ftrack_attributes preset not found")
return
self.log.info("getting attributes from ftrack ...")
# get parent of task
custom_attributes = {}
try:
parent = task["parent"]
custom_attributes = parent["custom_attributes"].items()
except KeyError:
self._raise(KeyError, "missing `parent` or `attributes`")
custom_attributes = dict(custom_attributes)
# get list of hierarchical attributes from ftrack
session = context.data["ftrackSession"]
custom_hier_attributes = self._get_custom_hier_attrs(session)
custom_attributes = {}
_nonhier = {}
custom_hier_attributes = {k: None for k in custom_hier_attributes}
for key, value in dict(parent["custom_attributes"]).items():
if key in custom_hier_attributes:
custom_hier_attributes[key] = value
else:
_nonhier[key] = value
custom_hier_values = self._get_hierarchical_values(
custom_hier_attributes, parent)
custom_hier_values.update(_nonhier)
errors = []
attribs = custom_hier_values
for check in to_check:
ev = {}
# WARNING(Ondrej Samohel): This is really not secure as we are
# basically executing user code. But there's no other way to make
# it flexible enough for users to get stuff from
exec(str(check[2]), {}, ev)
if not ev.get("out"):
errors.append("{} code doesn't return 'out': '{}'".format(
check[0], check[2]))
continue
if check[0] in attribs:
if check[1] == "is":
if attribs[check[0]] != ev["out"]:
errors.append("{}: {} is not {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "is_not":
if attribs[check[0]] == ev["out"]:
errors.append("{}: {} is {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "less_than":
if attribs[check[0]] < ev["out"]:
errors.append("{}: {} is greater {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "greater_than":
if attribs[check[0]] < ev["out"]:
errors.append("{}: {} is less {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "contains":
if attribs[check[0]] in ev["out"]:
errors.append("{}: {} does not contain {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "not_contains":
if attribs[check[0]] not in ev["out"]:
errors.append("{}: {} contains {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "starts_with":
if attribs[check[0]].startswith(ev["out"]):
errors.append("{}: {} does not starts with {}".format(
check[0], attribs[check[0]], ev["out"]))
elif check[1] == "ends_with":
if attribs[check[0]].endswith(ev["out"]):
errors.append("{}: {} does not end with {}".format(
check[0], attribs[check[0]], ev["out"]))
if errors:
self.log.error('There are invalid values for attributes:')
for e in errors:
self.log.error(e)
raise ValueError("ftrack attributes doesn't match")
def _get_custom_hier_attrs(self, session):
hier_custom_attributes = []
cust_attrs_query = (
"select id, entity_type, object_type_id, is_hierarchical"
" from CustomAttributeConfiguration"
)
all_attrs = session.query(cust_attrs_query).all()
for cust_attr in all_attrs:
if cust_attr["is_hierarchical"]:
hier_custom_attributes.append(cust_attr["key"])
return hier_custom_attributes
def _get_hierarchical_values(self, keys_dict, entity):
# check values already set
_set_keys = []
for key, value in keys_dict.items():
if value is not None:
_set_keys.append(key)
# pop set values from keys_dict
set_keys = {}
for key in _set_keys:
set_keys[key] = keys_dict.pop(key)
# find if entity has set values and pop them out
keys_to_pop = []
for key in keys_dict.keys():
_val = entity["custom_attributes"][key]
if _val:
keys_to_pop.append(key)
set_keys[key] = _val
for key in keys_to_pop:
keys_dict.pop(key)
# if there are not keys to find value return found
if not keys_dict:
return set_keys
# end recursion if entity is project
if entity.entity_type.lower() == "project":
for key, value in keys_dict.items():
set_keys[key] = value
else:
result = self._get_hierarchical_values(keys_dict, entity["parent"])
for key, value in result.items():
set_keys[key] = value
return set_keys
def _raise(self, exc, msg):
self.log.error(msg)
raise exc(msg)
|