Skip to content

file_transaction

DuplicateDestinationError

Bases: ValueError

Error raised when transfer destination already exists in queue.

The error is only raised if allow_queue_replacements is False on the FileTransaction instance and the added file to transfer is of a different src file than the one already detected in the queue.

Source code in client/ayon_core/lib/file_transaction.py
13
14
15
16
17
18
19
20
class DuplicateDestinationError(ValueError):
    """Error raised when transfer destination already exists in queue.

    The error is only raised if `allow_queue_replacements` is False on the
    FileTransaction instance and the added file to transfer is of a different
    src file than the one already detected in the queue.

    """

FileTransaction

File transaction with rollback options.

The file transaction is a three-step process.

1) Rename any existing files to a "temporary backup" during process() 2) Copy the files to final destination during process() 3) Remove any backed up files (*no rollback possible!) during finalize()

Step 3 is done during finalize(). If not called the .bak files will remain on disk.

These steps try to ensure that we don't overwrite half of any existing files e.g. if they are currently in use.

Note

A regular filesystem is not a transactional file system and even though this implementation tries to produce a 'safe copy' with a potential rollback do keep in mind that it's inherently unsafe due to how filesystem works and a myriad of things could happen during the transaction that break the logic. A file storage could go down, permissions could be changed, other machines could be moving or writing files. A lot can happen.

Warning

Any folders created during the transfer will not be removed.

Source code in client/ayon_core/lib/file_transaction.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class FileTransaction:
    """File transaction with rollback options.

    The file transaction is a three-step process.

    1) Rename any existing files to a "temporary backup" during `process()`
    2) Copy the files to final destination during `process()`
    3) Remove any backed up files (*no rollback possible!) during `finalize()`

    Step 3 is done during `finalize()`. If not called the .bak files will
    remain on disk.

    These steps try to ensure that we don't overwrite half of any existing
    files e.g. if they are currently in use.

    Note:
        A regular filesystem is *not* a transactional file system and even
        though this implementation tries to produce a 'safe copy' with a
        potential rollback do keep in mind that it's inherently unsafe due
        to how filesystem works and a myriad of things could happen during
        the transaction that break the logic. A file storage could go down,
        permissions could be changed, other machines could be moving or writing
        files. A lot can happen.

    Warning:
        Any folders created during the transfer will not be removed.
    """

    MODE_COPY = 0
    MODE_HARDLINK = 1

    def __init__(self, log=None, allow_queue_replacements=False):
        if log is None:
            log = logging.getLogger("FileTransaction")

        self.log = log

        # The transfer queue
        # todo: make this an actual FIFO queue?
        self._transfers = {}

        # Destination file paths that a file was transferred to
        self._transferred = []

        # Backup file location mapping to original locations
        self._backup_to_original = {}

        self._allow_queue_replacements = allow_queue_replacements

    def add(self, src, dst, mode=MODE_COPY):
        """Add a new file to transfer queue.

        Args:
            src (str): Source path.
            dst (str): Destination path.
            mode (MODE_COPY, MODE_HARDLINK): Transfer mode.
        """

        opts = {"mode": mode}

        src = os.path.normpath(os.path.abspath(src))
        dst = os.path.normpath(os.path.abspath(dst))

        if dst in self._transfers:
            queued_src = self._transfers[dst][0]
            if src == queued_src:
                self.log.debug(
                    "File transfer was already in queue: {} -> {}".format(
                        src, dst))
                return
            else:
                if not self._allow_queue_replacements:
                    raise DuplicateDestinationError(
                        "Transfer to destination is already in queue: "
                        "{} -> {}. It's not allowed to be replaced by "
                        "a new transfer from {}".format(
                            queued_src, dst, src
                        ))

                self.log.warning("File transfer in queue replaced..")
                self.log.debug(
                    "Removed from queue: {} -> {} replaced by {} -> {}".format(
                        queued_src, dst, src, dst))

        self._transfers[dst] = (src, opts)

    def process(self):
        with ThreadPoolExecutor(max_workers=8) as executor:
            # Submit backup tasks
            backup_futures = [
                executor.submit(self._backup_file, dst, src)
                for dst, (src, _) in self._transfers.items()
            ]
            wait_for_future_errors(
                executor, backup_futures, logger=self.log)

            # Submit transfer tasks
            transfer_futures = [
                executor.submit(self._transfer_file, dst, src, opts)
                for dst, (src, opts) in self._transfers.items()
            ]
            wait_for_future_errors(
                executor, transfer_futures, logger=self.log)

    def _backup_file(self, dst, src):
        self.log.debug(f"Checking file ... {src} -> {dst}")
        path_same = self._same_paths(src, dst)
        if path_same or not os.path.exists(dst):
            return

        # Backup original file
        backup = dst + ".bak"
        self._backup_to_original[backup] = dst
        self.log.debug(f"Backup existing file: {dst} -> {backup}")
        os.rename(dst, backup)

    def _transfer_file(self, dst, src, opts):
        path_same = self._same_paths(src, dst)
        if path_same:
            self.log.debug(
                f"Source and destination are same files {src} -> {dst}")
            return

        self._create_folder_for_file(dst)

        if opts["mode"] == self.MODE_COPY:
            self.log.debug(f"Copying file ... {src} -> {dst}")
            copyfile(src, dst)
        elif opts["mode"] == self.MODE_HARDLINK:
            self.log.debug(f"Hardlinking file ... {src} -> {dst}")
            create_hard_link(src, dst)

        self._transferred.append(dst)

    def finalize(self):
        # Delete any backed up files
        for backup in self._backup_to_original.keys():
            try:
                os.remove(backup)
            except OSError:
                self.log.error(
                    "Failed to remove backup file: {}".format(backup),
                    exc_info=True)

    def rollback(self):
        errors = 0
        last_exc = None
        # Rollback any transferred files
        for path in self._transferred:
            try:
                os.remove(path)
            except OSError as exc:
                last_exc = exc
                errors += 1
                self.log.error(
                    "Failed to rollback created file: {}".format(path),
                    exc_info=True)

        # Rollback the backups
        for backup, original in self._backup_to_original.items():
            try:
                os.rename(backup, original)
            except OSError as exc:
                last_exc = exc
                errors += 1
                self.log.error(
                    "Failed to restore original file: {} -> {}".format(
                        backup, original),
                    exc_info=True)

        if errors:
            self.log.error(
                "{} errors occurred during rollback.".format(errors),
                exc_info=True)
            raise last_exc

    @property
    def transferred(self):
        """Return the processed transfers destination paths"""
        return list(self._transferred)

    @property
    def backups(self):
        """Return the backup file paths"""
        return list(self._backup_to_original.keys())

    def _create_folder_for_file(self, path):
        dirname = os.path.dirname(path)
        try:
            os.makedirs(dirname)
        except OSError as e:
            if e.errno != errno.EEXIST:
                self.log.critical("An unexpected error occurred.")
                raise e

    def _same_paths(self, src, dst):
        # handles same paths but with C:/project vs c:/project
        if os.path.exists(src) and os.path.exists(dst):
            return os.stat(src) == os.stat(dst)

        return src == dst

backups property

Return the backup file paths

transferred property

Return the processed transfers destination paths

add(src, dst, mode=MODE_COPY)

Add a new file to transfer queue.

Parameters:

Name Type Description Default
src str

Source path.

required
dst str

Destination path.

required
mode (MODE_COPY, MODE_HARDLINK)

Transfer mode.

MODE_COPY
Source code in client/ayon_core/lib/file_transaction.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def add(self, src, dst, mode=MODE_COPY):
    """Add a new file to transfer queue.

    Args:
        src (str): Source path.
        dst (str): Destination path.
        mode (MODE_COPY, MODE_HARDLINK): Transfer mode.
    """

    opts = {"mode": mode}

    src = os.path.normpath(os.path.abspath(src))
    dst = os.path.normpath(os.path.abspath(dst))

    if dst in self._transfers:
        queued_src = self._transfers[dst][0]
        if src == queued_src:
            self.log.debug(
                "File transfer was already in queue: {} -> {}".format(
                    src, dst))
            return
        else:
            if not self._allow_queue_replacements:
                raise DuplicateDestinationError(
                    "Transfer to destination is already in queue: "
                    "{} -> {}. It's not allowed to be replaced by "
                    "a new transfer from {}".format(
                        queued_src, dst, src
                    ))

            self.log.warning("File transfer in queue replaced..")
            self.log.debug(
                "Removed from queue: {} -> {} replaced by {} -> {}".format(
                    queued_src, dst, src, dst))

    self._transfers[dst] = (src, opts)

wait_for_future_errors(executor, futures, logger=None)

For the ThreadPoolExecutor shutdown and cancel futures as soon one of the workers raises an error as they complete.

The ThreadPoolExecutor only cancels pending futures on exception but will still complete those that are running - each which also themselves could fail. We log all exceptions but re-raise the last exception only.

Source code in client/ayon_core/lib/file_transaction.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def wait_for_future_errors(
        executor: ThreadPoolExecutor,
        futures: List[Future],
        logger: Optional[logging.Logger] = None):
    """For the ThreadPoolExecutor shutdown and cancel futures as soon one of
    the workers raises an error as they complete.

    The ThreadPoolExecutor only cancels pending futures on exception but will
    still complete those that are running - each which also themselves could
    fail. We log all exceptions but re-raise the last exception only.
    """
    if logger is None:
        logger = logging.getLogger(__name__)

    for future in concurrent.futures.as_completed(futures):
        exception = future.exception()
        if exception:
            # As soon as an error occurs, stop executing more futures.
            # Running workers, however, will still be complete, so we also want
            # to log those errors if any occurred on them.
            executor.shutdown(wait=True, cancel_futures=True)
            break
    else:
        # Futures are completed, no exceptions occurred
        return

    # An exception occurred in at least one future. Get exceptions from
    # all futures that are done and ended up failing until that point.
    exceptions = []
    for future in futures:
        if not future.cancelled() and future.done():
            exception = future.exception()
            if exception:
                exceptions.append(exception)

    # Log any exceptions that occurred in all workers
    for exception in exceptions:
        logger.error("Error occurred in worker", exc_info=exception)

    # Raise the last exception
    raise exceptions[-1]