Commit 93648b2e authored by Lukas Appelhans's avatar Lukas Appelhans
Browse files

Make runner thread-affine

parent c266d388
......@@ -383,7 +383,7 @@ private:
OperationPrivate * const d_ptr;
friend class OperationRunner;
friend class RunnerRunnable;
friend class RunnerWorker;
friend class ValidatorWorker;
/* This is needed by the libarchive interface to perform its tasks */
......
......@@ -244,22 +244,29 @@ void OperationRunner::run()
d->runThread = new RunnerRunnable(d->operations, d->processingOptions);
connect(d->runThread.data(), SIGNAL(errorsOccurred(Akabei::Error::List)),
this, SLOT(__k__errorsOccurred(Akabei::Error::List)));
connect(d->runThread.data(), SIGNAL(runFinished(bool)),
this, SLOT(__k__finished(bool)));
connect(d->runThread.data(), SIGNAL(phaseStarted(Akabei::Operation::Phase)),
this, SIGNAL(phaseStarted(Akabei::Operation::Phase)));
connect(d->runThread.data(), SIGNAL(phaseFinished(Akabei::Operation::Phase)),
this, SIGNAL(phaseFinished(Akabei::Operation::Phase)));
connect(d->runThread.data(), SIGNAL(operationStarted(Akabei::Operation *)),
this, SIGNAL(operationStarted(Akabei::Operation *)));
connect(d->runThread.data(), SIGNAL(operationFinished(Akabei::Operation *)),
this, SIGNAL(operationFinished(Akabei::Operation *)));
connect(d->runThread.data(), SIGNAL(ready()), SLOT(__k__connectToRunnerStatus()));
QThreadPool::globalInstance()->start(d->runThread.data());
}
void OperationRunnerPrivate::__k__connectToRunnerStatus()
{
Q_Q(OperationRunner);
q->connect(runThread.data()->worker(), SIGNAL(errorsOccurred(Akabei::Error::List)),
q, SLOT(__k__errorsOccurred(Akabei::Error::List)));
q->connect(runThread.data()->worker(), SIGNAL(runFinished(bool)),
q, SLOT(__k__finished(bool)));
q->connect(runThread.data()->worker(), SIGNAL(phaseStarted(Akabei::Operation::Phase)),
q, SIGNAL(phaseStarted(Akabei::Operation::Phase)));
q->connect(runThread.data()->worker(), SIGNAL(phaseFinished(Akabei::Operation::Phase)),
q, SIGNAL(phaseFinished(Akabei::Operation::Phase)));
q->connect(runThread.data()->worker(), SIGNAL(operationStarted(Akabei::Operation *)),
q, SIGNAL(operationStarted(Akabei::Operation *)));
q->connect(runThread.data()->worker(), SIGNAL(operationFinished(Akabei::Operation *)),
q, SIGNAL(operationFinished(Akabei::Operation *)));
}
void OperationRunner::cancel()
{
......
......@@ -196,6 +196,7 @@ private:
Q_PRIVATE_SLOT(d_func(), void __k__validationFinished(bool,OpsHash))
Q_PRIVATE_SLOT(d_func(), void __k__finished(bool))
Q_PRIVATE_SLOT(d_func(), void __k__connectToValidationStatus())
Q_PRIVATE_SLOT(d_func(), void __k__connectToRunnerStatus())
friend class BackendPrivate;
friend class Operation;
......
......@@ -57,6 +57,7 @@ public:
void __k__validationFinished(bool, const OpsHash&);
void __k__finished(bool);
void __k__connectToValidationStatus();
void __k__connectToRunnerStatus();
};
}
......
......@@ -23,24 +23,23 @@ static Akabei::ProcessingOptions s_options;
namespace Akabei
{
RunnerRunnable::RunnerRunnable(const QHash<Operation::Phase, QList< Operation* > > &ops, ProcessingOptions processingOptions, QObject* parent)
: QObject(parent)
, QRunnable()
, m_operations(ops)
RunnerWorker::RunnerWorker(const OpsHash& ops, QObject* parent)
: QObject(parent)
, m_operations(ops)
, m_currentPhase(Operation::Phase1)
{
setAutoDelete(true);
s_options = processingOptions;
}
RunnerRunnable::~RunnerRunnable()
RunnerWorker::~RunnerWorker()
{
}
bool RunnerRunnable::runSingle(Operation* op)
bool RunnerWorker::runSingle(Operation* op)
{
OperationPrivate *opPr = RunnerRunnable::operationPrivateProxy(op);
OperationPrivate *opPr = RunnerWorker::operationPrivateProxy(op);
opPr->setProcessingOptions(s_options);
// Pre operations
......@@ -73,24 +72,12 @@ bool RunnerRunnable::runSingle(Operation* op)
return true;
}
OperationPrivate* RunnerRunnable::operationPrivateProxy(Operation* op)
OperationPrivate* RunnerWorker::operationPrivateProxy(Operation* op)
{
return op->d_func();
}
void RunnerRunnable::requestCancel()
{
}
void RunnerRunnable::run()
{
// Start from phase 1.
m_currentPhase = Operation::Phase1;
processNextPhase();
}
void RunnerRunnable::processNextPhase()
void RunnerWorker::processNextPhase()
{
emit phaseStarted(m_currentPhase);
// Let's go. For each phase, check if all the operations are ready (hence validated),
......@@ -152,14 +139,14 @@ void RunnerRunnable::processNextPhase()
}
}
Akabei::Error::List RunnerRunnable::runConcurrent(const QList< Operation* >& ops)
Akabei::Error::List RunnerWorker::runConcurrent(const QList< Operation* >& ops)
{
foreach (Operation * op, ops)
emit operationStarted(op);
QEventLoop e;
s_runFutureWatcher = new QFutureWatcher<void>;
connect(s_runFutureWatcher.data(), SIGNAL(finished()), &e, SLOT(quit()));
QFuture< void > future = QtConcurrent::map(ops, RunnerRunnable::runSingle);
QFuture< void > future = QtConcurrent::map(ops, RunnerWorker::runSingle);
s_runFutureWatcher.data()->setFuture(future);
e.exec();
......@@ -181,7 +168,7 @@ Akabei::Error::List RunnerRunnable::runConcurrent(const QList< Operation* >& ops
return Akabei::Error::List();
}
Akabei::Error::List RunnerRunnable::runSequential(const QList< Operation* >& ops)
Akabei::Error::List RunnerWorker::runSequential(const QList< Operation* >& ops)
{
foreach (Operation *op, ops) {
emit operationStarted(op);
......@@ -198,12 +185,53 @@ Akabei::Error::List RunnerRunnable::runSequential(const QList< Operation* >& ops
return Akabei::Error::List();
}
void RunnerRunnable::manageErrors(Akabei::Error::List const& errors)
void RunnerWorker::manageErrors(Akabei::Error::List const& errors)
{
emit errorsOccurred(errors);
emit runFinished(false);
}
void RunnerWorker::run()
{
// Start from phase 1.
m_currentPhase = Operation::Phase1;
processNextPhase();
}
RunnerRunnable::RunnerRunnable(const QHash<Operation::Phase, QList< Operation* > > &ops, ProcessingOptions processingOptions, QObject* parent)
: QObject(parent)
, QRunnable()
, m_operations(ops)
, m_worker(0)
{
s_options = processingOptions;
}
RunnerRunnable::~RunnerRunnable()
{
if (m_worker)
m_worker->deleteLater();
}
void RunnerRunnable::requestCancel()
{
}
void RunnerRunnable::run()
{
//Don't parent, otherwise we break thread-affinity!!!
m_worker = new RunnerWorker(m_operations);
emit ready();
m_worker->run();
}
RunnerWorker * RunnerRunnable::worker()
{
return m_worker;
}
}
#include "akabeirunnerrunnable_p.moc"
......@@ -21,20 +21,12 @@
#include <akabeioperationrunner_p.h>
namespace Akabei {
class RunnerRunnable : public QObject, public QRunnable
class RunnerWorker : public QObject
{
Q_OBJECT
public:
RunnerRunnable(const OpsHash &ops, ProcessingOptions processingOptions, QObject* parent = 0);
virtual ~RunnerRunnable();
void requestCancel();
static OperationPrivate *operationPrivateProxy(Operation *op);
protected:
virtual void run();
explicit RunnerWorker(const OpsHash &ops, QObject* parent = 0);
virtual ~RunnerWorker();
Q_SIGNALS:
void errorsOccurred(Akabei::Error::List const& errors);
......@@ -45,17 +37,41 @@ class RunnerRunnable : public QObject, public QRunnable
void operationFinished(Akabei::Operation *operation);
private:
void run();
void processNextPhase();
Akabei::Error::List runConcurrent(const QList< Operation* > &ops);
Akabei::Error::List runSequential(const QList< Operation* > &ops);
static bool runSingle(Operation *op);
void manageErrors(Akabei::Error::List const& errors);
static OperationPrivate *operationPrivateProxy(Operation *op);
private:
QHash<Operation::Phase, QList< Operation* > > m_operations;
Operation::Phase m_currentPhase;
friend class OperationRunner;
friend class RunnerRunnable;
};
class RunnerRunnable : public QObject, public QRunnable
{
Q_OBJECT
public:
RunnerRunnable(const OpsHash &ops, ProcessingOptions processingOptions, QObject* parent = 0);
virtual ~RunnerRunnable();
RunnerWorker * worker();
void requestCancel();
protected:
virtual void run();
Q_SIGNALS:
void ready();
private:
QHash<Operation::Phase, QList< Operation* > > m_operations;
RunnerWorker * m_worker;
};
}
......
......@@ -427,6 +427,7 @@ ValidatorRunnable::ValidatorRunnable(const QHash<Operation::Phase, QList< Operat
: QObject()
, QRunnable()
, m_operations(ops)
, m_worker(0)
{
s_options = processingOptions;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment