Skip to content
Snippets Groups Projects
Commit 92d74817 authored by Friedemann Kleint's avatar Friedemann Kleint
Browse files

Trk: Fix socket notifier threading warning.

Also introduce mutex into DeviceContext.
parent 713b1f20
No related branches found
No related tags found
No related merge requests found
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#include <QtCore/QMutex> #include <QtCore/QMutex>
#include <QtCore/QWaitCondition> #include <QtCore/QWaitCondition>
#include <QtCore/QSharedPointer> #include <QtCore/QSharedPointer>
#include <QtCore/QMetaType>
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
# include <windows.h> # include <windows.h>
...@@ -132,18 +133,30 @@ TrkMessage::TrkMessage(byte c, byte t, TrkCallback cb) : ...@@ -132,18 +133,30 @@ TrkMessage::TrkMessage(byte c, byte t, TrkCallback cb) :
{ {
} }
} // namespace trk
Q_DECLARE_METATYPE(trk::TrkMessage)
namespace trk {
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
// //
// TrkWriteQueue // TrkWriteQueue: Mixin class that manages a write queue of Trk messages.
// pendingMessage()/notifyWriteResult() should be called from a worked/timer
// that writes the messages. The class does not take precautions for multithreading
// with exception of the handling of the TRK_WRITE_QUEUE_NOOP_CODE
// synchronization message. The invocation of the callback is then
// done by the thread owning the TrkWriteQueue, while pendingMessage() is called
// from another thread. This happens via a Qt::BlockingQueuedConnection.
// //
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
/* Mixin class that manages a write queue of Trk messages. */ class TrkWriteQueue : public QObject
class TrkWriteQueue {
{ Q_OBJECT
Q_DISABLE_COPY(TrkWriteQueue)
public: public:
TrkWriteQueue(); explicit TrkWriteQueue(bool multithreaded = true);
// Enqueue messages. // Enqueue messages.
void queueTrkMessage(byte code, TrkCallback callback, void queueTrkMessage(byte code, TrkCallback callback,
...@@ -160,6 +173,12 @@ public: ...@@ -160,6 +173,12 @@ public:
// after taking the pendingMessage off. // after taking the pendingMessage off.
void notifyWriteResult(bool ok); void notifyWriteResult(bool ok);
signals:
void internalNoopMessageDequeued(const trk::TrkMessage&);
private slots:
void invokeNoopMessage(trk::TrkMessage);
private: private:
typedef QMap<byte, TrkMessage> TokenMessageMap; typedef QMap<byte, TrkMessage> TokenMessageMap;
...@@ -171,10 +190,15 @@ private: ...@@ -171,10 +190,15 @@ private:
bool m_trkWriteBusy; bool m_trkWriteBusy;
}; };
TrkWriteQueue::TrkWriteQueue() : TrkWriteQueue::TrkWriteQueue(bool multithreaded) :
m_trkWriteToken(0), m_trkWriteToken(0),
m_trkWriteBusy(false) m_trkWriteBusy(false)
{ {
static const int trkMessageMetaId = qRegisterMetaType<trk::TrkMessage>();
Q_UNUSED(trkMessageMetaId)
connect(this, SIGNAL(internalNoopMessageDequeued(trk::TrkMessage)),
this, SLOT(invokeNoopMessage(trk::TrkMessage)),
multithreaded ? Qt::BlockingQueuedConnection : Qt::AutoConnection);
} }
byte TrkWriteQueue::nextTrkWriteToken() byte TrkWriteQueue::nextTrkWriteToken()
...@@ -201,17 +225,11 @@ bool TrkWriteQueue::pendingMessage(TrkMessage *message) ...@@ -201,17 +225,11 @@ bool TrkWriteQueue::pendingMessage(TrkMessage *message)
// Invoked from timer, try to flush out message queue // Invoked from timer, try to flush out message queue
if (m_trkWriteBusy || m_trkWriteQueue.isEmpty()) if (m_trkWriteBusy || m_trkWriteQueue.isEmpty())
return false; return false;
// Handle the noop message, just invoke CB // Handle the noop message, just invoke CB in slot (ower thread)
if (m_trkWriteQueue.front().code == TRK_WRITE_QUEUE_NOOP_CODE) { if (m_trkWriteQueue.front().code == TRK_WRITE_QUEUE_NOOP_CODE) {
TrkMessage noopMessage = m_trkWriteQueue.dequeue(); TrkMessage noopMessage = m_trkWriteQueue.dequeue();
if (noopMessage.callback) { if (noopMessage.callback)
TrkResult result; emit internalNoopMessageDequeued(noopMessage);
result.code = noopMessage.code;
result.token = noopMessage.token;
result.data = noopMessage.data;
result.cookie = noopMessage.cookie;
noopMessage.callback(result);
}
} }
// Check again for real messages // Check again for real messages
if (m_trkWriteQueue.isEmpty()) if (m_trkWriteQueue.isEmpty())
...@@ -221,6 +239,16 @@ bool TrkWriteQueue::pendingMessage(TrkMessage *message) ...@@ -221,6 +239,16 @@ bool TrkWriteQueue::pendingMessage(TrkMessage *message)
return true; return true;
} }
void TrkWriteQueue::invokeNoopMessage(trk::TrkMessage noopMessage)
{
TrkResult result;
result.code = noopMessage.code;
result.token = noopMessage.token;
result.data = noopMessage.data;
result.cookie = noopMessage.cookie;
noopMessage.callback(result);
}
void TrkWriteQueue::notifyWriteResult(bool ok) void TrkWriteQueue::notifyWriteResult(bool ok)
{ {
// On success, dequeue message and await result // On success, dequeue message and await result
...@@ -271,6 +299,7 @@ struct DeviceContext { ...@@ -271,6 +299,7 @@ struct DeviceContext {
QFile file; QFile file;
#endif #endif
bool serialFrame; bool serialFrame;
QMutex mutex;
}; };
DeviceContext::DeviceContext() : DeviceContext::DeviceContext() :
...@@ -358,6 +387,7 @@ void WriterThread::terminate() ...@@ -358,6 +387,7 @@ void WriterThread::terminate()
bool WriterThread::write(const QByteArray &data, QString *errorMessage) bool WriterThread::write(const QByteArray &data, QString *errorMessage)
{ {
QMutexLocker(&m_context->mutex);
#ifdef Q_OS_WIN #ifdef Q_OS_WIN
DWORD charsWritten; DWORD charsWritten;
if (!WriteFile(m_context->device, data.data(), data.size(), &charsWritten, NULL)) { if (!WriteFile(m_context->device, data.data(), data.size(), &charsWritten, NULL)) {
...@@ -588,12 +618,14 @@ void TrkDevice::tryTrkRead() ...@@ -588,12 +618,14 @@ void TrkDevice::tryTrkRead()
char buffer[BUFFERSIZE]; char buffer[BUFFERSIZE];
DWORD charsRead; DWORD charsRead;
DWORD totalCharsRead = 0; DWORD totalCharsRead = 0;
{
while (TryReadFile(d->deviceContext->device, buffer, BUFFERSIZE, &charsRead, NULL)) { QMutexLocker(&d->deviceContext->mutex);
totalCharsRead += charsRead; while (TryReadFile(d->deviceContext->device, buffer, BUFFERSIZE, &charsRead, NULL)) {
d->trkReadBuffer.append(buffer, charsRead); totalCharsRead += charsRead;
if (isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame)) d->trkReadBuffer.append(buffer, charsRead);
break; if (isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame))
break;
}
} }
if (d->verbose > 1 && totalCharsRead) if (d->verbose > 1 && totalCharsRead)
emitLogMessage("Read" + d->trkReadBuffer.toHex()); emitLogMessage("Read" + d->trkReadBuffer.toHex());
...@@ -606,10 +638,14 @@ void TrkDevice::tryTrkRead() ...@@ -606,10 +638,14 @@ void TrkDevice::tryTrkRead()
return; return;
} }
#else #else
const int size = bytesAvailable(d->deviceContext->file.handle()); QByteArray data;
if (!size) {
return; QMutexLocker(&d->deviceContext->mutex);
const QByteArray data = d->deviceContext->file.read(size); const int size = bytesAvailable(d->deviceContext->file.handle());
if (!size)
return;
data = d->deviceContext->file.read(size);
}
if (d->verbose > 1) if (d->verbose > 1)
emitLogMessage("trk: <- " + stringFromArray(data)); emitLogMessage("trk: <- " + stringFromArray(data));
d->trkReadBuffer.append(data); d->trkReadBuffer.append(data);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment