Commit 47c37556 authored by Eike Ziller's avatar Eike Ziller

mapReduce: Support progress information and add (unordered) map

If a container is given to mapReduce, it takes the responsibility to
report progress information for the whole operation. If the map function
reports its own progress, that is taken into account for the overall
progress.

The (so far only unordered) Utils::map operation can be used to replace
MultiTask, by passing a member function of the items in the container as
a map function.

Change-Id: I18ca38a6ad2899d73f590bfe59bf2e6eb2f1a57a
Reviewed-by: default avatarTobias Hunger <tobias.hunger@theqtcompany.com>
parent d2350c23
......@@ -23,5 +23,4 @@
**
****************************************************************************/
#include "multitask.h"
#include "runextensions.h"
......@@ -359,7 +359,7 @@ void cleanUpFileSearch(QFutureInterface<FileSearchResultList> &futureInterface,
QFuture<FileSearchResultList> Utils::findInFiles(const QString &searchTerm, FileIterator *files,
QTextDocument::FindFlags flags, QMap<QString, QString> fileToContentsMap)
{
return mapReduce(std::cref(*files),
return mapReduce(files->begin(), files->end(),
[searchTerm, files](QFutureInterface<FileSearchResultList> &futureInterface) {
return initFileSearch(futureInterface, searchTerm, files);
},
......@@ -371,7 +371,7 @@ QFuture<FileSearchResultList> Utils::findInFiles(const QString &searchTerm, File
QFuture<FileSearchResultList> Utils::findInFilesRegExp(const QString &searchTerm, FileIterator *files,
QTextDocument::FindFlags flags, QMap<QString, QString> fileToContentsMap)
{
return mapReduce(std::cref(*files),
return mapReduce(files->begin(), files->end(),
[searchTerm, files](QFutureInterface<FileSearchResultList> &futureInterface) {
return initFileSearch(futureInterface, searchTerm, files);
},
......
This diff is collapsed.
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of Qt Creator.
**
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 3 as published by the Free Software
** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-3.0.html.
**
****************************************************************************/
#ifndef MULTITASK_H
#define MULTITASK_H
#include "utils_global.h"
#include "runextensions.h"
#include <QObject>
#include <QList>
#include <QEventLoop>
#include <QFutureWatcher>
#include <QtConcurrentRun>
#include <QThreadPool>
#include <QDebug>
QT_BEGIN_NAMESPACE
namespace QtConcurrent {
class QTCREATOR_UTILS_EXPORT MultiTaskBase : public QObject, public QRunnable
{
Q_OBJECT
protected slots:
virtual void cancelSelf() = 0;
virtual void setFinished() = 0;
virtual void setProgressRange(int min, int max) = 0;
virtual void setProgressValue(int value) = 0;
virtual void setProgressText(QString value) = 0;
};
template <typename Class, typename R>
class MultiTask : public MultiTaskBase
{
public:
MultiTask(void (Class::*fn)(QFutureInterface<R> &), const QList<Class *> &objects)
: fn(fn),
objects(objects)
{
maxProgress = 100*objects.size();
}
QFuture<R> future()
{
futureInterface.reportStarted();
return futureInterface.future();
}
void run()
{
QThreadPool::globalInstance()->releaseThread();
futureInterface.setProgressRange(0, maxProgress);
foreach (Class *object, objects) {
QFutureWatcher<R> *watcher = new QFutureWatcher<R>();
watchers.insert(object, watcher);
finished.insert(watcher, false);
connect(watcher, &QFutureWatcherBase::finished,
this, &MultiTask::setFinished);
connect(watcher, &QFutureWatcherBase::progressRangeChanged,
this, &MultiTask::setProgressRange);
connect(watcher, &QFutureWatcherBase::progressValueChanged,
this, &MultiTask::setProgressValue);
connect(watcher, &QFutureWatcherBase::progressTextChanged,
this, &MultiTask::setProgressText);
watcher->setFuture(Utils::runAsync(QThreadPool::globalInstance(), fn, object));
}
selfWatcher = new QFutureWatcher<R>();
connect(selfWatcher, &QFutureWatcherBase::canceled, this, &MultiTask::cancelSelf);
selfWatcher->setFuture(futureInterface.future());
loop = new QEventLoop;
loop->exec();
futureInterface.reportFinished();
QThreadPool::globalInstance()->reserveThread();
qDeleteAll(watchers);
delete selfWatcher;
delete loop;
}
protected:
void cancelSelf()
{
foreach (QFutureWatcher<R> *watcher, watchers)
watcher->future().cancel();
}
void setFinished()
{
updateProgress();
QFutureWatcher<R> *watcher = static_cast<QFutureWatcher<R> *>(sender());
if (finished.contains(watcher))
finished[watcher] = true;
bool allFinished = true;
foreach (bool isFinished, finished) {
if (!isFinished) {
allFinished = false;
break;
}
}
if (allFinished)
loop->quit();
}
void setProgressRange(int min, int max)
{
Q_UNUSED(min)
Q_UNUSED(max)
updateProgress();
}
void setProgressValue(int value)
{
Q_UNUSED(value)
updateProgress();
}
void setProgressText(QString value)
{
Q_UNUSED(value)
updateProgressText();
}
private:
void updateProgress()
{
int progressSum = 0;
foreach (QFutureWatcher<R> *watcher, watchers) {
if (watcher->progressMinimum() == watcher->progressMaximum()) {
if (watcher->future().isFinished() && !watcher->future().isCanceled())
progressSum += 100;
} else {
progressSum += 100*(watcher->progressValue()-watcher->progressMinimum())/(watcher->progressMaximum()-watcher->progressMinimum());
}
}
futureInterface.setProgressValue(progressSum);
}
void updateProgressText()
{
QString text;
foreach (QFutureWatcher<R> *watcher, watchers) {
if (!watcher->progressText().isEmpty()) {
text += watcher->progressText();
text += QLatin1Char('\n');
}
}
text = text.trimmed();
futureInterface.setProgressValueAndText(futureInterface.progressValue(), text);
}
QFutureInterface<R> futureInterface;
void (Class::*fn)(QFutureInterface<R> &);
QList<Class *> objects;
QFutureWatcher<R> *selfWatcher;
QMap<Class *, QFutureWatcher<R> *> watchers;
QMap<QFutureWatcher<R> *, bool> finished;
QEventLoop *loop;
int maxProgress;
};
template <typename Class, typename T>
QFuture<T> run(void (Class::*fn)(QFutureInterface<T> &), const QList<Class *> &objects, int priority = 0)
{
MultiTask<Class, T> *task = new MultiTask<Class, T>(fn, objects);
QFuture<T> future = task->future();
QThreadPool::globalInstance()->start(task, priority);
return future;
}
} // namespace QtConcurrent
QT_END_NAMESPACE
#endif // MULTITASK_H
......@@ -168,7 +168,6 @@ HEADERS += \
$$PWD/persistentsettings.h \
$$PWD/completingtextedit.h \
$$PWD/json.h \
$$PWD/multitask.h \
$$PWD/runextensions.h \
$$PWD/portlist.h \
$$PWD/tcpportsgatherer.h \
......
......@@ -124,7 +124,6 @@ QtcLibrary {
"macroexpander.cpp",
"macroexpander.h",
"mapreduce.h",
"multitask.h",
"navigationtreeview.cpp",
"navigationtreeview.h",
"networkaccessmanager.cpp",
......
......@@ -46,8 +46,8 @@
#include <qmljs/qmljsdialect.h>
#include <qmljstools/qmljsmodelmanager.h>
#include <utils/multitask.h>
#include <utils/qtcassert.h>
#include <utils/runextensions.h>
#include <utils/textfileformat.h>
#include <QDirIterator>
......
......@@ -39,7 +39,7 @@
#include <projectexplorer/projectexplorer.h>
#include <projectexplorer/projectexplorersettings.h>
#include <utils/multitask.h>
#include <utils/runextensions.h>
#include <QFuture>
#include <QFutureInterface>
......
......@@ -46,7 +46,7 @@
#include <coreplugin/progressmanager/futureprogress.h>
#include <extensionsystem/pluginmanager.h>
#include <utils/algorithm.h>
#include <utils/QtConcurrentTools>
#include <utils/mapreduce.h>
#include <utils/qtcassert.h>
#include <QSettings>
......@@ -312,7 +312,7 @@ void Locator::refresh(QList<ILocatorFilter *> filters)
{
if (filters.isEmpty())
filters = m_filters;
QFuture<void> task = QtConcurrent::run(&ILocatorFilter::refresh, filters);
QFuture<void> task = Utils::map(filters, &ILocatorFilter::refresh);
FutureProgress *progress =
ProgressManager::addTask(task, tr("Updating Locator Caches"), Constants::TASK_INDEX);
connect(progress, &FutureProgress::finished, this, &Locator::saveSettings);
......
......@@ -150,11 +150,11 @@ using namespace Utils;
The actual refresh, which calls all the filters' refresh functions
in a different thread, looks like this:
\code
QFuture<void> task = QtConcurrent::run(&ILocatorFilter::refresh, filters);
QFuture<void> task = Utils::map(filters, &ILocatorFilter::refresh);
Core::FutureProgress *progress = Core::ProgressManager::addTask(task, tr("Indexing"),
Locator::Constants::TASK_INDEX);
\endcode
First, we tell QtConcurrent to start a thread which calls all the filters'
First, we to start an asynchronous operation which calls all the filters'
refresh function. After that we register the returned QFuture object
with the ProgressManager.
......
......@@ -34,6 +34,8 @@ class tst_MapReduce : public QObject
private slots:
void mapReduce();
void mapReduceRvalueContainer();
void map();
};
static int returnxx(int x)
......@@ -92,6 +94,43 @@ void tst_MapReduce::mapReduce()
}
}
void tst_MapReduce::mapReduceRvalueContainer()
{
{
QFuture<int> future = Utils::mapReduce(QList<int>({1, 2, 3, 4, 5}),
[](QFutureInterface<int>&) { return 0; },
[](int value) { return value; },
[](QFutureInterface<int>&, int &state, int value) { state += value; },
[](QFutureInterface<int> &fi, int &state) { fi.reportResult(state); });
// here, lifetime of the QList temporary ends
QCOMPARE(future.results(), QList<int>({15}));
}
}
void tst_MapReduce::map()
{
{
QList<double> results = Utils::map(QList<int>({2, 5, 1}),
[](int x) { return x*2.5; }
).results();
Utils::sort(results);
QCOMPARE(results, QList<double>({2.5, 5., 12.5}));
}
{
// void result
QList<int> results;
QMutex mutex;
Utils::map(
// container
QList<int>({2, 5, 1}),
// map
[&mutex, &results](int x) { QMutexLocker l(&mutex); results.append(x); }
).waitForFinished();
Utils::sort(results); // mapping order is undefined
QCOMPARE(results, QList<int>({1, 2, 5}));
}
}
QTEST_MAIN(tst_MapReduce)
#include "tst_mapreduce.moc"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment