diff --git a/src/shared/trk/trkdevice.cpp b/src/shared/trk/trkdevice.cpp index 773e1ea3c48cf43e5612022688b408cd44f18ecb..831024f341985a792a7ec5c4ad2e854633258e78 100644 --- a/src/shared/trk/trkdevice.cpp +++ b/src/shared/trk/trkdevice.cpp @@ -53,10 +53,15 @@ # include <errno.h> # include <string.h> # include <unistd.h> +/* Required headers for select() according to POSIX.1-2001 */ +# include <sys/select.h> +/* Required headers for select() according to earlier standards: + #include <sys/time.h> + #include <sys/types.h> + #include <unistd.h> +*/ #endif -enum { TimerInterval = 10 }; - #ifdef Q_OS_WIN // Format windows error from GetLastError() value: @@ -80,28 +85,6 @@ QString winErrorMessage(unsigned long error) return rc; } -// Non-blocking replacement for win-api ReadFile function -BOOL WINAPI TryReadFile(HANDLE hFile, - LPVOID lpBuffer, - DWORD nNumberOfBytesToRead, - LPDWORD lpNumberOfBytesRead, - LPOVERLAPPED lpOverlapped) -{ - COMSTAT comStat; - if (!ClearCommError(hFile, NULL, &comStat)){ - qDebug() << "ClearCommError() failed"; - return FALSE; - } - if (comStat.cbInQue == 0) { - *lpNumberOfBytesRead = 0; - return FALSE; - } - return ReadFile(hFile, - lpBuffer, - qMin(comStat.cbInQue, nNumberOfBytesToRead), - lpNumberOfBytesRead, - lpOverlapped); -} #endif namespace trk { @@ -143,20 +126,17 @@ namespace trk { // // TrkWriteQueue: Mixin class that manages a write queue of Trk messages. // pendingMessage()/notifyWriteResult() should be called from a worked/timer -// that writes the messages. The class does not take precautions for multithreading -// with exception of the handling of the TRK_WRITE_QUEUE_NOOP_CODE -// synchronization message. The invocation of the callback is then -// done by the thread owning the TrkWriteQueue, while pendingMessage() is called -// from another thread. This happens via a Qt::BlockingQueuedConnection. +// that writes the messages. The class does not take precautions for multithreading. +// A no-op message is simply taken off the queue. The calling class +// can use the helper invokeNoopMessage() to trigger its callback. // /////////////////////////////////////////////////////////////////////// -class TrkWriteQueue : public QObject +class TrkWriteQueue { - Q_OBJECT Q_DISABLE_COPY(TrkWriteQueue) public: - explicit TrkWriteQueue(bool multithreaded = true); + explicit TrkWriteQueue(); // Enqueue messages. void queueTrkMessage(byte code, TrkCallback callback, @@ -166,18 +146,23 @@ public: // Call this from the device read notification with the results. void slotHandleResult(const TrkResult &result); - // This can be called periodically in a timer to retrieve + // pendingMessage() can be called periodically in a timer to retrieve // the pending messages to be sent. - bool pendingMessage(TrkMessage *message); + enum PendingMessageResult { + NoMessage, // No message in queue. + PendingMessage, /* There is a queued message. The calling class + * can write it out and use notifyWriteResult() + * to notify about the result. */ + NoopMessageDequeued // A no-op message has been dequeued. see invokeNoopMessage(). + }; + + PendingMessageResult pendingMessage(TrkMessage *message); // Notify the queue about the success of the write operation // after taking the pendingMessage off. void notifyWriteResult(bool ok); -signals: - void internalNoopMessageDequeued(const trk::TrkMessage&); - -private slots: - void invokeNoopMessage(trk::TrkMessage); + // Helper function that invokes the callback of a no-op message + static void invokeNoopMessage(trk::TrkMessage); private: typedef QMap<byte, TrkMessage> TokenMessageMap; @@ -190,15 +175,10 @@ private: bool m_trkWriteBusy; }; -TrkWriteQueue::TrkWriteQueue(bool multithreaded) : +TrkWriteQueue::TrkWriteQueue() : m_trkWriteToken(0), m_trkWriteBusy(false) { - static const int trkMessageMetaId = qRegisterMetaType<trk::TrkMessage>(); - Q_UNUSED(trkMessageMetaId) - connect(this, SIGNAL(internalNoopMessageDequeued(trk::TrkMessage)), - this, SLOT(invokeNoopMessage(trk::TrkMessage)), - multithreaded ? Qt::BlockingQueuedConnection : Qt::AutoConnection); } byte TrkWriteQueue::nextTrkWriteToken() @@ -220,23 +200,19 @@ void TrkWriteQueue::queueTrkMessage(byte code, TrkCallback callback, m_trkWriteQueue.append(msg); } -bool TrkWriteQueue::pendingMessage(TrkMessage *message) +TrkWriteQueue::PendingMessageResult TrkWriteQueue::pendingMessage(TrkMessage *message) { // Invoked from timer, try to flush out message queue if (m_trkWriteBusy || m_trkWriteQueue.isEmpty()) - return false; + return NoMessage; // Handle the noop message, just invoke CB in slot (ower thread) if (m_trkWriteQueue.front().code == TRK_WRITE_QUEUE_NOOP_CODE) { - TrkMessage noopMessage = m_trkWriteQueue.dequeue(); - if (noopMessage.callback) - emit internalNoopMessageDequeued(noopMessage); + *message = m_trkWriteQueue.dequeue(); + return NoopMessageDequeued; } - // Check again for real messages - if (m_trkWriteQueue.isEmpty()) - return false; if (message) *message = m_trkWriteQueue.front(); - return true; + return PendingMessage; } void TrkWriteQueue::invokeNoopMessage(trk::TrkMessage noopMessage) @@ -295,6 +271,8 @@ struct DeviceContext { DeviceContext(); #ifdef Q_OS_WIN HANDLE device; + OVERLAPPED readOverlapped; + OVERLAPPED writeOverlapped; #else QFile file; #endif @@ -313,7 +291,11 @@ DeviceContext::DeviceContext() : /////////////////////////////////////////////////////////////////////// // // TrkWriterThread: A thread operating a TrkWriteQueue. -// +// with exception of the handling of the TRK_WRITE_QUEUE_NOOP_CODE +// synchronization message. The invocation of the callback is then +// done by the thread owning the TrkWriteQueue, while pendingMessage() is called +// from another thread. This happens via a Qt::BlockingQueuedConnection. + /////////////////////////////////////////////////////////////////////// class WriterThread : public QThread { @@ -334,12 +316,16 @@ public: signals: void error(const QString &); + void internalNoopMessageDequeued(const trk::TrkMessage&); public slots: bool trkWriteRawMessage(const TrkMessage &msg); void terminate(); void tryWrite(); +private slots: + void invokeNoopMessage(const trk::TrkMessage &); + private: bool write(const QByteArray &data, QString *errorMessage); @@ -355,6 +341,10 @@ WriterThread::WriterThread(const QSharedPointer<DeviceContext> &context) : m_context(context), m_terminate(false) { + static const int trkMessageMetaId = qRegisterMetaType<trk::TrkMessage>(); + Q_UNUSED(trkMessageMetaId) + connect(this, SIGNAL(internalNoopMessageDequeued(trk::TrkMessage)), + this, SLOT(invokeNoopMessage(trk::TrkMessage)), Qt::BlockingQueuedConnection); } void WriterThread::run() @@ -370,27 +360,58 @@ void WriterThread::run() // Send off message m_dataMutex.lock(); TrkMessage message; - if (m_queue.pendingMessage(&message)) { + const TrkWriteQueue::PendingMessageResult pr = m_queue.pendingMessage(&message); + m_dataMutex.unlock(); + switch (pr) { + case TrkWriteQueue::NoMessage: + break; + case TrkWriteQueue::PendingMessage: { const bool success = trkWriteRawMessage(message); + m_dataMutex.lock(); m_queue.notifyWriteResult(success); + m_dataMutex.unlock(); } - m_dataMutex.unlock(); + break; + case TrkWriteQueue::NoopMessageDequeued: + // Sync with thread that owns us via a blocking signal + emit internalNoopMessageDequeued(message); + break; + } // switch } } +void WriterThread::invokeNoopMessage(const trk::TrkMessage &msg) +{ + TrkWriteQueue::invokeNoopMessage(msg); +} + void WriterThread::terminate() { m_terminate = true; m_waitCondition.wakeAll(); wait(); + m_terminate = false; } +#ifdef Q_OS_WIN +static inline bool overlappedSyncWrite(HANDLE file, const char *data, + DWORD size, DWORD *charsWritten, + OVERLAPPED *overlapped) +{ + if (WriteFile(file, data, size, charsWritten, overlapped)) + return true; + if (GetLastError() != ERROR_IO_PENDING) + return false; + return GetOverlappedResult(file, overlapped, charsWritten, TRUE); +} +#endif + bool WriterThread::write(const QByteArray &data, QString *errorMessage) { QMutexLocker(&m_context->mutex); #ifdef Q_OS_WIN DWORD charsWritten; - if (!WriteFile(m_context->device, data.data(), data.size(), &charsWritten, NULL)) { + if (!overlappedSyncWrite(m_context->device, data.data(), data.size(), &charsWritten, &m_context->writeOverlapped)) { *errorMessage = QString::fromLatin1("Error writing data: %1").arg(winErrorMessage(GetLastError())); return false; } @@ -444,6 +465,222 @@ void WriterThread::slotHandleResult(const TrkResult &result) tryWrite(); // Have messages been enqueued in-between? } +#ifdef Q_OS_WIN +/////////////////////////////////////////////////////////////////////// +// +// WinReaderThread: A thread reading from the device using Windows API. +// Waits on an overlapped I/O handle and an event that tells the thread to +// terminate. +// +/////////////////////////////////////////////////////////////////////// + +class WinReaderThread : public QThread { + Q_OBJECT + Q_DISABLE_COPY(WinReaderThread) +public: + explicit WinReaderThread(const QSharedPointer<DeviceContext> &context); + ~WinReaderThread(); + + virtual void run(); + +signals: + void error(const QString &); + void dataReceived(char c); + +public slots: + void terminate(); + +private: + enum Handles { FileHandle, TerminateEventHandle, HandleCount }; + + inline int tryRead(); + + const QSharedPointer<DeviceContext> m_context; + HANDLE m_handles[HandleCount]; +}; + +WinReaderThread::WinReaderThread(const QSharedPointer<DeviceContext> &context) : + m_context(context) +{ + m_handles[FileHandle] = NULL; + m_handles[TerminateEventHandle] = CreateEvent(NULL, FALSE, FALSE, NULL); +} + +WinReaderThread::~WinReaderThread() +{ + CloseHandle(m_handles[TerminateEventHandle]); +} + +// Return 0 to continue or error code +int WinReaderThread::tryRead() +{ + // Trigger read + char c; + DWORD bytesRead = 0; + if (ReadFile(m_context->device, &c, 1, &bytesRead, &m_context->readOverlapped)) { + emit dataReceived(c); + return 0; + } + const DWORD readError = GetLastError(); + if (readError != ERROR_IO_PENDING) { + emit error(QString::fromLatin1("Read error: %1").arg(winErrorMessage(readError))); + return -1; + } + // Wait for either termination or data + const DWORD wr = WaitForMultipleObjects(HandleCount, m_handles, false, INFINITE); + if (wr == WAIT_FAILED) { + emit error(QString::fromLatin1("Wait failed: %1").arg(winErrorMessage(GetLastError()))); + return -2; + } + if (wr - WAIT_OBJECT_0 == TerminateEventHandle) { + return 1; // Terminate + } + // Check data + if (!GetOverlappedResult(m_context->device, &m_context->readOverlapped, &bytesRead, true)) { + emit error(QString::fromLatin1("GetOverlappedResult failed: %1").arg(winErrorMessage(GetLastError()))); + return -3; + } + emit dataReceived(c); + return 0; +} + +void WinReaderThread::run() +{ + m_handles[FileHandle] = m_context->readOverlapped.hEvent; + int readResult; + while ( (readResult = tryRead()) == 0) ; +} + +void WinReaderThread::terminate() +{ + SetEvent(m_handles[TerminateEventHandle]); + wait(); +} + +typedef WinReaderThread ReaderThread; + +#else + +/////////////////////////////////////////////////////////////////////// +// +// UnixReaderThread: A thread reading from the device. +// Uses select() to wait and a special ioctl() to find out the number +// of bytes queued. For clean termination, the self-pipe trick is used. +// The class maintains a pipe, on whose read end the select waits besides +// the device file handle. To terminate, a byte is written to the pipe. +// +/////////////////////////////////////////////////////////////////////// + +static inline QString msgUnixCallFailedErrno(const char *func, int errorNumber) +{ + return QString::fromLatin1("Call to %1() failed: %2").arg(QLatin1String(func), QString::fromLocal8Bit(strerror(errorNumber))); +} + +class UnixReaderThread : public QThread { + Q_OBJECT + Q_DISABLE_COPY(UnixReaderThread) +public: + explicit UnixReaderThread(const QSharedPointer<DeviceContext> &context); + ~UnixReaderThread(); + + virtual void run(); + +signals: + void error(const QString &); + void dataReceived(const QByteArray &); + +public slots: + void terminate(); + +private: + inline int tryRead(); + + const QSharedPointer<DeviceContext> m_context; + int m_terminatePipeFileDescriptors[2]; +}; + +UnixReaderThread::UnixReaderThread(const QSharedPointer<DeviceContext> &context) : + m_context(context) +{ + m_terminatePipeFileDescriptors[0] = m_terminatePipeFileDescriptors[1] = -1; + // Set up pipes for termination. Should not fail + if (pipe(m_terminatePipeFileDescriptors) < 0) + qWarning("%s\n", qPrintable(msgUnixCallFailedErrno("pipe", errno))); +} + +UnixReaderThread::~UnixReaderThread() +{ + close(m_terminatePipeFileDescriptors[0]); + close(m_terminatePipeFileDescriptors[1]); +} + +int UnixReaderThread::tryRead() +{ + fd_set readSet, tempReadSet, tempExceptionSet; + struct timeval timeOut; + const int fileDescriptor = m_context->file.handle(); + FD_ZERO(&readSet); + FD_SET(fileDescriptor, &readSet); + FD_SET(m_terminatePipeFileDescriptors[0], &readSet); + const int maxFileDescriptor = qMax(m_terminatePipeFileDescriptors[0], fileDescriptor); + int result = 0; + do { + memcpy(&tempReadSet, &readSet, sizeof(fd_set)); + memcpy(&tempExceptionSet, &readSet, sizeof(fd_set)); + timeOut.tv_sec = 1; + timeOut.tv_usec = 0; + result = select(maxFileDescriptor + 1, &tempReadSet, NULL, &tempExceptionSet, &timeOut); + } while ( result < 0 && errno == EINTR ); + // Timeout? + if (result == 0) + return 0; + // Something wrong? + if (result < 0) { + emit error(msgUnixCallFailedErrno("select", errno)); + return -1; + } + // Did the exception set trigger on the device? + if (FD_ISSET(fileDescriptor,&tempExceptionSet)) { + emit error(QLatin1String("An Exception occurred on the device.")); + return -2; + } + // Check termination pipe. + if (FD_ISSET(m_terminatePipeFileDescriptors[0], &tempReadSet) + || FD_ISSET(m_terminatePipeFileDescriptors[0], &tempExceptionSet)) + return 1; + // determine number of pending bytes and read + int numBytes; + if (ioctl(fileDescriptor, FIONREAD, &numBytes) < 0) { + emit error(msgUnixCallFailedErrno("ioctl", errno)); + return -1; + + } + m_context->mutex.lock(); + const QByteArray data = m_context->file.read(numBytes); + m_context->mutex.unlock(); + emit dataReceived(data); + return 0; +} + +void UnixReaderThread::run() +{ + int readResult; + // Read loop + while ( (readResult = tryRead()) == 0) ; +} + +void UnixReaderThread::terminate() +{ + // Trigger select() by writing to the pipe + char c = 0; + write(m_terminatePipeFileDescriptors[1], &c, 1); + wait(); +} + +typedef UnixReaderThread ReaderThread; + +#endif + /////////////////////////////////////////////////////////////////////// // // TrkDevicePrivate @@ -456,9 +693,9 @@ struct TrkDevicePrivate QSharedPointer<DeviceContext> deviceContext; QSharedPointer<WriterThread> writerThread; + QSharedPointer<ReaderThread> readerThread; QByteArray trkReadBuffer; - int timerId; int verbose; QString errorString; }; @@ -471,7 +708,6 @@ struct TrkDevicePrivate TrkDevicePrivate::TrkDevicePrivate() : deviceContext(new DeviceContext), - timerId(-1), verbose(0) { } @@ -502,13 +738,21 @@ bool TrkDevice::open(const QString &port, QString *errorMessage) 0, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, + FILE_ATTRIBUTE_NORMAL|FILE_FLAG_NO_BUFFERING|FILE_FLAG_OVERLAPPED, NULL); if (INVALID_HANDLE_VALUE == d->deviceContext->device) { *errorMessage = QString::fromLatin1("Could not open device '%1': %2").arg(port, winErrorMessage(GetLastError())); return false; } + memset(&d->deviceContext->readOverlapped, 0, sizeof(OVERLAPPED)); + d->deviceContext->readOverlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + memset(&d->deviceContext->writeOverlapped, 0, sizeof(OVERLAPPED)); + d->deviceContext->writeOverlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (d->deviceContext->readOverlapped.hEvent == NULL || d->deviceContext->writeOverlapped.hEvent == NULL) { + *errorMessage = QString::fromLatin1("Failed to create events: %1").arg(winErrorMessage(GetLastError())); + return false; + } #else d->deviceContext->file.setFileName(port); if (!d->deviceContext->file.open(QIODevice::ReadWrite|QIODevice::Unbuffered)) { @@ -537,11 +781,23 @@ bool TrkDevice::open(const QString &port, QString *errorMessage) return false; } #endif - d->timerId = startTimer(TimerInterval); - d->writerThread = QSharedPointer<WriterThread>(new WriterThread(d->deviceContext)); - connect(d->writerThread.data(), SIGNAL(error(QString)), this, SIGNAL(error(QString)), + d->readerThread = QSharedPointer<ReaderThread>(new ReaderThread(d->deviceContext)); + connect(d->readerThread.data(), SIGNAL(error(QString)), this, SLOT(emitError(QString)), Qt::QueuedConnection); - d->writerThread->start(); +#ifdef Q_OS_WIN + connect(d->readerThread.data(), SIGNAL(dataReceived(char)), + this, SLOT(dataReceived(char)), Qt::QueuedConnection); +#else + connect(d->readerThread.data(), SIGNAL(dataReceived(QByteArray)), + this, SLOT(dataReceived(QByteArray)), Qt::QueuedConnection); +#endif + d->readerThread->start(); + + d->writerThread = QSharedPointer<WriterThread>(new WriterThread(d->deviceContext)); + connect(d->writerThread.data(), SIGNAL(error(QString)), this, SLOT(emitError(QString)), + Qt::QueuedConnection); + d->writerThread->start(); + if (d->verbose) qDebug() << "Opened" << port; return true; @@ -551,16 +807,16 @@ void TrkDevice::close() { if (!isOpen()) return; - if (d->timerId != -1) { - killTimer(d->timerId); - d->timerId = -1; - } #ifdef Q_OS_WIN CloseHandle(d->deviceContext->device); d->deviceContext->device = INVALID_HANDLE_VALUE; + CloseHandle(d->deviceContext->readOverlapped.hEvent); + CloseHandle(d->deviceContext->writeOverlapped.hEvent); + d->deviceContext->readOverlapped.hEvent = d->deviceContext->writeOverlapped.hEvent = NULL; #else d->deviceContext->file.close(); #endif + d->readerThread->terminate(); d->writerThread->terminate(); if (d->verbose) emitLogMessage("Close"); @@ -600,65 +856,20 @@ void TrkDevice::setVerbose(int b) d->verbose = b; } -#ifndef Q_OS_WIN -static inline int bytesAvailable(int fileNo) +void TrkDevice::dataReceived(char c) { - int numBytes; - const int rc = ioctl(fileNo, FIONREAD, &numBytes); - if (rc < 0) - numBytes=0; - return numBytes; + d->trkReadBuffer += c; + readMessages(); } -#endif -void TrkDevice::tryTrkRead() +void TrkDevice::dataReceived(const QByteArray &data) +{ + d->trkReadBuffer += data; + readMessages(); +} + +void TrkDevice::readMessages() { -#ifdef Q_OS_WIN - const DWORD BUFFERSIZE = 1024; - char buffer[BUFFERSIZE]; - DWORD charsRead; - DWORD totalCharsRead = 0; - { - QMutexLocker(&d->deviceContext->mutex); - while (TryReadFile(d->deviceContext->device, buffer, BUFFERSIZE, &charsRead, NULL)) { - totalCharsRead += charsRead; - d->trkReadBuffer.append(buffer, charsRead); - if (isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame)) - break; - } - } - if (d->verbose > 1 && totalCharsRead) - emitLogMessage("Read" + d->trkReadBuffer.toHex()); - if (!totalCharsRead) - return; - const ushort len = isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame); - if (!len) { - const QString msg = QString::fromLatin1("Partial message: %1").arg(stringFromArray(d->trkReadBuffer)); - emitError(msg); - return; - } -#else - QByteArray data; - { - QMutexLocker(&d->deviceContext->mutex); - const int size = bytesAvailable(d->deviceContext->file.handle()); - if (!size) - return; - data = d->deviceContext->file.read(size); - } - if (d->verbose > 1) - emitLogMessage("trk: <- " + stringFromArray(data)); - d->trkReadBuffer.append(data); - const ushort len = isValidTrkResult(d->trkReadBuffer, d->deviceContext->serialFrame); - if (!len) { - if (d->trkReadBuffer.size() > 10) { - const QString msg = QString::fromLatin1("Unable to extract message from '%1' '%2'"). - arg(QLatin1String(d->trkReadBuffer.toHex())).arg(QString::fromAscii(d->trkReadBuffer)); - emitError(msg); - } - return; - } -#endif // Q_OS_WIN TrkResult r; QByteArray rawData; while (extractResult(&d->trkReadBuffer, d->deviceContext->serialFrame, &r, &rawData)) { @@ -671,11 +882,6 @@ void TrkDevice::tryTrkRead() } } -void TrkDevice::timerEvent(QTimerEvent *) -{ - tryTrkRead(); -} - void TrkDevice::emitError(const QString &s) { d->errorString = s; diff --git a/src/shared/trk/trkdevice.h b/src/shared/trk/trkdevice.h index a4da14fccb034b5fa598b7c44e5a9718161d3a4a..a9640f83002cb5b324071d1bbd38be1a51a29389 100644 --- a/src/shared/trk/trkdevice.h +++ b/src/shared/trk/trkdevice.h @@ -104,12 +104,16 @@ signals: void error(const QString &msg); void logMessage(const QString &msg); -protected: +private slots: + void dataReceived(char c); + void dataReceived(const QByteArray &a); + +protected slots: void emitError(const QString &msg); void emitLogMessage(const QString &msg); - virtual void timerEvent(QTimerEvent *ev); private: + void readMessages(); TrkDevicePrivate *d; };