Skip to content

otio_burnin

ModifiedBurnins

Bases: Burnins

This is modification of OTIO FFmpeg Burnin adapter. - requires FFmpeg in PATH

Offers 6 positions for burnin text. Each can be set with: - static text - frames - timecode

Options - dictionary which sets the final look. - Datatypes explanation: string format must be supported by FFmpeg. Examples: "#000000", "0x000000", "black" must be accesible by ffmpeg = name of registered Font in system or path to font file. Examples: "Arial", "C:/Windows/Fonts/arial.ttf"

  • Possible keys: "opacity" - Opacity of text - "bg_opacity" - Opacity of background (box around text) - "bg_color" - Background color - "bg_padding" - Background padding in pixels - "x_offset" - offsets burnin vertically by entered pixels from border - "y_offset" - offsets burnin horizontally by entered pixels from border -
  • x_offset & y_offset should be set at least to same value as bg_padding!! "font" - Font Family for text - "font_size" - Font size in pixels - "font_color" - Color of text - "frame_offset" - Default start frame -
    • required IF start frame is not set when using frames or timecode burnins

On initializing class can be set General options through "options_init" arg. General options can be overridden when adding burnin.

Source code in client/ayon_core/scripts/otio_burnin.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
class ModifiedBurnins(ffmpeg_burnins.Burnins):
    '''
    This is modification of OTIO FFmpeg Burnin adapter.
    - requires FFmpeg in PATH

    Offers 6 positions for burnin text. Each can be set with:
    - static text
    - frames
    - timecode

    Options - dictionary which sets the final look.
    - Datatypes explanation:
    <color> string format must be supported by FFmpeg.
        Examples: "#000000", "0x000000", "black"
    <font> must be accesible by ffmpeg = name of registered Font in system
        or path to font file.
        Examples: "Arial", "C:/Windows/Fonts/arial.ttf"

    - Possible keys:
    "opacity" - Opacity of text - <float, Range:0-1>
    "bg_opacity" - Opacity of background (box around text) - <float, Range:0-1>
    "bg_color" - Background color - <color>
    "bg_padding" - Background padding in pixels - <int>
    "x_offset" - offsets burnin vertically by entered pixels
        from border - <int>
    "y_offset" - offsets burnin horizontally by entered pixels
        from border - <int>
    - x_offset & y_offset should be set at least to same value as bg_padding!!
    "font" - Font Family for text - <font>
    "font_size" - Font size in pixels - <int>
    "font_color" - Color of text - <color>
    "frame_offset" - Default start frame - <int>
        - required IF start frame is not set when using frames
          or timecode burnins

    On initializing class can be set General options through
        "options_init" arg.
    General options can be overridden when adding burnin.

    '''
    TOP_CENTERED = ffmpeg_burnins.TOP_CENTERED
    BOTTOM_CENTERED = ffmpeg_burnins.BOTTOM_CENTERED
    TOP_LEFT = ffmpeg_burnins.TOP_LEFT
    BOTTOM_LEFT = ffmpeg_burnins.BOTTOM_LEFT
    TOP_RIGHT = ffmpeg_burnins.TOP_RIGHT
    BOTTOM_RIGHT = ffmpeg_burnins.BOTTOM_RIGHT

    options_init = {
        'opacity': 1,
        'x_offset': 5,
        'y_offset': 5,
        'bg_padding': 5,
        'bg_opacity': 0.5,
        'font_size': 42
    }

    def __init__(
        self, source, ffprobe_data=None, options_init=None, first_frame=None
    ):
        if not ffprobe_data:
            ffprobe_data = _get_ffprobe_data(source)

        # Validate 'streams' before calling super to raise more specific
        #   error
        source_streams = ffprobe_data.get("streams")
        if not source_streams:
            raise ValueError((
                "Input file \"{}\" does not contain any streams"
                " with image/video content."
            ).format(source))

        self.ffprobe_data = ffprobe_data
        self.first_frame = first_frame
        self.input_args = []
        self.cleanup_paths = []

        super().__init__(source, source_streams)

        if options_init:
            self.options_init.update(options_init)

    def add_text(
        self,
        text,
        align,
        frame_start=None,
        frame_end=None,
        options=None,
    ):
        """
        Adding static text to a filter.

        :param str text: text to apply to the drawtext
        :param enum align: alignment, must use provided enum flags
        :param int frame_start: starting frame for burnins current frame
        :param dict options: recommended to use TextOptions
        """
        if not options:
            options = ffmpeg_burnins.TextOptions(**self.options_init)

        options = options.copy()
        if frame_start is not None:
            options["frame_offset"] = frame_start

        # `frame_end` is only for meassurements of text position
        if frame_end is not None:
            options["frame_end"] = frame_end


        options["label"] = align
        self._add_burnin(text, align, options, DRAWTEXT)

    def add_timecode(
        self, align, frame_start=None, frame_end=None, frame_start_tc=None,
        text=None, options=None
    ):
        """
        Convenience method to create the frame number expression.

        :param enum align: alignment, must use provided enum flags
        :param int frame_start:  starting frame for burnins current frame
        :param int frame_start_tc:  starting frame for burnins timecode
        :param str text: text that will be before timecode
        :param dict options: recommended to use TimeCodeOptions
        """
        if not options:
            options = ffmpeg_burnins.TimeCodeOptions(**self.options_init)

        options = options.copy()
        if frame_start is not None:
            options["frame_offset"] = frame_start

        # `frame_end` is only for meassurements of text position
        if frame_end is not None:
            options["frame_end"] = frame_end

        if not frame_start_tc:
            frame_start_tc = options["frame_offset"]

        if not text:
            text = ""

        if not options.get("fps"):
            options["fps"] = self.frame_rate

        if isinstance(frame_start_tc, str):
            options["timecode"] = frame_start_tc
        else:
            options["timecode"] = ffmpeg_burnins._frames_to_timecode(
                frame_start_tc,
                self.frame_rate
            )

        self._add_burnin(text, align, options, TIMECODE)

    def add_per_frame_text(
        self,
        text,
        align,
        frame_start,
        frame_end,
        listed_keys,
        options=None
    ):
        """Add text that changes per frame.

        Args:
            text (str): Template string with unfilled keys that are changed
                per frame.
            align (str): Alignment of text.
            frame_start (int): Starting frame for burnins current frame.
            frame_end (int): Ending frame for burnins current frame.
            listed_keys (list): List of keys that are changed per frame.
            options (Optional[dict]): Options to affect style of burnin.
        """

        if not options:
            options = ffmpeg_burnins.TimeCodeOptions(**self.options_init)

        options = options.copy()
        if frame_start is None:
            frame_start = options["frame_offset"]

        # `frame_end` is only for meassurements of text position
        if frame_end is None:
            frame_end = options["frame_end"]

        fps = options.get("fps")
        if not fps:
            fps = self.frame_rate

        text_for_size = text
        if CURRENT_FRAME_SPLITTER in text:
            expr = self._get_current_frame_expression(frame_start, frame_end)
            if expr is None:
                expr = MISSING_KEY_VALUE
                text_for_size = text_for_size.replace(
                    CURRENT_FRAME_SPLITTER, MISSING_KEY_VALUE)
            text = text.replace(CURRENT_FRAME_SPLITTER, expr)

        # Find longest list with values
        longest_list_len = max(
            len(item["values"]) for item in listed_keys.values()
        )
        # Where to store formatted values per frame by key
        new_listed_keys = [{} for _ in range(longest_list_len)]
        # Find the longest value per fill key.
        #   The longest value is used to determine size of burnin box.
        longest_value_by_key = {}
        for key, item in listed_keys.items():
            values = item["values"]
            # Fill the missing values from the longest list with the last
            #   value to make sure all values have same "frame count"
            last_value = values[-1] if values else ""
            for _ in range(longest_list_len - len(values)):
                values.append(last_value)

            # Prepare dictionary structure for nestes values
            # - last key is overriden on each frame loop
            item_keys = list(item["keys"])
            fill_data = {}
            sub_value = fill_data
            last_item_key = item_keys.pop(-1)
            for item_key in item_keys:
                sub_value[item_key] = {}
                sub_value = sub_value[item_key]

            # Fill value per frame
            key_max_len = 0
            key_max_value = ""
            for value, new_values in zip(values, new_listed_keys):
                sub_value[last_item_key] = value
                try:
                    value = key.format(**sub_value)
                except (TypeError, KeyError, ValueError):
                    value = MISSING_KEY_VALUE
                new_values[key] = value

                value_len = len(value)
                if value_len > key_max_len:
                    key_max_value = value
                    key_max_len = value_len

            # Store the longest value
            longest_value_by_key[key] = key_max_value

        # Make sure the longest value of each key is replaced for text size
        #   calculation
        for key, value in longest_value_by_key.items():
            text_for_size = text_for_size.replace(key, value)

        # Create temp file with instructions for each frame of text
        lines = []
        for frame, value in enumerate(new_listed_keys):
            seconds = float(frame) / fps
            # Escape special character
            new_text = text
            for _key, _value in value.items():
                _value = str(_value)
                new_text = new_text.replace(_key, str(_value))

            new_text = (
                str(new_text)
                .replace("\\", "\\\\")
                .replace(",", "\\,")
                .replace(":", "\\:")
            )
            lines.append(
                f"{seconds} drawtext@{align} reinit text='{new_text}';")

        with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp:
            path = temp.name
            temp.write("\n".join(lines))

        self.cleanup_paths.append(path)
        self.filters["drawtext"].append("sendcmd=f='{}'".format(
            path.replace("\\", "/").replace(":", "\\:")
        ))
        self.add_text(text_for_size, align, frame_start, frame_end, options)

    def _get_current_frame_expression(self, frame_start, frame_end):
        if frame_start is None:
            return None
        return (
            "%{eif:n+" + str(frame_start)
            + ":d:" + str(len(str(frame_end))) + "}"
        )

    def _add_burnin(self, text, align, options, draw):
        """
        Generic method for building the filter flags.
        :param str text: text to apply to the drawtext
        :param enum align: alignment, must use provided enum flags
        :param dict options:
        """

        final_text = text
        text_for_size = text
        if CURRENT_FRAME_SPLITTER in text:
            frame_start = options["frame_offset"]
            frame_end = options.get("frame_end", frame_start)
            expr = self._get_current_frame_expression(frame_start, frame_end)
            if expr is not None:
                max_length = len(str(frame_end))
                # Use number '8' length times for replacement
                size_replacement = max_length * "8"
            else:
                expr = size_replacement = MISSING_KEY_VALUE

            final_text = final_text.replace(
                CURRENT_FRAME_SPLITTER, expr
            )
            text_for_size = text_for_size.replace(
                CURRENT_FRAME_SPLITTER, size_replacement
            )

        resolution = self.resolution
        data = {
            'text': (
                final_text
                .replace(",", r"\,")
                .replace(':', r'\:')
            ),
            'color': options['font_color'],
            'size': options['font_size']
        }
        timecode_text = options.get("timecode") or ""
        text_for_size += timecode_text

        font_path = options.get("font")
        if not font_path or not os.path.exists(font_path):
            font_path = ffmpeg_burnins.FONT

        options["font"] = font_path

        data.update(options)
        data.update(
            ffmpeg_burnins._drawtext(align, resolution, text_for_size, options)
        )

        arg_font_path = (
            font_path
            .replace("\\", "\\\\")
            .replace(':', r'\:')
        )
        data["font"] = arg_font_path

        self.filters['drawtext'].append(draw % data)

        if options.get('bg_color') is not None:
            box = ffmpeg_burnins.BOX % {
                'border': options['bg_padding'],
                'color': options['bg_color'],
                'opacity': options['bg_opacity']
            }
            self.filters['drawtext'][-1] += ':%s' % box

    def command(self, output=None, args=None, overwrite=False):
        """
        Generate the entire FFMPEG command.

        :param str output: output file
        :param str args: additional FFMPEG arguments
        :param bool overwrite: overwrite the output if it exists
        :returns: completed command
        :rtype: str
        """
        output = '"{}"'.format(output or '')
        if overwrite:
            output = '-y {}'.format(output)

        filters = ""
        filter_string = self.filter_string
        if filter_string:
            with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp:
                temp.write(filter_string)
                filters_path = temp.name
            filters = '-filter_script:v "{}"'.format(filters_path)
            print("Filters:", filter_string)
            self.cleanup_paths.append(filters_path)

        if self.first_frame is not None:
            start_number_arg = "-start_number {}".format(self.first_frame)
            self.input_args.append(start_number_arg)
            if "start_number" not in args:
                if not args:
                    args = start_number_arg
                else:
                    args = " ".join((start_number_arg, args))

        input_args = ""
        if self.input_args:
            input_args = " {}".format(" ".join(self.input_args))

        return (FFMPEG % {
            'input_args': input_args,
            'input': self.source,
            'output': output,
            'args': '%s ' % args if args else '',
            'filters': filters
        }).strip()

    def render(self, output, args=None, overwrite=False, **kwargs):
        """
        Render the media to a specified destination.

        :param str output: output file
        :param str args: additional FFMPEG arguments
        :param bool overwrite: overwrite the output if it exists
        """
        if not overwrite and os.path.exists(output):
            raise RuntimeError("Destination '%s' exists, please "
                               "use overwrite" % output)

        is_sequence = "%" in output

        command = self.command(
            output=output,
            args=args,
            overwrite=overwrite
        )
        print("Launching command: {}".format(command))

        use_shell = True
        try:
            test_proc = subprocess.Popen(
                f"{FFMPEG_EXE_COMMAND} --help", shell=True
            )
            test_proc.wait()
        except BaseException:
            use_shell = False

        kwargs = {
            "stdout": subprocess.PIPE,
            "stderr": subprocess.PIPE,
            "shell": use_shell,
        }
        proc = subprocess.Popen(command, **kwargs)

        _stdout, _stderr = proc.communicate()
        if _stdout:
            print(_stdout.decode("utf-8", errors="backslashreplace"))

        # This will probably never happen as ffmpeg use stdout
        if _stderr:
            print(_stderr.decode("utf-8", errors="backslashreplace"))

        if proc.returncode != 0:
            raise RuntimeError(
                "Failed to render '{}': {}'".format(output, command)
            )
        if is_sequence:
            output = output % kwargs.get("duration")

        if not os.path.exists(output):
            raise RuntimeError(
                "Failed to generate this f*cking file '%s'" % output
            )

        for path in self.cleanup_paths:
            if os.path.exists(path):
                os.remove(path)

add_per_frame_text(text, align, frame_start, frame_end, listed_keys, options=None)

Add text that changes per frame.

Parameters:

Name Type Description Default
text str

Template string with unfilled keys that are changed per frame.

required
align str

Alignment of text.

required
frame_start int

Starting frame for burnins current frame.

required
frame_end int

Ending frame for burnins current frame.

required
listed_keys list

List of keys that are changed per frame.

required
options Optional[dict]

Options to affect style of burnin.

None
Source code in client/ayon_core/scripts/otio_burnin.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def add_per_frame_text(
    self,
    text,
    align,
    frame_start,
    frame_end,
    listed_keys,
    options=None
):
    """Add text that changes per frame.

    Args:
        text (str): Template string with unfilled keys that are changed
            per frame.
        align (str): Alignment of text.
        frame_start (int): Starting frame for burnins current frame.
        frame_end (int): Ending frame for burnins current frame.
        listed_keys (list): List of keys that are changed per frame.
        options (Optional[dict]): Options to affect style of burnin.
    """

    if not options:
        options = ffmpeg_burnins.TimeCodeOptions(**self.options_init)

    options = options.copy()
    if frame_start is None:
        frame_start = options["frame_offset"]

    # `frame_end` is only for meassurements of text position
    if frame_end is None:
        frame_end = options["frame_end"]

    fps = options.get("fps")
    if not fps:
        fps = self.frame_rate

    text_for_size = text
    if CURRENT_FRAME_SPLITTER in text:
        expr = self._get_current_frame_expression(frame_start, frame_end)
        if expr is None:
            expr = MISSING_KEY_VALUE
            text_for_size = text_for_size.replace(
                CURRENT_FRAME_SPLITTER, MISSING_KEY_VALUE)
        text = text.replace(CURRENT_FRAME_SPLITTER, expr)

    # Find longest list with values
    longest_list_len = max(
        len(item["values"]) for item in listed_keys.values()
    )
    # Where to store formatted values per frame by key
    new_listed_keys = [{} for _ in range(longest_list_len)]
    # Find the longest value per fill key.
    #   The longest value is used to determine size of burnin box.
    longest_value_by_key = {}
    for key, item in listed_keys.items():
        values = item["values"]
        # Fill the missing values from the longest list with the last
        #   value to make sure all values have same "frame count"
        last_value = values[-1] if values else ""
        for _ in range(longest_list_len - len(values)):
            values.append(last_value)

        # Prepare dictionary structure for nestes values
        # - last key is overriden on each frame loop
        item_keys = list(item["keys"])
        fill_data = {}
        sub_value = fill_data
        last_item_key = item_keys.pop(-1)
        for item_key in item_keys:
            sub_value[item_key] = {}
            sub_value = sub_value[item_key]

        # Fill value per frame
        key_max_len = 0
        key_max_value = ""
        for value, new_values in zip(values, new_listed_keys):
            sub_value[last_item_key] = value
            try:
                value = key.format(**sub_value)
            except (TypeError, KeyError, ValueError):
                value = MISSING_KEY_VALUE
            new_values[key] = value

            value_len = len(value)
            if value_len > key_max_len:
                key_max_value = value
                key_max_len = value_len

        # Store the longest value
        longest_value_by_key[key] = key_max_value

    # Make sure the longest value of each key is replaced for text size
    #   calculation
    for key, value in longest_value_by_key.items():
        text_for_size = text_for_size.replace(key, value)

    # Create temp file with instructions for each frame of text
    lines = []
    for frame, value in enumerate(new_listed_keys):
        seconds = float(frame) / fps
        # Escape special character
        new_text = text
        for _key, _value in value.items():
            _value = str(_value)
            new_text = new_text.replace(_key, str(_value))

        new_text = (
            str(new_text)
            .replace("\\", "\\\\")
            .replace(",", "\\,")
            .replace(":", "\\:")
        )
        lines.append(
            f"{seconds} drawtext@{align} reinit text='{new_text}';")

    with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp:
        path = temp.name
        temp.write("\n".join(lines))

    self.cleanup_paths.append(path)
    self.filters["drawtext"].append("sendcmd=f='{}'".format(
        path.replace("\\", "/").replace(":", "\\:")
    ))
    self.add_text(text_for_size, align, frame_start, frame_end, options)

add_text(text, align, frame_start=None, frame_end=None, options=None)

Adding static text to a filter.

:param str text: text to apply to the drawtext :param enum align: alignment, must use provided enum flags :param int frame_start: starting frame for burnins current frame :param dict options: recommended to use TextOptions

Source code in client/ayon_core/scripts/otio_burnin.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def add_text(
    self,
    text,
    align,
    frame_start=None,
    frame_end=None,
    options=None,
):
    """
    Adding static text to a filter.

    :param str text: text to apply to the drawtext
    :param enum align: alignment, must use provided enum flags
    :param int frame_start: starting frame for burnins current frame
    :param dict options: recommended to use TextOptions
    """
    if not options:
        options = ffmpeg_burnins.TextOptions(**self.options_init)

    options = options.copy()
    if frame_start is not None:
        options["frame_offset"] = frame_start

    # `frame_end` is only for meassurements of text position
    if frame_end is not None:
        options["frame_end"] = frame_end


    options["label"] = align
    self._add_burnin(text, align, options, DRAWTEXT)

add_timecode(align, frame_start=None, frame_end=None, frame_start_tc=None, text=None, options=None)

Convenience method to create the frame number expression.

:param enum align: alignment, must use provided enum flags :param int frame_start: starting frame for burnins current frame :param int frame_start_tc: starting frame for burnins timecode :param str text: text that will be before timecode :param dict options: recommended to use TimeCodeOptions

Source code in client/ayon_core/scripts/otio_burnin.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def add_timecode(
    self, align, frame_start=None, frame_end=None, frame_start_tc=None,
    text=None, options=None
):
    """
    Convenience method to create the frame number expression.

    :param enum align: alignment, must use provided enum flags
    :param int frame_start:  starting frame for burnins current frame
    :param int frame_start_tc:  starting frame for burnins timecode
    :param str text: text that will be before timecode
    :param dict options: recommended to use TimeCodeOptions
    """
    if not options:
        options = ffmpeg_burnins.TimeCodeOptions(**self.options_init)

    options = options.copy()
    if frame_start is not None:
        options["frame_offset"] = frame_start

    # `frame_end` is only for meassurements of text position
    if frame_end is not None:
        options["frame_end"] = frame_end

    if not frame_start_tc:
        frame_start_tc = options["frame_offset"]

    if not text:
        text = ""

    if not options.get("fps"):
        options["fps"] = self.frame_rate

    if isinstance(frame_start_tc, str):
        options["timecode"] = frame_start_tc
    else:
        options["timecode"] = ffmpeg_burnins._frames_to_timecode(
            frame_start_tc,
            self.frame_rate
        )

    self._add_burnin(text, align, options, TIMECODE)

command(output=None, args=None, overwrite=False)

Generate the entire FFMPEG command.

:param str output: output file :param str args: additional FFMPEG arguments :param bool overwrite: overwrite the output if it exists :returns: completed command :rtype: str

Source code in client/ayon_core/scripts/otio_burnin.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def command(self, output=None, args=None, overwrite=False):
    """
    Generate the entire FFMPEG command.

    :param str output: output file
    :param str args: additional FFMPEG arguments
    :param bool overwrite: overwrite the output if it exists
    :returns: completed command
    :rtype: str
    """
    output = '"{}"'.format(output or '')
    if overwrite:
        output = '-y {}'.format(output)

    filters = ""
    filter_string = self.filter_string
    if filter_string:
        with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp:
            temp.write(filter_string)
            filters_path = temp.name
        filters = '-filter_script:v "{}"'.format(filters_path)
        print("Filters:", filter_string)
        self.cleanup_paths.append(filters_path)

    if self.first_frame is not None:
        start_number_arg = "-start_number {}".format(self.first_frame)
        self.input_args.append(start_number_arg)
        if "start_number" not in args:
            if not args:
                args = start_number_arg
            else:
                args = " ".join((start_number_arg, args))

    input_args = ""
    if self.input_args:
        input_args = " {}".format(" ".join(self.input_args))

    return (FFMPEG % {
        'input_args': input_args,
        'input': self.source,
        'output': output,
        'args': '%s ' % args if args else '',
        'filters': filters
    }).strip()

render(output, args=None, overwrite=False, **kwargs)

Render the media to a specified destination.

:param str output: output file :param str args: additional FFMPEG arguments :param bool overwrite: overwrite the output if it exists

Source code in client/ayon_core/scripts/otio_burnin.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
def render(self, output, args=None, overwrite=False, **kwargs):
    """
    Render the media to a specified destination.

    :param str output: output file
    :param str args: additional FFMPEG arguments
    :param bool overwrite: overwrite the output if it exists
    """
    if not overwrite and os.path.exists(output):
        raise RuntimeError("Destination '%s' exists, please "
                           "use overwrite" % output)

    is_sequence = "%" in output

    command = self.command(
        output=output,
        args=args,
        overwrite=overwrite
    )
    print("Launching command: {}".format(command))

    use_shell = True
    try:
        test_proc = subprocess.Popen(
            f"{FFMPEG_EXE_COMMAND} --help", shell=True
        )
        test_proc.wait()
    except BaseException:
        use_shell = False

    kwargs = {
        "stdout": subprocess.PIPE,
        "stderr": subprocess.PIPE,
        "shell": use_shell,
    }
    proc = subprocess.Popen(command, **kwargs)

    _stdout, _stderr = proc.communicate()
    if _stdout:
        print(_stdout.decode("utf-8", errors="backslashreplace"))

    # This will probably never happen as ffmpeg use stdout
    if _stderr:
        print(_stderr.decode("utf-8", errors="backslashreplace"))

    if proc.returncode != 0:
        raise RuntimeError(
            "Failed to render '{}': {}'".format(output, command)
        )
    if is_sequence:
        output = output % kwargs.get("duration")

    if not os.path.exists(output):
        raise RuntimeError(
            "Failed to generate this f*cking file '%s'" % output
        )

    for path in self.cleanup_paths:
        if os.path.exists(path):
            os.remove(path)

burnins_from_data(input_path, output_path, data, codec_data=None, options=None, burnin_values=None, overwrite=True, full_input_path=None, first_frame=None, source_ffmpeg_cmd=None)

This method adds burnins to video/image file based on presets setting.

Extension of output MUST be same as input. (mov -> mov, avi -> avi,...)

Parameters:

Name Type Description Default
input_path str

Full path to input file where burnins should be add.

required
output_path str

Full path to output file where output will be rendered.

required
data dict

Data required for burnin settings (more info below).

required
codec_data list

All codec related arguments in list.

None
options dict

Options for burnins.

None
burnin_values dict

Contain positioned values.

None
overwrite bool

Output will be overwritten if already exists, True by default.

True

Presets must be set separately. Should be dict with 2 keys: - "options" - sets look of burnins - colors, opacity,... (more info: ModifiedBurnins doc) - OPTIONAL default values are used when not included - "burnins" - contains dictionary with burnins settings - OPTIONAL burnins won't be added (easier is not to use this) - each key of "burnins" represents Alignment, there are 6 possibilities: TOP_LEFT TOP_CENTERED TOP_RIGHT BOTTOM_LEFT BOTTOM_CENTERED BOTTOM_RIGHT - value must be string with text you want to burn-in - text may contain specific formatting keys (exmplained below)

Requirement of data keys is based on presets. - "frame_start" - is required when "timecode" or "current_frame" ins keys - "frame_start_tc" - when "timecode" should start with different frame - keys for static text

EXAMPLE: preset = { "options": {OPTIONS FOR LOOK}, "burnins": { "TOP_LEFT": "static_text", "TOP_RIGHT": "{shot}", "BOTTOM_LEFT": "TC: {timecode}", "BOTTOM_RIGHT": "{frame_start}{current_frame}" } }

For this preset we'll need at least this data: data = { "frame_start": 1001, "shot": "sh0010" }

When Timecode should start from 1 then data need: data = { "frame_start": 1001, "frame_start_tc": 1, "shot": "sh0010" }

Source code in client/ayon_core/scripts/otio_burnin.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
def burnins_from_data(
    input_path, output_path, data,
    codec_data=None, options=None, burnin_values=None, overwrite=True,
    full_input_path=None, first_frame=None, source_ffmpeg_cmd=None
):
    """This method adds burnins to video/image file based on presets setting.

    Extension of output MUST be same as input. (mov -> mov, avi -> avi,...)

    Args:
        input_path (str): Full path to input file where burnins should be add.
        output_path (str): Full path to output file where output will be
            rendered.
        data (dict): Data required for burnin settings (more info below).
        codec_data (list): All codec related arguments in list.
        options (dict): Options for burnins.
        burnin_values (dict): Contain positioned values.
        overwrite (bool): Output will be overwritten if already exists,
            True by default.

    Presets must be set separately. Should be dict with 2 keys:
    - "options" - sets look of burnins - colors, opacity,...
        (more info: ModifiedBurnins doc)
                - *OPTIONAL* default values are used when not included
    - "burnins" - contains dictionary with burnins settings
                - *OPTIONAL* burnins won't be added (easier is not to use this)
        - each key of "burnins" represents Alignment,
        there are 6 possibilities:
            TOP_LEFT        TOP_CENTERED        TOP_RIGHT
            BOTTOM_LEFT     BOTTOM_CENTERED     BOTTOM_RIGHT
        - value must be string with text you want to burn-in
        - text may contain specific formatting keys (exmplained below)

    Requirement of *data* keys is based on presets.
    - "frame_start" - is required when "timecode" or "current_frame" ins keys
    - "frame_start_tc" - when "timecode" should start with different frame
    - *keys for static text*

    EXAMPLE:
    preset = {
        "options": {*OPTIONS FOR LOOK*},
        "burnins": {
            "TOP_LEFT": "static_text",
            "TOP_RIGHT": "{shot}",
            "BOTTOM_LEFT": "TC: {timecode}",
            "BOTTOM_RIGHT": "{frame_start}{current_frame}"
        }
    }

    For this preset we'll need at least this data:
    data = {
        "frame_start": 1001,
        "shot": "sh0010"
    }

    When Timecode should start from 1 then data need:
    data = {
        "frame_start": 1001,
        "frame_start_tc": 1,
        "shot": "sh0010"
    }
    """
    ffprobe_data = None
    if full_input_path:
        ffprobe_data = _get_ffprobe_data(full_input_path)

    burnin = ModifiedBurnins(input_path, ffprobe_data, options, first_frame)

    frame_start = data.get("frame_start")
    frame_end = data.get("frame_end")
    frame_start_tc = data.get('frame_start_tc', frame_start)

    video_stream = None
    for stream in burnin._streams:
        if stream.get("codec_type") == "video":
            video_stream = stream
            break

    if video_stream is None:
        raise ValueError("Source didn't have video stream.")

    if "resolution_width" not in data:
        data["resolution_width"] = video_stream.get(
            "width", MISSING_KEY_VALUE)

    if "resolution_height" not in data:
        data["resolution_height"] = video_stream.get(
            "height", MISSING_KEY_VALUE)

    r_frame_rate = video_stream.get("r_frame_rate", "0/0")
    if "fps" not in data:
        data["fps"] = convert_ffprobe_fps_value(r_frame_rate)

    # Check frame start and add expression if is available
    if frame_start is not None:
        data[CURRENT_FRAME_KEY[1:-1]] = CURRENT_FRAME_SPLITTER

    if frame_start_tc is not None:
        data[TIMECODE_KEY[1:-1]] = TIMECODE_KEY

    source_timecode = video_stream.get("timecode")
    if source_timecode is None:
        source_timecode = video_stream.get("tags", {}).get("timecode")

    # Use "format" key from ffprobe data
    #   - this is used e.g. in mxf extension
    if source_timecode is None:
        input_format = burnin.ffprobe_data.get("format") or {}
        source_timecode = input_format.get("timecode")
        if source_timecode is None:
            source_timecode = input_format.get("tags", {}).get("timecode")

    if source_timecode is not None:
        data[SOURCE_TIMECODE_KEY[1:-1]] = SOURCE_TIMECODE_KEY

    clean_up_paths = []
    for align_text, value in burnin_values.items():
        if not value:
            continue

        if isinstance(value, dict):
            raise TypeError((
                "Expected string, number or list type."
                " Got: {} - \"{}\""
                " (Make sure you have new burnin presets)."
            ).format(str(type(value)), str(value)))

        align = None
        align_text = align_text.strip().lower()
        if align_text == "top_left":
            align = ModifiedBurnins.TOP_LEFT
        elif align_text == "top_centered":
            align = ModifiedBurnins.TOP_CENTERED
        elif align_text == "top_right":
            align = ModifiedBurnins.TOP_RIGHT
        elif align_text == "bottom_left":
            align = ModifiedBurnins.BOTTOM_LEFT
        elif align_text == "bottom_centered":
            align = ModifiedBurnins.BOTTOM_CENTERED
        elif align_text == "bottom_right":
            align = ModifiedBurnins.BOTTOM_RIGHT

        has_timecode = TIMECODE_KEY in value
        # Replace with missing key value if frame_start_tc is not set
        if frame_start_tc is None and has_timecode:
            has_timecode = False
            print(
                "`frame_start` and `frame_start_tc`"
                " are not set in entered data."
            )
            value = value.replace(TIMECODE_KEY, MISSING_KEY_VALUE)

        has_source_timecode = SOURCE_TIMECODE_KEY in value
        if source_timecode is None and has_source_timecode:
            has_source_timecode = False
            print("Source does not have set timecode value.")
            value = value.replace(SOURCE_TIMECODE_KEY, MISSING_KEY_VALUE)

        # Failsafe for missing keys.
        fill_values, listed_keys, missing_keys = prepare_fill_values(
            value, data
        )

        for key in missing_keys:
            value = value.replace(key, MISSING_KEY_VALUE)

        if listed_keys:
            for key, key_value in fill_values.items():
                if key == CURRENT_FRAME_KEY:
                    key_value = CURRENT_FRAME_SPLITTER
                value = value.replace(key, str(key_value))
            burnin.add_per_frame_text(
                value, align, frame_start, frame_end, listed_keys
            )
            continue

        # Handle timecode differently
        if has_source_timecode:
            args = [align, frame_start, frame_end, source_timecode]
            if not value.startswith(SOURCE_TIMECODE_KEY):
                value_items = value.split(SOURCE_TIMECODE_KEY)
                text = value_items[0].format(**data)
                args.append(text)

            burnin.add_timecode(*args)
            continue

        if has_timecode:
            args = [align, frame_start, frame_end, frame_start_tc]
            if not value.startswith(TIMECODE_KEY):
                value_items = value.split(TIMECODE_KEY)
                text = value_items[0].format(**data)
                args.append(text)

            burnin.add_timecode(*args)
            continue

        text = value.format(**data)

        burnin.add_text(text, align, frame_start, frame_end)

    ffmpeg_args = []
    if codec_data:
        # Use codec definition from method arguments
        ffmpeg_args = codec_data
        ffmpeg_args.append("-g 1")

    else:
        ffmpeg_args.extend(
            get_ffmpeg_format_args(burnin.ffprobe_data, source_ffmpeg_cmd)
        )
        ffmpeg_args.extend(
            get_ffmpeg_codec_args(burnin.ffprobe_data, source_ffmpeg_cmd)
        )
        # Use arguments from source if are available source arguments
        if source_ffmpeg_cmd:
            copy_args = (
                "-metadata",
                "-metadata:s:v:0",
            )
            args = source_ffmpeg_cmd.split(" ")
            for idx, arg in enumerate(args):
                if arg in copy_args:
                    ffmpeg_args.extend([arg, args[idx + 1]])

    # Use group one (same as `-intra` argument, which is deprecated)
    ffmpeg_args_str = " ".join(ffmpeg_args)
    burnin.render(
        output_path, args=ffmpeg_args_str, overwrite=overwrite, **data
    )
    for path in clean_up_paths:
        os.remove(path)

prepare_fill_values(burnin_template, data)

Prepare values that will be filled instead of burnin template.

Parameters:

Name Type Description Default
burnin_template str

Burnin template string.

required
data dict[str, Any]

Data that will be used to fill template.

required

Returns:

Type Description

tuple[dict[str, dict[str, Any]], dict[str, Any], set[str]]: Filled values that can be used as are, listed values that have different value per frame and missing keys that are not present in data.

Source code in client/ayon_core/scripts/otio_burnin.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def prepare_fill_values(burnin_template, data):
    """Prepare values that will be filled instead of burnin template.

    Args:
        burnin_template (str): Burnin template string.
        data (dict[str, Any]): Data that will be used to fill template.

    Returns:
        tuple[dict[str, dict[str, Any]], dict[str, Any], set[str]]: Filled
            values that can be used as are, listed values that have different
            value per frame and missing keys that are not present in data.
    """

    fill_values = {}
    listed_keys = {}
    missing_keys = set()
    for item in Formatter().parse(burnin_template):
        _, field_name, format_spec, conversion = item
        if not field_name:
            continue
        # Calculate nested keys '{project[name]}' -> ['project', 'name']
        keys = [key.rstrip("]") for key in field_name.split("[")]
        # Calculate original full key for replacement
        conversion = "!{}".format(conversion) if conversion else ""
        format_spec = ":{}".format(format_spec) if format_spec else ""
        orig_key = "{{{}{}{}}}".format(
            field_name, conversion, format_spec)

        key_value = data
        try:
            for key in keys:
                key_value = key_value[key]

            if isinstance(key_value, list):
                listed_keys[orig_key] = {
                    "values": key_value,
                    "keys": keys}
            else:
                fill_values[orig_key] = orig_key.format(**data)
        except (KeyError, TypeError):
            missing_keys.add(orig_key)
            continue
    return fill_values, listed_keys, missing_keys