Initial commit.

This commit is contained in:
Jonas Kvinge
2018-02-27 18:06:05 +01:00
parent 85d9664df7
commit b2b1ba7abe
1393 changed files with 177311 additions and 1 deletions

View File

@@ -0,0 +1,34 @@
// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// From Chromium src/base/macros.h
#include <stddef.h> // For size_t.
// The arraysize(arr) macro returns the # of elements in an array arr.
// The expression is a compile-time constant, and therefore can be
// used in defining new arrays, for example. If you use arraysize on
// a pointer by mistake, you will get a compile-time error.
//
// One caveat is that arraysize() doesn't accept any array of an
// anonymous type or a type defined inside a function. In these rare
// cases, you have to use the unsafe ARRAYSIZE_UNSAFE() macro below. This is
// due to a limitation in C++'s template system. The limitation might
// eventually be removed, but it hasn't happened yet.
// This template function declaration is used in defining arraysize.
// Note that the function doesn't need an implementation, as we only
// use its type.
template <typename T, size_t N>
char (&ArraySizeHelper(T (&array)[N]))[N];
// That gcc wants both of these prototypes seems mysterious. VC, for
// its part, can't decide which to use (another mystery). Matching of
// template overloads: the final frontier.
#ifndef _MSC_VER
template <typename T, size_t N>
char (&ArraySizeHelper(const T (&array)[N]))[N];
#endif
#define arraysize(array) (sizeof(ArraySizeHelper(array)))

View File

@@ -0,0 +1,71 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#include "closure.h"
#include "core/timeconstants.h"
namespace _detail {
ClosureBase::ClosureBase(ObjectHelper *helper)
: helper_(helper) {
}
ClosureBase::~ClosureBase() {
}
CallbackClosure::CallbackClosure(QObject *sender, const char *signal, std::function<void()> callback)
: ClosureBase(new ObjectHelper(sender, signal, this)),
callback_(callback) {
}
void CallbackClosure::Invoke() {
callback_();
}
ObjectHelper* ClosureBase::helper() const {
return helper_;
}
ObjectHelper::ObjectHelper(QObject *sender, const char *signal, ClosureBase *closure) : closure_(closure) {
connect(sender, signal, SLOT(Invoked()));
connect(sender, SIGNAL(destroyed()), SLOT(deleteLater()));
}
void ObjectHelper::Invoked() {
closure_->Invoke();
deleteLater();
}
void Unpack(QList<QGenericArgument>*) {}
} // namespace _detail
_detail::ClosureBase* NewClosure(QObject *sender, const char *signal, std::function<void()> callback) {
return new _detail::CallbackClosure(sender, signal, callback);
}
void DoAfter(QObject *receiver, const char *slot, int msec) {
QTimer::singleShot(msec, receiver, slot);
}
void DoInAMinuteOrSo(QObject *receiver, const char *slot) {
int msec = (60 + (qrand() % 60)) * kMsecPerSec;
DoAfter(receiver, slot, msec);
}

View File

@@ -0,0 +1,248 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef CLOSURE_H
#define CLOSURE_H
#include <chrono>
#include <functional>
#include <memory>
#include <QFuture>
#include <QFutureWatcher>
#include <QMetaMethod>
#include <QObject>
#include <QSharedPointer>
#include <QTimer>
namespace _detail {
class ObjectHelper;
// Interface for ObjectHelper to call on signal emission.
class ClosureBase {
public:
virtual ~ClosureBase();
virtual void Invoke() = 0;
// Tests only.
ObjectHelper* helper() const;
protected:
explicit ClosureBase(ObjectHelper*);
ObjectHelper* helper_;
private:
Q_DISABLE_COPY(ClosureBase);
};
// QObject helper as templated QObjects do not work.
// Connects to the given signal and invokes the closure when called.
// Deletes itself and the Closure after being invoked.
class ObjectHelper : public QObject {
Q_OBJECT
public:
ObjectHelper(QObject* parent, const char* signal, ClosureBase* closure);
private slots:
void Invoked();
private:
std::unique_ptr<ClosureBase> closure_;
Q_DISABLE_COPY(ObjectHelper);
};
// Helpers for unpacking a variadic template list.
// Base case of no arguments.
void Unpack(QList<QGenericArgument>*);
template <typename Arg>
void Unpack(QList<QGenericArgument>* list, const Arg& arg) {
list->append(Q_ARG(Arg, arg));
}
template <typename Head, typename... Tail>
void Unpack(QList<QGenericArgument>* list, const Head& head, const Tail&... tail) {
Unpack(list, head);
Unpack(list, tail...);
}
template <typename... Args>
class Closure : public ClosureBase {
public:
Closure(
QObject* sender,
const char* signal,
QObject* receiver,
const char* slot,
const Args&... args)
: ClosureBase(new ObjectHelper(sender, signal, this)),
// std::bind is the easiest way to store an argument list.
function_(std::bind(&Closure<Args...>::Call, this, args...)),
receiver_(receiver) {
const QMetaObject* meta_receiver = receiver->metaObject();
QByteArray normalised_slot = QMetaObject::normalizedSignature(slot + 1);
const int index = meta_receiver->indexOfSlot(normalised_slot.constData());
Q_ASSERT(index != -1);
slot_ = meta_receiver->method(index);
QObject::connect(receiver_, SIGNAL(destroyed()), helper_, SLOT(deleteLater()));
}
virtual void Invoke() {
function_();
}
private:
void Call(const Args&... args) {
QList<QGenericArgument> arg_list;
Unpack(&arg_list, args...);
slot_.invoke(
receiver_,
arg_list.size() > 0 ? arg_list[0] : QGenericArgument(),
arg_list.size() > 1 ? arg_list[1] : QGenericArgument(),
arg_list.size() > 2 ? arg_list[2] : QGenericArgument(),
arg_list.size() > 3 ? arg_list[3] : QGenericArgument(),
arg_list.size() > 4 ? arg_list[4] : QGenericArgument(),
arg_list.size() > 5 ? arg_list[5] : QGenericArgument(),
arg_list.size() > 6 ? arg_list[6] : QGenericArgument(),
arg_list.size() > 7 ? arg_list[7] : QGenericArgument(),
arg_list.size() > 8 ? arg_list[8] : QGenericArgument(),
arg_list.size() > 9 ? arg_list[9] : QGenericArgument());
}
std::function<void()> function_;
QObject* receiver_;
QMetaMethod slot_;
};
template <typename T, typename... Args>
class SharedClosure : public Closure<Args...> {
public:
SharedClosure(
QSharedPointer<T> sender,
const char* signal,
QObject* receiver,
const char* slot,
const Args&... args)
: Closure<Args...>(
sender.data(),
signal,
receiver,
slot,
args...),
data_(sender) {
}
private:
QSharedPointer<T> data_;
};
class CallbackClosure : public ClosureBase {
public:
CallbackClosure(QObject* sender, const char* signal,
std::function<void()> callback);
virtual void Invoke();
private:
std::function<void()> callback_;
};
} // namespace _detail
template <typename... Args>
_detail::ClosureBase* NewClosure(
QObject* sender,
const char* signal,
QObject* receiver,
const char* slot,
const Args&... args) {
return new _detail::Closure<Args...>(
sender, signal, receiver, slot, args...);
}
// QSharedPointer variant
template <typename T, typename... Args>
_detail::ClosureBase* NewClosure(
QSharedPointer<T> sender,
const char* signal,
QObject* receiver,
const char* slot,
const Args&... args) {
return new _detail::SharedClosure<T, Args...>(
sender, signal, receiver, slot, args...);
}
_detail::ClosureBase* NewClosure(QObject* sender, const char* signal, std::function<void()> callback);
template <typename... Args>
_detail::ClosureBase* NewClosure(QObject* sender, const char* signal, std::function<void(Args...)> callback, const Args&... args) {
return NewClosure(sender, signal, std::bind(callback, args...));
}
template <typename... Args>
_detail::ClosureBase* NewClosure(
QObject* sender,
const char* signal,
void (*callback)(Args...),
const Args&... args) {
return NewClosure(sender, signal, std::bind(callback, args...));
}
template <typename T, typename Unused, typename... Args>
_detail::ClosureBase* NewClosure(
QObject* sender,
const char* signal,
T* receiver, Unused (T::*callback)(Args...),
const Args&... args) {
return NewClosure(sender, signal, std::bind(callback, receiver, args...));
}
template <typename T, typename... Args>
_detail::ClosureBase* NewClosure(QFuture<T> future, QObject* receiver, const char* slot, const Args&... args) {
QFutureWatcher<T>* watcher = new QFutureWatcher<T>;
watcher->setFuture(future);
QObject::connect(watcher, SIGNAL(finished()), watcher, SLOT(deleteLater()));
return NewClosure(watcher, SIGNAL(finished()), receiver, slot, args...);
}
template <typename T, typename F, typename... Args>
_detail::ClosureBase* NewClosure(QFuture<T> future, const F& callback, const Args&... args) {
QFutureWatcher<T>* watcher = new QFutureWatcher<T>;
watcher->setFuture(future);
QObject::connect(watcher, SIGNAL(finished()), watcher, SLOT(deleteLater()));
return NewClosure(watcher, SIGNAL(finished()), callback, args...);
}
void DoAfter(QObject* receiver, const char* slot, int msec);
void DoAfter(std::function<void()> callback, std::chrono::milliseconds msec);
void DoInAMinuteOrSo(QObject* receiver, const char* slot);
template <typename R, typename P>
void DoAfter(std::function<void()> callback, std::chrono::duration<R, P> duration) {
QTimer* timer = new QTimer;
timer->setSingleShot(true);
NewClosure(timer, SIGNAL(timeout()), callback);
QObject::connect(timer, SIGNAL(timeout()), timer, SLOT(deleteLater()));
std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(duration);
timer->start(msec.count());
}
#endif // CLOSURE_H

View File

@@ -0,0 +1,137 @@
/* This file is part of Strawberry.
Copyright 2012, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef CONCURRENTRUN_H
#define CONCURRENTRUN_H
#include <functional>
#include <QFuture>
#include <QRunnable>
#include <QThreadPool>
/*
The aim of ThreadFunctor classes and ConcurrentRun::Run() functions is to
complete QtConcurrentRun, which lack support for using a particular
QThreadPool, as it always uses QThreadPool::globalInstance().
This is problematic when we do not want to share the same thread pool over
all the application, but want to keep the convenient QtConcurrent::run()
functor syntax.
With ConcurrentRun::Run(), time critical changes can be performed in their
own pool, which is not empty by other actions (as it happens when using
QtConcurrentRun::run()).
ThreadFunctor classes are used to store a functor and its arguments, and
Run() functions are used for convenience: to directly create a new
ThreadFunctor object and start it.
*/
/*
Base abstract classes ThreadFunctorBase and ThreadFunctor (for void and
non-void result):
*/
template<typename ReturnType>
class ThreadFunctorBase : public QFutureInterface<ReturnType>, public QRunnable {
public:
ThreadFunctorBase() {}
QFuture<ReturnType> Start(QThreadPool* thread_pool) {
this->setRunnable(this);
this->reportStarted();
Q_ASSERT(thread_pool);
QFuture<ReturnType> future = this->future();
thread_pool->start(this, 0 /* priority: currently we do not support
changing the priority. Might be added later
if needed */);
return future;
}
virtual void run() = 0;
};
template <typename ReturnType, typename... Args>
class ThreadFunctor : public ThreadFunctorBase<ReturnType> {
public:
ThreadFunctor(std::function<ReturnType (Args...)> function,
Args... args)
: function_(std::bind(function, args...)) {
}
virtual void run() {
this->reportResult(function_());
this->reportFinished();
}
private:
std::function<ReturnType()> function_;
};
// Partial specialisation for void return type.
template <typename... Args>
class ThreadFunctor <void, Args...> : public ThreadFunctorBase<void> {
public:
ThreadFunctor(std::function<void (Args...)> function,
Args... args)
: function_(std::bind(function, args...)) {
}
virtual void run() {
function_();
this->reportFinished();
}
private:
std::function<void()> function_;
};
/*
Run functions
*/
namespace ConcurrentRun {
// Empty argument form.
template <typename ReturnType>
QFuture<ReturnType> Run(
QThreadPool* threadpool,
std::function<ReturnType ()> function) {
return (new ThreadFunctor<ReturnType>(function))->Start(threadpool);
}
// Function object with arguments form.
template <typename ReturnType, typename... Args>
QFuture<ReturnType> Run(
QThreadPool* threadpool,
std::function<ReturnType (Args...)> function,
const Args&... args) {
return (new ThreadFunctor<ReturnType, Args...>(
function, args...))->Start(threadpool);
}
// Support passing C function pointers instead of function objects.
template <typename ReturnType, typename... Args>
QFuture<ReturnType> Run(
QThreadPool* threadpool,
ReturnType (*function) (Args...),
const Args&... args) {
return Run(
threadpool, std::function<ReturnType (Args...)>(function), args...);
}
}
#endif // CONCURRENTRUN_H

View File

@@ -0,0 +1,66 @@
/* This file is part of Clementine.
Copyright 2016, John Maguire <john.maguire@gmail.com>
Clementine is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Clementine is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Clementine. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef LAZY_H
#define LAZY_H
#include <functional>
#include <memory>
// Helper for lazy initialisation of objects.
// Usage:
// Lazy<Foo> my_lazy_object([]() { return new Foo; });
template <typename T>
class Lazy {
public:
explicit Lazy(std::function<T*()> init) : init_(init) {}
// Convenience constructor that will lazily default construct the object.
Lazy() : init_([]() { return new T; }) {}
T* get() const {
CheckInitialised();
return ptr_.get();
}
typename std::add_lvalue_reference<T>::type operator*() const {
CheckInitialised();
return *ptr_;
}
T* operator->() const { return get(); }
// Returns true if the object is not yet initialised.
explicit operator bool() const { return ptr_; }
// Deletes the underlying object and will re-run the initialisation function
// if the object is requested again.
void reset() { ptr_.reset(nullptr); }
private:
void CheckInitialised() const {
if (!ptr_) {
ptr_.reset(init_());
}
}
const std::function<T*()> init_;
mutable std::unique_ptr<T> ptr_;
};
#endif // LAZY_H

View File

@@ -0,0 +1,292 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <glib.h>
#include <iostream>
#include <QtGlobal>
#include <cxxabi.h>
#ifdef Q_OS_UNIX
#include <execinfo.h>
#endif
#include <QCoreApplication>
#include <QDateTime>
#include <QStringList>
#include <QtMessageHandler>
#include "logging.h"
namespace logging {
static Level sDefaultLevel = Level_Debug;
static QMap<QString, Level>* sClassLevels = nullptr;
static QIODevice *sNullDevice = nullptr;
const char* kDefaultLogLevels = "GstEnginePipeline:2,*:3";
static const char *kMessageHandlerMagic = "__logging_message__";
static const int kMessageHandlerMagicLength = strlen(kMessageHandlerMagic);
static QtMessageHandler sOriginalMessageHandler = nullptr;
void GLog(const char *domain, int level, const char *message, void *user_data) {
switch (level) {
case G_LOG_FLAG_RECURSION:
case G_LOG_FLAG_FATAL:
case G_LOG_LEVEL_ERROR:
case G_LOG_LEVEL_CRITICAL: qLog(Error) << message; break;
case G_LOG_LEVEL_WARNING: qLog(Warning) << message; break;
case G_LOG_LEVEL_MESSAGE:
case G_LOG_LEVEL_INFO: qLog(Info) << message; break;
case G_LOG_LEVEL_DEBUG:
default: qLog(Debug) << message; break;
}
}
static void MessageHandler(QtMsgType type, const QMessageLogContext &context, const QString &message) {
if (strncmp(kMessageHandlerMagic, message.toLocal8Bit().data(), kMessageHandlerMagicLength) == 0) {
fprintf(stderr, "%s\n", message.toLocal8Bit().data() + kMessageHandlerMagicLength);
return;
}
Level level = Level_Debug;
switch (type) {
case QtFatalMsg:
case QtCriticalMsg: level = Level_Error; break;
case QtWarningMsg: level = Level_Warning; break;
case QtDebugMsg:
default: level = Level_Debug; break;
}
for (const QString& line : message.split('\n')) {
CreateLogger(level, "unknown", -1) << line.toLocal8Bit().constData();
}
if (type == QtFatalMsg) {
abort();
}
}
void Init() {
delete sClassLevels;
delete sNullDevice;
sClassLevels = new QMap<QString, Level>();
sNullDevice = new NullDevice;
sNullDevice->open(QIODevice::ReadWrite);
// Catch other messages from Qt
if (!sOriginalMessageHandler) {
sOriginalMessageHandler = qInstallMessageHandler(MessageHandler);
}
}
void SetLevels(const QString &levels) {
if (!sClassLevels) return;
for (const QString& item : levels.split(',')) {
const QStringList class_level = item.split(':');
QString class_name;
bool ok = false;
int level = Level_Error;
if (class_level.count() == 1) {
level = class_level.last().toInt(&ok);
}
else if (class_level.count() == 2) {
class_name = class_level.first();
level = class_level.last().toInt(&ok);
}
if (!ok || level < Level_Error || level > Level_Debug) {
continue;
}
if (class_name.isEmpty() || class_name == "*") {
sDefaultLevel = (Level) level;
}
else {
sClassLevels->insert(class_name, (Level) level);
}
}
}
QString ParsePrettyFunction(const char *pretty_function) {
// Get the class name out of the function name.
QString class_name = pretty_function;
const int paren = class_name.indexOf('(');
if (paren != -1) {
const int colons = class_name.lastIndexOf("::", paren);
if (colons != -1) {
class_name = class_name.left(colons);
}
else {
class_name = class_name.left(paren);
}
}
const int space = class_name.lastIndexOf(' ');
if (space != -1) {
class_name = class_name.mid(space+1);
}
return class_name;
}
QDebug CreateLogger(Level level, const QString &class_name, int line) {
// Map the level to a string
const char *level_name = nullptr;
switch (level) {
case Level_Debug: level_name = " DEBUG "; break;
case Level_Info: level_name = " INFO "; break;
case Level_Warning: level_name = " WARN "; break;
case Level_Error: level_name = " ERROR "; break;
case Level_Fatal: level_name = " FATAL "; break;
}
// Check the settings to see if we're meant to show or hide this message.
Level threshold_level = sDefaultLevel;
if (sClassLevels && sClassLevels->contains(class_name)) {
threshold_level = sClassLevels->value(class_name);
}
if (level > threshold_level) {
return QDebug(sNullDevice);
}
QString function_line = class_name;
if (line != -1) {
function_line += ":" + QString::number(line);
}
QtMsgType type = QtDebugMsg;
if (level == Level_Fatal) {
type = QtFatalMsg;
}
QDebug ret(type);
ret.nospace() << kMessageHandlerMagic
<< QDateTime::currentDateTime()
.toString("hh:mm:ss.zzz")
.toLatin1()
.constData() << level_name
<< function_line.leftJustified(32).toLatin1().constData();
return ret.space();
}
QString CXXDemangle(const QString &mangled_function) {
int status;
char* demangled_function = abi::__cxa_demangle(mangled_function.toLatin1().constData(), nullptr, nullptr, &status);
if (status == 0) {
QString ret = QString::fromLatin1(demangled_function);
free(demangled_function);
return ret;
}
return mangled_function; // Probably not a C++ function.
}
QString DarwinDemangle(const QString &symbol) {
QStringList split = symbol.split(' ', QString::SkipEmptyParts);
QString mangled_function = split[3];
return CXXDemangle(mangled_function);
}
QString LinuxDemangle(const QString &symbol) {
QRegExp regex("\\(([^+]+)");
if (!symbol.contains(regex)) {
return symbol;
}
QString mangled_function = regex.cap(1);
return CXXDemangle(mangled_function);
}
QString DemangleSymbol(const QString &symbol) {
#ifdef Q_OS_DARWIN
return DarwinDemangle(symbol);
#elif defined(Q_OS_LINUX)
return LinuxDemangle(symbol);
#else
return symbol;
#endif
}
void DumpStackTrace() {
#ifdef Q_OS_UNIX
void* callstack[128];
int callstack_size = backtrace(reinterpret_cast<void**>(&callstack), sizeof(callstack));
char** symbols = backtrace_symbols(reinterpret_cast<void**>(&callstack), callstack_size);
// Start from 1 to skip ourself.
for (int i = 1; i < callstack_size; ++i) {
std::cerr << DemangleSymbol(QString::fromLatin1(symbols[i])).toStdString() << std::endl;
}
free(symbols);
#else
qLog(Debug) << "FIXME: Implement printing stack traces on this platform";
#endif
}
#if 0
QDebug CreateLoggerFatal(int line, const char *class_name) { return qCreateLogger(line, class_name, Fatal); }
QDebug CreateLoggerError(int line, const char *class_name) { return qCreateLogger(line, class_name, Error); }
#ifdef QT_NO_WARNING_OUTPUT
QNoDebug CreateLoggerWarning(int, const char*) { return QNoDebug(); }
#else
QDebug CreateLoggerWarning(int line, const char *class_name) { return qCreateLogger(line, class_name, Warning); }
#endif // QT_NO_WARNING_OUTPUT
#ifdef QT_NO_DEBUG_OUTPUT
QNoDebug CreateLoggerInfo(int, const char*) { return QNoDebug(); }
QNoDebug CreateLoggerDebug(int, const char*) { return QNoDebug(); }
#else
QDebug CreateLoggerInfo(int line, const char *class_name) { return qCreateLogger(line, class_name, Info); }
QDebug CreateLoggerDebug(int line, const char *class_name) { return qCreateLogger(line, class_name, Debug); }
#endif // QT_NO_DEBUG_OUTPUT
#endif
} // namespace logging
namespace {
template <typename T>
QString print_duration(T duration, const std::string& unit) {
return QString("%1%2").arg(duration.count()).arg(unit.c_str());
}
} // namespace
QDebug operator<<(QDebug dbg, std::chrono::seconds secs) {
dbg.nospace() << print_duration(secs, "s");
return dbg.space();
}

View File

@@ -0,0 +1,93 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef LOGGING_H
#define LOGGING_H
#include <chrono>
#include <string>
#include <QDebug>
#ifdef QT_NO_DEBUG_STREAM
# define qLog(level) while (false) QNoDebug()
#else
#define qLog(level) \
logging::CreateLogger(logging::Level_##level, \
logging::ParsePrettyFunction(__PRETTY_FUNCTION__), \
__LINE__)
#endif // QT_NO_DEBUG_STREAM
#if 0
#define qLog(level) \
logging::CreateLogger##level(__LINE__, __PRETTY_FUNCTION__)
#define qCreateLogger(line, class_name, level) \
logging::CreateLogger(logging::Level_##level, \
logging::ParsePrettyFunction(class_name), \
line)
#endif
namespace logging {
class NullDevice : public QIODevice {
protected:
qint64 readData(char*, qint64) { return -1; }
qint64 writeData(const char*, qint64 len) { return len; }
};
enum Level {
Level_Fatal = -1,
Level_Error = 0,
Level_Warning,
Level_Info,
Level_Debug,
};
void Init();
void SetLevels(const QString& levels);
void DumpStackTrace();
QString ParsePrettyFunction(const char* pretty_function);
QDebug CreateLogger(Level level, const QString &class_name, int line);
QDebug CreateLoggerFatal(int line, const char* class_name);
QDebug CreateLoggerError(int line, const char* class_name);
#ifdef QT_NO_WARNING_OUTPUT
QNoDebug CreateLoggerWarning(int, const char*);
#else
QDebug CreateLoggerWarning(int line, const char* class_name);
#endif // QT_NO_WARNING_OUTPUT
#ifdef QT_NO_DEBUG_OUTPUT
QNoDebug CreateLoggerInfo(int, const char*);
QNoDebug CreateLoggerDebug(int, const char*);
#else
QDebug CreateLoggerInfo(int line, const char* class_name);
QDebug CreateLoggerDebug(int line, const char* class_name);
#endif // QT_NO_DEBUG_OUTPUT
void GLog(const char* domain, int level, const char* message, void* user_data);
extern const char* kDefaultLogLevels;
}
QDebug operator<<(QDebug debug, std::chrono::seconds secs);
#endif // LOGGING_H

View File

@@ -0,0 +1,112 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "messagehandler.h"
#include "core/logging.h"
#include <QAbstractSocket>
#include <QLocalSocket>
#include <QDataStream>
_MessageHandlerBase::_MessageHandlerBase(QIODevice *device, QObject *parent)
: QObject(parent),
device_(nullptr),
flush_abstract_socket_(nullptr),
flush_local_socket_(nullptr),
reading_protobuf_(false),
expected_length_(0),
is_device_closed_(false) {
if (device) {
SetDevice(device);
}
}
void _MessageHandlerBase::SetDevice(QIODevice *device) {
device_ = device;
buffer_.open(QIODevice::ReadWrite);
connect(device, SIGNAL(readyRead()), SLOT(DeviceReadyRead()));
// Yeah I know.
if (QAbstractSocket *socket = qobject_cast<QAbstractSocket*>(device)) {
flush_abstract_socket_ = &QAbstractSocket::flush;
connect(socket, SIGNAL(disconnected()), SLOT(DeviceClosed()));
}
else if (QLocalSocket* socket = qobject_cast<QLocalSocket*>(device)) {
flush_local_socket_ = &QLocalSocket::flush;
connect(socket, SIGNAL(disconnected()), SLOT(DeviceClosed()));
}
else {
qFatal("Unsupported device type passed to _MessageHandlerBase");
}
}
void _MessageHandlerBase::DeviceReadyRead() {
while (device_->bytesAvailable()) {
if (!reading_protobuf_) {
// Read the length of the next message
QDataStream s(device_);
s >> expected_length_;
reading_protobuf_ = true;
}
// Read some of the message
buffer_.write(device_->read(expected_length_ - buffer_.size()));
// Did we get everything?
if (buffer_.size() == expected_length_) {
// Parse the message
if (!RawMessageArrived(buffer_.data())) {
qLog(Error) << "Malformed protobuf message";
device_->close();
return;
}
// Clear the buffer
buffer_.close();
buffer_.setData(QByteArray());
buffer_.open(QIODevice::ReadWrite);
reading_protobuf_ = false;
}
}
}
void _MessageHandlerBase::WriteMessage(const QByteArray &data) {
QDataStream s(device_);
s << quint32(data.length());
s.writeRawData(data.data(), data.length());
// Sorry.
if (flush_abstract_socket_) {
((static_cast<QAbstractSocket*>(device_))->*(flush_abstract_socket_))();
}
else if (flush_local_socket_) {
((static_cast<QLocalSocket*>(device_))->*(flush_local_socket_))();
}
}
void _MessageHandlerBase::DeviceClosed() {
is_device_closed_ = true;
AbortAll();
}

View File

@@ -0,0 +1,181 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef MESSAGEHANDLER_H
#define MESSAGEHANDLER_H
#include <QBuffer>
#include <QMap>
#include <QMutex>
#include <QMutexLocker>
#include <QObject>
#include <QSemaphore>
#include <QThread>
#include "core/logging.h"
#include "core/messagereply.h"
class QAbstractSocket;
class QIODevice;
class QLocalSocket;
#define QStringFromStdString(x) QString::fromUtf8(x.data(), x.size())
#define DataCommaSizeFromQString(x) x.toUtf8().constData(), x.toUtf8().length()
// Reads and writes uint32 length encoded protobufs to a socket.
// This base QObject is separate from AbstractMessageHandler because moc can't
// handle templated classes. Use AbstractMessageHandler instead.
class _MessageHandlerBase : public QObject {
Q_OBJECT
public:
// device can be NULL, in which case you must call SetDevice before writing
// any messages.
_MessageHandlerBase(QIODevice* device, QObject* parent);
void SetDevice(QIODevice* device);
// After this is true, messages cannot be sent to the handler any more.
bool is_device_closed() const { return is_device_closed_; }
protected slots:
void WriteMessage(const QByteArray& data);
void DeviceReadyRead();
virtual void DeviceClosed();
protected:
virtual bool RawMessageArrived(const QByteArray& data) = 0;
virtual void AbortAll() = 0;
protected:
typedef bool (QAbstractSocket::*FlushAbstractSocket)();
typedef bool (QLocalSocket::*FlushLocalSocket)();
QIODevice* device_;
FlushAbstractSocket flush_abstract_socket_;
FlushLocalSocket flush_local_socket_;
bool reading_protobuf_;
quint32 expected_length_;
QBuffer buffer_;
bool is_device_closed_;
};
// Reads and writes uint32 length encoded MessageType messages to a socket.
// You should subclass this and implement the MessageArrived(MessageType)
// method.
template <typename MT>
class AbstractMessageHandler : public _MessageHandlerBase {
public:
AbstractMessageHandler(QIODevice* device, QObject* parent);
~AbstractMessageHandler() { AbortAll(); }
typedef MT MessageType;
typedef MessageReply<MT> ReplyType;
// Serialises the message and writes it to the socket. This version MUST be
// called from the thread in which the AbstractMessageHandler was created.
void SendMessage(const MessageType& message);
// Serialises the message and writes it to the socket. This version may be
// called from any thread.
void SendMessageAsync(const MessageType& message);
// Sends the request message inside and takes ownership of the MessageReply.
// The MessageReply's Finished() signal will be emitted when a reply arrives
// with the same ID. Must be called from my thread.
void SendRequest(ReplyType* reply);
// Sets the "id" field of reply to the same as the request, and sends the
// reply on the socket. Used on the worker side.
void SendReply(const MessageType& request, MessageType* reply);
protected:
// Called when a message is received from the socket.
virtual void MessageArrived(const MessageType& message) {}
// _MessageHandlerBase
bool RawMessageArrived(const QByteArray& data);
void AbortAll();
private:
QMap<int, ReplyType*> pending_replies_;
};
template <typename MT>
AbstractMessageHandler<MT>::AbstractMessageHandler(QIODevice* device,
QObject* parent)
: _MessageHandlerBase(device, parent) {}
template <typename MT>
void AbstractMessageHandler<MT>::SendMessage(const MessageType& message) {
Q_ASSERT(QThread::currentThread() == thread());
std::string data = message.SerializeAsString();
WriteMessage(QByteArray(data.data(), data.size()));
}
template <typename MT>
void AbstractMessageHandler<MT>::SendMessageAsync(const MessageType& message) {
std::string data = message.SerializeAsString();
metaObject()->invokeMethod(this, "WriteMessage", Qt::QueuedConnection,
Q_ARG(QByteArray, QByteArray(data.data(), data.size())));
}
template<typename MT>
void AbstractMessageHandler<MT>::SendRequest(ReplyType* reply) {
pending_replies_[reply->id()] = reply;
SendMessage(reply->request_message());
}
template<typename MT>
void AbstractMessageHandler<MT>::SendReply(const MessageType& request,
MessageType* reply) {
reply->set_id(request.id());
SendMessage(*reply);
}
template<typename MT>
bool AbstractMessageHandler<MT>::RawMessageArrived(const QByteArray& data) {
MessageType message;
if (!message.ParseFromArray(data.constData(), data.size())) {
return false;
}
ReplyType* reply = pending_replies_.take(message.id());
if (reply) {
// This is a reply to a message that we created earlier.
reply->SetReply(message);
} else {
MessageArrived(message);
}
return true;
}
template<typename MT>
void AbstractMessageHandler<MT>::AbortAll() {
for (ReplyType* reply : pending_replies_) {
reply->Abort();
}
pending_replies_.clear();
}
#endif // MESSAGEHANDLER_H

View File

@@ -0,0 +1,38 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#include "messagereply.h"
_MessageReplyBase::_MessageReplyBase(QObject *parent)
: QObject(parent), finished_(false), success_(false) {}
bool _MessageReplyBase::WaitForFinished() {
qLog(Debug) << "Waiting on ID" << id();
semaphore_.acquire();
qLog(Debug) << "Acquired ID" << id();
return success_;
}
void _MessageReplyBase::Abort() {
Q_ASSERT(!finished_);
finished_ = true;
success_ = false;
emit Finished(success_);
qLog(Debug) << "Releasing ID" << id() << "(aborted)";
semaphore_.release();
}

View File

@@ -0,0 +1,97 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MESSAGEREPLY_H
#define MESSAGEREPLY_H
#include <QObject>
#include <QSemaphore>
#include "core/logging.h"
// Base QObject for a reply future class that is returned immediately for
// requests that will occur in the background. Similar to QNetworkReply.
// Use MessageReply instead.
class _MessageReplyBase : public QObject {
Q_OBJECT
public:
_MessageReplyBase(QObject* parent = nullptr);
virtual int id() const = 0;
bool is_finished() const { return finished_; }
bool is_successful() const { return success_; }
// Waits for the reply to finish by waiting on a semaphore. Never call this
// from the MessageHandler's thread or it will block forever.
// Returns true if the call was successful.
bool WaitForFinished();
void Abort();
signals:
void Finished(bool success);
protected:
bool finished_;
bool success_;
QSemaphore semaphore_;
};
// A reply future class that is returned immediately for requests that will
// occur in the background. Similar to QNetworkReply.
template <typename MessageType>
class MessageReply : public _MessageReplyBase {
public:
MessageReply(const MessageType& request_message, QObject* parent = nullptr);
int id() const { return request_message_.id(); }
const MessageType& request_message() const { return request_message_; }
const MessageType& message() const { return reply_message_; }
void SetReply(const MessageType& message);
private:
MessageType request_message_;
MessageType reply_message_;
};
template<typename MessageType>
MessageReply<MessageType>::MessageReply(const MessageType& request_message,
QObject* parent)
: _MessageReplyBase(parent)
{
request_message_.MergeFrom(request_message);
}
template<typename MessageType>
void MessageReply<MessageType>::SetReply(const MessageType& message) {
Q_ASSERT(!finished_);
reply_message_.MergeFrom(message);
finished_ = true;
success_ = true;
emit Finished(success_);
qLog(Debug) << "Releasing ID" << id() << "(finished)";
semaphore_.release();
}
#endif // MESSAGEREPLY_H

View File

@@ -0,0 +1,33 @@
/* This file is part of Strawberry.
Copyright 2012, David Sansome <me@davidsansome.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef OVERRIDE_H
#define OVERRIDE_H
// Defines the OVERRIDE macro as C++11's override control keyword if
// it is available.
#ifndef __has_extension
#define __has_extension(x) 0
#endif
#if __has_extension(cxx_override_control) // Clang feature checking macro.
# define OVERRIDE override
#else
# define OVERRIDE
#endif
#endif // OVERRIDE_H

View File

@@ -0,0 +1,26 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#include "waitforsignal.h"
#include <QEventLoop>
void WaitForSignal(QObject *sender, const char *signal) {
QEventLoop loop;
QObject::connect(sender, signal, &loop, SLOT(quit()));
loop.exec();
}

View File

@@ -0,0 +1,25 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef WAITFORSIGNAL_H
#define WAITFORSIGNAL_H
class QObject;
void WaitForSignal(QObject *sender, const char *signal);
#endif // WAITFORSIGNAL_H

View File

@@ -0,0 +1,20 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#include "workerpool.h"
_WorkerPoolBase::_WorkerPoolBase(QObject *parent) : QObject(parent) {}

View File

@@ -0,0 +1,402 @@
/* This file is part of Strawberry.
Copyright 2011, David Sansome <me@davidsansome.com>
Strawberry is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Strawberry is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Strawberry. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef WORKERPOOL_H
#define WORKERPOOL_H
#include <QAtomicInt>
#include <QCoreApplication>
#include <QFile>
#include <QLocalServer>
#include <QLocalSocket>
#include <QMutex>
#include <QObject>
#include <QProcess>
#include <QQueue>
#include <QThread>
#include "core/closure.h"
#include "core/logging.h"
// Base class containing signals and slots - required because moc doesn't do
// templated objects.
class _WorkerPoolBase : public QObject {
Q_OBJECT
public:
_WorkerPoolBase(QObject* parent = nullptr);
signals:
// Emitted when a worker failed to start. This usually happens when the
// worker wasn't found, or couldn't be executed.
void WorkerFailedToStart();
protected slots:
virtual void DoStart() {}
virtual void NewConnection() {}
virtual void ProcessError(QProcess::ProcessError) {}
virtual void SendQueuedMessages() {}
};
// Manages a pool of one or more external processes. A local socket server is
// started for each process, and the address is passed to the process as
// argv[1]. The process is expected to connect back to the socket server, and
// when it does a HandlerType is created for it.
// Instances of HandlerType are created in the WorkerPool's thread.
template <typename HandlerType>
class WorkerPool : public _WorkerPoolBase {
public:
WorkerPool(QObject* parent = nullptr);
~WorkerPool();
typedef typename HandlerType::MessageType MessageType;
typedef typename HandlerType::ReplyType ReplyType;
// Sets the name of the worker executable. This is looked for first in the
// current directory, and then in $PATH. You must call this before calling
// Start().
void SetExecutableName(const QString& executable_name);
// Sets the number of worker process to use. Defaults to
// 1 <= (processors / 2) <= 2.
void SetWorkerCount(int count);
// Sets the prefix to use for the local server (on unix this is a named pipe
// in /tmp). Defaults to QApplication::applicationName(). A random number
// is appended to this name when creating each server.
void SetLocalServerName(const QString& local_server_name);
// Starts all workers.
void Start();
// Fills in the message's "id" field and creates a reply future. The message
// is queued and the WorkerPool's thread will send it to the next available
// worker. Can be called from any thread.
ReplyType* SendMessageWithReply(MessageType* message);
protected:
// These are all reimplemented slots, they are called on the WorkerPool's
// thread.
void DoStart();
void NewConnection();
void ProcessError(QProcess::ProcessError error);
void SendQueuedMessages();
private:
struct Worker {
Worker() : local_server_(NULL), local_socket_(NULL), process_(NULL), handler_(NULL) {}
QLocalServer *local_server_;
QLocalSocket *local_socket_;
QProcess *process_;
HandlerType* handler_;
};
// Must only ever be called on my thread.
void StartOneWorker(Worker* worker);
template <typename T>
Worker* FindWorker(T Worker::*member, T value) {
for (typename QList<Worker>::iterator it = workers_.begin() ;
it != workers_.end() ; ++it) {
if ((*it).*member == value) {
return &(*it);
}
}
return NULL;
}
template <typename T>
void DeleteQObjectPointerLater(T** p) {
if (*p) {
(*p)->deleteLater();
*p = NULL;
}
}
// Creates a new reply future for the request with the next sequential ID,
// and sets the request's ID to the ID of the reply. Can be called from any
// thread
ReplyType* NewReply(MessageType* message);
// Returns the next handler, or NULL if there isn't one. Must be called from
// my thread.
HandlerType* NextHandler() const;
private:
QString local_server_name_;
QString executable_name_;
QString executable_path_;
int worker_count_;
mutable int next_worker_;
QList<Worker> workers_;
QAtomicInt next_id_;
QMutex message_queue_mutex_;
QQueue<ReplyType*> message_queue_;
};
template <typename HandlerType>
WorkerPool<HandlerType>::WorkerPool(QObject* parent)
: _WorkerPoolBase(parent),
next_worker_(0),
next_id_(0)
{
worker_count_ = qBound(1, QThread::idealThreadCount() / 2, 2);
local_server_name_ = qApp->applicationName().toLower();
if (local_server_name_.isEmpty())
local_server_name_ = "workerpool";
}
template <typename HandlerType>
WorkerPool<HandlerType>::~WorkerPool() {
for (const Worker& worker : workers_) {
if (worker.local_socket_ && worker.process_) {
disconnect(worker.process_, SIGNAL(error(QProcess::ProcessError)), this, SLOT(ProcessError(QProcess::ProcessError)));
// The worker is connected. Close his socket and wait for him to exit.
qLog(Debug) << "Closing worker socket";
worker.local_socket_->close();
worker.process_->waitForFinished(500);
}
if (worker.process_ && worker.process_->state() == QProcess::Running) {
// The worker is still running - kill it.
qLog(Debug) << "Killing worker process";
worker.process_->terminate();
if (!worker.process_->waitForFinished(500)) {
worker.process_->kill();
}
}
}
for (ReplyType* reply : message_queue_) {
reply->Abort();
}
}
template <typename HandlerType>
void WorkerPool<HandlerType>::SetWorkerCount(int count) {
Q_ASSERT(workers_.isEmpty());
worker_count_ = count;
}
template <typename HandlerType>
void WorkerPool<HandlerType>::SetLocalServerName(const QString& local_server_name) {
Q_ASSERT(workers_.isEmpty());
local_server_name_ = local_server_name;
}
template <typename HandlerType>
void WorkerPool<HandlerType>::SetExecutableName(const QString& executable_name) {
Q_ASSERT(workers_.isEmpty());
executable_name_ = executable_name;
}
template <typename HandlerType>
void WorkerPool<HandlerType>::Start() {
metaObject()->invokeMethod(this, "DoStart");
}
template <typename HandlerType>
void WorkerPool<HandlerType>::DoStart() {
Q_ASSERT(workers_.isEmpty());
Q_ASSERT(!executable_name_.isEmpty());
Q_ASSERT(QThread::currentThread() == thread());
// Find the executable if we can, default to searching $PATH
executable_path_ = executable_name_;
QStringList search_path;
search_path << qApp->applicationDirPath();
#ifdef Q_OS_MAC
search_path << qApp->applicationDirPath() + "/../PlugIns";
#endif
for (const QString& path_prefix : search_path) {
const QString executable_path = path_prefix + "/" + executable_name_;
if (QFile::exists(executable_path)) {
executable_path_ = executable_path;
break;
}
}
// Start all the workers
for (int i = 0; i < worker_count_; ++i) {
Worker worker;
StartOneWorker(&worker);
workers_ << worker;
}
}
template <typename HandlerType>
void WorkerPool<HandlerType>::StartOneWorker(Worker *worker) {
Q_ASSERT(QThread::currentThread() == thread());
DeleteQObjectPointerLater(&worker->local_server_);
DeleteQObjectPointerLater(&worker->local_socket_);
DeleteQObjectPointerLater(&worker->process_);
DeleteQObjectPointerLater(&worker->handler_);
worker->local_server_ = new QLocalServer(this);
worker->process_ = new QProcess(this);
connect(worker->local_server_, SIGNAL(newConnection()), SLOT(NewConnection()));
connect(worker->process_, SIGNAL(error(QProcess::ProcessError)), SLOT(ProcessError(QProcess::ProcessError)));
// Create a server, find an unused name and start listening
forever {
const int unique_number = qrand() ^ ((int)(quint64(this) & 0xFFFFFFFF));
const QString name = QString("%1_%2").arg(local_server_name_).arg(unique_number);
if (worker->local_server_->listen(name)) {
break;
}
}
qLog(Debug) << "Starting worker" << worker << executable_path_ << worker->local_server_->fullServerName();
// Start the process
worker->process_->setProcessChannelMode(QProcess::ForwardedChannels);
worker->process_->start(executable_path_, QStringList() << worker->local_server_->fullServerName());
}
template <typename HandlerType>
void WorkerPool<HandlerType>::NewConnection() {
Q_ASSERT(QThread::currentThread() == thread());
QLocalServer *server = qobject_cast<QLocalServer*>(sender());
// Find the worker with this server.
Worker* worker = FindWorker(&Worker::local_server_, server);
if (!worker) return;
qLog(Debug) << "Worker" << worker << "connected to" << server->fullServerName();
// Accept the connection.
worker->local_socket_ = server->nextPendingConnection();
// We only ever accept one connection per worker, so destroy the server now.
worker->local_socket_->setParent(this);
worker->local_server_->deleteLater();
worker->local_server_ = NULL;
// Create the handler.
worker->handler_ = new HandlerType(worker->local_socket_, this);
SendQueuedMessages();
}
template <typename HandlerType>
void WorkerPool<HandlerType>::ProcessError(QProcess::ProcessError error) {
Q_ASSERT(QThread::currentThread() == thread());
QProcess *process = qobject_cast<QProcess*>(sender());
// Find the worker with this process.
Worker *worker = FindWorker(&Worker::process_, process);
if (!worker) return;
switch (error) {
case QProcess::FailedToStart:
// Failed to start errors are bad - it usually means the worker isn't
// installed. Don't restart the process, but tell our owner, who will
// probably want to do something fatal.
qLog(Error) << "Worker failed to start";
emit WorkerFailedToStart();
break;
default:
// On any other error we just restart the process.
qLog(Debug) << "Worker" << worker << "failed with error" << error << "- restarting";
StartOneWorker(worker);
break;
}
}
template <typename HandlerType>
typename WorkerPool<HandlerType>::ReplyType*
WorkerPool<HandlerType>::NewReply(MessageType* message) {
const int id = next_id_.fetchAndAddOrdered(1);
message->set_id(id);
return new ReplyType(*message);
}
template <typename HandlerType>
typename WorkerPool<HandlerType>::ReplyType*
WorkerPool<HandlerType>::SendMessageWithReply(MessageType* message) {
ReplyType* reply = NewReply(message);
// Add the pending reply to the queue
{
QMutexLocker l(&message_queue_mutex_);
message_queue_.enqueue(reply);
}
// Wake up the main thread
metaObject()->invokeMethod(this, "SendQueuedMessages", Qt::QueuedConnection);
return reply;
}
template <typename HandlerType>
void WorkerPool<HandlerType>::SendQueuedMessages() {
QMutexLocker l(&message_queue_mutex_);
while (!message_queue_.isEmpty()) {
ReplyType *reply = message_queue_.dequeue();
// Find a worker for this message
HandlerType* handler = NextHandler();
if (!handler) {
// No available handlers - put the message on the front of the queue.
message_queue_.prepend(reply);
qLog(Debug) << "No available handlers to process request";
break;
}
handler->SendRequest(reply);
}
}
template <typename HandlerType>
HandlerType *WorkerPool<HandlerType>::NextHandler() const {
for (int i = 0; i < workers_.count(); ++i) {
const int worker_index = (next_worker_ + i) % workers_.count();
if (workers_[worker_index].handler_ &&
!workers_[worker_index].handler_->is_device_closed()) {
next_worker_ = (worker_index + 1) % workers_.count();
return workers_[worker_index].handler_;
}
}
return NULL;
}
#endif // WORKERPOOL_H