13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 | class ExtractCompute(pyblish.api.ContextPlugin):
"""Render RenderQueue locally."""
order = publish.Extractor.order - 0.47
label = "Extract Compute Nodes"
hosts = ["traypublisher"]
families = ["wrap"]
def process(self, context):
node_id_to_output_path = {}
output_path_to_instance = {}
for instance in context:
is_wrap = is_wrap_instance(instance, self.log)
if not is_wrap:
return
if instance.data["family"] == "workfile":
continue
workfile_path = instance.data["wrap"]["workfile_path"]
workfile_dir = os.path.dirname(workfile_path)
output_path = self._get_output_path(instance, workfile_dir)
if output_path in output_path_to_instance:
raise RuntimeError(f"{output_path} not unique!")
output_path_to_instance[output_path] = instance
node_id = instance.data["wrap"]["nodeId"]
node_id_to_output_path[node_id] = output_path
# same folderEntity for all instances
folder_entity = instance.data["folderEntity"]
frame_start = folder_entity["attrib"]["frameStart"]
frame_end = folder_entity["attrib"]["frameEnd"]
self._update_timeline(workfile_path, frame_start, frame_end)
self._update_output_dirs(workfile_path, node_id_to_output_path)
wrap_executable_path = instance.context.data["wrapExecutablePath"]
exit_code = self._call_compute(
workfile_path, wrap_executable_path, frame_start, frame_end)
if exit_code != 0:
raise RuntimeError(f"Cannot compute {workfile_path}")
product_name_to_instance = {}
for output_path, instance in output_path_to_instance.items():
product_name = instance.data["productName"]
another_instance = product_name_to_instance.get(product_name)
_, ext = os.path.splitext(os.path.basename(output_path))
ext = ext[1:]
files = []
output_dir = os.path.dirname(output_path)
for found_file_name in os.listdir(output_dir):
if not found_file_name.endswith(ext):
continue
files.append(found_file_name)
file_path = os.path.join(output_dir, found_file_name)
instance.context.data["cleanupFullPaths"].append(file_path)
if len(files) == 1:
files = files[0]
repre_data = {
"frameStart": frame_start,
"frameEnd": frame_end,
"name": ext,
"ext": ext,
"files": files,
"stagingDir": output_dir
}
if another_instance is not None:
representations = another_instance.data.get(
"representations", [])
instance = another_instance # update existing instance
else:
representations = instance.data.get("representations", [])
product_name_to_instance[product_name] = instance
representations.append(repre_data)
instance.data["representations"] = representations
instance.context.data["currentFile"] = workfile_path # TODO
def _get_output_path(self, instance, workfile_dir):
"""Get output dir where writer node will save.
If Wrap $PROJECT_DIR is not used, temporary folder will be created.
Returns:
[str]: full absolute path for writer node
"""
output_file_path = instance.data["wrap"]["output_file_path"]
output_file_name = os.path.basename(output_file_path)
if "$PROJECT_DIR" in output_file_path:
staging_dir = output_file_path.replace(
"$PROJECT_DIR", workfile_dir)
else:
staging_dir = self.staging_dir(instance)
output_path = os.path.join(staging_dir, output_file_name)
return output_path
def staging_dir(self, instance):
"""Provide a temporary directory in which to store extracted files
Upon calling this method the staging directory is stored inside
the instance.data['stagingDir']
"""
return get_instance_staging_dir(instance)
def _update_timeline(self, workfile_path, frame_start, frame_end):
"""Frame_start and frame_end must be inside of timeline values."""
with open(workfile_path, "r") as fp:
content = json.load(fp)
content["timeline"]["min"] = frame_start
content["timeline"]["current"] = frame_start
content["timeline"]["max"] = frame_end
with open(workfile_path, "w") as fp:
json.dump(content, fp, indent=4)
def _update_output_dirs(self, workfile_path, node_id_to_output_path):
"""Update all AYON_ writer nodes with path.
Could be in same directory where workfile is, or in temporary folders.
TODO revisit performance on separate parsing and writing to json,
it is expected to be negligent
"""
with open(workfile_path, "r") as fp:
content = json.load(fp)
for node in content["nodes"].values():
node_id = node["nodeId"]
output_dir = node_id_to_output_path.get(node_id)
if not output_dir:
continue
node["params"]["fileName"]["value"] = output_dir
with open(workfile_path, "w") as fp:
json.dump(content, fp, indent=4)
def _call_compute(
self, workfile_path, wrap_executable_path, frame_start, frame_end):
"""Trigger compute on workfile"""
wrap_executable_path = wrap_executable_path.replace("Wrap.",
"WrapCmd.")
subprocess_args = [wrap_executable_path, "compute", workfile_path,
"-s", str(frame_start),
"-e", str(frame_end)]
self.log.debug(f"args::{subprocess_args}")
exit_code = subprocess.call(subprocess_args,
cwd=os.path.dirname(workfile_path))
return exit_code
|