QGroundControl
Ground Control Station for MAVLink Drones
Loading...
Searching...
No Matches
GstVideoReceiver.cc
Go to the documentation of this file.
1//-----------------------------------------------------------------------------
2// Our pipeline look like this:
3//
4// +-->queue-->_decoderValve[-->_decoder-->_videoSink]
5// |
6// _source-->_tee
7// |
8// +-->queue-->_recorderValve[-->_fileSink]
9//-----------------------------------------------------------------------------
10
11#include "GstVideoReceiver.h"
12
14
15#include "GStreamerHelpers.h"
16#include "GstSourceFactory.h"
17#include "QGCLoggingCategory.h"
19
20#include <QtCore/QDateTime>
21#include <QtCore/QMutexLocker>
22#include <QtCore/QUrl>
23#include <QtQuick/QQuickItem>
24
25#include <algorithm>
26
27#include <gst/gst.h>
28#include <gst/video/video.h>
29
30QGC_LOGGING_CATEGORY(GstVideoReceiverLog, "Video.GStreamer.GstVideoReceiver")
31
32namespace {
33// kEosTimeoutNs: bus wait budget for EOS/ERROR during stop(); 3 s covers slow hw decoders.
34constexpr GstClockTime kEosTimeoutNs = 3 * GST_SECOND;
35
36// Refs the element's first src pad into *userData and stops iterating. Resync is handled
37// internally by gst_element_foreach_src_pad (unlike a bare gst_iterator_next loop).
38gboolean grabFirstSrcPad(GstElement * /*element*/, GstPad *pad, gpointer userData)
39{
40 *static_cast<GstPad **>(userData) = GST_PAD(gst_object_ref(pad));
41 return FALSE;
42}
43
44bool isRecoverableH265PaciError(GstMessage *msg, const GError *error, const gchar *debug)
45{
46 if (!msg || !error || (error->domain != GST_STREAM_ERROR) || (error->code != GST_STREAM_ERROR_FORMAT) ||
47 !debug || !g_strrstr(debug, "NAL unit type 50 not supported yet")) {
48 return false;
49 }
50
51 GstObject *src = GST_MESSAGE_SRC(msg);
52 if (!src || !GST_IS_ELEMENT(src)) {
53 return false;
54 }
55
56 GstElementFactory *factory = gst_element_get_factory(GST_ELEMENT(src));
57 return factory && (g_strcmp0(GST_OBJECT_NAME(factory), "rtph265depay") == 0);
58}
59
60} // namespace
61
63 : VideoReceiver(parent)
64 , _worker(new GstVideoWorker(this))
65{
66 qCDebug(GstVideoReceiverLog) << this;
67
68 _worker->start();
69 (void) connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog);
70}
71
73{
74 stop();
75 _worker->shutdown();
76
77 qCDebug(GstVideoReceiverLog) << this;
78}
79
80void GstVideoReceiver::start(uint32_t timeout)
81{
82 if (_needDispatch()) {
83 _worker->dispatch([this, timeout]() { start(timeout); });
84 return;
85 }
86
87 if (_pipeline) {
88 qCDebug(GstVideoReceiverLog) << "Already running!" << _uri;
90 return;
91 }
92
93 if (_uri.isEmpty()) {
94 qCDebug(GstVideoReceiverLog) << "Failed because URI is not specified";
96 return;
97 }
98
100 _buffer = lowLatency() ? -1 : 0;
101
102 qCDebug(GstVideoReceiverLog) << "Starting" << _uri << ", lowLatency" << lowLatency() << ", timeout" << _timeout;
103
104 // GST_DEBUG_BIN_TO_DOT_FILE is a no-op unless GST_DEBUG_DUMP_DOT_DIR is set; surface that
105 // once per process so field debugging doesn't require re-reading the source.
106 [[maybe_unused]] static const bool dotDirHinted = []() {
107 if (qgetenv("GST_DEBUG_DUMP_DOT_DIR").isEmpty()) {
108 qCInfo(GstVideoReceiverLog).noquote()
109 << "Pipeline dot-graph dumps are disabled. Set GST_DEBUG_DUMP_DOT_DIR=/path/to/dir to enable.";
110 }
111 return true;
112 }();
113
114 _endOfStream = false;
115
116 bool running = false;
117 bool pipelineUp = false;
118
119 GstElement *decoderQueue = nullptr;
120 GstElement *recorderQueue = nullptr;
121
122 do {
123 _tee = gst_element_factory_make("tee", nullptr);
124 if (!_tee) {
125 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tee') failed";
126 break;
127 }
128
129 GstPad *pad = gst_element_get_static_pad(_tee, "sink");
130 if (!pad) {
131 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
132 break;
133 }
134
136
137 _teeProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr);
138 gst_clear_object(&pad);
139 if (_teeProbeId == 0) {
140 // _teeProbe updates _lastSourceFrameTime; without it the watchdog timer fires spuriously instead of reporting a real failure.
141 qCCritical(GstVideoReceiverLog) << "gst_pad_add_probe(_teeProbe) failed";
142 break;
143 }
144
145 decoderQueue = gst_element_factory_make("queue", nullptr);
146 if (!decoderQueue) {
147 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
148 break;
149 }
150
151 // leaky=downstream (2) + tiny depth: the live-display branch must drop the oldest
152 // buffer on backpressure, not stall the streaming thread. Recording branch (below)
153 // keeps default non-leaky semantics so every frame reaches the muxer.
154 g_object_set(decoderQueue,
155 "leaky", 2,
156 "max-size-buffers", 2,
157 "max-size-bytes", 0,
158 "max-size-time", G_GUINT64_CONSTANT(0),
159 nullptr);
160
161 _decoderValve = gst_element_factory_make("valve", nullptr);
162 if (!_decoderValve) {
163 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
164 break;
165 }
166
167 g_object_set(_decoderValve,
168 "drop", TRUE,
169 nullptr);
170
171 recorderQueue = gst_element_factory_make("queue", nullptr);
172 if (!recorderQueue) {
173 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
174 break;
175 }
176
177 _recorderValve = gst_element_factory_make("valve", nullptr);
178 if (!_recorderValve) {
179 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
180 break;
181 }
182
183 g_object_set(_recorderValve,
184 "drop", TRUE,
185 nullptr);
186
187 _pipeline = gst_pipeline_new("receiver");
188 if (!_pipeline) {
189 qCCritical(GstVideoReceiverLog) << "gst_pipeline_new() failed";
190 break;
191 }
192
193 g_object_set(_pipeline,
194 "message-forward", TRUE,
195 nullptr);
196
198 sourceConfig.jitterBuffer = (_buffer < 0)
200 : (_buffer == 0
203 sourceConfig.latencyMs = _rtpJitterLatencyMs;
204 // do-retransmission needs ≥40 ms latency headroom over the default 20 ms rtx-delay;
205 // forcibly disable for sub-frame latency configurations to avoid retransmit storms.
207 _source = GStreamer::SourceFactory::create(_uri, sourceConfig);
208 if (!_source) {
209 qCCritical(GstVideoReceiverLog) << "SourceFactory::create() failed";
210 break;
211 }
212
213 gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr);
214
215 pipelineUp = true;
216
217 GstPad *srcPad = nullptr;
218 (void) gst_element_foreach_src_pad(_source, grabFirstSrcPad, &srcPad);
219
220 if (srcPad) {
221 _onNewSourcePad(srcPad);
222 gst_clear_object(&srcPad);
223 } else {
224 (void) g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this);
225 }
226
227 if (!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) {
228 qCCritical(GstVideoReceiverLog) << "Unable to link decoder queue";
229 break;
230 }
231
232 if (!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) {
233 qCCritical(GstVideoReceiverLog) << "Unable to link recorder queue";
234 break;
235 }
236
237 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
238 if (bus) {
239 gst_bus_enable_sync_message_emission(bus);
240 (void) g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
241 // HwBuffers facade chains every compiled context bridge so they don't clobber each
242 // other via gst_bus_set_sync_handler. Must run before GST_STATE_PLAYING — upstream
243 // queries context during PAUSED→PLAYING. No-op when no bridge-using GPU path is compiled.
244 gst_bus_set_sync_handler(bus, HwBuffers::onBusSyncMessage, nullptr, nullptr);
245 gst_clear_object(&bus);
246 }
247
248 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial");
249 running = (gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
250 } while(0);
251
252 if (!running) {
253 qCCritical(GstVideoReceiverLog) << "Failed";
254
255 if (_pipeline) {
256 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
257 (void) gst_element_get_state(_pipeline, nullptr, nullptr, GST_CLOCK_TIME_NONE);
258 gst_clear_object(&_pipeline);
259 }
260
261 if (!pipelineUp) {
262 gst_clear_object(&_recorderValve);
263 gst_clear_object(&recorderQueue);
264 gst_clear_object(&_decoderValve);
265 gst_clear_object(&decoderQueue);
266 gst_clear_object(&_tee);
267 gst_clear_object(&_source);
268 }
269
271 } else {
272 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started");
273 qCDebug(GstVideoReceiverLog) << "Started" << _uri;
274
275 // _watchdogTimer lives on `this` (GUI thread); the emit runs synchronously on the
276 // worker thread, so the timer start has to be queued separately or QObject warns.
277 QMetaObject::invokeMethod(this, [this]() { _watchdogTimer.start(1000); }, Qt::QueuedConnection);
279 }
280}
281
283{
284 if (_needDispatch()) {
285 _worker->dispatch([this]() { stop(); });
286 return;
287 }
288
289 if (_uri.isEmpty()) {
290 qCDebug(GstVideoReceiverLog) << "Stop called on empty URI (no-op)";
291 return;
292 }
293
294 qCDebug(GstVideoReceiverLog) << "Stopping" << _uri;
295
296 // Bump the epoch synchronously (atomic — no GUI thread needed) so any in-flight reconnect lambda
297 // is superseded before this stop() returns; cross-callsite QueuedConnection FIFO is not guaranteed.
298 _reconnectEpoch.fetch_add(1, std::memory_order_relaxed);
299 // Only _watchdogTimer.stop() must run on the GUI thread (the timer lives on `this`).
300 QMetaObject::invokeMethod(this, [this]() { _watchdogTimer.stop(); }, Qt::QueuedConnection);
301
302 if (_teeProbeId != 0) {
303 if (_tee) {
304 GstPad *sinkpad = gst_element_get_static_pad(_tee, "sink");
305 if (sinkpad) {
306 gst_pad_remove_probe(sinkpad, _teeProbeId);
307 gst_clear_object(&sinkpad);
308 }
309 }
310 _teeProbeId = 0;
311 }
312
313 if (_pipeline) {
314 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
315 if (bus) {
316 gst_bus_disable_sync_message_emission(bus);
317 (void) g_signal_handlers_disconnect_by_data(bus, this);
318
319 gboolean recordingValveClosed = TRUE;
320 g_object_get(_recorderValve, "drop", &recordingValveClosed, nullptr);
321
322 if (!recordingValveClosed) {
323 (void) gst_element_send_event(_pipeline, gst_event_new_eos());
324
325 // Wait for splitmuxsink to actually finalize its current fragment. async-finalize
326 // pushes muxer teardown off the streaming thread; the splitmuxsink-fragment-closed
327 // element message is posted (via message-forward=TRUE) exactly when the muxer's
328 // state has gone NULL. EOS is the fallback for older builds / unexpected paths;
329 // ERROR breaks out so we don't burn the full budget on a known failure. Track
330 // elapsed time so unrelated ELEMENT messages don't abort the wait early.
331 const GstClockTime deadline = kEosTimeoutNs;
332 const qint64 startMs = QDateTime::currentMSecsSinceEpoch();
333 bool finalized = false;
334 for (;;) {
335 const qint64 elapsedNs = (QDateTime::currentMSecsSinceEpoch() - startMs)
336 * qint64(GST_MSECOND);
337 if (elapsedNs >= qint64(deadline)) break;
338 const GstClockTime remaining = GstClockTime(qint64(deadline) - elapsedNs);
339 GstMessage *msg = gst_bus_timed_pop_filtered(bus, remaining,
340 (GstMessageType)(GST_MESSAGE_EOS | GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT));
341 if (!msg) break;
342 switch (GST_MESSAGE_TYPE(msg)) {
343 case GST_MESSAGE_ELEMENT: {
344 const GstStructure *s = gst_message_get_structure(msg);
345 if (s && gst_structure_has_name(s, "splitmuxsink-fragment-closed")) {
346 qCDebug(GstVideoReceiverLog) << "splitmuxsink fragment finalized";
347 finalized = true;
348 }
349 break;
350 }
351 case GST_MESSAGE_EOS:
352 qCDebug(GstVideoReceiverLog) << "End of stream received (fallback path)";
353 finalized = true;
354 break;
355 case GST_MESSAGE_ERROR:
356 qCCritical(GstVideoReceiverLog) << "Error stopping pipeline!";
357 finalized = true;
358 break;
359 default:
360 break;
361 }
362 gst_clear_message(&msg);
363 if (finalized) break;
364 }
365 if (!finalized) {
366 qCWarning(GstVideoReceiverLog) << "splitmuxsink finalize signal not received within"
367 << (kEosTimeoutNs / GST_MSECOND)
368 << "ms — forcing pipeline NULL (recording may be truncated; "
369 << "faststart + reserved-moov-update-period keep the file playable)";
370 }
371 }
372
373 gst_clear_object(&bus);
374 } else {
375 qCCritical(GstVideoReceiverLog) << "gst_pipeline_get_bus() failed";
376 }
377
378 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
379 (void) gst_element_get_state(_pipeline, nullptr, nullptr, GST_CLOCK_TIME_NONE);
380
381 // FIXME: check if branch is connected and remove all elements from branch
382 if (_fileSink) {
383 _shutdownRecordingBranch();
384 }
385
386 if (_videoSink) {
387 _shutdownDecodingBranch();
388 }
389
390 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped");
391
392 // Lock before nulling so an in-flight _onBusMessage on the streaming thread cannot read
393 // a half-destroyed _pipeline. _acquirePipelineRef takes its own ref under the same lock.
394 {
395 QMutexLocker lock(&_pipelineMutex);
396 gst_clear_object(&_pipeline);
397 _pipeline = nullptr;
398 }
399
400 _recorderValve = nullptr;
401 _decoderValve = nullptr;
402 _tee = nullptr;
403 _source = nullptr;
404
406
407 if (_streaming) {
408 _streaming = false;
409 qCDebug(GstVideoReceiverLog) << "Streaming stopped" << _uri;
411 } else {
412 qCDebug(GstVideoReceiverLog) << "Streaming did not start" << _uri;
413 }
414 }
415
416 qCDebug(GstVideoReceiverLog) << "Stopped" << _uri;
417
418 if (const HwBuffers::PathStats hwStats = HwBuffers::formatPathStats(true); hwStats.totalDelivered > 0) {
419 qCInfo(GstVideoReceiverLog).noquote()
420 << "HW path stats" << _uri << hwStats.line + HwBuffers::takeExtraPathStats();
421 }
422
424}
425
427{
428 if (!sink) {
429 qCCritical(GstVideoReceiverLog) << "VideoSink is NULL" << _uri;
430 return;
431 }
432
433 if (_needDispatch()) {
434 _worker->dispatch([this, sink]() mutable { startDecoding(sink); });
435 return;
436 }
437
438 qCDebug(GstVideoReceiverLog) << "Starting decoding" << _uri;
439
440 if (!_widget) {
441 qCDebug(GstVideoReceiverLog) << "Video Widget is NULL" << _uri;
443 return;
444 }
445
446 if (!_pipeline) {
447 gst_clear_object(&_videoSink);
448 }
449
450 if (_videoSink || _decoding) {
451 qCDebug(GstVideoReceiverLog) << "Already decoding!" << _uri;
453 return;
454 }
455
456 GstElement *videoSink = GST_ELEMENT(sink);
457 GstPad *pad = gst_element_get_static_pad(videoSink, "sink");
458 if (!pad) {
459 qCCritical(GstVideoReceiverLog) << "Unable to find sink pad of video sink" << _uri;
461 return;
462 }
463
465 _resetVideoSink = true;
466
467 _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr);
468 gst_clear_object(&pad);
469
470 _videoSink = videoSink;
471 gst_object_ref(_videoSink);
472
473 _removingDecoder = false;
474
475 if (!_streaming) {
477 return;
478 }
479
480 _ensureVideoSinkInPipeline();
481
482 if (!_addDecoder(_decoderValve)) {
483 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed" << _uri;
484 _shutdownDecodingBranch();
486 return;
487 }
488
489 g_object_set(_decoderValve,
490 "drop", FALSE,
491 nullptr);
492
493 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
494
496}
497
499{
500 if (_needDispatch()) {
501 _worker->dispatch([this]() { stopDecoding(); });
502 return;
503 }
504
505 qCDebug(GstVideoReceiverLog) << "Stopping decoding" << _uri;
506
507 // Gate on _videoSink (set by startDecoding) instead of _decoding (which only flips on
508 // first sink-buffer probe). Without this, stopDecoding() called between
509 // onStartDecodingComplete(OK) and the first frame returns STATUS_INVALID_STATE and
510 // leaves the decoder/sink branch live.
511 if (!_pipeline || !_videoSink) {
512 qCDebug(GstVideoReceiverLog) << "Not decoding!" << _uri;
514 return;
515 }
516
517 g_object_set(_decoderValve,
518 "drop", TRUE,
519 nullptr);
520
521 _removingDecoder = true;
522
523 const bool ret = _unlinkBranch(_decoderValve);
524
525 // FIXME: it is much better to emit onStopDecodingComplete() after decoding is really stopped
526 // (which happens later due to async design) but as for now it is also not so bad...
528}
529
530void GstVideoReceiver::startRecording(const QString &videoFile, FILE_FORMAT format)
531{
532 if (_needDispatch()) {
533 const QString cachedVideoFile = videoFile;
534 _worker->dispatch([this, cachedVideoFile, format]() { startRecording(cachedVideoFile, format); });
535 return;
536 }
537
538 qCDebug(GstVideoReceiverLog) << "Starting recording" << _uri;
539
540 if (!_pipeline) {
541 qCDebug(GstVideoReceiverLog) << "Streaming is not active!" << _uri;
543 return;
544 }
545
546 if (_recording) {
547 qCDebug(GstVideoReceiverLog) << "Already recording!" << _uri;
549 return;
550 }
551
552 qCDebug(GstVideoReceiverLog) << "New video file:" << videoFile << _uri;
553
554 _fileSink = _makeFileSink(videoFile, format);
555 if (!_fileSink) {
556 qCCritical(GstVideoReceiverLog) << "_makeFileSink() failed" << _uri;
558 return;
559 }
560
561 _removingRecorder = false;
562
563 (void) gst_object_ref(_fileSink);
564
565 gst_bin_add(GST_BIN(_pipeline), _fileSink);
566
567 if (!gst_element_link(_recorderValve, _fileSink)) {
568 qCCritical(GstVideoReceiverLog) << "Failed to link valve and file sink" << _uri;
570 return;
571 }
572
573 (void) gst_element_sync_state_with_parent(_fileSink);
574
575 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink");
576
577 // Install a probe on the recording branch to drop buffers until we hit our first keyframe
578 // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
579 // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback
580 GstPad *probepad = gst_element_get_static_pad(_recorderValve, "src");
581 if (!probepad) {
582 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed" << _uri;
584 return;
585 }
586
587 _keyframeWatchId = gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr);
588 gst_clear_object(&probepad);
589
590 g_object_set(_recorderValve,
591 "drop", FALSE,
592 nullptr);
593
594 _recordingOutput = videoFile;
595 _recording = true;
596 qCDebug(GstVideoReceiverLog) << "Recording started" << _uri;
599}
600
602{
603 if (_needDispatch()) {
604 _worker->dispatch([this]() { stopRecording(); });
605 return;
606 }
607
608 qCDebug(GstVideoReceiverLog) << "Stopping recording" << _uri;
609
610 if (!_pipeline || !_recording) {
611 qCDebug(GstVideoReceiverLog) << "Not recording!" << _uri;
613 return;
614 }
615
616 g_object_set(_recorderValve,
617 "drop", TRUE,
618 nullptr);
619
620 _removingRecorder = true;
621
622 if (!_unlinkBranch(_recorderValve)) {
623 _removingRecorder = false;
625 return;
626 }
627
628 // EOS event propagates valve→mux→filesink; _shutdownRecordingBranch emits the
629 // complete signal once the muxer index is written and the file is closed.
630 _recordingStopRequested = true;
631}
632
633void GstVideoReceiver::takeScreenshot(const QString &imageFile)
634{
635 if (_needDispatch()) {
636 const QString cachedImageFile = imageFile;
637 _worker->dispatch([this, cachedImageFile]() { takeScreenshot(cachedImageFile); });
638 return;
639 }
640
641 qCDebug(GstVideoReceiverLog) << "taking screenshot" << _uri;
642
643 // FIXME: record screenshot here
645}
646
647void GstVideoReceiver::_watchdog()
648{
649 _worker->dispatch([this]() {
650 if (!_pipeline) {
651 return;
652 }
653
654 const qint64 now = QDateTime::currentSecsSinceEpoch();
655 qint64 lastSourceFrameTime = _lastSourceFrameTime.load(std::memory_order_relaxed);
656 if (lastSourceFrameTime == 0) {
657 lastSourceFrameTime = now;
658 _lastSourceFrameTime.store(now, std::memory_order_relaxed);
659 }
660
661 if (++_statsTickCounter >= 10) {
663 if (const HwBuffers::PathStats hwStats = HwBuffers::formatPathStats(false); hwStats.totalDelivered > 0) {
664 qCDebug(GstVideoReceiverLog).noquote() << "HW path live" << _uri << hwStats.line;
665 }
666 }
667
668 // Drain QoS updates accumulated since the last tick (see GST_MESSAGE_QOS).
669 if (_qosStatsDirty.exchange(false, std::memory_order_acq_rel)) {
670 emit decoderStatsChanged();
671 }
672
673 qint64 elapsed = now - lastSourceFrameTime;
674 if (elapsed > _timeout) {
675 qCDebug(GstVideoReceiverLog) << "Stream timeout, no frames for" << elapsed << _uri;
676 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-watchdog-timeout");
677 emit timeout();
678 _scheduleReconnect("source watchdog");
679 return;
680 }
681
682 if (_decoding && !_removingDecoder) {
683 qint64 lastVideoFrameTime = _lastVideoFrameTime.load(std::memory_order_relaxed);
684 if (lastVideoFrameTime == 0) {
685 lastVideoFrameTime = now;
686 _lastVideoFrameTime.store(now, std::memory_order_relaxed);
687 }
688
689 elapsed = now - lastVideoFrameTime;
690 if (elapsed > (_timeout * 2)) {
691 qCDebug(GstVideoReceiverLog) << "Video decoder timeout, no frames for" << elapsed << _uri;
692 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-watchdog-timeout");
693 emit timeout();
694 _scheduleReconnect("decoder watchdog");
695 }
696 }
697 });
698}
699
700void GstVideoReceiver::_scheduleReconnect(const char *reason)
701{
702 // Always tear down — even when autoReconnect is off we still want a clean stop.
703 // stop() bumps _reconnectEpoch, so any prior singleShot lambda becomes a no-op.
704 stop();
705
706 if (!_autoReconnect) {
707 qCDebug(GstVideoReceiverLog) << "Auto-reconnect disabled — not retrying after" << reason;
708 return;
709 }
710
711 if (_uri.isEmpty()) {
712 return;
713 }
714
715 // Snapshot on the worker thread — where start() last wrote _timeout and where _uri reads
716 // are already sequenced — so the GUI-thread lambdas below don't read racy members.
717 const uint32_t reconnectTimeout = (_timeout != 0) ? _timeout : 8;
718 const QString uri = _uri;
719
720 // Schedule on the GUI thread (QTimer::singleShot requires its receiver's thread). Worker
721 // is the only caller today, but route through invokeMethod so a future direct GUI-thread
722 // call (e.g. user-initiated retry) stays correct.
723 QMetaObject::invokeMethod(this, [this, reason, reconnectTimeout, uri]() {
724 const int next = std::min(_reconnectAttempts.load(std::memory_order_relaxed) + 1, 30);
725 _reconnectAttempts.store(next, std::memory_order_relaxed);
726 // 1s → 2s → 4s → 8s → 16s, capped at 30s. Capping bounds worst-case "vehicle in flight,
727 // RF down for 5 min" recovery; lower than typical RTSP server keepalive (60s).
728 const int delaySec = std::min(1 << std::min(next - 1, 5), 30);
729 const quint64 epoch = _reconnectEpoch.load(std::memory_order_relaxed);
730 const int attempts = next;
731 qCInfo(GstVideoReceiverLog) << "Scheduling reconnect #" << attempts
732 << "in" << delaySec << "s after" << reason << uri;
733 QTimer::singleShot(delaySec * 1000, this, [this, epoch, attempts, reconnectTimeout, uri]() {
734 if (epoch != _reconnectEpoch.load(std::memory_order_relaxed)) return; // superseded by stop()
735 // _pipeline is mutated by the worker under _pipelineMutex; a bare deref here (GUI
736 // thread) races teardown, so probe liveness through the mutex-guarded accessor.
737 GstElement *livePipeline = _acquirePipelineRef();
738 const bool pipelineUp = (livePipeline != nullptr);
739 if (livePipeline) gst_object_unref(livePipeline);
740 if (uri.isEmpty() || pipelineUp) return; // pipeline already came back
741 qCInfo(GstVideoReceiverLog) << "Reconnecting (attempt" << attempts << ")" << uri;
742 start(reconnectTimeout);
743 });
744 }, Qt::QueuedConnection);
745}
746
748{
749 _worker->dispatch([this, tag]() {
750 GstElement *pipelineRef = _acquirePipelineRef();
751 if (!pipelineRef) {
752 qCDebug(GstVideoReceiverLog) << "dumpPipelineGraph: pipeline not running";
753 return;
754 }
755 const QByteArray tagUtf8 = tag.toUtf8();
756 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipelineRef), GST_DEBUG_GRAPH_SHOW_ALL, tagUtf8.constData());
757 const QString dotPath = GStreamer::writePipelineDot(pipelineRef, tagUtf8.constData());
758 if (!dotPath.isEmpty()) {
759 qCInfo(GstVideoReceiverLog) << "Pipeline graph saved to" << dotPath;
760 }
761 gst_object_unref(pipelineRef);
762 });
763}
764
765void GstVideoReceiver::_handleEOS()
766{
767 if (!_pipeline) {
768 return;
769 }
770
771 if (_endOfStream) {
772 stop();
773 } else if (_decoding && _removingDecoder) {
774 _shutdownDecodingBranch();
775 } else if (_recording && _removingRecorder) {
776 _shutdownRecordingBranch();
777 } /*else {
778 qCWarning(GstVideoReceiverLog) << "Unexpected EOS!";
779 stop();
780 }*/
781}
782
783GstElement *GstVideoReceiver::_makeDecoder()
784{
785 GstElement *decoder = gst_element_factory_make("decodebin3", nullptr);
786 if (!decoder) {
787 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('decodebin3') failed";
788 }
789 return decoder;
790}
791
792GstElement *GstVideoReceiver::_makeFileSink(const QString &videoFile, FILE_FORMAT format)
793{
794 GstElement *fileSink = nullptr;
795 GstElement *splitmux = nullptr;
796 GstElement *bin = nullptr;
797 GstPad *videopad = nullptr;
798 GstPad *ghostpad = nullptr;
799
800 do {
801 if (!isValidFileFormat(format)) {
802 qCCritical(GstVideoReceiverLog) << "Unsupported file format";
803 break;
804 }
805
806 // splitmuxsink owns its own muxer + filesink internally, handles request-pad
807 // lifetime, and finalizes asynchronously so EOS no longer wedges the worker
808 // thread (replaces the manual qtmux/matroskamux+filesink combo + "stuck muxer"
809 // bounded-wait in stop()). max-size-time=0 keeps single-file behaviour.
810 splitmux = gst_element_factory_make("splitmuxsink", nullptr);
811 if (!splitmux) {
812 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('splitmuxsink') failed";
813 break;
814 }
815
816 g_object_set(splitmux,
817 "location", qPrintable(videoFile),
818 "muxer-factory", _kFileMux[format],
819 "max-size-time", G_GUINT64_CONSTANT(0),
820 "max-size-bytes", G_GUINT64_CONSTANT(0),
821 "async-finalize", TRUE,
822 // Surface "splitmuxsink-fragment-closed" element messages on the pipeline bus so
823 // stop() can wait on the precise per-fragment finalize signal instead of EOS
824 // (gstsplitmuxsink.c:send_fragment_opened_closed_msg posts this per fragment,
825 // including the final fragment torn down on EOS).
826 "message-forward", TRUE,
827 nullptr);
828
829 // Crash-safe MP4/MOV: faststart writes moov up-front; reserved-moov-update-period
830 // refreshes the moov on a 1 s cadence so an abrupt kill still leaves a playable file.
831 // matroskamux is naturally streamable; skip the GstStructure dance.
832 if (format == FILE_FORMAT_MP4 || format == FILE_FORMAT_MOV) {
833 GstStructure *muxerProps = gst_structure_new("properties",
834 "faststart", G_TYPE_BOOLEAN, TRUE,
835 "reserved-moov-update-period", G_TYPE_UINT64, G_GUINT64_CONSTANT(1000000000),
836 nullptr);
837 g_object_set(splitmux, "muxer-properties", muxerProps, nullptr);
838 gst_structure_free(muxerProps);
839 }
840
841 bin = gst_bin_new("sinkbin");
842 if (!bin) {
843 qCCritical(GstVideoReceiverLog) << "gst_bin_new('sinkbin') failed";
844 break;
845 }
846
847 // splitmuxsink's video sink pad is a request pad — request once during construction
848 // and ghost it as "sink" so the existing recorderValve→fileSink link works unchanged.
849 // request_pad_simple (1.20+) does the pad-template lookup internally.
850 videopad = gst_element_request_pad_simple(splitmux, "video");
851 if (!videopad) {
852 qCCritical(GstVideoReceiverLog) << "gst_element_request_pad_simple(splitmuxsink, video) failed";
853 break;
854 }
855
856 if (!gst_bin_add(GST_BIN(bin), splitmux)) {
857 qCCritical(GstVideoReceiverLog) << "gst_bin_add(splitmuxsink) failed";
858 break;
859 }
860 splitmux = nullptr; // bin now owns it
861
862 ghostpad = gst_ghost_pad_new("sink", videopad);
863 if (!ghostpad) {
864 qCCritical(GstVideoReceiverLog) << "gst_ghost_pad_new() failed";
865 break;
866 }
867
868 if (!gst_element_add_pad(bin, ghostpad)) {
869 qCCritical(GstVideoReceiverLog) << "gst_element_add_pad(ghost) failed";
870 break;
871 }
872 ghostpad = nullptr; // bin now owns it
873
874 fileSink = bin;
875 bin = nullptr;
876 } while(0);
877
878 gst_clear_object(&ghostpad);
879 // No release_request_pad: on success splitmux is already NULL (owned by bin), and on failure the
880 // bin/splitmux unref below finalizes splitmuxsink, which reclaims its "video" request pad itself.
881 gst_clear_object(&videopad);
882 gst_clear_object(&splitmux);
883 gst_clear_object(&bin);
884 return fileSink;
885}
886
887void GstVideoReceiver::_onNewSourcePad(GstPad *pad)
888{
889 // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it
890 if (!gst_element_link(_source, _tee)) {
891 qCCritical(GstVideoReceiverLog) << "Unable to link source";
892 return;
893 }
894
895 if (!_streaming) {
896 _streaming = true;
897 qCDebug(GstVideoReceiverLog) << "Streaming started" << _uri;
899 }
900
901 _eosProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr);
902 if (_eosProbeId != 0) {
903 // Hold a ref so _shutdownDecodingBranch can remove the probe even after _decoder is gone.
904 _eosProbePad = GST_PAD_CAST(gst_object_ref(pad));
905 }
906 if (!_videoSink) {
907 return;
908 }
909
910 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad");
911
912 _ensureVideoSinkInPipeline();
913
914 if (!_addDecoder(_decoderValve)) {
915 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed";
916 _shutdownDecodingBranch();
917 return;
918 }
919
920 g_object_set(_decoderValve,
921 "drop", FALSE,
922 nullptr);
923
924 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
925}
926
927void GstVideoReceiver::_logDecodebin3SelectedCodec(GstElement *decodebin3)
928{
929 GValue value = G_VALUE_INIT;
930 GstIterator *iter = gst_bin_iterate_elements(GST_BIN(decodebin3));
931 GstElement *child;
932
933 while (gst_iterator_next(iter, &value) == GST_ITERATOR_OK) {
934 child = GST_ELEMENT(g_value_get_object(&value));
935 GstElementFactory *factory = gst_element_get_factory(child);
936
937 if (factory) {
938 gboolean is_decoder = gst_element_factory_list_is_type(factory, GST_ELEMENT_FACTORY_TYPE_DECODER);
939 if (is_decoder) {
940 const gchar *decoderKlass = gst_element_factory_get_klass(factory);
941 GstPluginFeature *feature = GST_PLUGIN_FEATURE(factory);
942 const gchar *featureName = gst_plugin_feature_get_name(feature);
943 const guint rank = gst_plugin_feature_get_rank(feature);
944 bool isHardwareDecoder = GStreamer::isHardwareDecoderFactory(factory);
945
946 QString pluginName = featureName;
947 GstPlugin *plugin = gst_plugin_feature_get_plugin(feature);
948 if (plugin) {
949 pluginName = gst_plugin_get_name(plugin);
950 gst_object_unref(plugin);
951 }
952 qCDebug(GstVideoReceiverLog) << "Decodebin3 selected codec:rank -" << pluginName << "/" << featureName << "-" << decoderKlass << (isHardwareDecoder ? "(HW)" : "(SW)") << ":" << rank;
953
954 const QString newName = QString::fromUtf8(featureName);
955 bool nameChanged = false;
956 {
957 QMutexLocker locker(&_decoderNameMutex);
958 if (newName != _decoderName) {
959 _decoderName = newName;
960 nameChanged = true;
961 }
962 }
963 if (nameChanged) {
964 emit decoderStatsChanged();
965 }
966
967 // Disable QoS on the internal decoder to prevent cascading
968 // frame drops on live streams. The videodecoder base class
969 // aggressively advances earliest_time after the first late
970 // frame, causing all subsequent frames to be dropped.
971 g_object_set(child, "qos", FALSE, nullptr);
972 qCDebug(GstVideoReceiverLog) << "Disabled QoS on internal decoder" << featureName;
973 }
974 }
975 g_value_reset(&value);
976 }
977 g_value_unset(&value);
978 gst_iterator_free(iter);
979}
980
981
982void GstVideoReceiver::_onNewDecoderPad(GstPad *pad)
983{
984 qCDebug(GstVideoReceiverLog) << "_onNewDecoderPad" << _uri;
985
986 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad");
987
988 // We should now know what codec decodebin3 selected.
989 _logDecodebin3SelectedCodec(_decoder);
990
991 if (!_addVideoSink(pad)) {
992 qCCritical(GstVideoReceiverLog) << "_addVideoSink() failed";
993 }
994}
995
996bool GstVideoReceiver::_addDecoder(GstElement *src)
997{
998 _decoder = _makeDecoder();
999 if (!_decoder) {
1000 qCCritical(GstVideoReceiverLog) << "_makeDecoder() failed";
1001 return false;
1002 }
1003
1004 (void) gst_object_ref(_decoder);
1005
1006 (void) gst_bin_add(GST_BIN(_pipeline), _decoder);
1007 (void) gst_element_sync_state_with_parent(_decoder);
1008
1009 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder");
1010
1011 if (!gst_element_link(src, _decoder)) {
1012 qCCritical(GstVideoReceiverLog) << "Unable to link decoder";
1013 gst_element_set_state(_decoder, GST_STATE_NULL);
1014 (void) gst_element_get_state(_decoder, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1015 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1016 gst_clear_object(&_decoder);
1017 return false;
1018 }
1019
1020 GstPad *srcPad = nullptr;
1021 (void) gst_element_foreach_src_pad(_decoder, grabFirstSrcPad, &srcPad);
1022
1023 if (srcPad) {
1024 _onNewDecoderPad(srcPad);
1025 } else {
1026 (void) g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this);
1027 }
1028
1029 gst_clear_object(&srcPad);
1030 return true;
1031}
1032
1033void GstVideoReceiver::_ensureVideoSinkInPipeline()
1034{
1035 if (!_videoSink || !_pipeline) {
1036 return;
1037 }
1038
1039 GstObject *parent = gst_element_get_parent(_videoSink);
1040 if (parent) {
1041 gst_object_unref(parent);
1042 return;
1043 }
1044
1045 g_object_set(_videoSink,
1046 "sync", (_buffer >= 0),
1047 NULL);
1048
1049 (void) gst_object_ref(_videoSink);
1050 (void) gst_bin_add(GST_BIN(_pipeline), _videoSink);
1051
1052 // PAUSED (not READY) triggers downstream caps negotiation before source data arrives.
1053 (void) gst_element_set_state(_videoSink, GST_STATE_PAUSED);
1054}
1055
1056bool GstVideoReceiver::_addVideoSink(GstPad *pad)
1057{
1058 GstCaps *caps = gst_pad_query_caps(pad, nullptr);
1059
1060 _ensureVideoSinkInPipeline();
1061
1062 GstPad *sinkPad = gst_element_get_static_pad(_videoSink, "sink");
1063 GstPadLinkReturn linkRet = sinkPad ? gst_pad_link(pad, sinkPad) : GST_PAD_LINK_WRONG_HIERARCHY;
1064 if (linkRet != GST_PAD_LINK_OK) {
1065 qCCritical(GstVideoReceiverLog) << "Unable to link decoder pad to video sink, result:" << linkRet;
1066
1067 // _ensureVideoSinkInPipeline() added it before linking; detach for the next retry.
1068 GstObject *parent = gst_element_get_parent(_videoSink);
1069 if (parent) {
1070 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1071 (void) gst_element_get_state(_videoSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1072 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1073 gst_clear_object(&parent);
1074 }
1075
1076 gst_clear_object(&sinkPad);
1077 gst_clear_caps(&caps);
1078 return false;
1079 }
1080 gst_clear_object(&sinkPad);
1081
1082 (void) gst_element_sync_state_with_parent(_videoSink);
1083
1084 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink");
1085
1086 // Determine video size. Errors here are non-fatal.
1087 QSize videoSize;
1088 do {
1089 if (!_decoderValve) {
1090 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - _decoderValve is NULL" << _uri;
1091 break;
1092 }
1093
1094 GstPad *valveSrcPad = gst_element_get_static_pad(_decoderValve, "src");
1095 if (!valveSrcPad) {
1096 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1097 break;
1098 }
1099
1100 GstCaps *valveSrcPadCaps = gst_pad_query_caps(valveSrcPad, nullptr);
1101 if (!valveSrcPadCaps) {
1102 qCCritical(GstVideoReceiverLog) << "gst_pad_query_caps() failed";
1103 gst_clear_object(&valveSrcPad);
1104 break;
1105 }
1106
1107 const GstStructure *structure = gst_caps_get_structure(valveSrcPadCaps, 0);
1108 if (!structure) {
1109 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - structure is NULL" << _uri;
1110 gst_clear_object(&valveSrcPad);
1111 break;
1112 }
1113
1114 gint width = 0;
1115 gint height = 0;
1116 (void) gst_structure_get_int(structure, "width", &width);
1117 (void) gst_structure_get_int(structure, "height", &height);
1118
1119 // Swap W×H for 90°/270° streams so QML AR is computed on display dimensions.
1120 gint orientation = 0;
1121 if (gst_structure_get_int(structure, "video-orientation", &orientation)
1122 && (orientation == GST_VIDEO_ORIENTATION_90R
1123 || orientation == GST_VIDEO_ORIENTATION_90L
1124 || orientation == GST_VIDEO_ORIENTATION_UL_LR
1125 || orientation == GST_VIDEO_ORIENTATION_UR_LL)) {
1126 videoSize.setWidth(height);
1127 videoSize.setHeight(width);
1128 } else {
1129 videoSize.setWidth(width);
1130 videoSize.setHeight(height);
1131 }
1132
1133 gst_clear_caps(&valveSrcPadCaps);
1134 gst_clear_object(&valveSrcPad);
1135 } while (false);
1136 emit videoSizeChanged(videoSize);
1137
1138 gst_clear_caps(&caps);
1139 return true;
1140}
1141
1142void GstVideoReceiver::_noteTeeFrame()
1143{
1144 _lastSourceFrameTime.store(QDateTime::currentSecsSinceEpoch(), std::memory_order_relaxed);
1145 // Successful frame arrival: drop the reconnect backoff so the next failure starts at 1 s,
1146 // not minutes-into-the-curve. This probe runs on the streaming thread while the backoff
1147 // increment runs on the GUI thread; post the reset there too so all mutation of
1148 // _reconnectAttempts is single-threaded and the increment can't clobber the reset.
1149 if (_reconnectAttempts.load(std::memory_order_relaxed) != 0) {
1150 QMetaObject::invokeMethod(
1151 this, [this]() { _reconnectAttempts.store(0, std::memory_order_relaxed); }, Qt::QueuedConnection);
1152 }
1153 const quint64 sourceFrames = _sourceFrameCount.fetch_add(1, std::memory_order_relaxed) + 1;
1154 if (sourceFrames == 1) {
1155 qCInfo(GstVideoReceiverLog).noquote() << "Source receiving frames (tee):" << _uri;
1156 } else if ((sourceFrames % 300) == 0) {
1157 qCDebug(GstVideoReceiverLog).noquote()
1158 << "Source flow: teeFrames=" << sourceFrames << "decoding=" << _decoding << _uri;
1159 }
1160}
1161
1162void GstVideoReceiver::_noteVideoSinkFrame()
1163{
1164 _lastVideoFrameTime.store(QDateTime::currentSecsSinceEpoch(), std::memory_order_relaxed);
1165 if (!_decoding) {
1166 _decoding = true;
1167 qCDebug(GstVideoReceiverLog) << "Decoding started";
1169 }
1170}
1171
1172void GstVideoReceiver::_noteEndOfStream()
1173{
1174 _endOfStream = true;
1175}
1176
1177bool GstVideoReceiver::_unlinkBranch(GstElement *from)
1178{
1179 GstPad *src = gst_element_get_static_pad(from, "src");
1180 if (!src) {
1181 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1182 return false;
1183 }
1184
1185 GstPad *sink = gst_pad_get_peer(src);
1186 if (!sink) {
1187 gst_clear_object(&src);
1188 qCCritical(GstVideoReceiverLog) << "gst_pad_get_peer() failed";
1189 return false;
1190 }
1191
1192 if (!gst_pad_unlink(src, sink)) {
1193 gst_clear_object(&src);
1194 gst_clear_object(&sink);
1195 qCCritical(GstVideoReceiverLog) << "gst_pad_unlink() failed";
1196 return false;
1197 }
1198
1199 gst_clear_object(&src);
1200
1201 // Send EOS at the beginning of the branch
1202 const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos());
1203
1204 gst_clear_object(&sink);
1205
1206 if (!ret) {
1207 qCCritical(GstVideoReceiverLog) << "Branch EOS was NOT sent";
1208 return false;
1209 }
1210
1211 qCDebug(GstVideoReceiverLog) << "Branch EOS was sent";
1212
1213 return true;
1214}
1215
1216void GstVideoReceiver::_shutdownDecodingBranch()
1217{
1218 if (_decoder) {
1219 GstObject *parent = gst_element_get_parent(_decoder);
1220 if (parent) {
1221 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1222 (void) gst_element_set_state(_decoder, GST_STATE_NULL);
1223 (void) gst_element_get_state(_decoder, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1224 gst_clear_object(&parent);
1225 }
1226
1227 gst_clear_object(&_decoder);
1228 }
1229
1230 if (_videoSinkProbeId != 0 && _videoSink) {
1231 GstPad *sinkpad = gst_element_get_static_pad(_videoSink, "sink");
1232 if (sinkpad) {
1233 gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
1234 gst_clear_object(&sinkpad);
1235 }
1236 }
1237 _videoSinkProbeId = 0;
1238
1239 if (_eosProbeId != 0 && _eosProbePad) {
1240 // Probe was installed on the source pad in _onNewSourcePad; remove from that exact pad — not from _decoder, which may already be cleared above.
1241 gst_pad_remove_probe(_eosProbePad, _eosProbeId);
1242 }
1243 _eosProbeId = 0;
1244 gst_clear_object(&_eosProbePad);
1245
1247
1248 if (_videoSink) {
1249 GstObject *parent = gst_element_get_parent(_videoSink);
1250 if (parent) {
1251 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1252 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1253 (void) gst_element_get_state(_videoSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1254 gst_clear_object(&parent);
1255 }
1256 gst_clear_object(&_videoSink);
1257 }
1258
1259 _removingDecoder = false;
1260
1261 if (_decoding) {
1262 _decoding = false;
1263 qCDebug(GstVideoReceiverLog) << "Decoding stopped";
1265 }
1266
1267 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped");
1268}
1269
1270void GstVideoReceiver::_shutdownRecordingBranch()
1271{
1272 if (_keyframeWatchId != 0 && _recorderValve) {
1273 GstPad *probepad = gst_element_get_static_pad(_recorderValve, "src");
1274 if (probepad) {
1275 gst_pad_remove_probe(probepad, _keyframeWatchId);
1276 gst_clear_object(&probepad);
1277 }
1278 _keyframeWatchId = 0;
1279 }
1280
1281 gst_bin_remove(GST_BIN(_pipeline), _fileSink);
1282 gst_element_set_state(_fileSink, GST_STATE_NULL);
1283 (void) gst_element_get_state(_fileSink, nullptr, nullptr, GST_CLOCK_TIME_NONE);
1284 gst_clear_object(&_fileSink);
1285
1286 _removingRecorder = false;
1287
1288 if (_recording) {
1289 _recording = false;
1290 qCDebug(GstVideoReceiverLog) << "Recording stopped";
1292 }
1293
1294 if (_recordingStopRequested) {
1295 _recordingStopRequested = false;
1297 }
1298
1299 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped");
1300}
1301
1302bool GstVideoReceiver::_needDispatch()
1303{
1304 return _worker->needDispatch();
1305}
1306
1307GstElement *GstVideoReceiver::_acquirePipelineRef() const
1308{
1309 QMutexLocker lock(&_pipelineMutex);
1310 if (!_pipeline) return nullptr;
1311 return GST_ELEMENT(gst_object_ref(_pipeline));
1312}
1313
1314gboolean GstVideoReceiver::_onBusMessage(GstBus * /* bus */, GstMessage *msg, gpointer data)
1315{
1316 if (!msg || !data) {
1317 qCCritical(GstVideoReceiverLog) << "Invalid parameters in _onBusMessage: msg=" << msg << "data=" << data;
1318 return TRUE;
1319 }
1320
1321 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(data);
1322
1323 if (GST_MESSAGE_TYPE(msg) != GST_MESSAGE_ERROR) {
1325 }
1326
1327 switch (GST_MESSAGE_TYPE(msg)) {
1328 case GST_MESSAGE_ERROR: {
1329 gchar *debug = nullptr;
1330 GError *error = nullptr;
1331 gst_message_parse_error(msg, &error, &debug);
1332 const bool recoverableH265PaciError = isRecoverableH265PaciError(msg, error, debug);
1333
1334 if (debug) {
1335 qCDebug(GstVideoReceiverLog) << "GStreamer debug:" << debug;
1336 g_clear_pointer(&debug, g_free);
1337 }
1338
1339 if (error) {
1340 if (recoverableH265PaciError) {
1341 qCWarning(GstVideoReceiverLog)
1342 << "Ignoring unsupported H.265 RTP PACI packet from rtph265depay:" << error->message;
1343 } else {
1344 qCCritical(GstVideoReceiverLog) << "GStreamer error:" << error->message;
1345 }
1346 g_clear_error(&error);
1347 }
1348
1349 if (recoverableH265PaciError) {
1350 break;
1351 }
1352
1354
1355 if (GstElement *pipelineRef = pThis->_acquirePipelineRef()) {
1356 // Native dump path (no-op without GST_DEBUG_DUMP_DOT_DIR) plus an unconditional
1357 // CacheLocation fallback so field-bug-report bundles include pipeline topology.
1358 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipelineRef), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-error");
1359 const QString dotPath = GStreamer::writePipelineDot(pipelineRef, "pipeline-error");
1360 if (!dotPath.isEmpty()) {
1361 qCInfo(GstVideoReceiverLog) << "Pipeline graph saved to" << dotPath;
1362 }
1363 gst_object_unref(pipelineRef);
1364 }
1365
1366 // GPU-side ERROR handling (cached-device drop) runs in HwBuffers::dispatchBusMessage above.
1367 // _scheduleReconnect calls stop() then queues a backoff retry if autoReconnect is on.
1368 pThis->_worker->dispatch([pThis]() {
1369 qCDebug(GstVideoReceiverLog) << "Stopping because of error";
1370 pThis->_scheduleReconnect("pipeline error");
1371 });
1372 break;
1373 }
1374 case GST_MESSAGE_WARNING: {
1375 // GStreamer posts WARNING for caps mismatches, decoder fallbacks, clock drift —
1376 // surfacing keeps these visible without escalating to STATUS_FAIL.
1377 gchar *debug = nullptr;
1378 GError *error = nullptr;
1379 gst_message_parse_warning(msg, &error, &debug);
1380 qCWarning(GstVideoReceiverLog) << "GStreamer warning:"
1381 << (error ? error->message : "(no message)")
1382 << "debug:" << (debug ? debug : "(none)");
1383 g_clear_error(&error);
1384 g_clear_pointer(&debug, g_free);
1385 break;
1386 }
1387 case GST_MESSAGE_EOS:
1388 pThis->_worker->dispatch([pThis]() {
1389 qCDebug(GstVideoReceiverLog) << "Received EOS";
1390 pThis->_handleEOS();
1391 });
1392 break;
1393 case GST_MESSAGE_STREAM_COLLECTION: {
1394 GstStreamCollection *collection = nullptr;
1395 gst_message_parse_stream_collection(msg, &collection);
1396 if (!collection) {
1397 break;
1398 }
1399 // SELECT_STREAMS keeps decodebin3 from instantiating audio decoder branches.
1400 GList *selectedIds = nullptr;
1401 const guint nStreams = gst_stream_collection_get_size(collection);
1402 for (guint i = 0; i < nStreams; ++i) {
1403 GstStream *stream = gst_stream_collection_get_stream(collection, i);
1404 const GstStreamType type = gst_stream_get_stream_type(stream);
1405 if (type & GST_STREAM_TYPE_VIDEO) {
1406 selectedIds = g_list_append(selectedIds,
1407 g_strdup(gst_stream_get_stream_id(stream)));
1408 }
1409 }
1410 if (selectedIds) {
1411 GstEvent *event = gst_event_new_select_streams(selectedIds);
1412 gst_element_send_event(GST_ELEMENT(GST_MESSAGE_SRC(msg)), event);
1413 g_list_free_full(selectedIds, g_free);
1414 }
1415 gst_object_unref(collection);
1416 break;
1417 }
1418 case GST_MESSAGE_QOS: {
1419 guint64 processed = 0, dropped = 0;
1420 gst_message_parse_qos_stats(msg, nullptr, &processed, &dropped);
1421
1422 gint64 jitter = 0;
1423 gdouble proportion = 0;
1424 gint quality = 0;
1425 gst_message_parse_qos_values(msg, &jitter, &proportion, &quality);
1426
1427 pThis->_processedFrames.store(processed, std::memory_order_relaxed);
1428 pThis->_droppedFrames.store(dropped, std::memory_order_relaxed);
1429 pThis->_currentJitterNs.store(jitter, std::memory_order_relaxed);
1430 pThis->_qosProportion.store(proportion, std::memory_order_relaxed);
1431 pThis->_qosQuality.store(quality, std::memory_order_relaxed);
1432 // GstBaseSink can post QOS per dropped buffer; defer the emit to the 1 Hz
1433 // watchdog tick so QML isn't flooded from the streaming thread.
1434 pThis->_qosStatsDirty.store(true, std::memory_order_release);
1435 break;
1436 }
1437 case GST_MESSAGE_ELEMENT: {
1438 const GstStructure *structure = gst_message_get_structure(msg);
1439 if (structure && gst_structure_has_name(structure, "qgc-caps-info")) {
1440 gint w = 0, h = 0;
1441 const gchar *fmt = gst_structure_get_string(structure, "format");
1442 gst_structure_get_int(structure, "width", &w);
1443 gst_structure_get_int(structure, "height", &h);
1444 const QString format = QString::fromUtf8(fmt ? fmt : "");
1445 const QSize resolution(w, h);
1446 // src compared by address only on the GUI thread; never dereferenced (may be gone by then).
1447 void *src = GST_MESSAGE_SRC(msg);
1448 QMetaObject::invokeMethod(pThis, [pThis, format, resolution, src]() {
1449 for (auto *c : QGCQVideoSinkController::controllersOf(pThis)) {
1450 if (static_cast<const void*>(c->element()) == src) {
1451 c->updateNegotiation(format, resolution);
1452 }
1453 }
1454 }, Qt::QueuedConnection);
1455 break;
1456 }
1457 if (!gst_structure_has_name(structure, "GstBinForwarded")) {
1458 break;
1459 }
1460
1461 GstMessage *forward_msg = nullptr;
1462 gst_structure_get(structure, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
1463 if (!forward_msg) {
1464 break;
1465 }
1466
1467 if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1468 pThis->_worker->dispatch([pThis]() {
1469 qCDebug(GstVideoReceiverLog) << "Received branch EOS";
1470 pThis->_handleEOS();
1471 });
1472 }
1473
1474 gst_clear_message(&forward_msg);
1475 break;
1476 }
1477 case GST_MESSAGE_STATE_CHANGED: {
1478 GstElement *pipelineRef = pThis->_acquirePipelineRef();
1479 if (!pipelineRef) break;
1480 const bool fromPipeline = (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipelineRef));
1481 if (!fromPipeline) {
1482 gst_object_unref(pipelineRef);
1483 break;
1484 }
1485 GstState oldState = GST_STATE_NULL, newState = GST_STATE_NULL;
1486 gst_message_parse_state_changed(msg, &oldState, &newState, nullptr);
1487 if (newState == GST_STATE_PLAYING && oldState != GST_STATE_PLAYING) {
1488 GstClockTime min = 0, max = 0;
1489 GstQuery *q = gst_query_new_latency();
1490 if (gst_element_query(pipelineRef, q)) {
1491 gboolean live = FALSE;
1492 gst_query_parse_latency(q, &live, &min, &max);
1493 }
1494 gst_query_unref(q);
1495 const QString decName = pThis->decoderName();
1496 qCDebug(GstVideoReceiverLog).noquote()
1497 << "Pipeline PLAYING:" << pThis->_uri
1498 << "decoder:" << (decName.isEmpty() ? QStringLiteral("(pending)") : decName)
1499 << "min-latency:" << (min / 1000000) << "ms"
1500 << "max-latency:" << (max / 1000000) << "ms";
1501 }
1502 gst_object_unref(pipelineRef);
1503 break;
1504 }
1505 case GST_MESSAGE_LATENCY:
1506 pThis->_worker->dispatch([pThis]() {
1507 GstElement* pipeline = pThis->_acquirePipelineRef();
1508 if (pipeline) {
1509 (void) gst_bin_recalculate_latency(GST_BIN(pipeline));
1510 gst_object_unref(pipeline);
1511 }
1512 });
1513 // Re-prime sink-side latency tracking after the pipeline recalculation (e.g. RTSP
1514 // jitter-buffer reconfigure). Controllers live on the GUI thread; hop there to query.
1515 QMetaObject::invokeMethod(pThis, [pThis]() {
1516 for (auto* c : QGCQVideoSinkController::controllersOf(pThis))
1517 c->refreshLatency();
1518 }, Qt::QueuedConnection);
1519 break;
1520 default:
1521 break;
1522 }
1523
1524 return TRUE;
1525}
1526
1527void GstVideoReceiver::_onNewPad(GstElement *element, GstPad *pad, gpointer data)
1528{
1529 GstVideoReceiver *self = static_cast<GstVideoReceiver*>(data);
1530
1531 if (element == self->_source) {
1532 self->_onNewSourcePad(pad);
1533 } else if (element == self->_decoder) {
1534 self->_onNewDecoderPad(pad);
1535 } else {
1536 qCDebug(GstVideoReceiverLog) << "Unexpected call!";
1537 }
1538}
1539
1540GstPadProbeReturn GstVideoReceiver::_teeProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1541{
1542 Q_UNUSED(pad); Q_UNUSED(info)
1543
1544 if (user_data) {
1545 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1546 pThis->_noteTeeFrame();
1547 }
1548
1549 return GST_PAD_PROBE_OK;
1550}
1551
1552GstPadProbeReturn GstVideoReceiver::_videoSinkProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1553{
1554 Q_UNUSED(pad); Q_UNUSED(info)
1555
1556 if (user_data) {
1557 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1558
1559 if (pThis->_resetVideoSink) {
1560 pThis->_resetVideoSink = false;
1561
1562#if 0 // FIXME: this makes MPEG2-TS playing smooth but breaks RTSP
1563 gst_pad_send_event(pad, gst_event_new_flush_start());
1564 gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));
1565
1566 GstBuffer* buf;
1567
1568 if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) {
1569 GstSegment* seg;
1570
1571 if ((seg = gst_segment_new()) != nullptr) {
1572 gst_segment_init(seg, GST_FORMAT_TIME);
1573
1574 seg->start = buf->pts;
1575
1576 gst_pad_send_event(pad, gst_event_new_segment(seg));
1577
1578 gst_segment_free(seg);
1579 seg = nullptr;
1580 }
1581
1582 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1583 }
1584#endif
1585 }
1586
1587 pThis->_noteVideoSinkFrame();
1588 }
1589
1590 return GST_PAD_PROBE_OK;
1591}
1592
1593GstPadProbeReturn GstVideoReceiver::_eosProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1594{
1595 Q_UNUSED(pad);
1596 Q_ASSERT(user_data);
1597
1598 if (info) {
1599 const GstEvent *event = gst_pad_probe_info_get_event(info);
1600 if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1601 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1602 pThis->_noteEndOfStream();
1603 }
1604 }
1605
1606 return GST_PAD_PROBE_OK;
1607}
1608
1609GstPadProbeReturn GstVideoReceiver::_keyframeWatch(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1610{
1611 if (!info || !user_data) {
1612 qCCritical(GstVideoReceiverLog) << "Invalid arguments";
1613 return GST_PAD_PROBE_DROP;
1614 }
1615
1616 GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
1617 if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
1618 // wait for a keyframe
1619 return GST_PAD_PROBE_DROP;
1620 }
1621
1622 // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream!
1623 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1624
1625 qCDebug(GstVideoReceiverLog) << "Got keyframe, stop dropping buffers";
1626
1627 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1628 emit pThis->recordingStarted(pThis->recordingOutput());
1629
1630 return GST_PAD_PROBE_REMOVE;
1631}
1632
1634 : QThread(parent)
1635{
1636 qCDebug(GstVideoReceiverLog) << this;
1637}
1638
1640{
1641 qCDebug(GstVideoReceiverLog) << this;
1642}
1643
1645{
1646 return (QThread::currentThread() != this);
1647}
1648
1650{
1651 QMutexLocker lock(&_taskQueueSync);
1652 _taskQueue.enqueue(task);
1653 _taskQueueUpdate.wakeOne();
1654}
1655
1657{
1658 if (needDispatch()) {
1659 dispatch([this]() { _shutdown = true; });
1660 (void) QThread::wait(2000);
1661 } else {
1662 QThread::quit();
1663 }
1664}
1665
1666void GstVideoWorker::run()
1667{
1668 while (!_shutdown) {
1669 _taskQueueSync.lock();
1670
1671 while (_taskQueue.isEmpty()) {
1672 _taskQueueUpdate.wait(&_taskQueueSync);
1673 }
1674
1675 const Task task = _taskQueue.dequeue();
1676
1677 _taskQueueSync.unlock();
1678
1679 task();
1680 }
1681}
std::function< void()> Task
struct _GstElement GstElement
Error error
#define QGC_LOGGING_CATEGORY(name, categoryStr)
void stopDecoding() override
void takeScreenshot(const QString &imageFile) override
Q_INVOKABLE void dumpPipelineGraph(const QString &tag=QStringLiteral("manual"))
void decoderStatsChanged()
QString decoderName() const
GstVideoReceiver(QObject *parent=nullptr)
void start(uint32_t timeout) override
void stopRecording() override
void startRecording(const QString &videoFile, FILE_FORMAT format) override
void stop() override
void startDecoding(void *sink) override
void dispatch(Task task)
bool needDispatch() const
GstVideoWorker(QObject *parent=nullptr)
void recordingStarted(const QString &filename)
QTimer _watchdogTimer
std::atomic< qint64 > _lastSourceFrameTime
bool lowLatency() const
std::atomic< qint64 > _lastVideoFrameTime
void videoSizeChanged(QSize size)
void streamingChanged(bool active)
void nameChanged(const QString &name)
uint32_t _timeout
QString uri() const
QString _recordingOutput
void recordingChanged(bool active)
std::atomic< bool > _decoding
void onStartRecordingComplete(STATUS status)
VideoSinkHandle sink() const
void onTakeScreenshotComplete(STATUS status)
void decodingChanged(bool active)
void onStartComplete(STATUS status)
static bool isValidFileFormat(FILE_FORMAT format)
void onStopDecodingComplete(STATUS status)
void onStopRecordingComplete(STATUS status)
QString recordingOutput() const
void onStartDecodingComplete(STATUS status)
QQuickItem * _widget
void onStopComplete(STATUS status)
std::atomic< bool > _autoReconnect
RTSP/UDP auto-reconnect with exponential backoff on watchdog/error.
GstElement * create(const QString &uri, const Config &config)
@ None
No rtpjitterbuffer element; lowest latency, no reordering.
@ Buffered
rtpjitterbuffer with drop-on-latency=FALSE.
@ DropOnLatency
rtpjitterbuffer with drop-on-latency=TRUE.
QString writePipelineDot(GstElement *pipeline, const char *tag)
bool isHardwareDecoderFactory(GstElementFactory *factory)
QString takeExtraPathStats() noexcept
Path-specific extras after formatPathStats (GL reuse/sync waits); reads-and-clears,...
Definition HwBuffers.cc:320
GstBusSyncReply onBusSyncMessage(GstBus *, GstMessage *msg, gpointer) noexcept
Bus sync handler (GstBusSyncHandler) installed on every pipeline; no-op when no GPU path compiled.
Definition HwBuffers.cc:169
void dispatchBusMessage(GstMessage *msg) noexcept
Receiver-side bus hook; drops cached GPU devices on GST_MESSAGE_ERROR. No-op when no GPU paths compil...
Definition HwBuffers.cc:114
PathStats formatPathStats(bool reset) noexcept
Definition HwBuffers.cc:296
QByteArray format(const QList< LogEntry > &entries, int fmt)
Formatted per-path counters + delivered total; reset=true reads-and-clears (teardown),...
Definition HwBuffers.h:62