20#include <QtCore/QDateTime>
21#include <QtCore/QMutexLocker>
23#include <QtQuick/QQuickItem>
28#include <gst/video/video.h>
34constexpr GstClockTime kEosTimeoutNs = 3 * GST_SECOND;
38gboolean grabFirstSrcPad(
GstElement * , GstPad *pad, gpointer userData)
40 *
static_cast<GstPad **
>(userData) = GST_PAD(gst_object_ref(pad));
44bool isRecoverableH265PaciError(GstMessage *msg,
const GError *
error,
const gchar *debug)
46 if (!msg || !
error || (
error->domain != GST_STREAM_ERROR) || (
error->code != GST_STREAM_ERROR_FORMAT) ||
47 !debug || !g_strrstr(debug,
"NAL unit type 50 not supported yet")) {
51 GstObject *src = GST_MESSAGE_SRC(msg);
52 if (!src || !GST_IS_ELEMENT(src)) {
56 GstElementFactory *factory = gst_element_get_factory(GST_ELEMENT(src));
57 return factory && (g_strcmp0(GST_OBJECT_NAME(factory),
"rtph265depay") == 0);
66 qCDebug(GstVideoReceiverLog) <<
this;
69 (void) connect(&
_watchdogTimer, &QTimer::timeout,
this, &GstVideoReceiver::_watchdog);
77 qCDebug(GstVideoReceiverLog) <<
this;
82 if (_needDispatch()) {
88 qCDebug(GstVideoReceiverLog) <<
"Already running!" <<
_uri;
94 qCDebug(GstVideoReceiverLog) <<
"Failed because URI is not specified";
102 qCDebug(GstVideoReceiverLog) <<
"Starting" <<
_uri <<
", lowLatency" <<
lowLatency() <<
", timeout" <<
_timeout;
106 [[maybe_unused]]
static const bool dotDirHinted = []() {
107 if (qgetenv(
"GST_DEBUG_DUMP_DOT_DIR").isEmpty()) {
108 qCInfo(GstVideoReceiverLog).noquote()
109 <<
"Pipeline dot-graph dumps are disabled. Set GST_DEBUG_DUMP_DOT_DIR=/path/to/dir to enable.";
116 bool running =
false;
117 bool pipelineUp =
false;
123 _tee = gst_element_factory_make(
"tee",
nullptr);
125 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('tee') failed";
129 GstPad *pad = gst_element_get_static_pad(_tee,
"sink");
131 qCCritical(GstVideoReceiverLog) <<
"gst_element_get_static_pad() failed";
137 _teeProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe,
this,
nullptr);
138 gst_clear_object(&pad);
139 if (_teeProbeId == 0) {
141 qCCritical(GstVideoReceiverLog) <<
"gst_pad_add_probe(_teeProbe) failed";
145 decoderQueue = gst_element_factory_make(
"queue",
nullptr);
147 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('queue') failed";
154 g_object_set(decoderQueue,
156 "max-size-buffers", 2,
158 "max-size-time", G_GUINT64_CONSTANT(0),
161 _decoderValve = gst_element_factory_make(
"valve",
nullptr);
162 if (!_decoderValve) {
163 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('valve') failed";
167 g_object_set(_decoderValve,
171 recorderQueue = gst_element_factory_make(
"queue",
nullptr);
172 if (!recorderQueue) {
173 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('queue') failed";
177 _recorderValve = gst_element_factory_make(
"valve",
nullptr);
178 if (!_recorderValve) {
179 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('valve') failed";
183 g_object_set(_recorderValve,
187 _pipeline = gst_pipeline_new(
"receiver");
189 qCCritical(GstVideoReceiverLog) <<
"gst_pipeline_new() failed";
193 g_object_set(_pipeline,
194 "message-forward", TRUE,
209 qCCritical(GstVideoReceiverLog) <<
"SourceFactory::create() failed";
213 gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve,
nullptr);
217 GstPad *srcPad =
nullptr;
218 (void) gst_element_foreach_src_pad(_source, grabFirstSrcPad, &srcPad);
221 _onNewSourcePad(srcPad);
222 gst_clear_object(&srcPad);
224 (void) g_signal_connect(_source,
"pad-added", G_CALLBACK(_onNewPad),
this);
227 if (!gst_element_link_many(_tee, decoderQueue, _decoderValve,
nullptr)) {
228 qCCritical(GstVideoReceiverLog) <<
"Unable to link decoder queue";
232 if (!gst_element_link_many(_tee, recorderQueue, _recorderValve,
nullptr)) {
233 qCCritical(GstVideoReceiverLog) <<
"Unable to link recorder queue";
237 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
239 gst_bus_enable_sync_message_emission(bus);
240 (void) g_signal_connect(bus,
"sync-message", G_CALLBACK(_onBusMessage),
this);
245 gst_clear_object(&bus);
248 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-initial");
249 running = (gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
253 qCCritical(GstVideoReceiverLog) <<
"Failed";
256 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
257 (void) gst_element_get_state(_pipeline,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
258 gst_clear_object(&_pipeline);
262 gst_clear_object(&_recorderValve);
263 gst_clear_object(&recorderQueue);
264 gst_clear_object(&_decoderValve);
265 gst_clear_object(&decoderQueue);
266 gst_clear_object(&_tee);
267 gst_clear_object(&_source);
272 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-started");
273 qCDebug(GstVideoReceiverLog) <<
"Started" <<
_uri;
277 QMetaObject::invokeMethod(
this, [
this]() {
_watchdogTimer.start(1000); }, Qt::QueuedConnection);
284 if (_needDispatch()) {
289 if (
_uri.isEmpty()) {
290 qCDebug(GstVideoReceiverLog) <<
"Stop called on empty URI (no-op)";
294 qCDebug(GstVideoReceiverLog) <<
"Stopping" <<
_uri;
298 _reconnectEpoch.fetch_add(1, std::memory_order_relaxed);
300 QMetaObject::invokeMethod(
this, [
this]() {
_watchdogTimer.stop(); }, Qt::QueuedConnection);
302 if (_teeProbeId != 0) {
304 GstPad *sinkpad = gst_element_get_static_pad(_tee,
"sink");
306 gst_pad_remove_probe(sinkpad, _teeProbeId);
307 gst_clear_object(&sinkpad);
314 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
316 gst_bus_disable_sync_message_emission(bus);
317 (void) g_signal_handlers_disconnect_by_data(bus,
this);
319 gboolean recordingValveClosed = TRUE;
320 g_object_get(_recorderValve,
"drop", &recordingValveClosed,
nullptr);
322 if (!recordingValveClosed) {
323 (void) gst_element_send_event(_pipeline, gst_event_new_eos());
331 const GstClockTime deadline = kEosTimeoutNs;
332 const qint64 startMs = QDateTime::currentMSecsSinceEpoch();
333 bool finalized =
false;
335 const qint64 elapsedNs = (QDateTime::currentMSecsSinceEpoch() - startMs)
336 * qint64(GST_MSECOND);
337 if (elapsedNs >= qint64(deadline))
break;
338 const GstClockTime remaining = GstClockTime(qint64(deadline) - elapsedNs);
339 GstMessage *msg = gst_bus_timed_pop_filtered(bus, remaining,
340 (GstMessageType)(GST_MESSAGE_EOS | GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT));
342 switch (GST_MESSAGE_TYPE(msg)) {
343 case GST_MESSAGE_ELEMENT: {
344 const GstStructure *s = gst_message_get_structure(msg);
345 if (s && gst_structure_has_name(s,
"splitmuxsink-fragment-closed")) {
346 qCDebug(GstVideoReceiverLog) <<
"splitmuxsink fragment finalized";
351 case GST_MESSAGE_EOS:
352 qCDebug(GstVideoReceiverLog) <<
"End of stream received (fallback path)";
355 case GST_MESSAGE_ERROR:
356 qCCritical(GstVideoReceiverLog) <<
"Error stopping pipeline!";
362 gst_clear_message(&msg);
363 if (finalized)
break;
366 qCWarning(GstVideoReceiverLog) <<
"splitmuxsink finalize signal not received within"
367 << (kEosTimeoutNs / GST_MSECOND)
368 <<
"ms — forcing pipeline NULL (recording may be truncated; "
369 <<
"faststart + reserved-moov-update-period keep the file playable)";
373 gst_clear_object(&bus);
375 qCCritical(GstVideoReceiverLog) <<
"gst_pipeline_get_bus() failed";
378 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
379 (void) gst_element_get_state(_pipeline,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
383 _shutdownRecordingBranch();
387 _shutdownDecodingBranch();
390 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-stopped");
395 QMutexLocker lock(&_pipelineMutex);
396 gst_clear_object(&_pipeline);
400 _recorderValve =
nullptr;
401 _decoderValve =
nullptr;
409 qCDebug(GstVideoReceiverLog) <<
"Streaming stopped" <<
_uri;
412 qCDebug(GstVideoReceiverLog) <<
"Streaming did not start" <<
_uri;
416 qCDebug(GstVideoReceiverLog) <<
"Stopped" <<
_uri;
419 qCInfo(GstVideoReceiverLog).noquote()
429 qCCritical(GstVideoReceiverLog) <<
"VideoSink is NULL" <<
_uri;
433 if (_needDispatch()) {
438 qCDebug(GstVideoReceiverLog) <<
"Starting decoding" <<
_uri;
441 qCDebug(GstVideoReceiverLog) <<
"Video Widget is NULL" <<
_uri;
447 gst_clear_object(&_videoSink);
451 qCDebug(GstVideoReceiverLog) <<
"Already decoding!" <<
_uri;
457 GstPad *pad = gst_element_get_static_pad(videoSink,
"sink");
459 qCCritical(GstVideoReceiverLog) <<
"Unable to find sink pad of video sink" <<
_uri;
467 _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe,
this,
nullptr);
468 gst_clear_object(&pad);
470 _videoSink = videoSink;
471 gst_object_ref(_videoSink);
480 _ensureVideoSinkInPipeline();
482 if (!_addDecoder(_decoderValve)) {
483 qCCritical(GstVideoReceiverLog) <<
"_addDecoder() failed" <<
_uri;
484 _shutdownDecodingBranch();
489 g_object_set(_decoderValve,
493 qCDebug(GstVideoReceiverLog) <<
"Decoding started" <<
_uri;
500 if (_needDispatch()) {
505 qCDebug(GstVideoReceiverLog) <<
"Stopping decoding" <<
_uri;
511 if (!_pipeline || !_videoSink) {
512 qCDebug(GstVideoReceiverLog) <<
"Not decoding!" <<
_uri;
517 g_object_set(_decoderValve,
523 const bool ret = _unlinkBranch(_decoderValve);
532 if (_needDispatch()) {
533 const QString cachedVideoFile = videoFile;
538 qCDebug(GstVideoReceiverLog) <<
"Starting recording" <<
_uri;
541 qCDebug(GstVideoReceiverLog) <<
"Streaming is not active!" <<
_uri;
547 qCDebug(GstVideoReceiverLog) <<
"Already recording!" <<
_uri;
552 qCDebug(GstVideoReceiverLog) <<
"New video file:" << videoFile <<
_uri;
554 _fileSink = _makeFileSink(videoFile, format);
556 qCCritical(GstVideoReceiverLog) <<
"_makeFileSink() failed" <<
_uri;
563 (void) gst_object_ref(_fileSink);
565 gst_bin_add(GST_BIN(_pipeline), _fileSink);
567 if (!gst_element_link(_recorderValve, _fileSink)) {
568 qCCritical(GstVideoReceiverLog) <<
"Failed to link valve and file sink" <<
_uri;
573 (void) gst_element_sync_state_with_parent(_fileSink);
575 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-with-filesink");
580 GstPad *probepad = gst_element_get_static_pad(_recorderValve,
"src");
582 qCCritical(GstVideoReceiverLog) <<
"gst_element_get_static_pad() failed" <<
_uri;
587 _keyframeWatchId = gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch,
this,
nullptr);
588 gst_clear_object(&probepad);
590 g_object_set(_recorderValve,
596 qCDebug(GstVideoReceiverLog) <<
"Recording started" <<
_uri;
603 if (_needDispatch()) {
608 qCDebug(GstVideoReceiverLog) <<
"Stopping recording" <<
_uri;
611 qCDebug(GstVideoReceiverLog) <<
"Not recording!" <<
_uri;
616 g_object_set(_recorderValve,
622 if (!_unlinkBranch(_recorderValve)) {
630 _recordingStopRequested =
true;
635 if (_needDispatch()) {
636 const QString cachedImageFile = imageFile;
641 qCDebug(GstVideoReceiverLog) <<
"taking screenshot" <<
_uri;
647void GstVideoReceiver::_watchdog()
654 const qint64 now = QDateTime::currentSecsSinceEpoch();
656 if (lastSourceFrameTime == 0) {
657 lastSourceFrameTime = now;
664 qCDebug(GstVideoReceiverLog).noquote() <<
"HW path live" <<
_uri << hwStats.line;
669 if (_qosStatsDirty.exchange(
false, std::memory_order_acq_rel)) {
673 qint64 elapsed = now - lastSourceFrameTime;
675 qCDebug(GstVideoReceiverLog) <<
"Stream timeout, no frames for" << elapsed <<
_uri;
676 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-watchdog-timeout");
678 _scheduleReconnect(
"source watchdog");
684 if (lastVideoFrameTime == 0) {
685 lastVideoFrameTime = now;
689 elapsed = now - lastVideoFrameTime;
691 qCDebug(GstVideoReceiverLog) <<
"Video decoder timeout, no frames for" << elapsed <<
_uri;
692 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-watchdog-timeout");
694 _scheduleReconnect(
"decoder watchdog");
700void GstVideoReceiver::_scheduleReconnect(
const char *reason)
707 qCDebug(GstVideoReceiverLog) <<
"Auto-reconnect disabled — not retrying after" << reason;
711 if (
_uri.isEmpty()) {
723 QMetaObject::invokeMethod(
this, [
this, reason, reconnectTimeout,
uri]() {
724 const int next = std::min(_reconnectAttempts.load(std::memory_order_relaxed) + 1, 30);
725 _reconnectAttempts.store(next, std::memory_order_relaxed);
728 const int delaySec = std::min(1 << std::min(next - 1, 5), 30);
729 const quint64 epoch = _reconnectEpoch.load(std::memory_order_relaxed);
730 const int attempts = next;
731 qCInfo(GstVideoReceiverLog) <<
"Scheduling reconnect #" << attempts
732 <<
"in" << delaySec <<
"s after" << reason <<
uri;
733 QTimer::singleShot(delaySec * 1000,
this, [
this, epoch, attempts, reconnectTimeout,
uri]() {
734 if (epoch != _reconnectEpoch.load(std::memory_order_relaxed))
return;
737 GstElement *livePipeline = _acquirePipelineRef();
738 const bool pipelineUp = (livePipeline !=
nullptr);
739 if (livePipeline) gst_object_unref(livePipeline);
740 if (
uri.isEmpty() || pipelineUp)
return;
741 qCInfo(GstVideoReceiverLog) <<
"Reconnecting (attempt" << attempts <<
")" <<
uri;
742 start(reconnectTimeout);
744 }, Qt::QueuedConnection);
750 GstElement *pipelineRef = _acquirePipelineRef();
752 qCDebug(GstVideoReceiverLog) <<
"dumpPipelineGraph: pipeline not running";
755 const QByteArray tagUtf8 = tag.toUtf8();
756 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipelineRef), GST_DEBUG_GRAPH_SHOW_ALL, tagUtf8.constData());
758 if (!dotPath.isEmpty()) {
759 qCInfo(GstVideoReceiverLog) <<
"Pipeline graph saved to" << dotPath;
761 gst_object_unref(pipelineRef);
765void GstVideoReceiver::_handleEOS()
774 _shutdownDecodingBranch();
776 _shutdownRecordingBranch();
785 GstElement *decoder = gst_element_factory_make(
"decodebin3",
nullptr);
787 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('decodebin3') failed";
792GstElement *GstVideoReceiver::_makeFileSink(
const QString &videoFile, FILE_FORMAT format)
797 GstPad *videopad =
nullptr;
798 GstPad *ghostpad =
nullptr;
802 qCCritical(GstVideoReceiverLog) <<
"Unsupported file format";
810 splitmux = gst_element_factory_make(
"splitmuxsink",
nullptr);
812 qCCritical(GstVideoReceiverLog) <<
"gst_element_factory_make('splitmuxsink') failed";
816 g_object_set(splitmux,
817 "location", qPrintable(videoFile),
818 "muxer-factory", _kFileMux[format],
819 "max-size-time", G_GUINT64_CONSTANT(0),
820 "max-size-bytes", G_GUINT64_CONSTANT(0),
821 "async-finalize", TRUE,
826 "message-forward", TRUE,
833 GstStructure *muxerProps = gst_structure_new(
"properties",
834 "faststart", G_TYPE_BOOLEAN, TRUE,
835 "reserved-moov-update-period", G_TYPE_UINT64, G_GUINT64_CONSTANT(1000000000),
837 g_object_set(splitmux,
"muxer-properties", muxerProps,
nullptr);
838 gst_structure_free(muxerProps);
841 bin = gst_bin_new(
"sinkbin");
843 qCCritical(GstVideoReceiverLog) <<
"gst_bin_new('sinkbin') failed";
850 videopad = gst_element_request_pad_simple(splitmux,
"video");
852 qCCritical(GstVideoReceiverLog) <<
"gst_element_request_pad_simple(splitmuxsink, video) failed";
856 if (!gst_bin_add(GST_BIN(bin), splitmux)) {
857 qCCritical(GstVideoReceiverLog) <<
"gst_bin_add(splitmuxsink) failed";
862 ghostpad = gst_ghost_pad_new(
"sink", videopad);
864 qCCritical(GstVideoReceiverLog) <<
"gst_ghost_pad_new() failed";
868 if (!gst_element_add_pad(bin, ghostpad)) {
869 qCCritical(GstVideoReceiverLog) <<
"gst_element_add_pad(ghost) failed";
878 gst_clear_object(&ghostpad);
881 gst_clear_object(&videopad);
882 gst_clear_object(&splitmux);
883 gst_clear_object(&bin);
887void GstVideoReceiver::_onNewSourcePad(GstPad *pad)
890 if (!gst_element_link(_source, _tee)) {
891 qCCritical(GstVideoReceiverLog) <<
"Unable to link source";
897 qCDebug(GstVideoReceiverLog) <<
"Streaming started" <<
_uri;
901 _eosProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe,
this,
nullptr);
902 if (_eosProbeId != 0) {
904 _eosProbePad = GST_PAD_CAST(gst_object_ref(pad));
910 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-with-new-source-pad");
912 _ensureVideoSinkInPipeline();
914 if (!_addDecoder(_decoderValve)) {
915 qCCritical(GstVideoReceiverLog) <<
"_addDecoder() failed";
916 _shutdownDecodingBranch();
920 g_object_set(_decoderValve,
924 qCDebug(GstVideoReceiverLog) <<
"Decoding started" <<
_uri;
927void GstVideoReceiver::_logDecodebin3SelectedCodec(
GstElement *decodebin3)
929 GValue value = G_VALUE_INIT;
930 GstIterator *iter = gst_bin_iterate_elements(GST_BIN(decodebin3));
933 while (gst_iterator_next(iter, &value) == GST_ITERATOR_OK) {
934 child = GST_ELEMENT(g_value_get_object(&value));
935 GstElementFactory *factory = gst_element_get_factory(child);
938 gboolean is_decoder = gst_element_factory_list_is_type(factory, GST_ELEMENT_FACTORY_TYPE_DECODER);
940 const gchar *decoderKlass = gst_element_factory_get_klass(factory);
941 GstPluginFeature *feature = GST_PLUGIN_FEATURE(factory);
942 const gchar *featureName = gst_plugin_feature_get_name(feature);
943 const guint rank = gst_plugin_feature_get_rank(feature);
946 QString pluginName = featureName;
947 GstPlugin *plugin = gst_plugin_feature_get_plugin(feature);
949 pluginName = gst_plugin_get_name(plugin);
950 gst_object_unref(plugin);
952 qCDebug(GstVideoReceiverLog) <<
"Decodebin3 selected codec:rank -" << pluginName <<
"/" << featureName <<
"-" << decoderKlass << (isHardwareDecoder ?
"(HW)" :
"(SW)") <<
":" << rank;
954 const QString newName = QString::fromUtf8(featureName);
957 QMutexLocker locker(&_decoderNameMutex);
958 if (newName != _decoderName) {
959 _decoderName = newName;
971 g_object_set(child,
"qos", FALSE,
nullptr);
972 qCDebug(GstVideoReceiverLog) <<
"Disabled QoS on internal decoder" << featureName;
975 g_value_reset(&value);
977 g_value_unset(&value);
978 gst_iterator_free(iter);
982void GstVideoReceiver::_onNewDecoderPad(GstPad *pad)
984 qCDebug(GstVideoReceiverLog) <<
"_onNewDecoderPad" <<
_uri;
986 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-with-new-decoder-pad");
989 _logDecodebin3SelectedCodec(_decoder);
991 if (!_addVideoSink(pad)) {
992 qCCritical(GstVideoReceiverLog) <<
"_addVideoSink() failed";
996bool GstVideoReceiver::_addDecoder(
GstElement *src)
998 _decoder = _makeDecoder();
1000 qCCritical(GstVideoReceiverLog) <<
"_makeDecoder() failed";
1004 (void) gst_object_ref(_decoder);
1006 (void) gst_bin_add(GST_BIN(_pipeline), _decoder);
1007 (void) gst_element_sync_state_with_parent(_decoder);
1009 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-with-decoder");
1011 if (!gst_element_link(src, _decoder)) {
1012 qCCritical(GstVideoReceiverLog) <<
"Unable to link decoder";
1013 gst_element_set_state(_decoder, GST_STATE_NULL);
1014 (void) gst_element_get_state(_decoder,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
1015 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1016 gst_clear_object(&_decoder);
1020 GstPad *srcPad =
nullptr;
1021 (void) gst_element_foreach_src_pad(_decoder, grabFirstSrcPad, &srcPad);
1024 _onNewDecoderPad(srcPad);
1026 (void) g_signal_connect(_decoder,
"pad-added", G_CALLBACK(_onNewPad),
this);
1029 gst_clear_object(&srcPad);
1033void GstVideoReceiver::_ensureVideoSinkInPipeline()
1035 if (!_videoSink || !_pipeline) {
1039 GstObject *parent = gst_element_get_parent(_videoSink);
1041 gst_object_unref(parent);
1045 g_object_set(_videoSink,
1049 (void) gst_object_ref(_videoSink);
1050 (void) gst_bin_add(GST_BIN(_pipeline), _videoSink);
1053 (void) gst_element_set_state(_videoSink, GST_STATE_PAUSED);
1056bool GstVideoReceiver::_addVideoSink(GstPad *pad)
1058 GstCaps *caps = gst_pad_query_caps(pad,
nullptr);
1060 _ensureVideoSinkInPipeline();
1062 GstPad *sinkPad = gst_element_get_static_pad(_videoSink,
"sink");
1063 GstPadLinkReturn linkRet = sinkPad ? gst_pad_link(pad, sinkPad) : GST_PAD_LINK_WRONG_HIERARCHY;
1064 if (linkRet != GST_PAD_LINK_OK) {
1065 qCCritical(GstVideoReceiverLog) <<
"Unable to link decoder pad to video sink, result:" << linkRet;
1068 GstObject *parent = gst_element_get_parent(_videoSink);
1070 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1071 (void) gst_element_get_state(_videoSink,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
1072 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1073 gst_clear_object(&parent);
1076 gst_clear_object(&sinkPad);
1077 gst_clear_caps(&caps);
1080 gst_clear_object(&sinkPad);
1082 (void) gst_element_sync_state_with_parent(_videoSink);
1084 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-with-videosink");
1089 if (!_decoderValve) {
1090 qCCritical(GstVideoReceiverLog) <<
"Unable to determine video size - _decoderValve is NULL" <<
_uri;
1094 GstPad *valveSrcPad = gst_element_get_static_pad(_decoderValve,
"src");
1096 qCCritical(GstVideoReceiverLog) <<
"gst_element_get_static_pad() failed";
1100 GstCaps *valveSrcPadCaps = gst_pad_query_caps(valveSrcPad,
nullptr);
1101 if (!valveSrcPadCaps) {
1102 qCCritical(GstVideoReceiverLog) <<
"gst_pad_query_caps() failed";
1103 gst_clear_object(&valveSrcPad);
1107 const GstStructure *structure = gst_caps_get_structure(valveSrcPadCaps, 0);
1109 qCCritical(GstVideoReceiverLog) <<
"Unable to determine video size - structure is NULL" <<
_uri;
1110 gst_clear_object(&valveSrcPad);
1116 (void) gst_structure_get_int(structure,
"width", &width);
1117 (void) gst_structure_get_int(structure,
"height", &height);
1120 gint orientation = 0;
1121 if (gst_structure_get_int(structure,
"video-orientation", &orientation)
1122 && (orientation == GST_VIDEO_ORIENTATION_90R
1123 || orientation == GST_VIDEO_ORIENTATION_90L
1124 || orientation == GST_VIDEO_ORIENTATION_UL_LR
1125 || orientation == GST_VIDEO_ORIENTATION_UR_LL)) {
1126 videoSize.setWidth(height);
1127 videoSize.setHeight(width);
1129 videoSize.setWidth(width);
1130 videoSize.setHeight(height);
1133 gst_clear_caps(&valveSrcPadCaps);
1134 gst_clear_object(&valveSrcPad);
1138 gst_clear_caps(&caps);
1142void GstVideoReceiver::_noteTeeFrame()
1149 if (_reconnectAttempts.load(std::memory_order_relaxed) != 0) {
1150 QMetaObject::invokeMethod(
1151 this, [
this]() { _reconnectAttempts.store(0, std::memory_order_relaxed); }, Qt::QueuedConnection);
1153 const quint64 sourceFrames = _sourceFrameCount.fetch_add(1, std::memory_order_relaxed) + 1;
1154 if (sourceFrames == 1) {
1155 qCInfo(GstVideoReceiverLog).noquote() <<
"Source receiving frames (tee):" <<
_uri;
1156 }
else if ((sourceFrames % 300) == 0) {
1157 qCDebug(GstVideoReceiverLog).noquote()
1158 <<
"Source flow: teeFrames=" << sourceFrames <<
"decoding=" <<
_decoding <<
_uri;
1162void GstVideoReceiver::_noteVideoSinkFrame()
1167 qCDebug(GstVideoReceiverLog) <<
"Decoding started";
1172void GstVideoReceiver::_noteEndOfStream()
1177bool GstVideoReceiver::_unlinkBranch(
GstElement *from)
1179 GstPad *src = gst_element_get_static_pad(from,
"src");
1181 qCCritical(GstVideoReceiverLog) <<
"gst_element_get_static_pad() failed";
1185 GstPad *
sink = gst_pad_get_peer(src);
1187 gst_clear_object(&src);
1188 qCCritical(GstVideoReceiverLog) <<
"gst_pad_get_peer() failed";
1192 if (!gst_pad_unlink(src,
sink)) {
1193 gst_clear_object(&src);
1194 gst_clear_object(&
sink);
1195 qCCritical(GstVideoReceiverLog) <<
"gst_pad_unlink() failed";
1199 gst_clear_object(&src);
1202 const gboolean ret = gst_pad_send_event(
sink, gst_event_new_eos());
1204 gst_clear_object(&
sink);
1207 qCCritical(GstVideoReceiverLog) <<
"Branch EOS was NOT sent";
1211 qCDebug(GstVideoReceiverLog) <<
"Branch EOS was sent";
1216void GstVideoReceiver::_shutdownDecodingBranch()
1219 GstObject *parent = gst_element_get_parent(_decoder);
1221 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1222 (void) gst_element_set_state(_decoder, GST_STATE_NULL);
1223 (void) gst_element_get_state(_decoder,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
1224 gst_clear_object(&parent);
1227 gst_clear_object(&_decoder);
1230 if (_videoSinkProbeId != 0 && _videoSink) {
1231 GstPad *sinkpad = gst_element_get_static_pad(_videoSink,
"sink");
1233 gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
1234 gst_clear_object(&sinkpad);
1237 _videoSinkProbeId = 0;
1239 if (_eosProbeId != 0 && _eosProbePad) {
1241 gst_pad_remove_probe(_eosProbePad, _eosProbeId);
1244 gst_clear_object(&_eosProbePad);
1249 GstObject *parent = gst_element_get_parent(_videoSink);
1251 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1252 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1253 (void) gst_element_get_state(_videoSink,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
1254 gst_clear_object(&parent);
1256 gst_clear_object(&_videoSink);
1263 qCDebug(GstVideoReceiverLog) <<
"Decoding stopped";
1267 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-decoding-stopped");
1270void GstVideoReceiver::_shutdownRecordingBranch()
1272 if (_keyframeWatchId != 0 && _recorderValve) {
1273 GstPad *probepad = gst_element_get_static_pad(_recorderValve,
"src");
1275 gst_pad_remove_probe(probepad, _keyframeWatchId);
1276 gst_clear_object(&probepad);
1278 _keyframeWatchId = 0;
1281 gst_bin_remove(GST_BIN(_pipeline), _fileSink);
1282 gst_element_set_state(_fileSink, GST_STATE_NULL);
1283 (void) gst_element_get_state(_fileSink,
nullptr,
nullptr, GST_CLOCK_TIME_NONE);
1284 gst_clear_object(&_fileSink);
1290 qCDebug(GstVideoReceiverLog) <<
"Recording stopped";
1294 if (_recordingStopRequested) {
1295 _recordingStopRequested =
false;
1299 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-recording-stopped");
1302bool GstVideoReceiver::_needDispatch()
1307GstElement *GstVideoReceiver::_acquirePipelineRef()
const
1309 QMutexLocker lock(&_pipelineMutex);
1310 if (!_pipeline)
return nullptr;
1311 return GST_ELEMENT(gst_object_ref(_pipeline));
1314gboolean GstVideoReceiver::_onBusMessage(GstBus * , GstMessage *msg, gpointer data)
1316 if (!msg || !data) {
1317 qCCritical(GstVideoReceiverLog) <<
"Invalid parameters in _onBusMessage: msg=" << msg <<
"data=" << data;
1323 if (GST_MESSAGE_TYPE(msg) != GST_MESSAGE_ERROR) {
1327 switch (GST_MESSAGE_TYPE(msg)) {
1328 case GST_MESSAGE_ERROR: {
1329 gchar *debug =
nullptr;
1330 GError *
error =
nullptr;
1331 gst_message_parse_error(msg, &
error, &debug);
1332 const bool recoverableH265PaciError = isRecoverableH265PaciError(msg,
error, debug);
1335 qCDebug(GstVideoReceiverLog) <<
"GStreamer debug:" << debug;
1336 g_clear_pointer(&debug, g_free);
1340 if (recoverableH265PaciError) {
1341 qCWarning(GstVideoReceiverLog)
1342 <<
"Ignoring unsupported H.265 RTP PACI packet from rtph265depay:" <<
error->message;
1344 qCCritical(GstVideoReceiverLog) <<
"GStreamer error:" <<
error->message;
1346 g_clear_error(&
error);
1349 if (recoverableH265PaciError) {
1355 if (
GstElement *pipelineRef = pThis->_acquirePipelineRef()) {
1358 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipelineRef), GST_DEBUG_GRAPH_SHOW_ALL,
"pipeline-error");
1360 if (!dotPath.isEmpty()) {
1361 qCInfo(GstVideoReceiverLog) <<
"Pipeline graph saved to" << dotPath;
1363 gst_object_unref(pipelineRef);
1368 pThis->_worker->
dispatch([pThis]() {
1369 qCDebug(GstVideoReceiverLog) <<
"Stopping because of error";
1370 pThis->_scheduleReconnect(
"pipeline error");
1374 case GST_MESSAGE_WARNING: {
1377 gchar *debug =
nullptr;
1378 GError *
error =
nullptr;
1379 gst_message_parse_warning(msg, &
error, &debug);
1380 qCWarning(GstVideoReceiverLog) <<
"GStreamer warning:"
1382 <<
"debug:" << (debug ? debug :
"(none)");
1383 g_clear_error(&
error);
1384 g_clear_pointer(&debug, g_free);
1387 case GST_MESSAGE_EOS:
1388 pThis->_worker->
dispatch([pThis]() {
1389 qCDebug(GstVideoReceiverLog) <<
"Received EOS";
1390 pThis->_handleEOS();
1393 case GST_MESSAGE_STREAM_COLLECTION: {
1394 GstStreamCollection *collection =
nullptr;
1395 gst_message_parse_stream_collection(msg, &collection);
1400 GList *selectedIds =
nullptr;
1401 const guint nStreams = gst_stream_collection_get_size(collection);
1402 for (guint i = 0; i < nStreams; ++i) {
1403 GstStream *stream = gst_stream_collection_get_stream(collection, i);
1404 const GstStreamType type = gst_stream_get_stream_type(stream);
1405 if (type & GST_STREAM_TYPE_VIDEO) {
1406 selectedIds = g_list_append(selectedIds,
1407 g_strdup(gst_stream_get_stream_id(stream)));
1411 GstEvent *
event = gst_event_new_select_streams(selectedIds);
1412 gst_element_send_event(GST_ELEMENT(GST_MESSAGE_SRC(msg)), event);
1413 g_list_free_full(selectedIds, g_free);
1415 gst_object_unref(collection);
1418 case GST_MESSAGE_QOS: {
1419 guint64 processed = 0, dropped = 0;
1420 gst_message_parse_qos_stats(msg,
nullptr, &processed, &dropped);
1423 gdouble proportion = 0;
1425 gst_message_parse_qos_values(msg, &jitter, &proportion, &quality);
1427 pThis->_processedFrames.store(processed, std::memory_order_relaxed);
1428 pThis->_droppedFrames.store(dropped, std::memory_order_relaxed);
1429 pThis->_currentJitterNs.store(jitter, std::memory_order_relaxed);
1430 pThis->_qosProportion.store(proportion, std::memory_order_relaxed);
1431 pThis->_qosQuality.store(quality, std::memory_order_relaxed);
1434 pThis->_qosStatsDirty.store(
true, std::memory_order_release);
1437 case GST_MESSAGE_ELEMENT: {
1438 const GstStructure *structure = gst_message_get_structure(msg);
1439 if (structure && gst_structure_has_name(structure,
"qgc-caps-info")) {
1441 const gchar *fmt = gst_structure_get_string(structure,
"format");
1442 gst_structure_get_int(structure,
"width", &w);
1443 gst_structure_get_int(structure,
"height", &h);
1444 const QString
format = QString::fromUtf8(fmt ? fmt :
"");
1445 const QSize resolution(w, h);
1447 void *src = GST_MESSAGE_SRC(msg);
1448 QMetaObject::invokeMethod(pThis, [pThis, format, resolution, src]() {
1450 if (
static_cast<const void*
>(c->element()) == src) {
1451 c->updateNegotiation(format, resolution);
1454 }, Qt::QueuedConnection);
1457 if (!gst_structure_has_name(structure,
"GstBinForwarded")) {
1461 GstMessage *forward_msg =
nullptr;
1462 gst_structure_get(structure,
"message", GST_TYPE_MESSAGE, &forward_msg, NULL);
1467 if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1468 pThis->_worker->
dispatch([pThis]() {
1469 qCDebug(GstVideoReceiverLog) <<
"Received branch EOS";
1470 pThis->_handleEOS();
1474 gst_clear_message(&forward_msg);
1477 case GST_MESSAGE_STATE_CHANGED: {
1478 GstElement *pipelineRef = pThis->_acquirePipelineRef();
1479 if (!pipelineRef)
break;
1480 const bool fromPipeline = (GST_MESSAGE_SRC(msg) == GST_OBJECT(pipelineRef));
1481 if (!fromPipeline) {
1482 gst_object_unref(pipelineRef);
1485 GstState oldState = GST_STATE_NULL, newState = GST_STATE_NULL;
1486 gst_message_parse_state_changed(msg, &oldState, &newState,
nullptr);
1487 if (newState == GST_STATE_PLAYING && oldState != GST_STATE_PLAYING) {
1488 GstClockTime min = 0, max = 0;
1489 GstQuery *q = gst_query_new_latency();
1490 if (gst_element_query(pipelineRef, q)) {
1491 gboolean live = FALSE;
1492 gst_query_parse_latency(q, &live, &min, &max);
1496 qCDebug(GstVideoReceiverLog).noquote()
1497 <<
"Pipeline PLAYING:" << pThis->
_uri
1498 <<
"decoder:" << (decName.isEmpty() ? QStringLiteral(
"(pending)") : decName)
1499 <<
"min-latency:" << (min / 1000000) <<
"ms"
1500 <<
"max-latency:" << (max / 1000000) <<
"ms";
1502 gst_object_unref(pipelineRef);
1505 case GST_MESSAGE_LATENCY:
1506 pThis->_worker->
dispatch([pThis]() {
1507 GstElement* pipeline = pThis->_acquirePipelineRef();
1509 (void) gst_bin_recalculate_latency(GST_BIN(pipeline));
1510 gst_object_unref(pipeline);
1515 QMetaObject::invokeMethod(pThis, [pThis]() {
1517 c->refreshLatency();
1518 }, Qt::QueuedConnection);
1527void GstVideoReceiver::_onNewPad(
GstElement *element, GstPad *pad, gpointer data)
1531 if (element == self->_source) {
1532 self->_onNewSourcePad(pad);
1533 }
else if (element == self->_decoder) {
1534 self->_onNewDecoderPad(pad);
1536 qCDebug(GstVideoReceiverLog) <<
"Unexpected call!";
1540GstPadProbeReturn GstVideoReceiver::_teeProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1542 Q_UNUSED(pad); Q_UNUSED(info)
1546 pThis->_noteTeeFrame();
1549 return GST_PAD_PROBE_OK;
1552GstPadProbeReturn GstVideoReceiver::_videoSinkProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1554 Q_UNUSED(pad); Q_UNUSED(info)
1563 gst_pad_send_event(pad, gst_event_new_flush_start());
1564 gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));
1568 if ((buf = gst_pad_probe_info_get_buffer(info)) !=
nullptr) {
1571 if ((seg = gst_segment_new()) !=
nullptr) {
1572 gst_segment_init(seg, GST_FORMAT_TIME);
1574 seg->start = buf->pts;
1576 gst_pad_send_event(pad, gst_event_new_segment(seg));
1578 gst_segment_free(seg);
1582 gst_pad_set_offset(pad, -
static_cast<gint64
>(buf->pts));
1587 pThis->_noteVideoSinkFrame();
1590 return GST_PAD_PROBE_OK;
1593GstPadProbeReturn GstVideoReceiver::_eosProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1596 Q_ASSERT(user_data);
1599 const GstEvent *
event = gst_pad_probe_info_get_event(info);
1600 if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1602 pThis->_noteEndOfStream();
1606 return GST_PAD_PROBE_OK;
1609GstPadProbeReturn GstVideoReceiver::_keyframeWatch(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1611 if (!info || !user_data) {
1612 qCCritical(GstVideoReceiverLog) <<
"Invalid arguments";
1613 return GST_PAD_PROBE_DROP;
1616 GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
1617 if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
1619 return GST_PAD_PROBE_DROP;
1623 gst_pad_set_offset(pad, -
static_cast<gint64
>(buf->pts));
1625 qCDebug(GstVideoReceiverLog) <<
"Got keyframe, stop dropping buffers";
1630 return GST_PAD_PROBE_REMOVE;
1636 qCDebug(GstVideoReceiverLog) <<
this;
1641 qCDebug(GstVideoReceiverLog) <<
this;
1646 return (QThread::currentThread() !=
this);
1651 QMutexLocker lock(&_taskQueueSync);
1652 _taskQueue.enqueue(task);
1653 _taskQueueUpdate.wakeOne();
1659 dispatch([
this]() { _shutdown =
true; });
1660 (void) QThread::wait(2000);
1666void GstVideoWorker::run()
1668 while (!_shutdown) {
1669 _taskQueueSync.lock();
1671 while (_taskQueue.isEmpty()) {
1672 _taskQueueUpdate.wait(&_taskQueueSync);
1675 const Task task = _taskQueue.dequeue();
1677 _taskQueueSync.unlock();
std::function< void()> Task
struct _GstElement GstElement
#define QGC_LOGGING_CATEGORY(name, categoryStr)
void stopDecoding() override
void takeScreenshot(const QString &imageFile) override
Q_INVOKABLE void dumpPipelineGraph(const QString &tag=QStringLiteral("manual"))
void decoderStatsChanged()
QString decoderName() const
GstVideoReceiver(QObject *parent=nullptr)
void start(uint32_t timeout) override
void stopRecording() override
void startRecording(const QString &videoFile, FILE_FORMAT format) override
void startDecoding(void *sink) override
bool needDispatch() const
GstVideoWorker(QObject *parent=nullptr)
void recordingStarted(const QString &filename)
std::atomic< qint64 > _lastSourceFrameTime
std::atomic< qint64 > _lastVideoFrameTime
void videoSizeChanged(QSize size)
void streamingChanged(bool active)
void nameChanged(const QString &name)
void recordingChanged(bool active)
std::atomic< bool > _decoding
void onStartRecordingComplete(STATUS status)
VideoSinkHandle sink() const
void onTakeScreenshotComplete(STATUS status)
void decodingChanged(bool active)
void onStartComplete(STATUS status)
static bool isValidFileFormat(FILE_FORMAT format)
void onStopDecodingComplete(STATUS status)
void onStopRecordingComplete(STATUS status)
QString recordingOutput() const
void onStartDecodingComplete(STATUS status)
void onStopComplete(STATUS status)
std::atomic< bool > _autoReconnect
RTSP/UDP auto-reconnect with exponential backoff on watchdog/error.
GstElement * create(const QString &uri, const Config &config)
@ None
No rtpjitterbuffer element; lowest latency, no reordering.
@ Buffered
rtpjitterbuffer with drop-on-latency=FALSE.
@ DropOnLatency
rtpjitterbuffer with drop-on-latency=TRUE.
QString writePipelineDot(GstElement *pipeline, const char *tag)
bool isHardwareDecoderFactory(GstElementFactory *factory)
QString takeExtraPathStats() noexcept
Path-specific extras after formatPathStats (GL reuse/sync waits); reads-and-clears,...
GstBusSyncReply onBusSyncMessage(GstBus *, GstMessage *msg, gpointer) noexcept
Bus sync handler (GstBusSyncHandler) installed on every pipeline; no-op when no GPU path compiled.
void dispatchBusMessage(GstMessage *msg) noexcept
Receiver-side bus hook; drops cached GPU devices on GST_MESSAGE_ERROR. No-op when no GPU paths compil...
PathStats formatPathStats(bool reset) noexcept
JitterBuffer jitterBuffer
Formatted per-path counters + delivered total; reset=true reads-and-clears (teardown),...