15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 | @pytest.mark.skipif(helpers.IS_GITHUB_ACTIONS, reason="WIP make it run on GitHub actions.")
@pytest.mark.parametrize("empty_project", [{"task_types": ("rendering", "edit")}], indirect=True)
def test_match_hierarchy_update(empty_project, mockgun_project): # noqa: F811
""" Ensure new Flow entities are updated from an AYON hierarchy.
"""
ay_project_data = empty_project
mg, _ = mockgun_project
hub, sg_project = helpers.setup_sg_project_and_hub(ay_project_data, mg)
entity_hub = hub.entity_hub
# Add "final" status
data = entity_hub.project_entity.statuses.to_data()
data.append({"name": "Final", "shortName": "fin"})
data.append({"name": "In Progress", "shortName": "ip"})
entity_hub.project_entity.set_statuses(data)
# An asset that exist in SG but needs update.
ay_asset = entity_hub.add_new_folder(
folder_type="Asset",
name="my_asset",
label="my_asset",
parent_id=entity_hub.project_entity.id,
status="Final",
attribs={
constants.SHOTGRID_ID_ATTRIB: "1",
constants.SHOTGRID_TYPE_ATTRIB: "Asset"
}
)
edit_task = entity_hub.add_new_task(
task_type="edit",
name="my_edit_task",
label="my_edit_task",
parent_id=ay_asset.id,
status="Final",
attribs={
constants.SHOTGRID_ID_ATTRIB: "1",
constants.SHOTGRID_TYPE_ATTRIB: "Task"
}
)
sg_asset = mg.create(
"Asset",
{
"code": "my_asset",
"sg_ayon_id": ay_asset.id,
"sg_status_list": "wtg",
"project": sg_project,
}
)
mg.create(
"Task",
{
'content': 'my_edit_task',
'entity': sg_asset,
'project': sg_project,
'sg_ayon_id': edit_task.id,
"sg_status_list": "wtg",
}
)
# A sequence that does not exist in SG but needs update.
ay_sequence = entity_hub.add_new_folder(
folder_type="Sequence",
name="my_sequence",
label="my_sequence",
status="Final",
parent_id=entity_hub.project_entity.id,
)
entity_hub.commit_changes()
# Launch hierarchy sync
with (
mock.patch.object(validate, "get_sg_project_enabled_entities", return_value=helpers.ENABLED_ENTITIES.items()),
mock.patch.object(utils, "get_sg_project_enabled_entities", return_value=helpers.ENABLED_ENTITIES.items()),
):
hub.synchronize_projects()
sg_asset = mg.find_one(
"Asset",
[["id", "is", 1]],
["sg_status_list"]
)
sg_sequence = mg.find_one(
"Sequence",
[["id", "is", 1]],
["sg_status_list", "sg_ayon_id"],
)
sg_task = mg.find_one(
"Task",
[["id", "is", 1]],
["sg_status_list"]
)
# Ensure asset status got updated.
assert sg_asset["sg_status_list"] == "fin"
# Ensure sequence got created and updated.
assert sg_sequence["sg_ayon_id"] == ay_sequence.id
assert sg_sequence["sg_status_list"] == "fin"
# Ensure task status got updated.
assert sg_task["sg_status_list"] == "fin"
|