QGroundControl
Ground Control Station for MAVLink Drones
Loading...
Searching...
No Matches
GstVideoReceiver.cc
Go to the documentation of this file.
1//-----------------------------------------------------------------------------
2// Our pipeline look like this:
3//
4// +-->queue-->_decoderValve[-->_decoder-->_videoSink]
5// |
6// _source-->_tee
7// |
8// +-->queue-->_recorderValve[-->_fileSink]
9//-----------------------------------------------------------------------------
10
11#include "GstVideoReceiver.h"
12#include "GStreamerHelpers.h"
13#include "QGCLoggingCategory.h"
14
15#include <QtCore/QDateTime>
16#include <QtCore/QUrl>
17#include <QtQuick/QQuickItem>
18
19#include <gst/gst.h>
20
21QGC_LOGGING_CATEGORY(GstVideoReceiverLog, "Video.GstVideoReceiver")
22
24 : VideoReceiver(parent)
25 , _worker(new GstVideoWorker(this))
26{
27 // qCDebug(GstVideoReceiverLog) << this;
28
29 _worker->start();
30 (void) connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog);
31 _watchdogTimer.start(1000);
32}
33
35{
36 stop();
37 _worker->shutdown();
38
39 // qCDebug(GstVideoReceiverLog) << this;
40}
41
42void GstVideoReceiver::start(uint32_t timeout)
43{
44 if (_needDispatch()) {
45 _worker->dispatch([this, timeout]() { start(timeout); });
46 return;
47 }
48
49 if (_pipeline) {
50 qCDebug(GstVideoReceiverLog) << "Already running!" << _uri;
51 _dispatchSignal([this]() { emit onStartComplete(STATUS_INVALID_STATE); });
52 return;
53 }
54
55 if (_uri.isEmpty()) {
56 qCDebug(GstVideoReceiverLog) << "Failed because URI is not specified";
57 _dispatchSignal([this]() { emit onStartComplete(STATUS_INVALID_URL); });
58 return;
59 }
60
62 _buffer = lowLatency() ? -1 : 0;
63
64 qCDebug(GstVideoReceiverLog) << "Starting" << _uri << ", lowLatency" << lowLatency() << ", timeout" << _timeout;
65
66 _endOfStream = false;
67
68 bool running = false;
69 bool pipelineUp = false;
70
71 GstElement *decoderQueue = nullptr;
72 GstElement *recorderQueue = nullptr;
73
74 do {
75 _tee = gst_element_factory_make("tee", nullptr);
76 if (!_tee) {
77 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tee') failed";
78 break;
79 }
80
81 GstPad *pad = gst_element_get_static_pad(_tee, "sink");
82 if (!pad) {
83 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
84 break;
85 }
86
88
89 _teeProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr);
90 gst_clear_object(&pad);
91
92 decoderQueue = gst_element_factory_make("queue", nullptr);
93 if (!decoderQueue) {
94 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
95 break;
96 }
97
98 _decoderValve = gst_element_factory_make("valve", nullptr);
99 if (!_decoderValve) {
100 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
101 break;
102 }
103
104 g_object_set(_decoderValve,
105 "drop", TRUE,
106 nullptr);
107
108 recorderQueue = gst_element_factory_make("queue", nullptr);
109 if (!recorderQueue) {
110 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('queue') failed";
111 break;
112 }
113
114 _recorderValve = gst_element_factory_make("valve", nullptr);
115 if (!_recorderValve) {
116 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('valve') failed";
117 break;
118 }
119
120 g_object_set(_recorderValve,
121 "drop", TRUE,
122 nullptr);
123
124 _pipeline = gst_pipeline_new("receiver");
125 if (!_pipeline) {
126 qCCritical(GstVideoReceiverLog) << "gst_pipeline_new() failed";
127 break;
128 }
129
130 g_object_set(_pipeline,
131 "message-forward", TRUE,
132 nullptr);
133
134 _source = _makeSource(_uri);
135 if (!_source) {
136 qCCritical(GstVideoReceiverLog) << "_makeSource() failed";
137 break;
138 }
139
140 gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr);
141
142 pipelineUp = true;
143
144 GstPad *srcPad = nullptr;
145 GstIterator *it = gst_element_iterate_src_pads(_source);
146 GValue vpad = G_VALUE_INIT;
147 switch (gst_iterator_next(it, &vpad)) {
148 case GST_ITERATOR_OK:
149 srcPad = GST_PAD(g_value_get_object(&vpad));
150 (void) gst_object_ref(srcPad);
151 (void) g_value_reset(&vpad);
152 break;
153 case GST_ITERATOR_RESYNC:
154 gst_iterator_resync(it);
155 break;
156 default:
157 break;
158 }
159 g_value_unset(&vpad);
160 gst_iterator_free(it);
161
162 if (srcPad) {
163 _onNewSourcePad(srcPad);
164 gst_clear_object(&srcPad);
165 } else {
166 (void) g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this);
167 }
168
169 if (!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) {
170 qCCritical(GstVideoReceiverLog) << "Unable to link decoder queue";
171 break;
172 }
173
174 if (!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) {
175 qCCritical(GstVideoReceiverLog) << "Unable to link recorder queue";
176 break;
177 }
178
179 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
180 if (bus) {
181 gst_bus_enable_sync_message_emission(bus);
182 (void) g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
183 gst_clear_object(&bus);
184 }
185
186 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial");
187 running = (gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
188 } while(0);
189
190 if (!running) {
191 qCCritical(GstVideoReceiverLog) << "Failed";
192
193 if (_pipeline) {
194 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
195 gst_clear_object(&_pipeline);
196 }
197
198 if (!pipelineUp) {
199 gst_clear_object(&_recorderValve);
200 gst_clear_object(&recorderQueue);
201 gst_clear_object(&_decoderValve);
202 gst_clear_object(&decoderQueue);
203 gst_clear_object(&_tee);
204 gst_clear_object(&_source);
205 }
206
207 // Rate limit restarts on failure. This sleep is OK because we're in the video worker thread.
208 QThread::sleep(1);
209 _dispatchSignal([this]() { emit onStartComplete(STATUS_FAIL); });
210 } else {
211 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started");
212 qCDebug(GstVideoReceiverLog) << "Started" << _uri;
213
214 _dispatchSignal([this]() { emit onStartComplete(STATUS_OK); });
215 }
216}
217
219{
220 if (_needDispatch()) {
221 _worker->dispatch([this]() { stop(); });
222 return;
223 }
224
225 if (_uri.isEmpty()) {
226 qCWarning(GstVideoReceiverLog) << "Stop called on empty URI";
227 return;
228 }
229
230 qCDebug(GstVideoReceiverLog) << "Stopping" << _uri;
231
232 if (_teeProbeId != 0) {
233 if (_tee) {
234 GstPad *sinkpad = gst_element_get_static_pad(_tee, "sink");
235 if (sinkpad) {
236 gst_pad_remove_probe(sinkpad, _teeProbeId);
237 gst_clear_object(&sinkpad);
238 }
239 }
240 _teeProbeId = 0;
241 }
242
243 if (_pipeline) {
244 GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
245 if (bus) {
246 gst_bus_disable_sync_message_emission(bus);
247 (void) g_signal_handlers_disconnect_by_data(bus, this);
248
249 gboolean recordingValveClosed = TRUE;
250 g_object_get(_recorderValve, "drop", &recordingValveClosed, nullptr);
251
252 if (!recordingValveClosed) {
253 (void) gst_element_send_event(_pipeline, gst_event_new_eos());
254
255 GstMessage *msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS | GST_MESSAGE_ERROR));
256 if (msg) {
257 switch (GST_MESSAGE_TYPE(msg)) {
258 case GST_MESSAGE_EOS:
259 qCDebug(GstVideoReceiverLog) << "End of stream received!";
260 break;
261 case GST_MESSAGE_ERROR:
262 qCCritical(GstVideoReceiverLog) << "Error stopping pipeline!";
263 break;
264 default:
265 break;
266 }
267
268 gst_clear_message(&msg);
269 } else {
270 qCCritical(GstVideoReceiverLog) << "gst_bus_timed_pop_filtered() failed";
271 }
272 }
273
274 gst_clear_object(&bus);
275 } else {
276 qCCritical(GstVideoReceiverLog) << "gst_pipeline_get_bus() failed";
277 }
278
279 (void) gst_element_set_state(_pipeline, GST_STATE_NULL);
280
281 // FIXME: check if branch is connected and remove all elements from branch
282 if (_fileSink) {
283 _shutdownRecordingBranch();
284 }
285
286 if (_videoSink) {
287 _shutdownDecodingBranch();
288 }
289
290 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped");
291
292 gst_clear_object(&_pipeline);
293 _pipeline = nullptr;
294
295 _recorderValve = nullptr;
296 _decoderValve = nullptr;
297 _tee = nullptr;
298 _source = nullptr;
299
301
302 if (_streaming) {
303 _streaming = false;
304 qCDebug(GstVideoReceiverLog) << "Streaming stopped" << _uri;
305 _dispatchSignal([this]() { emit streamingChanged(_streaming); });
306 } else {
307 qCDebug(GstVideoReceiverLog) << "Streaming did not start" << _uri;
308 }
309 }
310
311 qCDebug(GstVideoReceiverLog) << "Stopped" << _uri;
312
313 _dispatchSignal([this]() { emit onStopComplete(STATUS_OK); });
314}
315
317{
318 if (!sink) {
319 qCCritical(GstVideoReceiverLog) << "VideoSink is NULL" << _uri;
320 return;
321 }
322
323 if (_needDispatch()) {
324 _worker->dispatch([this, sink]() mutable { startDecoding(sink); });
325 return;
326 }
327
328 qCDebug(GstVideoReceiverLog) << "Starting decoding" << _uri;
329
330 if (!_widget) {
331 qCDebug(GstVideoReceiverLog) << "Video Widget is NULL" << _uri;
332 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
333 return;
334 }
335
336 if (!_pipeline) {
337 gst_clear_object(&_videoSink);
338 }
339
340 if (_videoSink || _decoding) {
341 qCDebug(GstVideoReceiverLog) << "Already decoding!" << _uri;
342 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_INVALID_STATE); });
343 return;
344 }
345
346 GstElement *videoSink = GST_ELEMENT(sink);
347 GstPad *pad = gst_element_get_static_pad(videoSink, "sink");
348 if (!pad) {
349 qCCritical(GstVideoReceiverLog) << "Unable to find sink pad of video sink" << _uri;
350 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
351 return;
352 }
353
355 _resetVideoSink = true;
356
357 _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr);
358 gst_clear_object(&pad);
359
360 _videoSink = videoSink;
361 gst_object_ref(_videoSink);
362
363 _removingDecoder = false;
364
365 if (!_streaming) {
366 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_OK); });
367 return;
368 }
369
370 if (!_addDecoder(_decoderValve)) {
371 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed" << _uri;
372 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_FAIL); });
373 return;
374 }
375
376 g_object_set(_decoderValve,
377 "drop", FALSE,
378 nullptr);
379
380 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
381
382 _dispatchSignal([this]() { emit onStartDecodingComplete(STATUS_OK); });
383}
384
386{
387 if (_needDispatch()) {
388 _worker->dispatch([this]() { stopDecoding(); });
389 return;
390 }
391
392 qCDebug(GstVideoReceiverLog) << "Stopping decoding" << _uri;
393
394 if (!_pipeline || !_decoding) {
395 qCDebug(GstVideoReceiverLog) << "Not decoding!" << _uri;
396 _dispatchSignal([this]() { emit onStopDecodingComplete(STATUS_INVALID_STATE); });
397 return;
398 }
399
400 g_object_set(_decoderValve,
401 "drop", TRUE,
402 nullptr);
403
404 _removingDecoder = true;
405
406 const bool ret = _unlinkBranch(_decoderValve);
407
408 // FIXME: it is much better to emit onStopDecodingComplete() after decoding is really stopped
409 // (which happens later due to async design) but as for now it is also not so bad...
410 _dispatchSignal([this, ret](){ emit onStopDecodingComplete(ret ? STATUS_OK : STATUS_FAIL); });
411}
412
413void GstVideoReceiver::startRecording(const QString &videoFile, FILE_FORMAT format)
414{
415 if (_needDispatch()) {
416 const QString cachedVideoFile = videoFile;
417 _worker->dispatch([this, cachedVideoFile, format]() { startRecording(cachedVideoFile, format); });
418 return;
419 }
420
421 qCDebug(GstVideoReceiverLog) << "Starting recording" << _uri;
422
423 if (!_pipeline) {
424 qCDebug(GstVideoReceiverLog) << "Streaming is not active!" << _uri;
425 _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_INVALID_STATE); });
426 return;
427 }
428
429 if (_recording) {
430 qCDebug(GstVideoReceiverLog) << "Already recording!" << _uri;
431 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_INVALID_STATE); });
432 return;
433 }
434
435 qCDebug(GstVideoReceiverLog) << "New video file:" << videoFile << _uri;
436
437 _fileSink = _makeFileSink(videoFile, format);
438 if (!_fileSink) {
439 qCCritical(GstVideoReceiverLog) << "_makeFileSink() failed" << _uri;
440 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
441 return;
442 }
443
444 _removingRecorder = false;
445
446 (void) gst_object_ref(_fileSink);
447
448 gst_bin_add(GST_BIN(_pipeline), _fileSink);
449
450 if (!gst_element_link(_recorderValve, _fileSink)) {
451 qCCritical(GstVideoReceiverLog) << "Failed to link valve and file sink" << _uri;
452 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
453 return;
454 }
455
456 (void) gst_element_sync_state_with_parent(_fileSink);
457
458 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink");
459
460 // Install a probe on the recording branch to drop buffers until we hit our first keyframe
461 // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
462 // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback
463 GstPad *probepad = gst_element_get_static_pad(_recorderValve, "src");
464 if (!probepad) {
465 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed" << _uri;
466 _dispatchSignal([this]() { emit onStartRecordingComplete(STATUS_FAIL); });
467 return;
468 }
469
470 (void) gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffers until key frame is received
471 gst_clear_object(&probepad);
472
473 g_object_set(_recorderValve,
474 "drop", FALSE,
475 nullptr);
476
477 _recordingOutput = videoFile;
478 _recording = true;
479 qCDebug(GstVideoReceiverLog) << "Recording started" << _uri;
480 _dispatchSignal([this]() {
483 });
484}
485
487{
488 if (_needDispatch()) {
489 _worker->dispatch([this]() { stopRecording(); });
490 return;
491 }
492
493 qCDebug(GstVideoReceiverLog) << "Stopping recording" << _uri;
494
495 if (!_pipeline || !_recording) {
496 qCDebug(GstVideoReceiverLog) << "Not recording!" << _uri;
497 _dispatchSignal([this]() { emit onStopRecordingComplete(STATUS_INVALID_STATE); });
498 return;
499 }
500
501 g_object_set(_recorderValve,
502 "drop", TRUE,
503 nullptr);
504
505 _removingRecorder = true;
506
507 const bool ret = _unlinkBranch(_recorderValve);
508
509 // FIXME: it is much better to emit onStopRecordingComplete() after recording is really stopped
510 // (which happens later due to async design) but as for now it is also not so bad...
511 _dispatchSignal([this, ret]() { emit onStopRecordingComplete(ret ? STATUS_OK : STATUS_FAIL); });
512}
513
514void GstVideoReceiver::takeScreenshot(const QString &imageFile)
515{
516 if (_needDispatch()) {
517 const QString cachedImageFile = imageFile;
518 _worker->dispatch([this, cachedImageFile]() { takeScreenshot(cachedImageFile); });
519 return;
520 }
521
522 qCDebug(GstVideoReceiverLog) << "taking screenshot" << _uri;
523
524 // FIXME: record screenshot here
525 _dispatchSignal([this]() { emit onTakeScreenshotComplete(STATUS_NOT_IMPLEMENTED); });
526}
527
528void GstVideoReceiver::_watchdog()
529{
530 _worker->dispatch([this]() {
531 if (!_pipeline) {
532 return;
533 }
534
535 const qint64 now = QDateTime::currentSecsSinceEpoch();
536 if (_lastSourceFrameTime == 0) {
538 }
539
540 qint64 elapsed = now - _lastSourceFrameTime;
541 if (elapsed > _timeout) {
542 qCDebug(GstVideoReceiverLog) << "Stream timeout, no frames for" << elapsed << _uri;
543 _dispatchSignal([this]() { emit timeout(); });
544 stop();
545 }
546
547 if (_decoding && !_removingDecoder) {
548 if (_lastVideoFrameTime == 0) {
550 }
551
552 elapsed = now - _lastVideoFrameTime;
553 if (elapsed > (_timeout * 2)) {
554 qCDebug(GstVideoReceiverLog) << "Video decoder timeout, no frames for" << elapsed << _uri;
555 _dispatchSignal([this]() { emit timeout(); });
556 stop();
557 }
558 }
559 });
560}
561
562void GstVideoReceiver::_handleEOS()
563{
564 if (!_pipeline) {
565 return;
566 }
567
568 if (_endOfStream) {
569 stop();
570 } else if (_decoding && _removingDecoder) {
571 _shutdownDecodingBranch();
572 } else if (_recording && _removingRecorder) {
573 _shutdownRecordingBranch();
574 } /*else {
575 qCWarning(GstVideoReceiverLog) << "Unexpected EOS!";
576 stop();
577 }*/
578}
579
580gboolean GstVideoReceiver::_filterParserCaps(GstElement *bin, GstPad *pad, GstElement *element, GstQuery *query, gpointer data)
581{
582 Q_UNUSED(bin); Q_UNUSED(pad); Q_UNUSED(element); Q_UNUSED(data)
583
584 if (GST_QUERY_TYPE(query) != GST_QUERY_CAPS) {
585 return FALSE;
586 }
587
588 GstCaps *srcCaps;
589 gst_query_parse_caps(query, &srcCaps);
590 if (!srcCaps || gst_caps_is_any(srcCaps)) {
591 return FALSE;
592 }
593
594 GstCaps *sinkCaps = nullptr;
595 GstCaps *filter = nullptr;
596 GstStructure *structure = gst_caps_get_structure(srcCaps, 0);
597 if (gst_structure_has_name(structure, "video/x-h265")) {
598 filter = gst_caps_from_string("video/x-h265");
599 if (gst_caps_can_intersect(srcCaps, filter)) {
600 sinkCaps = gst_caps_from_string("video/x-h265,stream-format=hvc1");
601 }
602 gst_clear_caps(&filter);
603 } else if (gst_structure_has_name(structure, "video/x-h264")) {
604 filter = gst_caps_from_string("video/x-h264");
605 if (gst_caps_can_intersect(srcCaps, filter)) {
606 sinkCaps = gst_caps_from_string("video/x-h264,stream-format=avc");
607 }
608 gst_clear_caps(&filter);
609 }
610
611 if (sinkCaps) {
612 gst_query_set_caps_result(query, sinkCaps);
613 gst_clear_caps(&sinkCaps);
614 return TRUE;
615 }
616
617 return FALSE;
618}
619
620GstElement *GstVideoReceiver::_makeSource(const QString &input)
621{
622 if (input.isEmpty()) {
623 qCCritical(GstVideoReceiverLog) << "Failed because URI is not specified";
624 return nullptr;
625 }
626
627 const QUrl sourceUrl(input);
628
629 const bool isRtsp = sourceUrl.scheme().startsWith("rtsp", Qt::CaseInsensitive);
630 const bool isUdp264 = input.contains("udp://", Qt::CaseInsensitive);
631 const bool isUdp265 = input.contains("udp265://", Qt::CaseInsensitive);
632 const bool isUdpMPEGTS = input.contains("mpegts://", Qt::CaseInsensitive);
633 const bool isTcpMPEGTS = input.contains("tcp://", Qt::CaseInsensitive);
634
635 GstElement *source = nullptr;
636 GstElement *buffer = nullptr;
637 GstElement *tsdemux = nullptr;
638 GstElement *parser = nullptr;
639 GstElement *bin = nullptr;
640 GstElement *srcbin = nullptr;
641
642 do {
643 if (isRtsp) {
644 if (!GStreamer::is_valid_rtsp_uri(input.toUtf8().constData())) {
645 qCCritical(GstVideoReceiverLog) << "Invalid RTSP URI:" << input;
646 break;
647 }
648
649 source = gst_element_factory_make("rtspsrc", "source");
650 if (!source) {
651 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('rtspsrc') failed";
652 break;
653 }
654
655 g_object_set(source,
656 "location", input.toUtf8().constData(),
657 "latency", 25,
658 nullptr);
659 } else if (isTcpMPEGTS) {
660 source = gst_element_factory_make("tcpclientsrc", "source");
661 if (!source) {
662 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tcpclientsrc') failed";
663 break;
664 }
665
666 const QString host = sourceUrl.host();
667 const quint16 port = sourceUrl.port();
668 g_object_set(source,
669 "host", host.toUtf8().constData(),
670 "port", port,
671 nullptr);
672 } else if (isUdp264 || isUdp265 || isUdpMPEGTS) {
673 source = gst_element_factory_make("udpsrc", "source");
674 if (!source) {
675 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('udpsrc') failed";
676 break;
677 }
678
679 const QString uri = QStringLiteral("udp://%1:%2").arg(sourceUrl.host(), QString::number(sourceUrl.port()));
680 g_object_set(source,
681 "uri", uri.toUtf8().constData(),
682 nullptr);
683
684 GstCaps *caps = nullptr;
685 if (isUdp264) {
686 caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264");
687 if (!caps) {
688 qCCritical(GstVideoReceiverLog) << "gst_caps_from_string() failed";
689 break;
690 }
691 } else if (isUdp265) {
692 caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H265");
693 if (!caps) {
694 qCCritical(GstVideoReceiverLog) << "gst_caps_from_string() failed";
695 break;
696 }
697 }
698
699 if (caps) {
700 g_object_set(source,
701 "caps", caps,
702 nullptr);
703 gst_clear_caps(&caps);
704 }
705 } else {
706 qCDebug(GstVideoReceiverLog) << "URI is not recognized";
707 }
708
709 if (!source) {
710 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make() for data source failed";
711 break;
712 }
713
714 bin = gst_bin_new("sourcebin");
715 if (!bin) {
716 qCCritical(GstVideoReceiverLog) << "gst_bin_new('sourcebin') failed";
717 break;
718 }
719
720 parser = gst_element_factory_make("parsebin", "parser");
721 if (!parser) {
722 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('parsebin') failed";
723 break;
724 }
725
726 (void) g_signal_connect(parser, "autoplug-query", G_CALLBACK(_filterParserCaps), nullptr);
727
728 gst_bin_add_many(GST_BIN(bin), source, parser, nullptr);
729
730 // FIXME: AV: Android does not determine MPEG2-TS via parsebin - have to explicitly state which demux to use
731 // FIXME: AV: tsdemux handling is a bit ugly - let's try to find elegant solution for that later
732 if (isTcpMPEGTS || isUdpMPEGTS) {
733 tsdemux = gst_element_factory_make("tsdemux", nullptr);
734 if (!tsdemux) {
735 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('tsdemux') failed";
736 break;
737 }
738
739 (void) gst_bin_add(GST_BIN(bin), tsdemux);
740
741 if (!gst_element_link(source, tsdemux)) {
742 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
743 break;
744 }
745
746 source = tsdemux;
747 tsdemux = nullptr;
748 }
749
750 int probeRes = 0;
751 (void) gst_element_foreach_src_pad(source, _padProbe, &probeRes);
752
753 if (probeRes & 1) {
754 if ((probeRes & 2) && (_buffer >= 0)) {
755 buffer = gst_element_factory_make("rtpjitterbuffer", nullptr);
756 if (!buffer) {
757 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('rtpjitterbuffer') failed";
758 break;
759 }
760
761 (void) gst_bin_add(GST_BIN(bin), buffer);
762
763 if (!gst_element_link_many(source, buffer, parser, nullptr)) {
764 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
765 break;
766 }
767 } else {
768 if (!gst_element_link(source, parser)) {
769 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
770 break;
771 }
772 }
773 } else {
774 (void) g_signal_connect(source, "pad-added", G_CALLBACK(_linkPad), parser);
775 }
776
777 (void) g_signal_connect(parser, "pad-added", G_CALLBACK(_wrapWithGhostPad), nullptr);
778
779 source = tsdemux = buffer = parser = nullptr;
780
781 srcbin = bin;
782 bin = nullptr;
783 } while(0);
784
785 gst_clear_object(&bin);
786 gst_clear_object(&parser);
787 gst_clear_object(&tsdemux);
788 gst_clear_object(&buffer);
789 gst_clear_object(&source);
790
791 return srcbin;
792}
793
794GstElement *GstVideoReceiver::_makeDecoder(GstCaps *caps, GstElement *videoSink)
795{
796 Q_UNUSED(caps); Q_UNUSED(videoSink)
797
798 GstElement *decoder = gst_element_factory_make("decodebin3", nullptr);
799 if (!decoder) {
800 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('decodebin3') failed";
801 }
802
803 return decoder;
804}
805
806GstElement *GstVideoReceiver::_makeFileSink(const QString &videoFile, FILE_FORMAT format)
807{
808 GstElement *fileSink = nullptr;
809 GstElement *mux = nullptr;
810 GstElement *sink = nullptr;
811 GstElement *bin = nullptr;
812 bool releaseElements = true;
813
814 do {
815 if (!isValidFileFormat(format)) {
816 qCCritical(GstVideoReceiverLog) << "Unsupported file format";
817 break;
818 }
819
820 mux = gst_element_factory_make(_kFileMux[format], nullptr);
821 if (!mux) {
822 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('" << _kFileMux[format] << "') failed";
823 break;
824 }
825
826 sink = gst_element_factory_make("filesink", nullptr);
827 if (!sink) {
828 qCCritical(GstVideoReceiverLog) << "gst_element_factory_make('filesink') failed";
829 break;
830 }
831
832 g_object_set(sink,
833 "location", qPrintable(videoFile),
834 nullptr);
835
836 bin = gst_bin_new("sinkbin");
837 if (!bin) {
838 qCCritical(GstVideoReceiverLog) << "gst_bin_new('sinkbin') failed";
839 break;
840 }
841
842 GstPadTemplate *padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u");
843 if (!padTemplate) {
844 qCCritical(GstVideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed";
845 break;
846 }
847
848 // FIXME: pad handling is potentially leaking (and other similar places too!)
849 GstPad *pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr);
850 if (!pad) {
851 qCCritical(GstVideoReceiverLog) << "gst_element_request_pad(mux) failed";
852 break;
853 }
854
855 gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr);
856
857 releaseElements = false;
858
859 GstPad *ghostpad = gst_ghost_pad_new("sink", pad);
860 (void) gst_element_add_pad(bin, ghostpad);
861 gst_clear_object(&pad);
862
863 if (!gst_element_link(mux, sink)) {
864 qCCritical(GstVideoReceiverLog) << "gst_element_link() failed";
865 break;
866 }
867
868 fileSink = bin;
869 bin = nullptr;
870 } while(0);
871
872 if (releaseElements) {
873 gst_clear_object(&sink);
874 gst_clear_object(&mux);
875 }
876
877 gst_clear_object(&bin);
878 return fileSink;
879}
880
881void GstVideoReceiver::_onNewSourcePad(GstPad *pad)
882{
883 // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it
884 if (!gst_element_link(_source, _tee)) {
885 qCCritical(GstVideoReceiverLog) << "Unable to link source";
886 return;
887 }
888
889 if (!_streaming) {
890 _streaming = true;
891 qCDebug(GstVideoReceiverLog) << "Streaming started" << _uri;
892 _dispatchSignal([this]() { emit streamingChanged(_streaming); });
893 }
894
895 (void) gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr);
896 if (!_videoSink) {
897 return;
898 }
899
900 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad");
901
902 if (!_addDecoder(_decoderValve)) {
903 qCCritical(GstVideoReceiverLog) << "_addDecoder() failed";
904 return;
905 }
906
907 g_object_set(_decoderValve,
908 "drop", FALSE,
909 nullptr);
910
911 qCDebug(GstVideoReceiverLog) << "Decoding started" << _uri;
912}
913
914void GstVideoReceiver::_logDecodebin3SelectedCodec(GstElement *decodebin3)
915{
916 GValue value = G_VALUE_INIT;
917 GstIterator *iter = gst_bin_iterate_elements(GST_BIN(decodebin3));
918 GstElement *child;
919
920 while (gst_iterator_next(iter, &value) == GST_ITERATOR_OK) {
921 child = GST_ELEMENT(g_value_get_object(&value));
922 GstElementFactory *factory = gst_element_get_factory(child);
923
924 if (factory) {
925 gboolean is_decoder = gst_element_factory_list_is_type(factory, GST_ELEMENT_FACTORY_TYPE_DECODER);
926 if (is_decoder) {
927 const gchar *decoderKlass = gst_element_factory_get_klass(factory);
928 GstPluginFeature *feature = GST_PLUGIN_FEATURE(factory);
929 const gchar *featureName = gst_plugin_feature_get_name(feature);
930 const guint rank = gst_plugin_feature_get_rank(feature);
931 bool isHardwareDecoder = GStreamer::is_hardware_decoder_factory(factory);
932
933 QString pluginName = featureName;
934 GstPlugin *plugin = gst_plugin_feature_get_plugin(feature);
935 if (plugin) {
936 pluginName = gst_plugin_get_name(plugin);
937 gst_object_unref(plugin);
938 }
939 qCDebug(GstVideoReceiverLog) << "Decodebin3 selected codec:rank -" << pluginName << "/" << featureName << "-" << decoderKlass << (isHardwareDecoder ? "(HW)" : "(SW)") << ":" << rank;
940 }
941 }
942 g_value_reset(&value);
943 }
944 g_value_unset(&value);
945 gst_iterator_free(iter);
946}
947
948
949void GstVideoReceiver::_onNewDecoderPad(GstPad *pad)
950{
951 qCDebug(GstVideoReceiverLog) << "_onNewDecoderPad" << _uri;
952
953 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad");
954
955 // We should now know what codec decodebin3 selected.
956 _logDecodebin3SelectedCodec(_decoder);
957
958 if (!_addVideoSink(pad)) {
959 qCCritical(GstVideoReceiverLog) << "_addVideoSink() failed";
960 }
961}
962
963bool GstVideoReceiver::_addDecoder(GstElement *src)
964{
965 GstPad *srcpad = gst_element_get_static_pad(src, "src");
966 if (!srcpad) {
967 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
968 return false;
969 }
970
971 GstCaps *caps = gst_pad_query_caps(srcpad, nullptr);
972 if (!caps) {
973 qCCritical(GstVideoReceiverLog) << "gst_pad_query_caps() failed";
974 gst_clear_object(&srcpad);
975 return false;
976 }
977
978 gst_clear_object(&srcpad);
979
980 _decoder = _makeDecoder();
981 if (!_decoder) {
982 qCCritical(GstVideoReceiverLog) << "_makeDecoder() failed";
983 gst_clear_caps(&caps);
984 return false;
985 }
986
987 (void) gst_object_ref(_decoder);
988
989 gst_clear_caps(&caps);
990
991 (void) gst_bin_add(GST_BIN(_pipeline), _decoder);
992 (void) gst_element_sync_state_with_parent(_decoder);
993
994 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder");
995
996 if (!gst_element_link(src, _decoder)) {
997 qCCritical(GstVideoReceiverLog) << "Unable to link decoder";
998 return false;
999 }
1000
1001 GstPad *srcPad = nullptr;
1002 GstIterator *it = gst_element_iterate_src_pads(_decoder);
1003 GValue vpad = G_VALUE_INIT;
1004 switch (gst_iterator_next(it, &vpad)) {
1005 case GST_ITERATOR_OK:
1006 srcPad = GST_PAD(g_value_get_object(&vpad));
1007 (void) gst_object_ref(srcPad);
1008 (void) g_value_reset(&vpad);
1009 break;
1010 case GST_ITERATOR_RESYNC:
1011 gst_iterator_resync(it);
1012 break;
1013 default:
1014 break;
1015 }
1016 g_value_unset(&vpad);
1017 gst_iterator_free(it);
1018
1019 if (srcPad) {
1020 _onNewDecoderPad(srcPad);
1021 } else {
1022 (void) g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this);
1023 }
1024
1025 gst_clear_object(&srcPad);
1026 return true;
1027}
1028
1029bool GstVideoReceiver::_addVideoSink(GstPad *pad)
1030{
1031 GstCaps *caps = gst_pad_query_caps(pad, nullptr);
1032
1033 (void) gst_object_ref(_videoSink); // gst_bin_add() will steal one reference
1034 (void) gst_bin_add(GST_BIN(_pipeline), _videoSink);
1035
1036 if (!gst_element_link(_decoder, _videoSink)) {
1037 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1038 qCCritical(GstVideoReceiverLog) << "Unable to link video sink";
1039 gst_clear_caps(&caps);
1040 return false;
1041 }
1042
1043 g_object_set(_videoSink,
1044 "widget", _widget,
1045 "sync", (_buffer >= 0),
1046 NULL);
1047
1048 (void) gst_element_sync_state_with_parent(_videoSink);
1049
1050 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink");
1051
1052 // Determine video size. Errors here are non-fatal.
1053 QSize videoSize;
1054 do {
1055 if (!_decoderValve) {
1056 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - _decoderValve is NULL" << _uri;
1057 break;
1058 }
1059
1060 GstPad *valveSrcPad = gst_element_get_static_pad(_decoderValve, "src");
1061 if (!valveSrcPad) {
1062 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1063 break;
1064 }
1065
1066 GstCaps *valveSrcPadCaps = gst_pad_query_caps(valveSrcPad, nullptr);
1067 if (!valveSrcPadCaps) {
1068 qCCritical(GstVideoReceiverLog) << "gst_pad_query_caps() failed";
1069 gst_clear_object(&valveSrcPad);
1070 break;
1071 }
1072
1073 const GstStructure *structure = gst_caps_get_structure(valveSrcPadCaps, 0);
1074 if (!structure) {
1075 qCCritical(GstVideoReceiverLog) << "Unable to determine video size - structure is NULL" << _uri;
1076 gst_clear_object(&valveSrcPad);
1077 break;
1078 }
1079
1080 gint width = 0;
1081 gint height = 0;
1082 (void) gst_structure_get_int(structure, "width", &width);
1083 (void) gst_structure_get_int(structure, "height", &height);
1084 videoSize.setWidth(width);
1085 videoSize.setHeight(height);
1086
1087 gst_clear_caps(&valveSrcPadCaps);
1088 gst_clear_object(&valveSrcPad);
1089 } while (false);
1090 _dispatchSignal([this, videoSize]() { emit videoSizeChanged(videoSize); });
1091
1092 gst_clear_caps(&caps);
1093 return true;
1094}
1095
1096void GstVideoReceiver::_noteTeeFrame()
1097{
1098 _lastSourceFrameTime = QDateTime::currentSecsSinceEpoch();
1099}
1100
1101void GstVideoReceiver::_noteVideoSinkFrame()
1102{
1103 _lastVideoFrameTime = QDateTime::currentSecsSinceEpoch();
1104 if (!_decoding) {
1105 _decoding = true;
1106 qCDebug(GstVideoReceiverLog) << "Decoding started";
1107 _dispatchSignal([this]() { emit decodingChanged(_decoding); });
1108 }
1109}
1110
1111void GstVideoReceiver::_noteEndOfStream()
1112{
1113 _endOfStream = true;
1114}
1115
1116bool GstVideoReceiver::_unlinkBranch(GstElement *from)
1117{
1118 GstPad *src = gst_element_get_static_pad(from, "src");
1119 if (!src) {
1120 qCCritical(GstVideoReceiverLog) << "gst_element_get_static_pad() failed";
1121 return false;
1122 }
1123
1124 GstPad *sink = gst_pad_get_peer(src);
1125 if (!sink) {
1126 gst_clear_object(&src);
1127 qCCritical(GstVideoReceiverLog) << "gst_pad_get_peer() failed";
1128 return false;
1129 }
1130
1131 if (!gst_pad_unlink(src, sink)) {
1132 gst_clear_object(&src);
1133 gst_clear_object(&sink);
1134 qCCritical(GstVideoReceiverLog) << "gst_pad_unlink() failed";
1135 return false;
1136 }
1137
1138 gst_clear_object(&src);
1139
1140 // Send EOS at the beginning of the branch
1141 const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos());
1142
1143 gst_clear_object(&sink);
1144
1145 if (!ret) {
1146 qCCritical(GstVideoReceiverLog) << "Branch EOS was NOT sent";
1147 return false;
1148 }
1149
1150 qCDebug(GstVideoReceiverLog) << "Branch EOS was sent";
1151
1152 return true;
1153}
1154
1155void GstVideoReceiver::_shutdownDecodingBranch()
1156{
1157 if (_decoder) {
1158 GstObject *parent = gst_element_get_parent(_decoder);
1159 if (parent) {
1160 (void) gst_bin_remove(GST_BIN(_pipeline), _decoder);
1161 (void) gst_element_set_state(_decoder, GST_STATE_NULL);
1162 gst_clear_object(&parent);
1163 }
1164
1165 gst_clear_object(&_decoder);
1166 }
1167
1168 if (_videoSinkProbeId != 0) {
1169 GstPad *sinkpad = gst_element_get_static_pad(_videoSink, "sink");
1170 if (sinkpad) {
1171 gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
1172 gst_clear_object(&sinkpad);
1173 }
1174 _videoSinkProbeId = 0;
1175 }
1176
1178
1179 GstObject *parent = gst_element_get_parent(_videoSink);
1180 if (parent) {
1181 (void) gst_bin_remove(GST_BIN(_pipeline), _videoSink);
1182 (void) gst_element_set_state(_videoSink, GST_STATE_NULL);
1183 gst_clear_object(&parent);
1184 }
1185
1186 gst_clear_object(&_videoSink);
1187
1188 _removingDecoder = false;
1189
1190 if (_decoding) {
1191 _decoding = false;
1192 qCDebug(GstVideoReceiverLog) << "Decoding stopped";
1193 _dispatchSignal([this]() { emit decodingChanged(_decoding); });
1194 }
1195
1196 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped");
1197}
1198
1199void GstVideoReceiver::_shutdownRecordingBranch()
1200{
1201 gst_bin_remove(GST_BIN(_pipeline), _fileSink);
1202 gst_element_set_state(_fileSink, GST_STATE_NULL);
1203 gst_clear_object(&_fileSink);
1204
1205 _removingRecorder = false;
1206
1207 if (_recording) {
1208 _recording = false;
1209 qCDebug(GstVideoReceiverLog) << "Recording stopped";
1210 _dispatchSignal([this]() { emit recordingChanged(_recording); });
1211 }
1212
1213 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped");
1214}
1215
1216bool GstVideoReceiver::_needDispatch()
1217{
1218 return _worker->needDispatch();
1219}
1220
1221void GstVideoReceiver::_dispatchSignal(Task emitter)
1222{
1223 _signalDepth += 1;
1224
1225 // QElapsedTimer timer;
1226 // timer.start();
1227
1228 emitter();
1229
1230 // qCDebug(GstVideoReceiverLog) << "Task took" << timer.elapsed() << "ms";
1231
1232 _signalDepth -= 1;
1233}
1234
1235gboolean GstVideoReceiver::_onBusMessage(GstBus * /* bus */, GstMessage *msg, gpointer data)
1236{
1237 if (!msg || !data) {
1238 qCCritical(GstVideoReceiverLog) << "Invalid parameters in _onBusMessage: msg=" << msg << "data=" << data;
1239 return TRUE;
1240 }
1241
1242 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(data);
1243
1244 switch (GST_MESSAGE_TYPE(msg)) {
1245 case GST_MESSAGE_ERROR: {
1246 gchar *debug;
1247 GError *error;
1248 gst_message_parse_error(msg, &error, &debug);
1249
1250 if (debug) {
1251 qCDebug(GstVideoReceiverLog) << "GStreamer debug:" << debug;
1252 g_clear_pointer(&debug, g_free);
1253 }
1254
1255 if (error) {
1256 qCCritical(GstVideoReceiverLog) << "GStreamer error:" << error->message;
1257 g_clear_error(&error);
1258 }
1259
1260 pThis->_worker->dispatch([pThis]() {
1261 qCDebug(GstVideoReceiverLog) << "Stopping because of error";
1262 pThis->stop();
1263 });
1264 break;
1265 }
1266 case GST_MESSAGE_EOS:
1267 pThis->_worker->dispatch([pThis]() {
1268 qCDebug(GstVideoReceiverLog) << "Received EOS";
1269 pThis->_handleEOS();
1270 });
1271 break;
1272 case GST_MESSAGE_ELEMENT: {
1273 const GstStructure *structure = gst_message_get_structure(msg);
1274 if (!gst_structure_has_name(structure, "GstBinForwarded")) {
1275 break;
1276 }
1277
1278 GstMessage *forward_msg = nullptr;
1279 gst_structure_get(structure, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
1280 if (!forward_msg) {
1281 break;
1282 }
1283
1284 if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1285 pThis->_worker->dispatch([pThis]() {
1286 qCDebug(GstVideoReceiverLog) << "Received branch EOS";
1287 pThis->_handleEOS();
1288 });
1289 }
1290
1291 gst_clear_message(&forward_msg);
1292 break;
1293 }
1294 default:
1295 break;
1296 }
1297
1298 return TRUE;
1299}
1300
1301void GstVideoReceiver::_onNewPad(GstElement *element, GstPad *pad, gpointer data)
1302{
1303 GstVideoReceiver *self = static_cast<GstVideoReceiver*>(data);
1304
1305 if (element == self->_source) {
1306 self->_onNewSourcePad(pad);
1307 } else if (element == self->_decoder) {
1308 self->_onNewDecoderPad(pad);
1309 } else {
1310 qCDebug(GstVideoReceiverLog) << "Unexpected call!";
1311 }
1312}
1313
1314void GstVideoReceiver::_wrapWithGhostPad(GstElement *element, GstPad *pad, gpointer data)
1315{
1316 Q_UNUSED(data)
1317
1318 gchar *name = gst_pad_get_name(pad);
1319 if (!name) {
1320 qCCritical(GstVideoReceiverLog) << "gst_pad_get_name() failed";
1321 return;
1322 }
1323
1324 GstPad *ghostpad = gst_ghost_pad_new(name, pad);
1325 if (!ghostpad) {
1326 qCCritical(GstVideoReceiverLog) << "gst_ghost_pad_new() failed";
1327 g_clear_pointer(&name, g_free);
1328 return;
1329 }
1330
1331 g_clear_pointer(&name, g_free);
1332
1333 (void) gst_pad_set_active(ghostpad, TRUE);
1334
1335 if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) {
1336 qCCritical(GstVideoReceiverLog) << "gst_element_add_pad() failed";
1337 }
1338}
1339
1340void GstVideoReceiver::_linkPad(GstElement *element, GstPad *pad, gpointer data)
1341{
1342 gchar *name = gst_pad_get_name(pad);
1343 if (!name) {
1344 qCCritical(GstVideoReceiverLog) << "gst_pad_get_name() failed";
1345 return;
1346 }
1347
1348 if (!gst_element_link_pads(element, name, GST_ELEMENT(data), "sink")) {
1349 qCCritical(GstVideoReceiverLog) << "gst_element_link_pads() failed";
1350 }
1351
1352 g_clear_pointer(&name, g_free);
1353}
1354
1355gboolean GstVideoReceiver::_padProbe(GstElement *element, GstPad *pad, gpointer user_data)
1356{
1357 Q_UNUSED(element)
1358
1359 int *probeRes = static_cast<int*>(user_data);
1360 *probeRes |= 1;
1361
1362 GstCaps *filter = gst_caps_from_string("application/x-rtp");
1363 if (filter) {
1364 GstCaps *caps = gst_pad_query_caps(pad, nullptr);
1365 if (caps) {
1366 if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) {
1367 *probeRes |= 2;
1368 }
1369
1370 gst_clear_caps(&caps);
1371 }
1372
1373 gst_clear_caps(&filter);
1374 }
1375
1376 return TRUE;
1377}
1378
1379GstPadProbeReturn GstVideoReceiver::_teeProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1380{
1381 Q_UNUSED(pad); Q_UNUSED(info)
1382
1383 if (user_data) {
1384 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1385 pThis->_noteTeeFrame();
1386 }
1387
1388 return GST_PAD_PROBE_OK;
1389}
1390
1391GstPadProbeReturn GstVideoReceiver::_videoSinkProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1392{
1393 Q_UNUSED(pad); Q_UNUSED(info)
1394
1395 if (user_data) {
1396 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1397
1398 if (pThis->_resetVideoSink) {
1399 pThis->_resetVideoSink = false;
1400
1401#if 0 // FIXME: this makes MPEG2-TS playing smooth but breaks RTSP
1402 gst_pad_send_event(pad, gst_event_new_flush_start());
1403 gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));
1404
1405 GstBuffer* buf;
1406
1407 if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) {
1408 GstSegment* seg;
1409
1410 if ((seg = gst_segment_new()) != nullptr) {
1411 gst_segment_init(seg, GST_FORMAT_TIME);
1412
1413 seg->start = buf->pts;
1414
1415 gst_pad_send_event(pad, gst_event_new_segment(seg));
1416
1417 gst_segment_free(seg);
1418 seg = nullptr;
1419 }
1420
1421 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1422 }
1423#endif
1424 }
1425
1426 pThis->_noteVideoSinkFrame();
1427 }
1428
1429 return GST_PAD_PROBE_OK;
1430}
1431
1432GstPadProbeReturn GstVideoReceiver::_eosProbe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1433{
1434 Q_UNUSED(pad);
1435 Q_ASSERT(user_data);
1436
1437 if (info) {
1438 const GstEvent *event = gst_pad_probe_info_get_event(info);
1439 if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1440 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1441 pThis->_noteEndOfStream();
1442 }
1443 }
1444
1445 return GST_PAD_PROBE_OK;
1446}
1447
1448GstPadProbeReturn GstVideoReceiver::_keyframeWatch(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
1449{
1450 if (!info || !user_data) {
1451 qCCritical(GstVideoReceiverLog) << "Invalid arguments";
1452 return GST_PAD_PROBE_DROP;
1453 }
1454
1455 GstBuffer *buf = gst_pad_probe_info_get_buffer(info);
1456 if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
1457 // wait for a keyframe
1458 return GST_PAD_PROBE_DROP;
1459 }
1460
1461 // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream!
1462 gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1463
1464 qCDebug(GstVideoReceiverLog) << "Got keyframe, stop dropping buffers";
1465
1466 GstVideoReceiver *pThis = static_cast<GstVideoReceiver*>(user_data);
1467 pThis->_dispatchSignal([pThis]() { emit pThis->recordingStarted(pThis->recordingOutput()); });
1468
1469 return GST_PAD_PROBE_REMOVE;
1470}
1471
1473 : QThread(parent)
1474{
1475 // qCDebug(GstVideoReceiverLog) << this;
1476}
1477
1479{
1480 // qCDebug(GstVideoReceiverLog) << this;
1481}
1482
1484{
1485 return (QThread::currentThread() != this);
1486}
1487
1489{
1490 QMutexLocker lock(&_taskQueueSync);
1491 _taskQueue.enqueue(task);
1492 _taskQueueUpdate.wakeOne();
1493}
1494
1496{
1497 if (needDispatch()) {
1498 dispatch([this]() { _shutdown = true; });
1499 (void) QThread::wait(2000);
1500 } else {
1501 QThread::quit();
1502 }
1503}
1504
1505void GstVideoWorker::run()
1506{
1507 while (!_shutdown) {
1508 _taskQueueSync.lock();
1509
1510 while (_taskQueue.isEmpty()) {
1511 _taskQueueUpdate.wait(&_taskQueueSync);
1512 }
1513
1514 const Task task = _taskQueue.dequeue();
1515
1516 _taskQueueSync.unlock();
1517
1518 task();
1519 }
1520}
std::function< void()> Task
struct _GstElement GstElement
Error error
#define QGC_LOGGING_CATEGORY(name, categoryStr)
void stopDecoding() override
void takeScreenshot(const QString &imageFile) override
void start(uint32_t timeout) override
void stopRecording() override
void startRecording(const QString &videoFile, FILE_FORMAT format) override
void stop() override
void startDecoding(void *sink) override
void dispatch(Task task)
bool needDispatch() const
GstVideoWorker(QObject *parent=nullptr)
void recordingStarted(const QString &filename)
bool lowLatency() const
void videoSizeChanged(QSize size)
void streamingChanged(bool active)
qint64 _lastSourceFrameTime
uint32_t _timeout
qint64 _lastVideoFrameTime
QString uri() const
QString _recordingOutput
void recordingChanged(bool active)
void onStartRecordingComplete(STATUS status)
virtual void start(uint32_t timeout)=0
void onTakeScreenshotComplete(STATUS status)
void decodingChanged(bool active)
void onStartComplete(STATUS status)
static bool isValidFileFormat(FILE_FORMAT format)
QString name() const
void onStopDecodingComplete(STATUS status)
void onStopRecordingComplete(STATUS status)
QString recordingOutput() const
void onStartDecodingComplete(STATUS status)
QQuickItem * _widget
void onStopComplete(STATUS status)
uint32_t _signalDepth
gboolean is_valid_rtsp_uri(const gchar *uri_str)
bool is_hardware_decoder_factory(GstElementFactory *factory)