QGroundControl
Ground Control Station for MAVLink Drones
Loading...
Searching...
No Matches
GstVideoReceiver.cc
Go to the documentation of this file.
1//-----------------------------------------------------------------------------
2// Our pipeline look like this:
3//
4// +-->queue-->_decoderValve[-->_decoder-->_videoSink]
5// |
6// _source-->_tee
7// |
8// +-->queue-->_recorderValve[-->_fileSink]
9//-----------------------------------------------------------------------------
10
11#include "GstVideoReceiver.h"
12
13#if defined(QGC_HAS_GST_D3D11_GPU_PATH)
15#endif
16#if defined(QGC_HAS_GST_D3D12_GPU_PATH)
18#endif
19#include "GStreamerHelpers.h"
20#include "QGCLoggingCategory.h"
21
22#include <QtCore/QDateTime>
23#include <QtCore/QUrl>
24#include <QtQuick/QQuickItem>
25
26#include <gst/gst.h>
27#include <gst/video/video.h>
28
29QGC_LOGGING_CATEGORY(GstVideoReceiverLog, "Video.GStreamer.GstVideoReceiver")
30
31#if defined(QGC_HAS_GST_GLMEMORY_GPU_PATH) || defined(QGC_HAS_GST_D3D11_GPU_PATH) || defined(QGC_HAS_GST_D3D12_GPU_PATH)
33namespace {
34GstBusSyncReply _contextSyncDispatch(GstBus * /*bus*/, GstMessage *message, gpointer /*data*/)
35{
36 return GstContextBridgeRegistry::dispatchBridges(message);
37}
38} // namespace
39#endif
40
42 : VideoReceiver(parent)
43 , _worker(new GstVideoWorker(this))
44{
45 qCDebug(GstVideoReceiverLog) << this;
46
47 _worker->start();
48 (void) connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog);
49}
50
52{
53 stop();
54 _worker->shutdown();
55
56 qCDebug(GstVideoReceiverLog) << this;
57}
58
59void GstVideoReceiver::start(uint32_t timeout)
60{
61 if (_needDispatch()) {
62 _worker->dispatch([this, timeout]() { start(timeout); });
63 return;
64 }
65
66 if (_pipeline) {
67 qCDebug(GstVideoReceiverLog) << "Already running!" << _uri;
68 _dispatchSignal([this]() { emit onStartComplete(STATUS_INVALID_STATE); });
69 return;
70 }
71
72 if (_uri.isEmpty()) {
73 qCDebug(GstVideoReceiverLog) << "Failed because URI is not specified";
74 _dispatchSignal([this]() { emit onStartComplete(STATUS_INVALID_URL); });
75 return;
76 }
77
79 _buffer = lowLatency() ? -1 : 0;
80
81 qCDebug(GstVideoReceiverLog) << "Starting" << _uri << ", lowLatency" << lowLatency() << ", timeout" << _timeout;
82
83 _endOfStream = false;
84
85 bool running = false;
86 bool pipelineUp = false;
87
88 GstElement *decoderQueue = nullptr;
89 GstElement *recorderQueue = nullptr;
90
91 do {
92 _tee = gst_element_factory_make("tee", nullptr);
93 if (!_tee) {
94 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tee') failed";
95 break;
96 }
97
98 GstPad *pad = gst_element_get_static_pad(_tee, "sink");
99 if (!pad) {
100 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
101 break;
102 }
103
105
106 _teeProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr);
107 gst_clear_object(&pad);
108 if (_teeProbeId == 0) {
109 // _teeProbe updates _lastSourceFrameTime; without it the watchdog timer fires spuriously instead of reporting a real failure.
110 qCCritical(GstVideoReceiverLog) << "gst_pad_add_probe(_teeProbe) failed";
111 break;
112 }
113
114 decoderQueue = gst_element_factory_make("queue", nullptr);
115 if (!decoderQueue) {
116 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
117 break;
118 }
119
120 _decoderValve = gst_element_factory_make("valve", nullptr);
121 if (!_decoderValve) {
122 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
123 break;
124 }
125
126 g_object_set(_decoderValve,
127 "drop", TRUE,
128 nullptr);
129
130 recorderQueue = gst_element_factory_make("queue", nullptr);
131 if (!recorderQueue) {
132 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
133 break;
134 }
135
136 _recorderValve = gst_element_factory_make("valve", nullptr);
137 if (!_recorderValve) {
138 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
139 break;
140 }
141
142 g_object_set(_recorderValve,
143 "drop", TRUE,
144 nullptr);
145
146 _pipeline = gst_pipeline_new("receiver");
147 if (!_pipeline) {
148 qCCritical(GstVideoReceiverLog) << "gst_pipeline_new() failed";
149 break;
150 }
151
152 g_object_set(_pipeline,
153 "message-forward", TRUE,
154 nullptr);
155
156 _source = _makeSource(_uri);
157 if (!_source) {
158 qCCritical(GstVideoReceiverLog) << "_makeSource() failed";
159 break;
160 }
161
162 gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr);
163
164 pipelineUp = true;
165
166 GstPad *srcPad = nullptr;
167 GstIterator *it = gst_element_iterate_src_pads(_source);
168 GValue vpad = G_VALUE_INIT;
169 switch (gst_iterator_next(it, &vpad)) {
170 case GST_ITERATOR_OK:
171 srcPad = GST_PAD(g_value_get_object(&vpad));
172 (void) gst_object_ref(srcPad);
173 (void) g_value_reset(&vpad);
174 break;
175 case GST_ITERATOR_RESYNC:
176 gst_iterator_resync(it);
177 break;
178 default:
179 break;
180 }
181 g_value_unset(&vpad);
182 gst_iterator_free(it);
183
184 if (srcPad) {
185 _onNewSourcePad(srcPad);
186 gst_clear_object(&srcPad);
187 } else {
188 (void) g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this);
189 }
190
191 if (!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) {
192 qCCritical(GstVideoReceiverLog) << "Unable to link decoder queue";
193 break;
194 }
195
196 if (!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) {
197 qCCritical(GstVideoReceiverLog) << "Unable to link recorder queue";
198 break;
199 }
200
201 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
202 if (bus) {
203 gst_bus_enable_sync_message_emission(bus);
204 (void) g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
205#if defined(QGC_HAS_GST_GLMEMORY_GPU_PATH) || defined(QGC_HAS_GST_D3D11_GPU_PATH) || defined(QGC_HAS_GST_D3D12_GPU_PATH)
206 // Single sync dispatcher chains every compiled context bridge so
207 // they don't clobber each other via gst_bus_set_sync_handler. Must
208 // run before GST_STATE_PLAYING — upstream queries context during
209 // PAUSED→PLAYING. Each bridge cheap-rejects messages it doesn't
210 // serve, so total cost on irrelevant messages is a strcmp.
211 gst_bus_set_sync_handler(bus, _contextSyncDispatch, nullptr, nullptr);
212#endif
213 gst_clear_object(&bus);
214 }
215
216 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial");
217 running = (gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
218 } while(0);
219
220 if (!running) {
221 qCCritical(GstVideoReceiverLog) << "Failed";
222
223 if (_pipeline) {
224 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
225 (void) gst_element_get_state(_pipeline, nullptr, nullptr, GST_CLOCK_TIME_NONE);
226 gst_clear_object(&_pipeline);
227 }
228
229 if (!pipelineUp) {
230 gst_clear_object(&_recorderValve);
231 gst_clear_object(&recorderQueue);
232 gst_clear_object(&_decoderValve);
233 gst_clear_object(&decoderQueue);
234 gst_clear_object(&_tee);
235 gst_clear_object(&_source);
236 }
237
238 // Rate limit restarts on failure. This sleep is OK because we're in the video worker thread.
239 QThread::sleep(1);
240 _dispatchSignal([this]() { emit onStartComplete(STATUS_FAIL); });
241 } else {
242 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started");
243 qCDebug(GstVideoReceiverLog) << "Started" << _uri;
244
245 // _watchdogTimer lives on `this` (GUI thread); _dispatchSignal runs synchronously on the
246 // worker thread, so the timer start has to be queued separately or QObject warns.
247 QMetaObject::invokeMethod(this, [this]() { _watchdogTimer.start(1000); }, Qt::QueuedConnection);
248 _dispatchSignal([this]() { emit onStartComplete(STATUS_OK); });
249 }
250}
251
253{
254 if (_needDispatch()) {
255 _worker->dispatch([this]() { stop(); });
256 return;
257 }
258
259 if (_uri.isEmpty()) {
260 qCDebug(GstVideoReceiverLog) << "Stop called on empty URI (no-op)";
261 return;
262 }
263
264 qCDebug(GstVideoReceiverLog) << "Stopping" << _uri;
265
266 QMetaObject::invokeMethod(this, [this]() { _watchdogTimer.stop(); }, Qt::QueuedConnection);
267
268 if (_teeProbeId != 0) {
269 if (_tee) {
270 GstPad *sinkpad = gst_element_get_static_pad(_tee, "sink");
271 if (sinkpad) {
272 gst_pad_remove_probe(sinkpad, _teeProbeId);
273 gst_clear_object(&sinkpad);
274 }
275 }
276 _teeProbeId = 0;
277 }
278
279 if (_pipeline) {
280 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
281 if (bus) {
282 gst_bus_disable_sync_message_emission(bus);
283 (void) g_signal_handlers_disconnect_by_data(bus, this);
284
285 gboolean recordingValveClosed = TRUE;
286 g_object_get(_recorderValve, "drop", &recordingValveClosed, nullptr);
287
288 if (!recordingValveClosed) {
289 (void) gst_element_send_event(_pipeline, gst_event_new_eos());
290
291 GstMessage *msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS | GST_MESSAGE_ERROR));
292 if (msg) {
293 switch (GST_MESSAGE_TYPE(msg)) {
294 case GST_MESSAGE_EOS:
295 qCDebug(GstVideoReceiverLog) << "End of stream received!";
296 break;
297 case GST_MESSAGE_ERROR:
298 qCCritical(GstVideoReceiverLog) << "Error stopping pipeline!";
299 break;
300 default:
301 break;
302 }
303
304 gst_clear_message(&msg);
305 } else {
306 qCCritical(GstVideoReceiverLog) << "gst_bus_timed_pop_filtered() failed";
307 }
308 }
309
310 gst_clear_object(&bus);
311 } else {
312 qCCritical(GstVideoReceiverLog) << "gst_pipeline_get_bus() failed";
313 }
314
315 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
316 (void) gst_element_get_state(_pipeline, nullptr, nullptr, GST_CLOCK_TIME_NONE);
317
318 // FIXME: check if branch is connected and remove all elements from branch
319 if (_fileSink) {
320 _shutdownRecordingBranch();
321 }
322
323 if (_videoSink) {
324 _shutdownDecodingBranch();
325 }
326
327 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped");
328
329 gst_clear_object(&_pipeline);
330 _pipeline = nullptr;
331
332 _recorderValve = nullptr;
333 _decoderValve = nullptr;
334 _tee = nullptr;
335 _source = nullptr;
336
338
339 if (_streaming) {
340 _streaming = false;
341 qCDebug(GstVideoReceiverLog) << "Streaming stopped" << _uri;
342 _dispatchSignal([this]() { emit streamingChanged(_streaming); });
343 } else {
344 qCDebug(GstVideoReceiverLog) << "Streaming did not start" << _uri;
345 }
346 }
347
348 qCDebug(GstVideoReceiverLog) << "Stopped" << _uri;
349
350 _dispatchSignal([this]() { emit onStopComplete(STATUS_OK); });
351}
352
354{
355 if (!sink) {
356 qCCritical(GstVideoReceiverLog) << "VideoSink is NULL" << _uri;
357 return;
358 }
359
360 if (_needDispatch()) {
361 _worker->dispatch([this, sink]() mutable { startDecoding(sink); });
362 return;
363 }
364
365 qCDebug(GstVideoReceiverLog) << "Starting decoding" << _uri;
366
367 if (!_widget) {
368 qCDebug(GstVideoReceiverLog) << "Video Widget is NULL" << _uri;
369 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
370 return;
371 }
372
373 if (!_pipeline) {
374 gst_clear_object(&_videoSink);
375 }
376
377 if (_videoSink || _decoding) {
378 qCDebug(GstVideoReceiverLog) << "Already decoding!" << _uri;
379 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_INVALID_STATE); });
380 return;
381 }
382
383 GstElement *videoSink = GST_ELEMENT(sink);
384 GstPad *pad = gst_element_get_static_pad(videoSink, "sink");
385 if (!pad) {
386 qCCritical(GstVideoReceiverLog) << "Unable to find sink pad of video sink" << _uri;
387 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
388 return;
389 }
390
392 _resetVideoSink = true;
393
394 _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr);
395 gst_clear_object(&pad);
396
397 _videoSink = videoSink;
398 gst_object_ref(_videoSink);
399
400 _removingDecoder = false;
401
402 if (!_streaming) {
403 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_OK); });
404 return;
405 }
406
407 _ensureVideoSinkInPipeline();
408
409 if (!_addDecoder(_decoderValve)) {
410 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed" << _uri;
411 _shutdownDecodingBranch();
412 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
413 return;
414 }
415
416 g_object_set(_decoderValve,
417 "drop", FALSE,
418 nullptr);
419
420 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
421
422 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_OK); });
423}
424
426{
427 if (_needDispatch()) {
428 _worker->dispatch([this]() { stopDecoding(); });
429 return;
430 }
431
432 qCDebug(GstVideoReceiverLog) << "Stopping decoding" << _uri;
433
434 // Gate on _videoSink (set by startDecoding) instead of _decoding (which only flips on
435 // first sink-buffer probe). Without this, stopDecoding() called between
436 // onStartDecodingComplete(OK) and the first frame returns STATUS_INVALID_STATE and
437 // leaves the decoder/sink branch live.
438 if (!_pipeline || !_videoSink) {
439 qCDebug(GstVideoReceiverLog) << "Not decoding!" << _uri;
440 _dispatchSignal([this]() { emit onStopDecodingComplete(STATUS_INVALID_STATE); });
441 return;
442 }
443
444 g_object_set(_decoderValve,
445 "drop", TRUE,
446 nullptr);
447
448 _removingDecoder = true;
449
450 const bool ret = _unlinkBranch(_decoderValve);
451
452 // FIXME: it is much better to emit onStopDecodingComplete() after decoding is really stopped
453 // (which happens later due to async design) but as for now it is also not so bad...
454 _dispatchSignal([this, ret](){ emit onStopDecodingComplete(ret ? STATUS_OK : STATUS_FAIL); });
455}
456
457void GstVideoReceiver::startRecording(const QString &videoFile, FILE_FORMAT format)
458{
459 if (_needDispatch()) {
460 const QString cachedVideoFile = videoFile;
461 _worker->dispatch([this, cachedVideoFile, format]() { startRecording(cachedVideoFile, format); });
462 return;
463 }
464
465 qCDebug(GstVideoReceiverLog) << "Starting recording" << _uri;
466
467 if (!_pipeline) {
468 qCDebug(GstVideoReceiverLog) << "Streaming is not active!" << _uri;
469 _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_INVALID_STATE); });
470 return;
471 }
472
473 if (_recording) {
474 qCDebug(GstVideoReceiverLog) << "Already recording!" << _uri;
475 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_INVALID_STATE); });
476 return;
477 }
478
479 qCDebug(GstVideoReceiverLog) << "New video file:" << videoFile << _uri;
480
481 _fileSink = _makeFileSink(videoFile, format);
482 if (!_fileSink) {
483 qCCritical(GstVideoReceiverLog) << "_makeFileSink() failed" << _uri;
484 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
485 return;
486 }
487
488 _removingRecorder = false;
489
490 (void) gst_object_ref(_fileSink);
491
492 gst_bin_add(GST_BIN(_pipeline), _fileSink);
493
494 if (!gst_element_link(_recorderValve, _fileSink)) {
495 qCCritical(GstVideoReceiverLog) << "Failed to link valve and file sink" << _uri;
496 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
497 return;
498 }
499
500 (void) gst_element_sync_state_with_parent(_fileSink);
501
502 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink");
503
504 // Install a probe on the recording branch to drop buffers until we hit our first keyframe
505 // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
506 // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback
507 GstPad *probepad = gst_element_get_static_pad(_recorderValve, "src");
508 if (!probepad) {
509 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed" << _uri;
510 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
511 return;
512 }
513
514 _keyframeWatchId = gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr);
515 gst_clear_object(&probepad);
516
517 g_object_set(_recorderValve,
518 "drop", FALSE,
519 nullptr);
520
521 _recordingOutput = videoFile;
522 _recording = true;
523 qCDebug(GstVideoReceiverLog) << "Recording started" << _uri;
524 _dispatchSignal([this]() {
527 });
528}
529
531{
532 if (_needDispatch()) {
533 _worker->dispatch([this]() { stopRecording(); });
534 return;
535 }
536
537 qCDebug(GstVideoReceiverLog) << "Stopping recording" << _uri;
538
539 if (!_pipeline || !_recording) {
540 qCDebug(GstVideoReceiverLog) << "Not recording!" << _uri;
541 _dispatchSignal([this]() { emit onStopRecordingComplete(STATUS_INVALID_STATE); });
542 return;
543 }
544
545 g_object_set(_recorderValve,
546 "drop", TRUE,
547 nullptr);
548
549 _removingRecorder = true;
550
551 if (!_unlinkBranch(_recorderValve)) {
552 _removingRecorder = false;
553 _dispatchSignal([this]() { emit onStopRecordingComplete(STATUS_FAIL); });
554 return;
555 }
556
557 // EOS event propagates valve→mux→filesink; _shutdownRecordingBranch emits the
558 // complete signal once the muxer index is written and the file is closed.
559 _recordingStopRequested = true;
560}
561
562void GstVideoReceiver::takeScreenshot(const QString &imageFile)
563{
564 if (_needDispatch()) {
565 const QString cachedImageFile = imageFile;
566 _worker->dispatch([this, cachedImageFile]() { takeScreenshot(cachedImageFile); });
567 return;
568 }
569
570 qCDebug(GstVideoReceiverLog) << "taking screenshot" << _uri;
571
572 // FIXME: record screenshot here
573 _dispatchSignal([this]() { emit onTakeScreenshotComplete(STATUS_NOT_IMPLEMENTED); });
574}
575
576void GstVideoReceiver::_watchdog()
577{
578 _worker->dispatch([this]() {
579 if (!_pipeline) {
580 return;
581 }
582
583 const qint64 now = QDateTime::currentSecsSinceEpoch();
584 if (_lastSourceFrameTime == 0) {
586 }
587
588 qint64 elapsed = now - _lastSourceFrameTime;
589 if (elapsed > _timeout) {
590 qCDebug(GstVideoReceiverLog) << "Stream timeout, no frames for" << elapsed << _uri;
591 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-watchdog-timeout");
592 _dispatchSignal([this]() { emit timeout(); });
593 stop();
594 }
595
596 if (_decoding && !_removingDecoder) {
597 if (_lastVideoFrameTime == 0) {
599 }
600
601 elapsed = now - _lastVideoFrameTime;
602 if (elapsed > (_timeout * 2)) {
603 qCDebug(GstVideoReceiverLog) << "Video decoder timeout, no frames for" << elapsed << _uri;
604 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-watchdog-timeout");
605 _dispatchSignal([this]() { emit timeout(); });
606 stop();
607 }
608 }
609 });
610}
611
612void GstVideoReceiver::_handleEOS()
613{
614 if (!_pipeline) {
615 return;
616 }
617
618 if (_endOfStream) {
619 stop();
620 } else if (_decoding && _removingDecoder) {
621 _shutdownDecodingBranch();
622 } else if (_recording && _removingRecorder) {
623 _shutdownRecordingBranch();
624 } /*else {
625 qCWarning(GstVideoReceiverLog) << "Unexpected EOS!";
626 stop();
627 }*/
628}
629
630#if !defined(QGC_GST_BUILD_VERSION_MAJOR) || (QGC_GST_BUILD_VERSION_MAJOR == 1 && QGC_GST_BUILD_VERSION_MINOR < 28)
631gboolean GstVideoReceiver::_filterParserCaps(GstElement *bin, GstPad *pad, GstElement *element, GstQuery *query, gpointer data)
632{
633 Q_UNUSED(bin); Q_UNUSED(pad); Q_UNUSED(element); Q_UNUSED(data)
634
635 if (GST_QUERY_TYPE(query) != GST_QUERY_CAPS) {
636 return FALSE;
637 }
638
639 GstCaps *srcCaps;
640 gst_query_parse_caps(query, &srcCaps);
641 if (!srcCaps || gst_caps_is_any(srcCaps)) {
642 return FALSE;
643 }
644
645 GstCaps *sinkCaps = nullptr;
646 GstCaps *filter = nullptr;
647 GstStructure *structure = gst_caps_get_structure(srcCaps, 0);
648 if (gst_structure_has_name(structure, "video/x-h265")) {
649 filter = gst_caps_from_string("video/x-h265");
650 if (gst_caps_can_intersect(srcCaps, filter)) {
651 sinkCaps = gst_caps_from_string("video/x-h265,stream-format=hvc1");
652 }
653 gst_clear_caps(&filter);
654 } else if (gst_structure_has_name(structure, "video/x-h264")) {
655 filter = gst_caps_from_string("video/x-h264");
656 if (gst_caps_can_intersect(srcCaps, filter)) {
657 sinkCaps = gst_caps_from_string("video/x-h264,stream-format=avc");
658 }
659 gst_clear_caps(&filter);
660 }
661
662 if (sinkCaps) {
663 gst_query_set_caps_result(query, sinkCaps);
664 gst_clear_caps(&sinkCaps);
665 return TRUE;
666 }
667
668 return FALSE;
669}
670#endif
671
672GstElement *GstVideoReceiver::_makeSource(const QString &input)
673{
674 if (input.isEmpty()) {
675 qCCritical(GstVideoReceiverLog) << "Failed because URI is not specified";
676 return nullptr;
677 }
678
679 const QUrl sourceUrl(input);
680
681 const bool isRtsp = sourceUrl.scheme().startsWith("rtsp", Qt::CaseInsensitive);
682 const bool isUdp264 = input.contains("udp://", Qt::CaseInsensitive);
683 const bool isUdp265 = input.contains("udp265://", Qt::CaseInsensitive);
684 const bool isUdpMPEGTS = input.contains("mpegts://", Qt::CaseInsensitive);
685 const bool isTcpMPEGTS = input.contains("tcp://", Qt::CaseInsensitive);
686
687 GstElement *source = nullptr;
688 GstElement *buffer = nullptr;
689 GstElement *tsdemux = nullptr;
690 GstElement *parser = nullptr;
691 GstElement *bin = nullptr;
692 GstElement *srcbin = nullptr;
693
694 do {
695 if (isRtsp) {
696 if (!GStreamer::isValidRtspUri(input.toUtf8().constData())) {
697 qCCritical(GstVideoReceiverLog) << "Invalid RTSP URI:" << input;
698 break;
699 }
700
701 source = gst_element_factory_make("rtspsrc", "source");
702 if (!source) {
703 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('rtspsrc') failed";
704 break;
705 }
706
707 const QString rtspUserInfo = sourceUrl.userInfo();
708 QString rtspUser, rtspPassword;
709 if (!rtspUserInfo.isEmpty()) {
710 const int colonIdx = rtspUserInfo.indexOf(QLatin1Char(':'));
711 if (colonIdx >= 0) {
712 rtspUser = rtspUserInfo.left(colonIdx);
713 rtspPassword = rtspUserInfo.mid(colonIdx + 1);
714 } else {
715 rtspUser = rtspUserInfo;
716 }
717 }
718 QUrl cleanUrl(sourceUrl);
719 cleanUrl.setUserInfo(QString());
720 const QByteArray cleanLocation = cleanUrl.toString().toUtf8();
721
722 g_object_set(source,
723 "location", cleanLocation.constData(),
724 "latency", 25,
725 "do-rtcp", TRUE,
726 "tcp-timeout", G_GUINT64_CONSTANT(5000000),
727 "udp-reconnect", TRUE,
728 "drop-on-latency", TRUE,
729 "retry", 3,
730 nullptr);
731
732 if (!rtspUser.isEmpty()) {
733 g_object_set(source,
734 "user-id", rtspUser.toUtf8().constData(),
735 "user-pw", rtspPassword.toUtf8().constData(),
736 nullptr);
737 }
738 } else if (isTcpMPEGTS) {
739 source = gst_element_factory_make("tcpclientsrc", "source");
740 if (!source) {
741 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tcpclientsrc') failed";
742 break;
743 }
744
745 const QString host = sourceUrl.host();
746 const quint16 port = sourceUrl.port();
747 g_object_set(source,
748 "host", host.toUtf8().constData(),
749 "port", port,
750 nullptr);
751 } else if (isUdp264 || isUdp265 || isUdpMPEGTS) {
752 source = gst_element_factory_make("udpsrc", "source");
753 if (!source) {
754 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('udpsrc') failed";
755 break;
756 }
757
758 const QString uri = QStringLiteral("udp://%1:%2").arg(sourceUrl.host(), QString::number(sourceUrl.port()));
759 g_object_set(source,
760 "uri", uri.toUtf8().constData(),
761 "buffer-size", 8 * 1024 * 1024,
762 nullptr);
763
764 GstCaps *caps = nullptr;
765 if (isUdp264) {
766 caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264");
767 if (!caps) {
768 qCCritical(GstVideoReceiverLog) << "gst_caps_from_string() failed";
769 break;
770 }
771 } else if (isUdp265) {
772 caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H265");
773 if (!caps) {
774 qCCritical(GstVideoReceiverLog) << "gst_caps_from_string() failed";
775 break;
776 }
777 }
778
779 if (caps) {
780 g_object_set(source,
781 "caps", caps,
782 nullptr);
783 gst_clear_caps(&caps);
784 }
785 } else {
786 qCDebug(GstVideoReceiverLog) << "URI is not recognized";
787 }
788
789 if (!source) {
790 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make() for data source failed";
791 break;
792 }
793
794 bin = gst_bin_new("sourcebin");
795 if (!bin) {
796 qCCritical(GstVideoReceiverLog) << "gst_bin_new('sourcebin') failed";
797 break;
798 }
799
800 parser = gst_element_factory_make("parsebin", "parser");
801 if (!parser) {
802 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('parsebin') failed";
803 break;
804 }
805
806 // GStreamer < 1.28: decodebin3 didn't negotiate stream-format caps properly
807 // between parser and decoder, so we forced hvc1/avc. GStreamer 1.28+ fixes
808 // this natively, and the forced caps break hardware decoders that need
809 // byte-stream format (e.g. Qualcomm AMC on Android, D3D12 on Windows).
810#if !defined(QGC_GST_BUILD_VERSION_MAJOR) || (QGC_GST_BUILD_VERSION_MAJOR == 1 && QGC_GST_BUILD_VERSION_MINOR < 28)
811 (void) g_signal_connect(parser, "autoplug-query", G_CALLBACK(_filterParserCaps), nullptr);
812#endif
813
814 gst_bin_add_many(GST_BIN(bin), source, parser, nullptr);
815
816 // FIXME: AV: Android does not determine MPEG2-TS via parsebin - have to explicitly state which demux to use
817 // FIXME: AV: tsdemux handling is a bit ugly - let's try to find elegant solution for that later
818 if (isTcpMPEGTS || isUdpMPEGTS) {
819 tsdemux = gst_element_factory_make("tsdemux", nullptr);
820 if (!tsdemux) {
821 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tsdemux') failed";
822 break;
823 }
824
825 (void) gst_bin_add(GST_BIN(bin), tsdemux);
826
827 if (!gst_element_link(source, tsdemux)) {
828 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
829 break;
830 }
831
832 source = tsdemux;
833 tsdemux = nullptr;
834 }
835
836 int probeRes = 0;
837 (void) gst_element_foreach_src_pad(source, _padProbe, &probeRes);
838
839 if (probeRes & 1) {
840 if ((probeRes & 2) && (_buffer >= 0)) {
841 buffer = gst_element_factory_make("rtpjitterbuffer", nullptr);
842 if (!buffer) {
843 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('rtpjitterbuffer') failed";
844 break;
845 }
846
847 g_object_set(buffer,
848 "do-lost", TRUE,
849 "drop-on-latency", _buffer == 0 ? TRUE : FALSE,
850 nullptr);
851
852 (void) gst_bin_add(GST_BIN(bin), buffer);
853
854 if (!gst_element_link_many(source, buffer, parser, nullptr)) {
855 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
856 break;
857 }
858 } else {
859 if (!gst_element_link(source, parser)) {
860 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
861 break;
862 }
863 }
864 } else {
865 (void) g_signal_connect(source, "pad-added", G_CALLBACK(_linkPad), parser);
866 }
867
868 (void) g_signal_connect(parser, "pad-added", G_CALLBACK(_wrapWithGhostPad), nullptr);
869
870 source = tsdemux = buffer = parser = nullptr;
871
872 srcbin = bin;
873 bin = nullptr;
874 } while(0);
875
876 gst_clear_object(&bin);
877 gst_clear_object(&parser);
878 gst_clear_object(&tsdemux);
879 gst_clear_object(&buffer);
880 gst_clear_object(&source);
881
882 return srcbin;
883}
884
885GstElement *GstVideoReceiver::_makeDecoder()
886{
887 GstElement *decoder = gst_element_factory_make("decodebin3", nullptr);
888 if (!decoder) {
889 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('decodebin3') failed";
890 }
891 return decoder;
892}
893
894GstElement *GstVideoReceiver::_makeFileSink(const QString &videoFile, FILE_FORMAT format)
895{
896 GstElement *fileSink = nullptr;
897 GstElement *mux = nullptr;
898 GstElement *sink = nullptr;
899 GstElement *bin = nullptr;
900 bool releaseElements = true;
901
902 do {
903 if (!isValidFileFormat(format)) {
904 qCCritical(GstVideoReceiverLog) << "Unsupported file format";
905 break;
906 }
907
908 mux = gst_element_factory_make(_kFileMux[format], nullptr);
909 if (!mux) {
910 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('" << _kFileMux[format] << "') failed";
911 break;
912 }
913
914 // mp4mux/qtmux: write moov atom up-front + reserve space for index updates so a crash
915 // mid-recording leaves a playable file. matroskamux is naturally streamable; skip.
916 if (format == FILE_FORMAT_MP4 || format == FILE_FORMAT_MOV) {
917 g_object_set(mux,
918 "faststart", TRUE,
919 "reserved-moov-update-period", G_GUINT64_CONSTANT(1000000000),
920 nullptr);
921 }
922
923 sink = gst_element_factory_make("filesink", nullptr);
924 if (!sink) {
925 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('filesink') failed";
926 break;
927 }
928
929 g_object_set(sink,
930 "location", qPrintable(videoFile),
931 nullptr);
932
933 bin = gst_bin_new("sinkbin");
934 if (!bin) {
935 qCCritical(GstVideoReceiverLog) << "gst_bin_new('sinkbin') failed";
936 break;
937 }
938
939 GstPadTemplate *padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u");
940 if (!padTemplate) {
941 qCCritical(GstVideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed";
942 break;
943 }
944
945 // FIXME: pad handling is potentially leaking (and other similar places too!)
946 GstPad *pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr);
947 if (!pad) {
948 qCCritical(GstVideoReceiverLog) << "gst_element_request_pad(mux) failed";
949 break;
950 }
951
952 gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr);
953
954 releaseElements = false;
955
956 GstPad *ghostpad = gst_ghost_pad_new("sink", pad);
957 (void) gst_element_add_pad(bin, ghostpad);
958 gst_clear_object(&pad);
959
960 if (!gst_element_link(mux, sink)) {
961 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
962 break;
963 }
964
965 fileSink = bin;
966 bin = nullptr;
967 } while(0);
968
969 if (releaseElements) {
970 gst_clear_object(&sink);
971 gst_clear_object(&mux);
972 }
973
974 gst_clear_object(&bin);
975 return fileSink;
976}
977
978void GstVideoReceiver::_onNewSourcePad(GstPad *pad)
979{
980 // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it
981 if (!gst_element_link(_source, _tee)) {
982 qCCritical(GstVideoReceiverLog) << "Unable to link source";
983 return;
984 }
985
986 if (!_streaming) {
987 _streaming = true;
988 qCDebug(GstVideoReceiverLog) << "Streaming started" << _uri;
989 _dispatchSignal([this]() { emit streamingChanged(_streaming); });
990 }
991
992 _eosProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr);
993 if (_eosProbeId != 0) {
994 // Hold a ref so _shutdownDecodingBranch can remove the probe even after _decoder is gone.
995 _eosProbePad = GST_PAD_CAST(gst_object_ref(pad));
996 }
997 if (!_videoSink) {
998 return;
999 }
1000
1001 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad");
1002
1003 _ensureVideoSinkInPipeline();
1004
1005 if (!_addDecoder(_decoderValve)) {
1006 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed";
1007 _shutdownDecodingBranch();
1008 return;
1009 }
1010
1011 g_object_set(_decoderValve,
1012 "drop", FALSE,
1013 nullptr);
1014
1015 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
1016}
1017
1018void GstVideoReceiver::_logDecodebin3SelectedCodec(GstElement *decodebin3)
1019{
1020 GValue value = G_VALUE_INIT;
1021 GstIterator *iter = gst_bin_iterate_elements(GST_BIN(decodebin3));
1022 GstElement *child;
1023
1024 while (gst_iterator_next(iter, &value) == GST_ITERATOR_OK) {
1025 child = GST_ELEMENT(g_value_get_object(&value));
1026 GstElementFactory *factory = gst_element_get_factory(child);
1027
1028 if (factory) {
1029 gboolean is_decoder = gst_element_factory_list_is_type(factory, GST_ELEMENT_FACTORY_TYPE_DECODER);
1030 if (is_decoder) {
1031 const gchar *decoderKlass = gst_element_factory_get_klass(factory);
1032 GstPluginFeature *feature = GST_PLUGIN_FEATURE(factory);
1033 const gchar *featureName = gst_plugin_feature_get_name(feature);
1034 const guint rank = gst_plugin_feature_get_rank(feature);
1035 bool isHardwareDecoder = GStreamer::isHardwareDecoderFactory(factory);
1036
1037 QString pluginName = featureName;
1038 GstPlugin *plugin = gst_plugin_feature_get_plugin(feature);
1039 if (plugin) {
1040 pluginName = gst_plugin_get_name(plugin);
1041 gst_object_unref(plugin);
1042 }
1043 qCDebug(GstVideoReceiverLog) << "Decodebin3 selected codec:rank -" << pluginName << "/" << featureName << "-" << decoderKlass << (isHardwareDecoder ? "(HW)" : "(SW)") << ":" << rank;
1044
1045 const QString newName = QString::fromUtf8(featureName);
1046 if (newName != _decoderName) {
1047 _decoderName = newName;
1048 _dispatchSignal([this]() { emit decoderStatsChanged(); });
1049 }
1050
1051 // Disable QoS on the internal decoder to prevent cascading
1052 // frame drops on live streams. The videodecoder base class
1053 // aggressively advances earliest_time after the first late
1054 // frame, causing all subsequent frames to be dropped.
1055 g_object_set(child, "qos", FALSE, nullptr);
1056 qCDebug(GstVideoReceiverLog) << "Disabled QoS on internal decoder" << featureName;
1057 }
1058 }
1059 g_value_reset(&value);
1060 }
1061 g_value_unset(&value);
1062 gst_iterator_free(iter);
1063}
1064
1065
1066void GstVideoReceiver::_onNewDecoderPad(GstPad *pad)
1067{
1068 qCDebug(GstVideoReceiverLog) << "_onNewDecoderPad" << _uri;
1069
1070 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad");
1071
1072 // We should now know what codec decodebin3 selected.
1073 _logDecodebin3SelectedCodec(_decoder);
1074
1075 if (!_addVideoSink(pad)) {
1076 qCCritical(GstVideoReceiverLog) << "_addVideoSink() failed";
1077 }
1078}
1079
1080bool GstVideoReceiver::_addDecoder(GstElement *src)
1081{
1082 _decoder = _makeDecoder();
1083 if (!_decoder) {
1084 qCCritical(GstVideoReceiverLog) << "_makeDecoder() failed";
1085 return false;
1086 }
1087
1088 (void) gst_object_ref(_decoder);
1089
1090 (void) gst_bin_add(GST_BIN(_pipeline), _decoder);
1091 (void) gst_element_sync_state_with_parent(_decoder);
1092
1093 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder");
1094
1095 if (!gst_element_link(src, _decoder)) {
1096 qCCritical(GstVideoReceiverLog) << "Unable to link decoder";
1097 gst_element_set_state(_decoder, GST_STATE_NULL);
1098 (void) gst_element_get_state(_decoder, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1099 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1100 gst_clear_object(&_decoder);
1101 return false;
1102 }
1103
1104 GstPad *srcPad = nullptr;
1105 GstIterator *it = gst_element_iterate_src_pads(_decoder);
1106 GValue vpad = G_VALUE_INIT;
1107 switch (gst_iterator_next(it, &vpad)) {
1108 case GST_ITERATOR_OK:
1109 srcPad = GST_PAD(g_value_get_object(&vpad));
1110 (void) gst_object_ref(srcPad);
1111 (void) g_value_reset(&vpad);
1112 break;
1113 case GST_ITERATOR_RESYNC:
1114 gst_iterator_resync(it);
1115 break;
1116 default:
1117 break;
1118 }
1119 g_value_unset(&vpad);
1120 gst_iterator_free(it);
1121
1122 if (srcPad) {
1123 _onNewDecoderPad(srcPad);
1124 } else {
1125 (void) g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this);
1126 }
1127
1128 gst_clear_object(&srcPad);
1129 return true;
1130}
1131
1132void GstVideoReceiver::_ensureVideoSinkInPipeline()
1133{
1134 if (!_videoSink || !_pipeline) {
1135 return;
1136 }
1137
1138 GstObject *parent = gst_element_get_parent(_videoSink);
1139 if (parent) {
1140 gst_object_unref(parent);
1141 return;
1142 }
1143
1144 g_object_set(_videoSink,
1145 "sync", (_buffer >= 0),
1146 NULL);
1147
1148 (void) gst_object_ref(_videoSink);
1149 (void) gst_bin_add(GST_BIN(_pipeline), _videoSink);
1150
1151 // PAUSED (not READY) triggers downstream caps negotiation before source data arrives.
1152 (void) gst_element_set_state(_videoSink, GST_STATE_PAUSED);
1153}
1154
1155bool GstVideoReceiver::_addVideoSink(GstPad *pad)
1156{
1157 GstCaps *caps = gst_pad_query_caps(pad, nullptr);
1158
1159 _ensureVideoSinkInPipeline();
1160
1161 GstPad *sinkPad = gst_element_get_static_pad(_videoSink, "sink");
1162 GstPadLinkReturn linkRet = sinkPad ? gst_pad_link(pad, sinkPad) : GST_PAD_LINK_WRONG_HIERARCHY;
1163 if (linkRet != GST_PAD_LINK_OK) {
1164 qCCritical(GstVideoReceiverLog) << "Unable to link decoder pad to video sink, result:" << linkRet;
1165
1166 // _ensureVideoSinkInPipeline() added it before linking; detach for the next retry.
1167 GstObject *parent = gst_element_get_parent(_videoSink);
1168 if (parent) {
1169 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1170 (void) gst_element_get_state(_videoSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1171 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1172 gst_clear_object(&parent);
1173 }
1174
1175 gst_clear_object(&sinkPad);
1176 gst_clear_caps(&caps);
1177 return false;
1178 }
1179 gst_clear_object(&sinkPad);
1180
1181 (void) gst_element_sync_state_with_parent(_videoSink);
1182
1183 // sync=FALSE + max-lateness=-1: HW decoders with startup latency must not drop early frames.
1184 g_object_set(_videoSink, "sync", FALSE, "max-lateness", G_GINT64_CONSTANT(-1), nullptr);
1185
1186 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink");
1187
1188 // Determine video size. Errors here are non-fatal.
1189 QSize videoSize;
1190 do {
1191 if (!_decoderValve) {
1192 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - _decoderValve is NULL" << _uri;
1193 break;
1194 }
1195
1196 GstPad *valveSrcPad = gst_element_get_static_pad(_decoderValve, "src");
1197 if (!valveSrcPad) {
1198 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1199 break;
1200 }
1201
1202 GstCaps *valveSrcPadCaps = gst_pad_query_caps(valveSrcPad, nullptr);
1203 if (!valveSrcPadCaps) {
1204 qCCritical(GstVideoReceiverLog) << "gst_pad_query_caps() failed";
1205 gst_clear_object(&valveSrcPad);
1206 break;
1207 }
1208
1209 const GstStructure *structure = gst_caps_get_structure(valveSrcPadCaps, 0);
1210 if (!structure) {
1211 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - structure is NULL" << _uri;
1212 gst_clear_object(&valveSrcPad);
1213 break;
1214 }
1215
1216 gint width = 0;
1217 gint height = 0;
1218 (void) gst_structure_get_int(structure, "width", &width);
1219 (void) gst_structure_get_int(structure, "height", &height);
1220
1221 // Swap W×H for 90°/270° streams so QML AR is computed on display dimensions.
1222 gint orientation = 0;
1223 if (gst_structure_get_int(structure, "video-orientation", &orientation)
1224 && (orientation == GST_VIDEO_ORIENTATION_90R
1225 || orientation == GST_VIDEO_ORIENTATION_90L
1226 || orientation == GST_VIDEO_ORIENTATION_UL_LR
1227 || orientation == GST_VIDEO_ORIENTATION_UR_LL)) {
1228 videoSize.setWidth(height);
1229 videoSize.setHeight(width);
1230 } else {
1231 videoSize.setWidth(width);
1232 videoSize.setHeight(height);
1233 }
1234
1235 gst_clear_caps(&valveSrcPadCaps);
1236 gst_clear_object(&valveSrcPad);
1237 } while (false);
1238 _dispatchSignal([this, videoSize]() { emit videoSizeChanged(videoSize); });
1239
1240 gst_clear_caps(&caps);
1241 return true;
1242}
1243
1244void GstVideoReceiver::_noteTeeFrame()
1245{
1246 _lastSourceFrameTime = QDateTime::currentSecsSinceEpoch();
1247}
1248
1249void GstVideoReceiver::_noteVideoSinkFrame()
1250{
1251 _lastVideoFrameTime = QDateTime::currentSecsSinceEpoch();
1252 if (!_decoding) {
1253 _decoding = true;
1254 qCDebug(GstVideoReceiverLog) << "Decoding started";
1255 _dispatchSignal([this]() { emit decodingChanged(_decoding); });
1256 }
1257}
1258
1259void GstVideoReceiver::_noteEndOfStream()
1260{
1261 _endOfStream = true;
1262}
1263
1264bool GstVideoReceiver::_unlinkBranch(GstElement *from)
1265{
1266 GstPad *src = gst_element_get_static_pad(from, "src");
1267 if (!src) {
1268 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1269 return false;
1270 }
1271
1272 GstPad *sink = gst_pad_get_peer(src);
1273 if (!sink) {
1274 gst_clear_object(&src);
1275 qCCritical(GstVideoReceiverLog) << "gst_pad_get_peer() failed";
1276 return false;
1277 }
1278
1279 if (!gst_pad_unlink(src, sink)) {
1280 gst_clear_object(&src);
1281 gst_clear_object(&sink);
1282 qCCritical(GstVideoReceiverLog) << "gst_pad_unlink() failed";
1283 return false;
1284 }
1285
1286 gst_clear_object(&src);
1287
1288 // Send EOS at the beginning of the branch
1289 const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos());
1290
1291 gst_clear_object(&sink);
1292
1293 if (!ret) {
1294 qCCritical(GstVideoReceiverLog) << "Branch EOS was NOT sent";
1295 return false;
1296 }
1297
1298 qCDebug(GstVideoReceiverLog) << "Branch EOS was sent";
1299
1300 return true;
1301}
1302
1303void GstVideoReceiver::_shutdownDecodingBranch()
1304{
1305 if (_decoder) {
1306 GstObject *parent = gst_element_get_parent(_decoder);
1307 if (parent) {
1308 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1309 (void) gst_element_set_state(_decoder, GST_STATE_NULL);
1310 (void) gst_element_get_state(_decoder, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1311 gst_clear_object(&parent);
1312 }
1313
1314 gst_clear_object(&_decoder);
1315 }
1316
1317 if (_videoSinkProbeId != 0 && _videoSink) {
1318 GstPad *sinkpad = gst_element_get_static_pad(_videoSink, "sink");
1319 if (sinkpad) {
1320 gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
1321 gst_clear_object(&sinkpad);
1322 }
1323 }
1324 _videoSinkProbeId = 0;
1325
1326 if (_eosProbeId != 0 && _eosProbePad) {
1327 // Probe was installed on the source pad in _onNewSourcePad; remove from that exact pad — not from _decoder, which may already be cleared above.
1328 gst_pad_remove_probe(_eosProbePad, _eosProbeId);
1329 }
1330 _eosProbeId = 0;
1331 gst_clear_object(&_eosProbePad);
1332
1334
1335 if (_videoSink) {
1336 GstObject *parent = gst_element_get_parent(_videoSink);
1337 if (parent) {
1338 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1339 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1340 (void) gst_element_get_state(_videoSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1341 gst_clear_object(&parent);
1342 }
1343 gst_clear_object(&_videoSink);
1344 }
1345
1346 _removingDecoder = false;
1347
1348 if (_decoding) {
1349 _decoding = false;
1350 qCDebug(GstVideoReceiverLog) << "Decoding stopped";
1351 _dispatchSignal([this]() { emit decodingChanged(_decoding); });
1352 }
1353
1354 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped");
1355}
1356
1357void GstVideoReceiver::_shutdownRecordingBranch()
1358{
1359 if (_keyframeWatchId != 0 && _recorderValve) {
1360 GstPad *probepad = gst_element_get_static_pad(_recorderValve, "src");
1361 if (probepad) {
1362 gst_pad_remove_probe(probepad, _keyframeWatchId);
1363 gst_clear_object(&probepad);
1364 }
1365 _keyframeWatchId = 0;
1366 }
1367
1368 gst_bin_remove(GST_BIN(_pipeline), _fileSink);
1369 gst_element_set_state(_fileSink, GST_STATE_NULL);
1370 (void) gst_element_get_state(_fileSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1371 gst_clear_object(&_fileSink);
1372
1373 _removingRecorder = false;
1374
1375 if (_recording) {
1376 _recording = false;
1377 qCDebug(GstVideoReceiverLog) << "Recording stopped";
1378 _dispatchSignal([this]() { emit recordingChanged(_recording); });
1379 }
1380
1381 if (_recordingStopRequested) {
1382 _recordingStopRequested = false;
1383 _dispatchSignal([this]() { emit onStopRecordingComplete(STATUS_OK); });
1384 }
1385
1386 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped");
1387}
1388
1389bool GstVideoReceiver::_needDispatch()
1390{
1391 return _worker->needDispatch();
1392}
1393
1394void GstVideoReceiver::_dispatchSignal(Task emitter)
1395{
1396 _signalDepth += 1;
1397 emitter();
1398 _signalDepth -= 1;
1399}
1400
1401gboolean GstVideoReceiver::_onBusMessage(GstBus * /* bus */, GstMessage *msg, gpointer data)
1402{
1403 if (!msg || !data) {
1404 qCCritical(GstVideoReceiverLog) << "Invalid parameters in _onBusMessage: msg=" << msg << "data=" << data;
1405 return TRUE;
1406 }
1407
1408 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(data);
1409
1410 switch (GST_MESSAGE_TYPE(msg)) {
1411 case GST_MESSAGE_ERROR: {
1412 gchar *debug;
1413 GError *error;
1414 gst_message_parse_error(msg, &error, &debug);
1415
1416 if (debug) {
1417 qCDebug(GstVideoReceiverLog) << "GStreamer debug:" << debug;
1418 g_clear_pointer(&debug, g_free);
1419 }
1420
1421 if (error) {
1422 qCCritical(GstVideoReceiverLog) << "GStreamer error:" << error->message;
1423 g_clear_error(&error);
1424 }
1425
1426 if (pThis->_pipeline) {
1427 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pThis->_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-error");
1428 }
1429
1430#if defined(QGC_HAS_GST_GLMEMORY_GPU_PATH) || defined(QGC_HAS_GST_D3D11_GPU_PATH) || defined(QGC_HAS_GST_D3D12_GPU_PATH)
1431 // Drop bridge-cached devices defensively. D3D11/D3D12 errors are most often device-loss
1432 // (DXGI_ERROR_DEVICE_REMOVED on driver reset / TDR / GPU detach); GST_MESSAGE_ERROR
1433 // doesn't carry a structured device-lost code, so reset on any error rather than
1434 // letting the next pipeline restart re-use a potentially-dead cached device. Cost on
1435 // false positives is one device re-discovery on next prime — already paid on cold start.
1436 // Render-thread mapTextures calls currentDevice() with transfer-full ownership now,
1437 // so an in-flight mapTextures keeps its own ref alive across this reset.
1438 GstContextBridgeRegistry::resetAllBridges();
1439#endif
1440
1441 pThis->_worker->dispatch([pThis]() {
1442 qCDebug(GstVideoReceiverLog) << "Stopping because of error";
1443 pThis->stop();
1444 });
1445 break;
1446 }
1447 case GST_MESSAGE_WARNING: {
1448 // GStreamer posts WARNING for caps mismatches, decoder fallbacks, clock drift —
1449 // surfacing keeps these visible without escalating to STATUS_FAIL.
1450 gchar *debug = nullptr;
1451 GError *error = nullptr;
1452 gst_message_parse_warning(msg, &error, &debug);
1453 qCWarning(GstVideoReceiverLog) << "GStreamer warning:"
1454 << (error ? error->message : "(no message)")
1455 << "debug:" << (debug ? debug : "(none)");
1456 g_clear_error(&error);
1457 g_clear_pointer(&debug, g_free);
1458 break;
1459 }
1460 case GST_MESSAGE_EOS:
1461 pThis->_worker->dispatch([pThis]() {
1462 qCDebug(GstVideoReceiverLog) << "Received EOS";
1463 pThis->_handleEOS();
1464 });
1465 break;
1466 case GST_MESSAGE_STREAM_COLLECTION: {
1467 GstStreamCollection *collection = nullptr;
1468 gst_message_parse_stream_collection(msg, &collection);
1469 if (!collection) {
1470 break;
1471 }
1472 // SELECT_STREAMS keeps decodebin3 from instantiating audio decoder branches.
1473 GList *selectedIds = nullptr;
1474 const guint nStreams = gst_stream_collection_get_size(collection);
1475 for (guint i = 0; i < nStreams; ++i) {
1476 GstStream *stream = gst_stream_collection_get_stream(collection, i);
1477 const GstStreamType type = gst_stream_get_stream_type(stream);
1478 if (type & GST_STREAM_TYPE_VIDEO) {
1479 selectedIds = g_list_append(selectedIds,
1480 g_strdup(gst_stream_get_stream_id(stream)));
1481 }
1482 }
1483 if (selectedIds) {
1484 GstEvent *event = gst_event_new_select_streams(selectedIds);
1485 gst_element_send_event(GST_ELEMENT(GST_MESSAGE_SRC(msg)), event);
1486 g_list_free_full(selectedIds, g_free);
1487 }
1488 gst_object_unref(collection);
1489 break;
1490 }
1491 case GST_MESSAGE_QOS: {
1492 guint64 processed = 0, dropped = 0;
1493 gst_message_parse_qos_stats(msg, nullptr, &processed, &dropped);
1494
1495 gint64 jitter = 0;
1496 gdouble proportion = 0;
1497 gint quality = 0;
1498 gst_message_parse_qos_values(msg, &jitter, &proportion, &quality);
1499
1500 pThis->_processedFrames = processed;
1501 pThis->_droppedFrames = dropped;
1502 pThis->_currentJitterNs = jitter;
1503 pThis->_qosProportion = proportion;
1504 pThis->_qosQuality = quality;
1505 pThis->_dispatchSignal([pThis]() { emit pThis->decoderStatsChanged(); });
1506 break;
1507 }
1508 case GST_MESSAGE_ELEMENT: {
1509 const GstStructure *structure = gst_message_get_structure(msg);
1510 if (!gst_structure_has_name(structure, "GstBinForwarded")) {
1511 break;
1512 }
1513
1514 GstMessage *forward_msg = nullptr;
1515 gst_structure_get(structure, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
1516 if (!forward_msg) {
1517 break;
1518 }
1519
1520 if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1521 pThis->_worker->dispatch([pThis]() {
1522 qCDebug(GstVideoReceiverLog) << "Received branch EOS";
1523 pThis->_handleEOS();
1524 });
1525 }
1526
1527 gst_clear_message(&forward_msg);
1528 break;
1529 }
1530 case GST_MESSAGE_STATE_CHANGED: {
1531 if (GST_MESSAGE_SRC(msg) != GST_OBJECT(pThis->_pipeline)) {
1532 break;
1533 }
1534 GstState oldState = GST_STATE_NULL, newState = GST_STATE_NULL;
1535 gst_message_parse_state_changed(msg, &oldState, &newState, nullptr);
1536 if (newState == GST_STATE_PLAYING && oldState != GST_STATE_PLAYING) {
1537 GstClockTime min = 0, max = 0;
1538 GstQuery *q = gst_query_new_latency();
1539 if (gst_element_query(pThis->_pipeline, q)) {
1540 gboolean live = FALSE;
1541 gst_query_parse_latency(q, &live, &min, &max);
1542 }
1543 gst_query_unref(q);
1544 qCDebug(GstVideoReceiverLog).noquote()
1545 << "Pipeline PLAYING:" << pThis->_uri
1546 << "decoder:" << (pThis->_decoderName.isEmpty() ? QStringLiteral("(pending)") : pThis->_decoderName)
1547 << "min-latency:" << (min / 1000000) << "ms"
1548 << "max-latency:" << (max / 1000000) << "ms";
1549 }
1550 break;
1551 }
1552 case GST_MESSAGE_LATENCY:
1553 pThis->_worker->dispatch([pThis]() {
1554 if (pThis->_pipeline) {
1555 (void) gst_bin_recalculate_latency(GST_BIN(pThis->_pipeline));
1556 }
1557 });
1558 pThis->_dispatchSignal([pThis]() { emit pThis->latencyChanged(); });
1559 break;
1560 default:
1561 break;
1562 }
1563
1564 return TRUE;
1565}
1566
1567void GstVideoReceiver::_onNewPad(GstElement *element, GstPad *pad, gpointer data)
1568{
1569 GstVideoReceiver *self = static_cast<GstVideoReceiver*>(data);
1570
1571 if (element == self->_source) {
1572 self->_onNewSourcePad(pad);
1573 } else if (element == self->_decoder) {
1574 self->_onNewDecoderPad(pad);
1575 } else {
1576 qCDebug(GstVideoReceiverLog) << "Unexpected call!";
1577 }
1578}
1579
1580void GstVideoReceiver::_wrapWithGhostPad(GstElement *element, GstPad *pad, gpointer data)
1581{
1582 Q_UNUSED(data)
1583
1584 gchar *name = gst_pad_get_name(pad);
1585 if (!name) {
1586 qCCritical(GstVideoReceiverLog) << "gst_pad_get_name() failed";
1587 return;
1588 }
1589
1590 GstPad *ghostpad = gst_ghost_pad_new(name, pad);
1591 if (!ghostpad) {
1592 qCCritical(GstVideoReceiverLog) << "gst_ghost_pad_new() failed";
1593 g_clear_pointer(&name, g_free);
1594 return;
1595 }
1596
1597 g_clear_pointer(&name, g_free);
1598
1599 (void) gst_pad_set_active(ghostpad, TRUE);
1600
1601 if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) {
1602 qCCritical(GstVideoReceiverLog) << "gst_element_add_pad() failed";
1603 }
1604}
1605
1606void GstVideoReceiver::_linkPad(GstElement *element, GstPad *pad, gpointer data)
1607{
1608 gchar *name = gst_pad_get_name(pad);
1609 if (!name) {
1610 qCCritical(GstVideoReceiverLog) << "gst_pad_get_name() failed";
1611 return;
1612 }
1613
1614 if (!gst_element_link_pads(element, name, GST_ELEMENT(data), "sink")) {
1615 qCCritical(GstVideoReceiverLog) << "gst_element_link_pads() failed";
1616 }
1617
1618 g_clear_pointer(&name, g_free);
1619}
1620
1621gboolean GstVideoReceiver::_padProbe(GstElement *element, GstPad *pad, gpointer user_data)
1622{
1623 Q_UNUSED(element)
1624
1625 int *probeRes = static_cast<int*>(user_data);
1626 *probeRes |= 1;
1627
1628 GstCaps *filter = gst_caps_from_string("application/x-rtp");
1629 if (filter) {
1630 GstCaps *caps = gst_pad_query_caps(pad, nullptr);
1631 if (caps) {
1632 if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) {
1633 *probeRes |= 2;
1634 }
1635
1636 gst_clear_caps(&caps);
1637 }
1638
1639 gst_clear_caps(&filter);
1640 }
1641
1642 return TRUE;
1643}
1644
1645GstPadProbeReturn GstVideoReceiver::_teeProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1646{
1647 Q_UNUSED(pad); Q_UNUSED(info)
1648
1649 if (user_data) {
1650 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1651 pThis->_noteTeeFrame();
1652 }
1653
1654 return GST_PAD_PROBE_OK;
1655}
1656
1657GstPadProbeReturn GstVideoReceiver::_videoSinkProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1658{
1659 Q_UNUSED(pad); Q_UNUSED(info)
1660
1661 if (user_data) {
1662 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1663
1664 if (pThis->_resetVideoSink) {
1665 pThis->_resetVideoSink = false;
1666
1667#if 0 // FIXME: this makes MPEG2-TS playing smooth but breaks RTSP
1668 gst_pad_send_event(pad, gst_event_new_flush_start());
1669 gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));
1670
1671 GstBuffer* buf;
1672
1673 if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) {
1674 GstSegment* seg;
1675
1676 if ((seg = gst_segment_new()) != nullptr) {
1677 gst_segment_init(seg, GST_FORMAT_TIME);
1678
1679 seg->start = buf->pts;
1680
1681 gst_pad_send_event(pad, gst_event_new_segment(seg));
1682
1683 gst_segment_free(seg);
1684 seg = nullptr;
1685 }
1686
1687 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1688 }
1689#endif
1690 }
1691
1692 pThis->_noteVideoSinkFrame();
1693 }
1694
1695 return GST_PAD_PROBE_OK;
1696}
1697
1698GstPadProbeReturn GstVideoReceiver::_eosProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1699{
1700 Q_UNUSED(pad);
1701 Q_ASSERT(user_data);
1702
1703 if (info) {
1704 const GstEvent *event = gst_pad_probe_info_get_event(info);
1705 if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1706 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1707 pThis->_noteEndOfStream();
1708 }
1709 }
1710
1711 return GST_PAD_PROBE_OK;
1712}
1713
1714GstPadProbeReturn GstVideoReceiver::_keyframeWatch(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1715{
1716 if (!info || !user_data) {
1717 qCCritical(GstVideoReceiverLog) << "Invalid arguments";
1718 return GST_PAD_PROBE_DROP;
1719 }
1720
1721 GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
1722 if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
1723 // wait for a keyframe
1724 return GST_PAD_PROBE_DROP;
1725 }
1726
1727 // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream!
1728 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1729
1730 qCDebug(GstVideoReceiverLog) << "Got keyframe, stop dropping buffers";
1731
1732 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1733 pThis->_dispatchSignal([pThis]() { emit pThis->recordingStarted(pThis->recordingOutput()); });
1734
1735 return GST_PAD_PROBE_REMOVE;
1736}
1737
1739 : QThread(parent)
1740{
1741 qCDebug(GstVideoReceiverLog) << this;
1742}
1743
1745{
1746 qCDebug(GstVideoReceiverLog) << this;
1747}
1748
1750{
1751 return (QThread::currentThread() != this);
1752}
1753
1755{
1756 QMutexLocker lock(&_taskQueueSync);
1757 _taskQueue.enqueue(task);
1758 _taskQueueUpdate.wakeOne();
1759}
1760
1762{
1763 if (needDispatch()) {
1764 dispatch([this]() { _shutdown = true; });
1765 (void) QThread::wait(2000);
1766 } else {
1767 QThread::quit();
1768 }
1769}
1770
1771void GstVideoWorker::run()
1772{
1773 while (!_shutdown) {
1774 _taskQueueSync.lock();
1775
1776 while (_taskQueue.isEmpty()) {
1777 _taskQueueUpdate.wait(&_taskQueueSync);
1778 }
1779
1780 const Task task = _taskQueue.dequeue();
1781
1782 _taskQueueSync.unlock();
1783
1784 task();
1785 }
1786}
std::function< void()> Task
struct _GstElement GstElement
Error error
#define QGC_LOGGING_CATEGORY(name, categoryStr)
void stopDecoding() override
void takeScreenshot(const QString &imageFile) override
void decoderStatsChanged()
GstVideoReceiver(QObject *parent=nullptr)
void start(uint32_t timeout) override
void stopRecording() override
void startRecording(const QString &videoFile, FILE_FORMAT format) override
void stop() override
void startDecoding(void *sink) override
void dispatch(Task task)
bool needDispatch() const
GstVideoWorker(QObject *parent=nullptr)
void recordingStarted(const QString &filename)
QTimer _watchdogTimer
bool lowLatency() const
void videoSizeChanged(QSize size)
void streamingChanged(bool active)
qint64 _lastSourceFrameTime
uint32_t _timeout
qint64 _lastVideoFrameTime
QString uri() const
QString _recordingOutput
void recordingChanged(bool active)
void onStartRecordingComplete(STATUS status)
void onTakeScreenshotComplete(STATUS status)
void decodingChanged(bool active)
void onStartComplete(STATUS status)
static bool isValidFileFormat(FILE_FORMAT format)
QString name() const
void onStopDecodingComplete(STATUS status)
void onStopRecordingComplete(STATUS status)
QString recordingOutput() const
void onStartDecodingComplete(STATUS status)
QQuickItem * _widget
void onStopComplete(STATUS status)
uint32_t _signalDepth
bool isHardwareDecoderFactory(GstElementFactory *factory)
gboolean isValidRtspUri(const gchar *uri_str)
QByteArray format(const QList< LogEntry > &entries, int fmt)