QGroundControl
Ground Control Station for MAVLink Drones
Loading...
Searching...
No Matches
QGCFileWriter.cc
Go to the documentation of this file.
1#include "QGCFileWriter.h"
2
3#include <QtCore/QFile>
4#include <QtCore/QMutexLocker>
5#include <QtCore/QThread>
6
8 : QObject(parent)
9{
10}
11
13{
14 _stop();
15}
16
17void QGCFileWriter::setFilePath(const QString &path)
18{
19 bool wasRunning = false;
20 {
21 const QMutexLocker locker(&_mutex);
22 if (_filePath == path) {
23 return;
24 }
25 wasRunning = _thread != nullptr;
26 }
27
28 if (wasRunning) {
29 _stop();
30 }
31
32 {
33 const QMutexLocker locker(&_mutex);
34 _filePath = path;
35 _hasError.store(false, std::memory_order_relaxed);
36 _lastError.clear();
37 }
38
39 if (wasRunning && !path.isEmpty()) {
40 const QMutexLocker locker(&_mutex);
41 _startLocked();
42 }
43}
44
46{
47 const QMutexLocker locker(&_mutex);
48 return _filePath;
49}
50
52{
53 const QMutexLocker locker(&_mutex);
54 return _thread != nullptr && _thread->isRunning();
55}
56
58{
59 const QMutexLocker locker(&_mutex);
60 return _lastError;
61}
62
63void QGCFileWriter::write(const QByteArray &data)
64{
65 if (data.isEmpty()) {
66 return;
67 }
68
69 if (_pendingBytes.load(std::memory_order_relaxed) > _maxPendingBytes) {
70 return;
71 }
72
73 {
74 const QMutexLocker locker(&_mutex);
75 if (_filePath.isEmpty()) {
76 return;
77 }
78 if (!_thread) {
79 _startLocked();
80 }
81 _queue.append(WorkItem{data, {}});
82 _pendingBytes.fetch_add(data.size(), std::memory_order_relaxed);
83 }
84 _condition.wakeOne();
85}
86
88{
89 if (!formatter) {
90 return;
91 }
92
93 {
94 QMutexLocker locker(&_mutex);
95 if (!_thread) {
96 _startLocked();
97 }
98 _queue.append(WorkItem{{}, std::move(formatter)});
99 }
100 _condition.wakeOne();
101}
102
103bool QGCFileWriter::flush(int timeoutMs)
104{
105 QMutexLocker locker(&_mutex);
106 if (!_thread) {
107 return true;
108 }
109
110 const quint64 target = _flushSeq.fetch_add(1, std::memory_order_relaxed) + 1;
111 _condition.wakeOne();
112
113 const auto deadline = QDeadlineTimer(timeoutMs);
114 while (_writeSeq.load(std::memory_order_acquire) < target
115 && !_quit.load(std::memory_order_relaxed)) {
116 if (!_condition.wait(&_mutex, deadline)) {
117 return false;
118 }
119 }
120 return true;
121}
122
124{
125 _stop();
126}
127
129{
130 const QMutexLocker locker(&_mutex);
131 _hasError.store(false, std::memory_order_relaxed);
132 _lastError.clear();
133}
134
135void QGCFileWriter::_startLocked()
136{
137 if (_thread) {
138 return;
139 }
140
141 _quit.store(false, std::memory_order_relaxed);
142 _thread = QThread::create([this]() { _workerLoop(); });
143 _thread->setObjectName(QStringLiteral("QGCFileWriter"));
144 _thread->start();
145}
146
147void QGCFileWriter::_stop()
148{
149 {
150 const QMutexLocker locker(&_mutex);
151 if (!_thread) {
152 return;
153 }
154 _quit.store(true, std::memory_order_relaxed);
155 }
156 _condition.wakeOne();
157 _thread->wait();
158 _thread->deleteLater();
159
160 const QMutexLocker locker(&_mutex);
161 _thread = nullptr;
162 _isOpen.store(false, std::memory_order_relaxed);
163}
164
165void QGCFileWriter::_workerLoop()
166{
167 QString path;
168 {
169 const QMutexLocker locker(&_mutex);
170 path = _filePath;
171 }
172
173 QFile file(path);
174 if (!file.open(QIODevice::WriteOnly | QIODevice::Append | QIODevice::Text)) {
175 const QMutexLocker locker(&_mutex);
176 _lastError = file.errorString();
177 _hasError.store(true, std::memory_order_relaxed);
178 QMetaObject::invokeMethod(this, [this, err = _lastError]() {
179 emit errorOccurred(err);
180 }, Qt::QueuedConnection);
181 return;
182 }
183
184 _isOpen.store(true, std::memory_order_relaxed);
185 _fileSize.store(file.size(), std::memory_order_relaxed);
186
187 while (!_quit.load(std::memory_order_relaxed)) {
188 QList<WorkItem> batch;
189
190 {
191 QMutexLocker locker(&_mutex);
192 while (_queue.isEmpty() && !_quit.load(std::memory_order_relaxed)) {
193 _condition.wait(&_mutex);
194 }
195 batch.swap(_queue);
196 }
197
198 // Resolve deferred items; only track bytes that were counted in _pendingBytes
199 qint64 pendingBytesConsumed = 0;
200 for (auto &item : batch) {
201 if (item.formatter) {
202 item.data = item.formatter();
203 item.formatter = nullptr;
204 } else {
205 pendingBytesConsumed += item.data.size();
206 }
207 }
208
209 bool writeError = false;
210 for (const auto &item : std::as_const(batch)) {
211 if (item.data.isEmpty()) {
212 continue;
213 }
214 const qint64 written = file.write(item.data);
215 if (written < 0) {
216 const QMutexLocker locker(&_mutex);
217 _lastError = file.errorString();
218 _hasError.store(true, std::memory_order_relaxed);
219 QMetaObject::invokeMethod(this, [this, err = _lastError]() {
220 emit errorOccurred(err);
221 }, Qt::QueuedConnection);
222 writeError = true;
223 break;
224 }
225 }
226
227 _pendingBytes.fetch_sub(pendingBytesConsumed, std::memory_order_relaxed);
228
229 if (!writeError) {
230 file.flush();
231 _fileSize.store(file.size(), std::memory_order_relaxed);
232 const qint64 sz = file.size();
233 QMetaObject::invokeMethod(this, [this, sz]() {
234 emit fileSizeChanged(sz);
235 }, Qt::QueuedConnection);
236 }
237
238 _writeSeq.fetch_add(1, std::memory_order_release);
239 _condition.wakeAll();
240
241 if (writeError) {
242 break;
243 }
244 }
245
246 // Drain remaining queue
247 {
248 QMutexLocker locker(&_mutex);
249 for (auto &item : _queue) {
250 if (item.formatter) {
251 item.data = item.formatter();
252 item.formatter = nullptr;
253 }
254 if (!item.data.isEmpty()) {
255 file.write(item.data);
256 }
257 }
258 _pendingBytes.store(0, std::memory_order_relaxed);
259 _queue.clear();
260 }
261
262 file.close();
263 _isOpen.store(false, std::memory_order_relaxed);
264 _fileSize.store(0, std::memory_order_relaxed);
265 _writeSeq.fetch_add(1, std::memory_order_release);
266 _condition.wakeAll();
267}
void writeDeferred(FormatFunc formatter)
~QGCFileWriter() override
bool flush(int timeoutMs=5000)
void fileSizeChanged(qint64 size)
QGCFileWriter(QObject *parent=nullptr)
QString lastError() const
bool isRunning() const
void write(const QByteArray &data)
void setFilePath(const QString &path)
void errorOccurred(const QString &message)
QString filePath() const
std::function< QByteArray()> FormatFunc