Commit 349b49ca authored by Eike Ziller's avatar Eike Ziller
Browse files

RunExtensions: Allow running functions on arbitrary thread pools



Change-Id: Icf642e7365535e4b2ca6fa4dee90a7033eb39af7
Reviewed-by: default avatarErik Verbruggen <erik.verbruggen@theqtcompany.com>
parent 7345cc30
......@@ -28,10 +28,10 @@
#include "qtcassert.h"
#include <qrunnable.h>
#include <qfuture.h>
#include <qfutureinterface.h>
#include <qthreadpool.h>
#include <QFuture>
#include <QFutureInterface>
#include <QRunnable>
#include <QThreadPool>
#include <chrono>
#include <functional>
......@@ -540,6 +540,73 @@ runAsyncImpl(QFutureInterface<ResultType> futureInterface, const Function &funct
futureInterface.reportFinished();
}
// can be replaced with std::(make_)index_sequence with C++14
template <std::size_t...>
struct indexSequence { };
template <std::size_t N, std::size_t... S>
struct makeIndexSequence : makeIndexSequence<N-1, N-1, S...> { };
template <std::size_t... S>
struct makeIndexSequence<0, S...> { typedef indexSequence<S...> type; };
template <class T>
typename std::decay<T>::type
decayCopy(T&& v)
{
return std::forward<T>(v);
}
template <typename ResultType, typename Function, typename... Args>
class AsyncJob : public QRunnable
{
public:
AsyncJob(Function &&function, Args&&... args)
// decay copy like std::thread
: data(decayCopy(std::forward<Function>(function)), decayCopy(std::forward<Args>(args))...)
{
// we need to report it as started even though it isn't yet, because someone might
// call waitForFinished on the future, which does _not_ block if the future is not started
futureInterface.setRunnable(this);
futureInterface.reportStarted();
}
~AsyncJob()
{
// QThreadPool can delete runnables even if they were never run (e.g. QThreadPool::clear).
// Since we reported them as started, we make sure that we always report them as finished.
// reportFinished only actually sends the signal if it wasn't already finished.
futureInterface.reportFinished();
}
QFuture<ResultType> future() { return futureInterface.future(); }
void run() override
{
if (futureInterface.isCanceled()) {
futureInterface.reportFinished();
return;
}
runHelper(typename makeIndexSequence<std::tuple_size<Data>::value>::type());
}
void setThreadPool(QThreadPool *pool)
{
futureInterface.setThreadPool(pool);
}
private:
using Data = std::tuple<typename std::decay<Function>::type, typename std::decay<Args>::type...>;
template <std::size_t... index>
void runHelper(indexSequence<index...>)
{
// invalidates data, which is moved into the call
runAsyncImpl(futureInterface, std::move(std::get<index>(data))...);
}
Data data;
QFutureInterface<ResultType> futureInterface;
};
} // Internal
template <typename ReduceResult, typename Container, typename InitFunction, typename MapFunction,
......@@ -581,7 +648,10 @@ QFuture<ReduceResult> mapReduce(const Container &container, const InitFunction &
\sa std::thread
\sa std::invoke
*/
template <typename ResultType, typename Function, typename... Args>
template <typename ResultType, typename Function, typename... Args,
typename = typename std::enable_if<
!std::is_same<typename std::decay<Function>::type, QThreadPool>::value
>::type>
QFuture<ResultType> runAsync(Function &&function, Args&&... args)
{
QFutureInterface<ResultType> futureInterface;
......@@ -591,6 +661,17 @@ QFuture<ResultType> runAsync(Function &&function, Args&&... args)
return futureInterface.future();
}
template <typename ResultType, typename Function, typename... Args>
QFuture<ResultType> runAsync(QThreadPool *pool, Function &&function, Args&&... args)
{
auto job = new Internal::AsyncJob<ResultType,Function,Args...>
(std::forward<Function>(function), std::forward<Args>(args)...);
job->setThreadPool(pool);
QFuture<ResultType> future = job->future();
pool->start(job);
return future;
}
} // Utils
#endif // RUNEXTENSIONS_H
......@@ -33,6 +33,7 @@ class tst_RunExtensions : public QObject
private slots:
void runAsync();
void runInThreadPool();
};
void report3(QFutureInterface<int> &fi)
......@@ -181,6 +182,92 @@ void tst_RunExtensions::runAsync()
QList<QString>({QString(QLatin1String("rvalue"))}));
}
void tst_RunExtensions::runInThreadPool()
{
QScopedPointer<QThreadPool> pool(new QThreadPool);
// free function pointer
QCOMPARE(Utils::runAsync<int>(pool.data(), &report3).results(),
QList<int>({0, 2, 1}));
QCOMPARE(Utils::runAsync<int>(pool.data(), report3).results(),
QList<int>({0, 2, 1}));
QCOMPARE(Utils::runAsync<double>(pool.data(), reportN, 4).results(),
QList<double>({0, 0, 0, 0}));
QCOMPARE(Utils::runAsync<double>(pool.data(), reportN, 2).results(),
QList<double>({0, 0}));
QString s = QLatin1String("string");
const QString &crs = QLatin1String("cr string");
const QString cs = QLatin1String("c string");
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString1, s).results(),
QList<QString>({s}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString1, crs).results(),
QList<QString>({crs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString1, cs).results(),
QList<QString>({cs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString1, QString(QLatin1String("rvalue"))).results(),
QList<QString>({QString(QLatin1String("rvalue"))}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString2, s).results(),
QList<QString>({s}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString2, crs).results(),
QList<QString>({crs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString2, cs).results(),
QList<QString>({cs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), reportString2, QString(QLatin1String("rvalue"))).results(),
QList<QString>({QString(QLatin1String("rvalue"))}));
// lambda
QCOMPARE(Utils::runAsync<double>(pool.data(), [](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
}, 3).results(),
QList<double>({0, 0, 0}));
// std::function
const std::function<void(QFutureInterface<double>&,int)> fun = [](QFutureInterface<double> &fi, int n) {
fi.reportResults(QVector<double>(n, 0));
};
QCOMPARE(Utils::runAsync<double>(pool.data(), fun, 2).results(),
QList<double>({0, 0}));
// operator()
QCOMPARE(Utils::runAsync<double>(pool.data(), Callable(), 3).results(),
QList<double>({0, 0, 0}));
const Callable c{};
QCOMPARE(Utils::runAsync<double>(pool.data(), c, 2).results(),
QList<double>({0, 0}));
// static member functions
QCOMPARE(Utils::runAsync<double>(pool.data(), &MyObject::staticMember0).results(),
QList<double>({0, 2, 1}));
QCOMPARE(Utils::runAsync<double>(pool.data(), &MyObject::staticMember1, 2).results(),
QList<double>({0, 0}));
// member functions
const MyObject obj{};
QCOMPARE(Utils::runAsync<double>(pool.data(), &MyObject::member0, &obj).results(),
QList<double>({0, 2, 1}));
QCOMPARE(Utils::runAsync<double>(pool.data(), &MyObject::member1, &obj, 4).results(),
QList<double>({0, 0, 0, 0}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString1, &obj, s).results(),
QList<QString>({s}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString1, &obj, crs).results(),
QList<QString>({crs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString1, &obj, cs).results(),
QList<QString>({cs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString1, &obj, QString(QLatin1String("rvalue"))).results(),
QList<QString>({QString(QLatin1String("rvalue"))}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString2, &obj, s).results(),
QList<QString>({s}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString2, &obj, crs).results(),
QList<QString>({crs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString2, &obj, cs).results(),
QList<QString>({cs}));
QCOMPARE(Utils::runAsync<QString>(pool.data(), &MyObject::memberString2, &obj, QString(QLatin1String("rvalue"))).results(),
QList<QString>({QString(QLatin1String("rvalue"))}));
}
QTEST_MAIN(tst_RunExtensions)
#include "tst_runextensions.moc"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment