Skip to content

server

SiteSync

Bases: BaseServerAddon

Source code in server/__init__.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
class SiteSync(BaseServerAddon):
    settings_model: Type[SiteSyncSettings] = SiteSyncSettings

    frontend_scopes: dict[str, Any] = {"project": {}}

    def initialize(self) -> None:

        self.add_endpoint(
            "/{project_name}/get_user_sites",
            self.get_user_sites,
            method="GET",
        )

        self.add_endpoint(
            "/{project_name}/params",
            self.get_site_sync_params,
            method="GET",
        )

        self.add_endpoint(
            "/{project_name}/state",
            self.get_site_sync_state,
            method="GET",
        )

        self.add_endpoint(
            "/{project_name}/state/{representation_id}/{site_name}",  # noqa
            self.set_site_sync_representation_state,
            method="POST",
        )

        self.add_endpoint(
            "/{project_name}/state/{representation_id}/{site_name}",  # noqa
            self.remove_site_sync_representation_state,
            method="DELETE",
        )

    #
    # GET SITE SYNC PARAMS
    #

    async def get_site_sync_params(
        self,
        project_name: ProjectName,
        user: CurrentUser,
    ) -> SiteSyncParamsModel:

        access_list = await folder_access_list(user, project_name, "read")
        conditions = []
        if access_list is not None:
            conditions.append(f"h.path like ANY ('{{ {','.join(access_list)} }}')")

        query = f"""
            SELECT
                DISTINCT(r.name) as name,
                COUNT (*) OVER () as total_count
            FROM project_{project_name}.representations as r
            INNER JOIN project_{project_name}.versions as v
                ON r.version_id = v.id
            INNER JOIN project_{project_name}.products as p
                ON v.product_id = p.id
            INNER JOIN project_{project_name}.hierarchy as h
                ON p.folder_id = h.id
            {SQLTool.conditions(conditions)}
        """

        total_count = 0
        names = []
        async for row in Postgres.iterate(query):
            total_count = row["total_count"] or 0
            names.append(row["name"])

        return SiteSyncParamsModel(count=total_count, names=names)

    #
    # GET USER SYNC SITES
    #

    async def get_user_sites(
        self,
        project_name: ProjectName,
        user: CurrentUser,
    ) -> dict[str, list[str]]:
        sites = {"active_site": [], "remote_site": []}
        site_infos = await Postgres.fetch("select id, data from sites")
        for site_info in site_infos:
            settings = await self.get_project_site_settings(
                project_name, user.name, site_info["id"]
            )
            for site_type in ["active_site", "remote_site"]:
                used_site = settings.dict()["local_setting"][site_type]
                if not used_site:
                    continue

                if used_site == "local":
                    sites[site_type].append(site_info["id"])
                else:
                    sites[site_type].append(used_site)
        return sites


    #
    # GET SITE SYNC OVERAL STATE
    #
    async def get_site_sync_state(
        self,
        project_name: ProjectName,
        user: CurrentUser,
        representationIds: list[str] | None = Query(
            None,
            description="Filter by representation ids",
            example="['57cf375c749611ed89de0242ac140004']",
        ),
        repreNameFilter: list[str] | None = Query(None,
            description="Filter by representation name"),
        localSite: str = Query(
            ...,
            description="Name of the local site",
            example="Machine42",
        ),
        remoteSite: str = Query(
            ...,
            description="Name of the remote site",
            example="GDrive",
        ),
        folderFilter: str | None = Query(
            None,
            description="Filter folders by name",
            example="sh042",
        ),
        folderIdsFilter: list[str] | None = Query(
            None,
            description="Filter folders by id, eg filtering by asset ids",
            example="['57cf375c749611ed89de0242ac140004']",
        ),
        productFilter: str | None = Query(
            None,
            description="Filter products by name",
            example="animation",
        ),
        versionFilter: int | None = Query(
            None,
            description="Filter products by version",
            example="1",
        ),
        versionIdsFilter: list[str] | None = Query(
            None,
            description="Filter versions by ids",
            example="['57cf375c749611ed89de0242ac140004']",
        ),
        localStatusFilter: list[StatusEnum] | None = Query(
            None,
            description=f"List of states to show. Available options: {StatusEnum.__doc__}",
            example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS],
        ),
        remoteStatusFilter: list[StatusEnum] | None = Query(
            None,
            description=f"List of states to show. Available options: {StatusEnum.__doc__}",
            example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS],
        ),
        sortBy: SortByEnum = Query(
            SortByEnum.folder,
            description="Sort the result by this value",
            example=SortByEnum.folder,
        ),
        sortDesc: bool = Query(
            False,
            name="Sort descending",
            description="Sort the result in descending order",
        ),
        bothOnly: bool = Query(
            False,
            name="Query only with both sites",
            description="Used for front end UI to show only repres with"
                        " both sides",
        ),
        # Pagination
        page: int = Query(1, ge=1),
        pageLength: int = Query(50, ge=1),
    ) -> SiteSyncSummaryModel:
        """Return a site sync state.

        Used for querying representations to be synchronized and state of
        versions and representations to show in Loader UI.
        """
        await check_sync_status_table(project_name)
        conditions = []

        if representationIds is not None:
            conditions.append(f"r.id IN {SQLTool.array(representationIds)}")

        if folderFilter:
            conditions.append(f"f.name ILIKE '%{folderFilter}%'")

        if folderIdsFilter:
            conditions.append(f"f.id IN {SQLTool.array(folderIdsFilter)}")

        if productFilter:
            conditions.append(f"p.name ILIKE '%{productFilter}%'")

        if versionFilter:
            conditions.append(f"v.version = {versionFilter}")

        if versionIdsFilter:
            conditions.append(f"v.id IN {SQLTool.array(versionIdsFilter)}")

        if localStatusFilter:
            statusFilter = [str(s.value) for s in localStatusFilter]
            conditions.append(f"local.status IN ({','.join(statusFilter)})")

        if remoteStatusFilter:
            statusFilter = [str(s.value) for s in remoteStatusFilter]
            conditions.append(f"remote.status IN ({','.join(statusFilter)})")

        if repreNameFilter:
            conditions.append(f"r.name IN {SQLTool.array(repreNameFilter)}")

        access_list = await folder_access_list(user, project_name, "read")
        if access_list is not None:
            conditions.append(f"path like ANY ('{{ {','.join(access_list)} }}')")

        sites_join = "LEFT"
        if bothOnly:
            sites_join = "INNER"

        query = f"""
            SELECT
                f.name as folder,
                p.name as product,
                v.version as version,
                r.name as representation,
                h.path as path,

                r.id as representation_id,
                r.files as representation_files,
                local.data as local_data,
                remote.data as remote_data,
                local.status as localStatus,
                remote.status as remoteStatus,
                v.id as version_id
            FROM
                project_{project_name}.folders as f
            INNER JOIN
                project_{project_name}.products as p
                ON p.folder_id = f.id
            INNER JOIN
                project_{project_name}.versions as v
                ON v.product_id = p.id
            INNER JOIN
                project_{project_name}.representations as r
                ON r.version_id = v.id
            INNER JOIN
                project_{project_name}.hierarchy as h
                ON f.id = h.id
            {sites_join} JOIN
                project_{project_name}.sitesync_files_status as local
                ON local.representation_id = r.id
                AND local.site_name = '{localSite}'
            {sites_join} JOIN
                project_{project_name}.sitesync_files_status as remote
                ON remote.representation_id = r.id
                AND remote.site_name = '{remoteSite}'

            {SQLTool.conditions(conditions)}

            ORDER BY {sortBy.value} {'DESC' if sortDesc else 'ASC'}
            LIMIT {pageLength}
            OFFSET { (page-1) * pageLength }
        """
        repres = []

        async for row in Postgres.iterate(query):
            files = row["representation_files"]
            file_count = len(files)
            total_size = sum([f.get("size") for f in files])

            ldata = row["local_data"] or {}
            lfiles = ldata.get("files", {})
            lsize = sum([f.get("size") for f in lfiles.values()] or [0])
            ltime = max([f.get("timestamp") for f in lfiles.values()] or [0])

            rdata = row["remote_data"] or {}
            rfiles = rdata.get("files", {})
            rsize = sum([f.get("size") for f in rfiles.values()] or [0])
            rtime = max([f.get("timestamp") for f in rfiles.values()] or [0])

            local_status = SyncStatusModel(
                status=StatusEnum.NOT_AVAILABLE
                if row["localstatus"] is None
                else row["localstatus"],
                totalSize=total_size,
                size=lsize,
                timestamp=ltime,
            )
            remote_status = SyncStatusModel(
                status=StatusEnum.NOT_AVAILABLE
                if row["remotestatus"] is None
                else row["remotestatus"],
                totalSize=total_size,
                size=rsize,
                timestamp=rtime,
            )

            file_list = []
            for file_info in files:
                file_id = file_info["id"]
                local_file = lfiles.get(file_id, {})
                remote_file = rfiles.get(file_id, {})

                file_list.append(
                    FileModel(
                        id=file_id,
                        fileHash=file_info["hash"],
                        size=file_info["size"],
                        path=file_info["path"],
                        baseName=os.path.split(file_info["path"])[1],
                        localStatus=SyncStatusModel(
                            status=local_file.get("status",
                                                StatusEnum.NOT_AVAILABLE),
                            size=local_file.get("size", 0),
                            totalSize=file_info["size"],
                            timestamp=local_file.get("timestamp", 0),
                            message=local_file.get("message", None),
                            retries=local_file.get("retries", 0),
                        ),
                        remoteStatus=SyncStatusModel(
                            status=remote_file.get("status",
                                                StatusEnum.NOT_AVAILABLE),
                            size=remote_file.get("size", 0),
                            totalSize=file_info["size"],
                            timestamp=remote_file.get("timestamp", 0),
                            message=remote_file.get("message", None),
                            retries=remote_file.get("retries", 0),
                        ),
                    )
                )

            repres.append(
                SiteSyncSummaryItem.construct(
                    folder=row["folder"],
                    product=row["product"],
                    version=row["version"],
                    representation=row["representation"],
                    representationId=row["representation_id"],
                    fileCount=file_count,
                    size=total_size,
                    localStatus=local_status,
                    remoteStatus=remote_status,
                    files=file_list,
                    version_id=row["version_id"]
                )
            )

        return SiteSyncSummaryModel(representations=repres)


    #
    # SET REPRESENTATION SYNC STATE
    #

    async def set_site_sync_representation_state(
        self,
        post_data: RepresentationStateModel,
        project_name: ProjectName,
        representation_id: RepresentationID,
        site_name: str = Path(...),  # TODO: add regex validator/dependency here! Important!

        # TODO: add CurrentUser dependency here! This endpoint is public now!
    ) -> Response:
        """Adds site information to representation.

        Called after integration to set initial state of representation files on
        sites.
        Called repeatedly during synchronization to update progress/store error
        message
        """
        await check_sync_status_table(project_name)

        priority = post_data.priority

        async with Postgres.acquire() as conn:
            async with conn.transaction():
                query = (
                    f"""
                    SELECT priority, data
                    FROM project_{project_name}.sitesync_files_status
                    WHERE representation_id = $1 AND site_name = $2
                    FOR UPDATE
                    """,
                    representation_id,
                    site_name,
                )

                result = await conn.fetch(*query)
                do_insert = False
                if not result:
                    do_insert = True
                    repre = await RepresentationEntity.load(
                        project_name, representation_id, transaction=conn
                    )

                    files = {}
                    for file_info in repre._payload.files:
                        fhash = file_info.hash
                        files[file_info.id] = {
                            "hash": fhash,
                            "status": StatusEnum.NOT_AVAILABLE,
                            "size": 0,
                            "timestamp": 0,
                        }
                else:
                    files = result[0]["data"].get("files")
                    if priority is None:
                        priority = result[0]["priority"]

                for posted_file in post_data.files:
                    posted_file_id = posted_file.id
                    if posted_file_id not in files:
                        logging.warning(f"{posted_file} not in files")
                        continue
                    files[posted_file_id]["timestamp"] = posted_file.timestamp
                    files[posted_file_id]["status"] = posted_file.status
                    files[posted_file_id]["size"] = posted_file.size

                    if posted_file.message:
                        files[posted_file_id]["message"] = posted_file.message
                    elif "message" in files[posted_file_id]:
                        del files[posted_file_id]["message"]

                    if posted_file.retries:
                        files[posted_file_id]["retries"] = posted_file.retries
                    elif "retries" in files[posted_file_id]:
                        del files[posted_file_id]["retries"]

                status = get_overal_status(files)

                if do_insert:
                    await conn.execute(
                        f"""
                        INSERT INTO project_{project_name}.sitesync_files_status
                        (representation_id, site_name, status, priority, data)
                        VALUES ($1, $2, $3, $4, $5)
                        """,
                        representation_id,
                        site_name,
                        status,
                        post_data.priority if post_data.priority is not None else 50,
                        {"files": files},
                    )
                else:
                    await conn.execute(
                        f"""
                        UPDATE project_{project_name}.sitesync_files_status
                        SET status = $1, data = $2, priority = $3
                        WHERE representation_id = $4 AND site_name = $5
                        """,
                        status,
                        {"files": files},
                        priority,
                        representation_id,
                        site_name,
                    )

        return Response(status_code=204)

    async def remove_site_sync_representation_state(
        self,
        project_name: ProjectName,
        user: CurrentUser,
        representation_id: RepresentationID,
        site_name: str = Path(...),  # TODO: add regex validator/dependency here! Important!
    ) -> Response:
        await check_sync_status_table(project_name)

        async with Postgres.acquire() as conn:
            async with conn.transaction():
                query = (
                    f"""
                    DELETE
                    FROM project_{project_name}.sitesync_files_status
                    WHERE representation_id = $1 AND site_name = $2
                    """,
                    representation_id,
                    site_name,
                )

                await conn.fetch(*query)

                return Response(status_code=204)

get_site_sync_state(project_name, user, representationIds=Query(None, description='Filter by representation ids', example="['57cf375c749611ed89de0242ac140004']"), repreNameFilter=Query(None, description='Filter by representation name'), localSite=Query(..., description='Name of the local site', example='Machine42'), remoteSite=Query(..., description='Name of the remote site', example='GDrive'), folderFilter=Query(None, description='Filter folders by name', example='sh042'), folderIdsFilter=Query(None, description='Filter folders by id, eg filtering by asset ids', example="['57cf375c749611ed89de0242ac140004']"), productFilter=Query(None, description='Filter products by name', example='animation'), versionFilter=Query(None, description='Filter products by version', example='1'), versionIdsFilter=Query(None, description='Filter versions by ids', example="['57cf375c749611ed89de0242ac140004']"), localStatusFilter=Query(None, description=f'List of states to show. Available options: {StatusEnum.__doc__}', example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS]), remoteStatusFilter=Query(None, description=f'List of states to show. Available options: {StatusEnum.__doc__}', example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS]), sortBy=Query(SortByEnum.folder, description='Sort the result by this value', example=SortByEnum.folder), sortDesc=Query(False, name='Sort descending', description='Sort the result in descending order'), bothOnly=Query(False, name='Query only with both sites', description='Used for front end UI to show only repres with both sides'), page=Query(1, ge=1), pageLength=Query(50, ge=1)) async

Return a site sync state.

Used for querying representations to be synchronized and state of versions and representations to show in Loader UI.

Source code in server/__init__.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
async def get_site_sync_state(
    self,
    project_name: ProjectName,
    user: CurrentUser,
    representationIds: list[str] | None = Query(
        None,
        description="Filter by representation ids",
        example="['57cf375c749611ed89de0242ac140004']",
    ),
    repreNameFilter: list[str] | None = Query(None,
        description="Filter by representation name"),
    localSite: str = Query(
        ...,
        description="Name of the local site",
        example="Machine42",
    ),
    remoteSite: str = Query(
        ...,
        description="Name of the remote site",
        example="GDrive",
    ),
    folderFilter: str | None = Query(
        None,
        description="Filter folders by name",
        example="sh042",
    ),
    folderIdsFilter: list[str] | None = Query(
        None,
        description="Filter folders by id, eg filtering by asset ids",
        example="['57cf375c749611ed89de0242ac140004']",
    ),
    productFilter: str | None = Query(
        None,
        description="Filter products by name",
        example="animation",
    ),
    versionFilter: int | None = Query(
        None,
        description="Filter products by version",
        example="1",
    ),
    versionIdsFilter: list[str] | None = Query(
        None,
        description="Filter versions by ids",
        example="['57cf375c749611ed89de0242ac140004']",
    ),
    localStatusFilter: list[StatusEnum] | None = Query(
        None,
        description=f"List of states to show. Available options: {StatusEnum.__doc__}",
        example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS],
    ),
    remoteStatusFilter: list[StatusEnum] | None = Query(
        None,
        description=f"List of states to show. Available options: {StatusEnum.__doc__}",
        example=[StatusEnum.QUEUED, StatusEnum.IN_PROGRESS],
    ),
    sortBy: SortByEnum = Query(
        SortByEnum.folder,
        description="Sort the result by this value",
        example=SortByEnum.folder,
    ),
    sortDesc: bool = Query(
        False,
        name="Sort descending",
        description="Sort the result in descending order",
    ),
    bothOnly: bool = Query(
        False,
        name="Query only with both sites",
        description="Used for front end UI to show only repres with"
                    " both sides",
    ),
    # Pagination
    page: int = Query(1, ge=1),
    pageLength: int = Query(50, ge=1),
) -> SiteSyncSummaryModel:
    """Return a site sync state.

    Used for querying representations to be synchronized and state of
    versions and representations to show in Loader UI.
    """
    await check_sync_status_table(project_name)
    conditions = []

    if representationIds is not None:
        conditions.append(f"r.id IN {SQLTool.array(representationIds)}")

    if folderFilter:
        conditions.append(f"f.name ILIKE '%{folderFilter}%'")

    if folderIdsFilter:
        conditions.append(f"f.id IN {SQLTool.array(folderIdsFilter)}")

    if productFilter:
        conditions.append(f"p.name ILIKE '%{productFilter}%'")

    if versionFilter:
        conditions.append(f"v.version = {versionFilter}")

    if versionIdsFilter:
        conditions.append(f"v.id IN {SQLTool.array(versionIdsFilter)}")

    if localStatusFilter:
        statusFilter = [str(s.value) for s in localStatusFilter]
        conditions.append(f"local.status IN ({','.join(statusFilter)})")

    if remoteStatusFilter:
        statusFilter = [str(s.value) for s in remoteStatusFilter]
        conditions.append(f"remote.status IN ({','.join(statusFilter)})")

    if repreNameFilter:
        conditions.append(f"r.name IN {SQLTool.array(repreNameFilter)}")

    access_list = await folder_access_list(user, project_name, "read")
    if access_list is not None:
        conditions.append(f"path like ANY ('{{ {','.join(access_list)} }}')")

    sites_join = "LEFT"
    if bothOnly:
        sites_join = "INNER"

    query = f"""
        SELECT
            f.name as folder,
            p.name as product,
            v.version as version,
            r.name as representation,
            h.path as path,

            r.id as representation_id,
            r.files as representation_files,
            local.data as local_data,
            remote.data as remote_data,
            local.status as localStatus,
            remote.status as remoteStatus,
            v.id as version_id
        FROM
            project_{project_name}.folders as f
        INNER JOIN
            project_{project_name}.products as p
            ON p.folder_id = f.id
        INNER JOIN
            project_{project_name}.versions as v
            ON v.product_id = p.id
        INNER JOIN
            project_{project_name}.representations as r
            ON r.version_id = v.id
        INNER JOIN
            project_{project_name}.hierarchy as h
            ON f.id = h.id
        {sites_join} JOIN
            project_{project_name}.sitesync_files_status as local
            ON local.representation_id = r.id
            AND local.site_name = '{localSite}'
        {sites_join} JOIN
            project_{project_name}.sitesync_files_status as remote
            ON remote.representation_id = r.id
            AND remote.site_name = '{remoteSite}'

        {SQLTool.conditions(conditions)}

        ORDER BY {sortBy.value} {'DESC' if sortDesc else 'ASC'}
        LIMIT {pageLength}
        OFFSET { (page-1) * pageLength }
    """
    repres = []

    async for row in Postgres.iterate(query):
        files = row["representation_files"]
        file_count = len(files)
        total_size = sum([f.get("size") for f in files])

        ldata = row["local_data"] or {}
        lfiles = ldata.get("files", {})
        lsize = sum([f.get("size") for f in lfiles.values()] or [0])
        ltime = max([f.get("timestamp") for f in lfiles.values()] or [0])

        rdata = row["remote_data"] or {}
        rfiles = rdata.get("files", {})
        rsize = sum([f.get("size") for f in rfiles.values()] or [0])
        rtime = max([f.get("timestamp") for f in rfiles.values()] or [0])

        local_status = SyncStatusModel(
            status=StatusEnum.NOT_AVAILABLE
            if row["localstatus"] is None
            else row["localstatus"],
            totalSize=total_size,
            size=lsize,
            timestamp=ltime,
        )
        remote_status = SyncStatusModel(
            status=StatusEnum.NOT_AVAILABLE
            if row["remotestatus"] is None
            else row["remotestatus"],
            totalSize=total_size,
            size=rsize,
            timestamp=rtime,
        )

        file_list = []
        for file_info in files:
            file_id = file_info["id"]
            local_file = lfiles.get(file_id, {})
            remote_file = rfiles.get(file_id, {})

            file_list.append(
                FileModel(
                    id=file_id,
                    fileHash=file_info["hash"],
                    size=file_info["size"],
                    path=file_info["path"],
                    baseName=os.path.split(file_info["path"])[1],
                    localStatus=SyncStatusModel(
                        status=local_file.get("status",
                                            StatusEnum.NOT_AVAILABLE),
                        size=local_file.get("size", 0),
                        totalSize=file_info["size"],
                        timestamp=local_file.get("timestamp", 0),
                        message=local_file.get("message", None),
                        retries=local_file.get("retries", 0),
                    ),
                    remoteStatus=SyncStatusModel(
                        status=remote_file.get("status",
                                            StatusEnum.NOT_AVAILABLE),
                        size=remote_file.get("size", 0),
                        totalSize=file_info["size"],
                        timestamp=remote_file.get("timestamp", 0),
                        message=remote_file.get("message", None),
                        retries=remote_file.get("retries", 0),
                    ),
                )
            )

        repres.append(
            SiteSyncSummaryItem.construct(
                folder=row["folder"],
                product=row["product"],
                version=row["version"],
                representation=row["representation"],
                representationId=row["representation_id"],
                fileCount=file_count,
                size=total_size,
                localStatus=local_status,
                remoteStatus=remote_status,
                files=file_list,
                version_id=row["version_id"]
            )
        )

    return SiteSyncSummaryModel(representations=repres)

set_site_sync_representation_state(post_data, project_name, representation_id, site_name=Path(...)) async

Adds site information to representation.

Called after integration to set initial state of representation files on sites. Called repeatedly during synchronization to update progress/store error message

Source code in server/__init__.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
async def set_site_sync_representation_state(
    self,
    post_data: RepresentationStateModel,
    project_name: ProjectName,
    representation_id: RepresentationID,
    site_name: str = Path(...),  # TODO: add regex validator/dependency here! Important!

    # TODO: add CurrentUser dependency here! This endpoint is public now!
) -> Response:
    """Adds site information to representation.

    Called after integration to set initial state of representation files on
    sites.
    Called repeatedly during synchronization to update progress/store error
    message
    """
    await check_sync_status_table(project_name)

    priority = post_data.priority

    async with Postgres.acquire() as conn:
        async with conn.transaction():
            query = (
                f"""
                SELECT priority, data
                FROM project_{project_name}.sitesync_files_status
                WHERE representation_id = $1 AND site_name = $2
                FOR UPDATE
                """,
                representation_id,
                site_name,
            )

            result = await conn.fetch(*query)
            do_insert = False
            if not result:
                do_insert = True
                repre = await RepresentationEntity.load(
                    project_name, representation_id, transaction=conn
                )

                files = {}
                for file_info in repre._payload.files:
                    fhash = file_info.hash
                    files[file_info.id] = {
                        "hash": fhash,
                        "status": StatusEnum.NOT_AVAILABLE,
                        "size": 0,
                        "timestamp": 0,
                    }
            else:
                files = result[0]["data"].get("files")
                if priority is None:
                    priority = result[0]["priority"]

            for posted_file in post_data.files:
                posted_file_id = posted_file.id
                if posted_file_id not in files:
                    logging.warning(f"{posted_file} not in files")
                    continue
                files[posted_file_id]["timestamp"] = posted_file.timestamp
                files[posted_file_id]["status"] = posted_file.status
                files[posted_file_id]["size"] = posted_file.size

                if posted_file.message:
                    files[posted_file_id]["message"] = posted_file.message
                elif "message" in files[posted_file_id]:
                    del files[posted_file_id]["message"]

                if posted_file.retries:
                    files[posted_file_id]["retries"] = posted_file.retries
                elif "retries" in files[posted_file_id]:
                    del files[posted_file_id]["retries"]

            status = get_overal_status(files)

            if do_insert:
                await conn.execute(
                    f"""
                    INSERT INTO project_{project_name}.sitesync_files_status
                    (representation_id, site_name, status, priority, data)
                    VALUES ($1, $2, $3, $4, $5)
                    """,
                    representation_id,
                    site_name,
                    status,
                    post_data.priority if post_data.priority is not None else 50,
                    {"files": files},
                )
            else:
                await conn.execute(
                    f"""
                    UPDATE project_{project_name}.sitesync_files_status
                    SET status = $1, data = $2, priority = $3
                    WHERE representation_id = $4 AND site_name = $5
                    """,
                    status,
                    {"files": files},
                    priority,
                    representation_id,
                    site_name,
                )

    return Response(status_code=204)

check_sync_status_table(project_name) async

Checks for existence of sitesync_files_status table, creates if not.

Source code in server/__init__.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
async def check_sync_status_table(project_name: str) -> None:
    """Checks for existence of `sitesync_files_status` table, creates if not."""
    await Postgres.execute(
        f"CREATE TABLE IF NOT EXISTS project_{project_name}.sitesync_files_status ("
        f"""representation_id UUID NOT NULL REFERENCES project_{project_name}.representations(id) ON DELETE CASCADE,
            site_name VARCHAR NOT NULL,
            status INTEGER NOT NULL DEFAULT -1,
            priority INTEGER NOT NULL DEFAULT 50,
            data JSONB NOT NULL DEFAULT '{{}}'::JSONB,
            PRIMARY KEY (representation_id, site_name)
        );"""
    )
    await Postgres.execute(f"CREATE INDEX IF NOT EXISTS file_status_idx ON project_{project_name}.sitesync_files_status(status);")
    await Postgres.execute(f"CREATE INDEX IF NOT EXISTS file_priority_idx ON project_{project_name}.sitesync_files_status(priority desc);")