diff --git a/src/libs/utils/filesearch.cpp b/src/libs/utils/filesearch.cpp index de50524f496c27d59b6d7ae4e05950f14e8ba621..a944ffced0117d8e26efb48d75c7791e1a2d18cd 100644 --- a/src/libs/utils/filesearch.cpp +++ b/src/libs/utils/filesearch.cpp @@ -83,8 +83,8 @@ class FileSearch public: FileSearch(const QString &searchTerm, QTextDocument::FindFlags flags, QMap fileToContentsMap); - FileSearchResultList operator()(QFutureInterface futureInterface, - const FileIterator::Item &item) const; + void operator()(QFutureInterface &futureInterface, + const FileIterator::Item &item) const; private: QMap fileToContentsMap; @@ -104,8 +104,8 @@ public: FileSearchRegExp(const QString &searchTerm, QTextDocument::FindFlags flags, QMap fileToContentsMap); FileSearchRegExp(const FileSearchRegExp &other); - FileSearchResultList operator()(QFutureInterface futureInterface, - const FileIterator::Item &item) const; + void operator()(QFutureInterface &futureInterface, + const FileIterator::Item &item) const; private: QRegularExpressionMatch doGuardedMatch(const QString &line, int offset) const; @@ -129,17 +129,21 @@ FileSearch::FileSearch(const QString &searchTerm, QTextDocument::FindFlags flags termDataUpper = searchTermUpper.constData(); } -FileSearchResultList FileSearch::operator()(QFutureInterface futureInterface, - const FileIterator::Item &item) const +void FileSearch::operator()(QFutureInterface &futureInterface, + const FileIterator::Item &item) const { - FileSearchResultList results; if (futureInterface.isCanceled()) - return results; + return; + futureInterface.setProgressRange(0, 1); + futureInterface.setProgressValue(0); + FileSearchResultList results; QFile file; QTextStream stream; QString tempString; - if (!openStream(item.filePath, item.encoding, &stream, &file, &tempString, fileToContentsMap)) - return results; + if (!openStream(item.filePath, item.encoding, &stream, &file, &tempString, fileToContentsMap)) { + futureInterface.cancel(); // failure + return; + } int lineNr = 0; while (!stream.atEnd()) { @@ -211,7 +215,10 @@ FileSearchResultList FileSearch::operator()(QFutureInterface futureInterface, - const FileIterator::Item &item) const +void FileSearchRegExp::operator()(QFutureInterface &futureInterface, + const FileIterator::Item &item) const { - FileSearchResultList results; if (futureInterface.isCanceled()) - return results; + return; + futureInterface.setProgressRange(0, 1); + futureInterface.setProgressValue(0); + FileSearchResultList results; QFile file; QTextStream stream; QString tempString; - if (!openStream(item.filePath, item.encoding, &stream, &file, &tempString, fileToContentsMap)) - return results; + if (!openStream(item.filePath, item.encoding, &stream, &file, &tempString, fileToContentsMap)) { + futureInterface.cancel(); // failure + return; + } int lineNr = 0; QString line; @@ -277,7 +288,10 @@ FileSearchResultList FileSearchRegExp::operator()(QFutureInterface &futureInterface, QFuture Utils::findInFiles(const QString &searchTerm, FileIterator *files, QTextDocument::FindFlags flags, QMap fileToContentsMap) { - return mapReduce(std::cref(*files), + return mapReduce(std::cref(*files), [searchTerm, files](QFutureInterface &futureInterface) { return initFileSearch(futureInterface, searchTerm, files); }, @@ -357,7 +371,7 @@ QFuture Utils::findInFiles(const QString &searchTerm, File QFuture Utils::findInFilesRegExp(const QString &searchTerm, FileIterator *files, QTextDocument::FindFlags flags, QMap fileToContentsMap) { - return mapReduce(std::cref(*files), + return mapReduce(std::cref(*files), [searchTerm, files](QFutureInterface &futureInterface) { return initFileSearch(futureInterface, searchTerm, files); }, diff --git a/src/libs/utils/mapreduce.h b/src/libs/utils/mapreduce.h index 1bbfdc9a254772ffc5926ee897f83a0498641c87..6d26607619166102845a2509ac5fc802c1f67e43 100644 --- a/src/libs/utils/mapreduce.h +++ b/src/libs/utils/mapreduce.h @@ -25,124 +25,167 @@ #pragma once -#include "qtcassert.h" +#include "runextensions.h" -#include -#include - -#include -#include -#include -#include +#include namespace Utils { - -template -typename std::vector>::iterator -waitForAny(std::vector> &futures) -{ - // Wait for any future to have a result ready. - // Unfortunately we have to do that in a busy loop because future doesn't have a feature to - // wait for any of a set of futures (yet? possibly when_any in C++17). - auto end = futures.end(); - QTC_ASSERT(!futures.empty(), return end); - auto futureIterator = futures.begin(); - forever { - if (futureIterator->wait_for(std::chrono::duration::zero()) == std::future_status::ready) - return futureIterator; - ++futureIterator; - if (futureIterator == end) - futureIterator = futures.begin(); - } -} - namespace Internal { -template -void swapErase(std::vector &vec, typename std::vector::iterator it) -{ - // efficient erasing by swapping with back element - *it = std::move(vec.back()); - vec.pop_back(); -} +// TODO: try to use this for replacing MultiTask -template -void reduceOne(QFutureInterface &futureInterface, - std::vector> &futures, - State &state, const ReduceFunction &reduce) +class MapReduceBase : public QObject { - auto futureIterator = waitForAny(futures); - if (futureIterator != futures.end()) { - reduce(futureInterface, state, futureIterator->get()); - swapErase(futures, futureIterator); - } -} + Q_OBJECT +}; -// This together with reduceOne can be replaced by std::transformReduce (parallelism TS) -// when that becomes widely available in C++ implementations template -void mapReduceLoop(QFutureInterface &futureInterface, const Container &container, - const MapFunction &map, State &state, const ReduceFunction &reduce) +class MapReduce : public MapReduceBase { - const unsigned MAX_THREADS = std::thread::hardware_concurrency(); - using MapResult = typename std::result_of,typename Container::value_type)>::type; - std::vector> futures; - futures.reserve(MAX_THREADS); - auto fileIterator = container.begin(); - auto end = container.end(); - while (!futureInterface.isCanceled() && (fileIterator != end || futures.size() != 0)) { - if (futures.size() >= MAX_THREADS || fileIterator == end) { - // We don't want to start a new thread (yet), so try to find a future that is ready and - // handle its result. - reduceOne(futureInterface, futures, state, reduce); - } else { // start a new thread - futures.push_back(std::async(std::launch::async, - map, futureInterface, *fileIterator)); - ++fileIterator; + using MapResult = typename Internal::resultType::type; + using Iterator = typename Container::const_iterator; + +public: + MapReduce(QFutureInterface futureInterface, const Container &container, + const MapFunction &map, State &state, const ReduceFunction &reduce) + : m_futureInterface(futureInterface), + m_container(container), + m_iterator(m_container.begin()), + m_map(map), + m_state(state), + m_reduce(reduce) + { + connect(&m_selfWatcher, &QFutureWatcher::canceled, + this, &MapReduce::cancelAll); + m_selfWatcher.setFuture(futureInterface.future()); + } + + void exec() + { + if (schedule()) // do not enter event loop for empty containers + m_loop.exec(); + } + +private: + bool schedule() + { + bool didSchedule = false; + while (m_iterator != m_container.end() && m_mapWatcher.size() < QThread::idealThreadCount()) { + didSchedule = true; + auto watcher = new QFutureWatcher(); + connect(watcher, &QFutureWatcher::finished, this, [this, watcher]() { + mapFinished(watcher); + }); + m_mapWatcher.append(watcher); + watcher->setFuture(runAsync(&m_threadPool, m_map, *m_iterator)); + ++m_iterator; } + return didSchedule; } -} + + void mapFinished(QFutureWatcher *watcher) + { + m_mapWatcher.removeAll(watcher); // remove so we can schedule next one + bool didSchedule = false; + if (!m_futureInterface.isCanceled()) { + // first schedule the next map... + didSchedule = schedule(); + // ...then reduce + const int resultCount = watcher->future().resultCount(); + for (int i = 0; i < resultCount; ++i) { + Internal::runAsyncImpl(m_futureInterface, m_reduce, m_state, watcher->future().resultAt(i)); + } + } + delete watcher; + if (!didSchedule && m_mapWatcher.isEmpty()) + m_loop.quit(); + } + + void cancelAll() + { + foreach (QFutureWatcher *watcher, m_mapWatcher) + watcher->cancel(); + } + + QFutureWatcher m_selfWatcher; + QFutureInterface m_futureInterface; + const Container &m_container; + Iterator m_iterator; + const MapFunction &m_map; + State &m_state; + const ReduceFunction &m_reduce; + QEventLoop m_loop; + QThreadPool m_threadPool; // for reusing threads + QList *> m_mapWatcher; +}; template -void blockingMapReduce(QFutureInterface futureInterface, const Container &container, +void blockingMapReduce(QFutureInterface &futureInterface, const Container &container, const InitFunction &init, const MapFunction &map, const ReduceFunction &reduce, const CleanUpFunction &cleanup) { auto state = init(futureInterface); - mapReduceLoop(futureInterface, container, map, state, reduce); + MapReduce mr(futureInterface, container, map, state, reduce); + mr.exec(); cleanup(futureInterface, state); - if (futureInterface.isPaused()) - futureInterface.waitForResume(); - futureInterface.reportFinished(); } } // Internal -template -QFuture mapReduce(std::reference_wrapper containerWrapper, +template ::type> +QFuture +mapReduce(std::reference_wrapper containerWrapper, const InitFunction &init, const MapFunction &map, const ReduceFunction &reduce, const CleanUpFunction &cleanup) { - auto fi = QFutureInterface(); - QFuture future = fi.future(); - fi.reportStarted(); - std::thread(Internal::blockingMapReduce, - fi, containerWrapper, init, map, reduce, cleanup).detach(); - return future; + return runAsync(Internal::blockingMapReduce, + containerWrapper, init, map, reduce, cleanup); } -template -QFuture mapReduce(const Container &container, const InitFunction &init, const MapFunction &map, +/*! + Calls the map function on all items in \a container in parallel through Utils::runAsync. + + The reduce function is called in the mapReduce thread with each of the reported results from + the map function, in arbitrary order, but never in parallel. + It gets passed a reference to a user defined state object, and a result from the map function. + If it takes a QFutureInterface reference as its first argument, it can report results + for the mapReduce operation through that. Otherwise, any values returned by the reduce function + are reported as results of the mapReduce operation. + + The init function is called in the mapReduce thread before the actual mapping starts, + and must return the initial state object for the reduce function. It gets the QFutureInterface + of the mapReduce operation passed as an argument. + + The cleanup function is called in the mapReduce thread after all map and reduce calls have + finished, with the QFutureInterface of the mapReduce operation and the final state object + as arguments, and can be used to clean up any resources, or report a final result of the + mapReduce. + + Container + StateType InitFunction(QFutureInterface&) + + void MapFunction(QFutureInterface&, const ItemType&) + or + MapResultType MapFunction(const ItempType&) + + void ReduceFunction(QFutureInterface&, StateType&, const ItemType&) + or + ReduceResultType ReduceFunction(StateType&, const ItemType&) + + void CleanUpFunction(QFutureInterface&, StateType&) + */ +template ::type> +QFuture +mapReduce(const Container &container, const InitFunction &init, const MapFunction &map, const ReduceFunction &reduce, const CleanUpFunction &cleanup) { - auto fi = QFutureInterface(); - QFuture future = fi.future(); - std::thread(Internal::blockingMapReduce, - fi, container, init, map, reduce, cleanup).detach(); - return future; + return runAsync(Internal::blockingMapReduce, + container, init, map, reduce, cleanup); } } // Utils diff --git a/src/libs/utils/runextensions.h b/src/libs/utils/runextensions.h index e9cef7a348b88c20e0e413d8b189c5f51fef014c..73a67ee46074cb4d2cdf2e7ac0d5b489aec0be57 100644 --- a/src/libs/utils/runextensions.h +++ b/src/libs/utils/runextensions.h @@ -222,12 +222,9 @@ private: // void function that does not take QFutureInterface template -void runAsyncReturnVoidDispatch(std::true_type, QFutureInterface futureInterface, Function &&function, Args&&... args) +void runAsyncReturnVoidDispatch(std::true_type, QFutureInterface, Function &&function, Args&&... args) { function(std::forward(args)...); - if (futureInterface.isPaused()) - futureInterface.waitForResume(); - futureInterface.reportFinished(); } // non-void function that does not take QFutureInterface @@ -235,9 +232,6 @@ template void runAsyncReturnVoidDispatch(std::false_type, QFutureInterface futureInterface, Function &&function, Args&&... args) { futureInterface.reportResult(function(std::forward(args)...)); - if (futureInterface.isPaused()) - futureInterface.waitForResume(); - futureInterface.reportFinished(); } // function that takes QFutureInterface @@ -245,9 +239,6 @@ template void runAsyncQFutureInterfaceDispatch(std::true_type, QFutureInterface futureInterface, Function &&function, Args&&... args) { function(futureInterface, std::forward(args)...); - if (futureInterface.isPaused()) - futureInterface.waitForResume(); - futureInterface.reportFinished(); } // function that does not take QFutureInterface @@ -374,6 +365,9 @@ private: { // invalidates data, which is moved into the call runAsyncImpl(futureInterface, std::move(std::get(data))...); + if (futureInterface.isPaused()) + futureInterface.waitForResume(); + futureInterface.reportFinished(); } Data data; diff --git a/tests/auto/auto.pro b/tests/auto/auto.pro index 1d036116ebf3cb046d36bb3a50fb88f09bd1d2c2..1612ea7f63a2f97d59452d952a718b1f97bfaa78 100644 --- a/tests/auto/auto.pro +++ b/tests/auto/auto.pro @@ -17,6 +17,7 @@ SUBDIRS += \ json \ utils \ filesearch \ + mapreduce \ runextensions \ sdktool \ valgrind diff --git a/tests/auto/mapreduce/mapreduce.pro b/tests/auto/mapreduce/mapreduce.pro new file mode 100644 index 0000000000000000000000000000000000000000..1c64eeaa7ad7e73dd1ae7869c5be07b543eb83b5 --- /dev/null +++ b/tests/auto/mapreduce/mapreduce.pro @@ -0,0 +1,6 @@ +QTC_LIB_DEPENDS += utils +include(../qttest.pri) + +# Input +SOURCES += tst_mapreduce.cpp +HEADERS += $$IDE_SOURCE_TREE/src/libs/utils/mapreduce.h diff --git a/tests/auto/mapreduce/mapreduce.qbs b/tests/auto/mapreduce/mapreduce.qbs new file mode 100644 index 0000000000000000000000000000000000000000..63502117b0210e0ce1259fd7ca6526bf77a0f567 --- /dev/null +++ b/tests/auto/mapreduce/mapreduce.qbs @@ -0,0 +1,10 @@ +import qbs + +QtcAutotest { + name: "Map reduce autotest" + Depends { name: "Utils" } + + files: [ + "tst_mapreduce.cpp", + ] +} diff --git a/tests/auto/mapreduce/tst_mapreduce.cpp b/tests/auto/mapreduce/tst_mapreduce.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fd339831b968054eb17e63e2bae1d70bf8460745 --- /dev/null +++ b/tests/auto/mapreduce/tst_mapreduce.cpp @@ -0,0 +1,97 @@ +/**************************************************************************** +** +** Copyright (C) 2016 The Qt Company Ltd. +** Contact: https://www.qt.io/licensing/ +** +** This file is part of Qt Creator. +** +** Commercial License Usage +** Licensees holding valid commercial Qt licenses may use this file in +** accordance with the commercial license agreement provided with the +** Software or, alternatively, in accordance with the terms contained in +** a written agreement between you and The Qt Company. For licensing terms +** and conditions see https://www.qt.io/terms-conditions. For further +** information use the contact form at https://www.qt.io/contact-us. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 3 as published by the Free Software +** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT +** included in the packaging of this file. Please review the following +** information to ensure the GNU General Public License requirements will +** be met: https://www.gnu.org/licenses/gpl-3.0.html. +** +****************************************************************************/ + +#include +#include + +#include + +class tst_MapReduce : public QObject +{ + Q_OBJECT + +private slots: + void mapReduce(); +}; + +static int returnxx(int x) +{ + return x*x; +} + +static void returnxxThroughFutureInterface(QFutureInterface &fi, int x) +{ + fi.reportResult(x*x); +} + +void tst_MapReduce::mapReduce() +{ + const auto dummyInit = [](QFutureInterface& fi) -> double { + fi.reportResult(0.); + return 0.; + }; + const auto reduceWithFutureInterface = [](QFutureInterface& fi, double &state, int value) { + state += value; + fi.reportResult(value); + }; + const auto reduceWithReturn = [](double &state, int value) -> double { + state += value; + return value; + }; + const auto cleanupHalfState = [](QFutureInterface &fi, double &state) { + state /= 2.; + fi.reportResult(state); + }; + + // TODO: cannot use function returnxx without pointer here because of decayCopy of arguments in runAsync + { + QList results = Utils::mapReduce(QList({1, 2, 3, 4, 5}), + dummyInit, &returnxx, + reduceWithFutureInterface, cleanupHalfState) + .results(); + Utils::sort(results); // mapping order is undefined + QCOMPARE(results, QList({0., 1., 4., 9., 16., 25., 27.5})); + } + { + QList results = Utils::mapReduce(QList({1, 2, 3, 4, 5}), + dummyInit, &returnxxThroughFutureInterface, + reduceWithFutureInterface, cleanupHalfState) + .results(); + Utils::sort(results); // mapping order is undefined + QCOMPARE(results, QList({0., 1., 4., 9., 16., 25., 27.5})); + } + { + QList results = Utils::mapReduce(QList({1, 2, 3, 4, 5}), + dummyInit, &returnxx, + reduceWithReturn, cleanupHalfState) + .results(); + Utils::sort(results); // mapping order is undefined + QCOMPARE(results, QList({0., 1., 4., 9., 16., 25., 27.5})); + } +} + +QTEST_MAIN(tst_MapReduce) + +#include "tst_mapreduce.moc"