Skip to content

_pinning_file_generation_funcs

generate_pinning_file(entry_usd, root_info, pinning_file_path)

Generate a AYON USD Resolver pinning file.

The pinning file can be used to pin paths in USD to specific filepaths, avoiding the need for dynamic resolving and runtime and allowing to pin dynamic URIs (like 'get me latest version') to be pinned to the version at the time of the generation.

Parameters:

Name Type Description Default
entry_usd str

The USD filepath to generate the pinning file for.

required
root_info Dict[str, str]

The project roots for the site the pinning should resolve to. These can be obtained via e.g. the AYON REST API get `/api/projects/{project_name}/siteRoots".

required
pinning_file_path str

The destination path to write the pinning file to.

required
Source code in client/ayon_usd/standalone/usd/pinning/_pinning_file_generation_funcs.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def generate_pinning_file(
    entry_usd: str, root_info: Dict[str, str], pinning_file_path: str
):
    """Generate a AYON USD Resolver pinning file.

    The pinning file can be used to pin paths in USD to specific filepaths,
    avoiding the need for dynamic resolving and runtime and allowing to pin
    dynamic URIs (like 'get me latest version') to be pinned to the version
    at the time of the generation.

    Arguments:
        entry_usd: The USD filepath to generate the pinning file for.
        root_info: The project roots for the site the pinning should resolve
          to. These can be obtained via e.g. the AYON REST API get
          `/api/projects/{project_name}/siteRoots".
        pinning_file_path: The destination path to write the pinning file to.

    """

    if not pinning_file_path.endswith(".json"):
        raise RuntimeError(
            f"Pinning file path is not a json file {pinning_file_path}")

    # Assume that the environment sets up the correct default AyonUsdResolver
    resolver = Ar.GetResolver()
    pinning_data = get_asset_dependencies(entry_usd, resolver)

    # on Windows, we need to make the drive letter lower case.
    if sys.platform.startswith('win'):
        pinning_data = {
            _normalize_path(key): _normalize_path(val)
            for key, val in pinning_data.items()
        }

    rootless_pinning_data = remove_root_from_dependency_info(
        pinning_data, root_info
    )

    rootless_pinning_data["ayon_pinning_data_entry_scene"] = _remove_sdf_args(
        entry_usd
    )

    _write_pinning_file(
        pinning_file_path,
        rootless_pinning_data,
    )

get_asset_dependencies(layer_path, resolver, processed_layers=None)

Return mapping from all used asset identifiers to the resolved filepaths.

Recursively traverse Sdf.Layer and get all their asset dependencies. For each asset identifier map it to its resolved filepath.

Parameters:

Name Type Description Default
layer_path str

Usd layer path to be taken as the root layer

required

Returns: Mapping from asset identifier to their resolved paths

Source code in client/ayon_usd/standalone/usd/pinning/_pinning_file_generation_funcs.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def get_asset_dependencies(
    layer_path: str,
    resolver: Ar.Resolver,
    processed_layers: Optional[Set[str]] = None
) -> Dict[str, str]:
    """Return mapping from all used asset identifiers to the resolved filepaths.

    Recursively traverse `Sdf.Layer` and get all their asset dependencies.
    For each asset identifier map it to its resolved filepath.

    Args:
        layer_path: Usd layer path to be taken as the root layer

    Returns: Mapping from asset identifier to their resolved paths

    """
    layer_path = _remove_sdf_args(layer_path)
    if not layer_path:
        return {}
    if isinstance(layer_path, Ar.ResolvedPath):
        layer_path = layer_path.GetPathString()

    identifier_to_path: Dict[str, str] = {}

    resolved_layer_path: Ar.ResolvedPath = resolver.Resolve(layer_path)

    if not processed_layers:
        processed_layers = {resolved_layer_path.GetPathString()}

    elif resolved_layer_path.GetPathString() in processed_layers:
        return {}

    else:
        processed_layers.add(resolved_layer_path.GetPathString())

    layer: Sdf.Layer = Sdf.Layer.FindOrOpen(resolved_layer_path)
    if not layer:
        log.warning(f"Unable to open layer: {resolved_layer_path}")
        return {}

    identifier_to_path[layer_path] = resolved_layer_path.GetPathString()
    prim_spec_file_paths: List[str] = _get_prim_spec_hierarchy_external_refs(
        layer.pseudoRoot, layer
    )
    for identifier in prim_spec_file_paths:
        identifier = _remove_sdf_args(identifier)
        resolved_path = resolver.Resolve(layer.ComputeAbsolutePath(identifier))
        resolved_path_str = resolved_path.GetPathString()
        identifier_to_path[layer.ComputeAbsolutePath(identifier)] = resolved_path_str

        if "<UDIM>" in resolved_path_str:
            # Include all tiles/paths of the UDIM
            udim_data = _resolve_udim(identifier, layer)
            identifier_to_path.update(udim_data)

    asset_identifier_list: List[str] = layer.GetCompositionAssetDependencies()

    for ref in asset_identifier_list:
        resolved_path = resolver.Resolve(layer.ComputeAbsolutePath(ref))
        ref = _remove_sdf_args(ref)
        resolved_path_str = resolved_path.GetPathString()
        if is_uri(ref):
            search_path_string = ref
        else:
            search_path_string = resolved_path_str

        identifier_to_path[search_path_string] = resolved_path_str

        recursive_result = get_asset_dependencies(
            search_path_string,
            resolver,
            processed_layers,
        )
        identifier_to_path.update(recursive_result)

    return identifier_to_path

remove_root_from_dependency_info(dependency_info, root_info)

Removes the Ayon Machine Root from a given Dependency info Dict

Parameters:

Name Type Description Default
dependency_info Dict[str, str]

Dict generated by get_asset_dependencies()

required
root_info Dict[str, str]

A flat dict containing root identifier mapping to the

required

Returns:

Type Description
Dict[str, str]

a dependency_info dict that holds key: Usd assetIdentifiers val:

Dict[str, str]

rootless paths as they would be returned from Ayon server /resolve

Dict[str, str]

endpoint ('Resolve Uris' in the server Docs).

Source code in client/ayon_usd/standalone/usd/pinning/_pinning_file_generation_funcs.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def remove_root_from_dependency_info(
    dependency_info: Dict[str, str], root_info: Dict[str, str]
) -> Dict[str, str]:
    """Removes the Ayon Machine Root from a given Dependency info Dict

    Args:
        dependency_info: Dict generated by `get_asset_dependencies()`
        root_info: A flat dict containing root identifier mapping to the
        associated root path /path/to/root. This can be obtained from Ayon
        server `Get Project Roots Overrides`

    Returns:
         a dependency_info dict that holds key: Usd assetIdentifiers val:
         rootless paths as they would be returned from Ayon server `/resolve`
         endpoint ('Resolve Uris' in the server Docs).
    """

    if not root_info or not dependency_info:
        raise ValueError(
            f"both root_info and dependency_info need to be present "
            f"and an instance of Dict (root_info: {root_info}, "
            f"dependency_info: {dependency_info})"
        )

    replacements = {path: replacer for replacer, path in root_info.items()}
    pattern = "|".join(f"({re.escape(pat)})" for pat in replacements)
    regx = re.compile(pattern)

    # TODO test if there are cases where we have more than one match.group
    def _replace_match(match: re.Match):
        match_grp_zero = match.group(0)
        match_replacment = replacements.get(re.escape(match_grp_zero))
        if not match_replacment:
            return match_grp_zero

        replacment = "{root[" + match_replacment + "]}"
        return replacment

    rootless_dependency_info = {}
    for key, path in dependency_info.items():
        new_path = regx.sub(_replace_match, path)
        new_key = regx.sub(_replace_match, key)

        rootless_dependency_info[new_key] = new_path

    return rootless_dependency_info