4#include <QtCore/QMutexLocker>
5#include <QtCore/QThread>
19 bool wasRunning =
false;
21 const QMutexLocker locker(&_mutex);
22 if (_filePath == path) {
25 wasRunning = _thread !=
nullptr;
33 const QMutexLocker locker(&_mutex);
35 _hasError.store(
false, std::memory_order_relaxed);
39 if (wasRunning && !path.isEmpty()) {
40 const QMutexLocker locker(&_mutex);
47 const QMutexLocker locker(&_mutex);
53 const QMutexLocker locker(&_mutex);
54 return _thread !=
nullptr && _thread->isRunning();
59 const QMutexLocker locker(&_mutex);
69 if (_pendingBytes.load(std::memory_order_relaxed) > _maxPendingBytes) {
74 const QMutexLocker locker(&_mutex);
75 if (_filePath.isEmpty()) {
81 _queue.append(WorkItem{data, {}});
82 _pendingBytes.fetch_add(data.size(), std::memory_order_relaxed);
94 QMutexLocker locker(&_mutex);
98 _queue.append(WorkItem{{}, std::move(formatter)});
100 _condition.wakeOne();
105 QMutexLocker locker(&_mutex);
110 const quint64 target = _flushSeq.fetch_add(1, std::memory_order_relaxed) + 1;
111 _condition.wakeOne();
113 const auto deadline = QDeadlineTimer(timeoutMs);
114 while (_writeSeq.load(std::memory_order_acquire) < target
115 && !_quit.load(std::memory_order_relaxed)) {
116 if (!_condition.wait(&_mutex, deadline)) {
130 const QMutexLocker locker(&_mutex);
131 _hasError.store(
false, std::memory_order_relaxed);
135void QGCFileWriter::_startLocked()
141 _quit.store(
false, std::memory_order_relaxed);
142 _thread = QThread::create([
this]() { _workerLoop(); });
143 _thread->setObjectName(QStringLiteral(
"QGCFileWriter"));
147void QGCFileWriter::_stop()
150 const QMutexLocker locker(&_mutex);
154 _quit.store(
true, std::memory_order_relaxed);
156 _condition.wakeOne();
158 _thread->deleteLater();
160 const QMutexLocker locker(&_mutex);
162 _isOpen.store(
false, std::memory_order_relaxed);
165void QGCFileWriter::_workerLoop()
169 const QMutexLocker locker(&_mutex);
174 if (!file.open(QIODevice::WriteOnly | QIODevice::Append | QIODevice::Text)) {
175 const QMutexLocker locker(&_mutex);
176 _lastError = file.errorString();
177 _hasError.store(
true, std::memory_order_relaxed);
178 QMetaObject::invokeMethod(
this, [
this, err = _lastError]() {
180 }, Qt::QueuedConnection);
184 _isOpen.store(
true, std::memory_order_relaxed);
185 _fileSize.store(file.size(), std::memory_order_relaxed);
187 while (!_quit.load(std::memory_order_relaxed)) {
188 QList<WorkItem> batch;
191 QMutexLocker locker(&_mutex);
192 while (_queue.isEmpty() && !_quit.load(std::memory_order_relaxed)) {
193 _condition.wait(&_mutex);
199 qint64 pendingBytesConsumed = 0;
200 for (
auto &item : batch) {
201 if (item.formatter) {
202 item.data = item.formatter();
203 item.formatter =
nullptr;
205 pendingBytesConsumed += item.data.size();
209 bool writeError =
false;
210 for (
const auto &item : std::as_const(batch)) {
211 if (item.data.isEmpty()) {
214 const qint64 written = file.write(item.data);
216 const QMutexLocker locker(&_mutex);
217 _lastError = file.errorString();
218 _hasError.store(
true, std::memory_order_relaxed);
219 QMetaObject::invokeMethod(
this, [
this, err = _lastError]() {
221 }, Qt::QueuedConnection);
227 _pendingBytes.fetch_sub(pendingBytesConsumed, std::memory_order_relaxed);
231 _fileSize.store(file.size(), std::memory_order_relaxed);
232 const qint64 sz = file.size();
233 QMetaObject::invokeMethod(
this, [
this, sz]() {
235 }, Qt::QueuedConnection);
238 _writeSeq.fetch_add(1, std::memory_order_release);
239 _condition.wakeAll();
248 QMutexLocker locker(&_mutex);
249 for (
auto &item : _queue) {
250 if (item.formatter) {
251 item.data = item.formatter();
252 item.formatter =
nullptr;
254 if (!item.data.isEmpty()) {
255 file.write(item.data);
258 _pendingBytes.store(0, std::memory_order_relaxed);
263 _isOpen.store(
false, std::memory_order_relaxed);
264 _fileSize.store(0, std::memory_order_relaxed);
265 _writeSeq.fetch_add(1, std::memory_order_release);
266 _condition.wakeAll();
void writeDeferred(FormatFunc formatter)
~QGCFileWriter() override
bool flush(int timeoutMs=5000)
void fileSizeChanged(qint64 size)
QGCFileWriter(QObject *parent=nullptr)
QString lastError() const
void write(const QByteArray &data)
void setFilePath(const QString &path)
void errorOccurred(const QString &message)
std::function< QByteArray()> FormatFunc