26 #include <QCoreApplication>
32 const int C_WAIT_TIME = 30 * 1000;
37 WorkerPool::WorkerPool()
38 : m_socket(new QUdpSocket()), m_tcpSocket(0), m_mapper(new QSignalMapper(this)),
39 m_release_mapper(new QSignalMapper(this)), m_initialized(false),
40 m_initialize_timer(nullptr), m_min_nodes(0), m_check_timer(new QTimer(this))
46 void WorkerPool::Initialize()
48 if (!m_socket->bind(QHostAnyAddress(), 45678, QUdpSocket::ShareAddress)) {
49 std::cerr <<
"WARNING: couldn't bind to the discovery port";
50 if (!m_initialize_timer) {
51 m_initialize_timer =
new QTimer();
52 connect(m_initialize_timer, SIGNAL(timeout()),
this, SLOT(Initialize()));
53 m_initialize_timer->start(1000);
58 m_socket->joinMulticastGroup(QHostAddress(
"239.13.13.13"));
59 connect(m_socket, SIGNAL(readyRead()),
this, SLOT(handleDatagrams()));
63 delete m_initialize_timer;
65 connect(QCoreApplication::instance(), SIGNAL(aboutToQuit()),
this, SLOT(deleteLater()));
66 connect(m_check_timer, SIGNAL(timeout()),
this, SLOT(PeriodicCheck()));
67 m_check_timer->setSingleShot(
false);
68 m_check_timer->start(C_WAIT_TIME);
71 WorkerPool::~WorkerPool()
77 bool contains(
const std::list<QString>& l, QString s)
86 void WorkerPool::handleDatagrams()
89 while (m_socket->hasPendingDatagrams()) {
91 datagram.resize(m_socket->pendingDatagramSize());
94 QHostAddress *senderAddr =
new QHostAddress();
96 m_socket->readDatagram(datagram.data(), datagram.size(), senderAddr);
101 std::string taskName =
"";
102 QString portString =
"";
104 const int FIND_SERVER_MESSAGE = 0;
105 const int FIND_SERVER_WHILE_WORKING_MESSAGE = 1;
106 const int BAD_MESSAGE = -1;
110 if (s.startsWith(
"Job Application")) {
111 QString msg(
"Job Application");
112 s.remove(0, msg.length() + 1);
114 port = portString.toInt();
115 messageType = FIND_SERVER_MESSAGE;
116 }
else if (s.startsWith(
"Server Discovery")) {
117 QString msg(
"Server Discovery");
118 s.remove(0, msg.length() + 1);
119 int index = s.indexOf(
" ");
120 QString idString = s.left(index + 1);
121 taskId = idString.toInt();
122 s.remove(0, index + 1);
123 index = s.indexOf(
" ");
124 portString = s.left(index + 1);
125 port = portString.toInt();
126 s.remove(0, index + 1);
127 taskName = s.toStdString();
128 messageType = FIND_SERVER_WHILE_WORKING_MESSAGE;
131 catch (std::exception &) {
132 messageType = BAD_MESSAGE;
135 messageType = BAD_MESSAGE;
138 switch (messageType) {
139 case FIND_SERVER_MESSAGE:
140 if (senderAddr && port) {
141 QString
id = senderAddr->toString() +
":" + s;
142 if (!contains(m_connected_list,
id)) {
143 m_connected_list.push_back(
id);
145 WorkerRepresentative *rep =
new WorkerRepresentative(*senderAddr, port);
147 connect(rep, SIGNAL(ReadyToWork()), m_mapper, SLOT(map()));
148 m_mapper->setMapping(rep, rep);
150 connect(rep, SIGNAL(Disconnected()), m_release_mapper, SLOT(map()));
151 m_release_mapper->setMapping(rep, rep);
157 case FIND_SERVER_WHILE_WORKING_MESSAGE:
158 if (senderAddr && port) {
159 QString
id = senderAddr->toString() +
":" + portString;
160 if (!contains(m_connected_list,
id)) {
161 m_connected_list.push_back(
id);
163 WorkerRepresentative *rep =
new WorkerRepresentative(*senderAddr, port, taskId,
166 connect(rep, SIGNAL(ReadyToWork()), m_mapper, SLOT(map()));
167 m_mapper->setMapping(rep, rep);
169 connect(rep, SIGNAL(Disconnected()), m_release_mapper, SLOT(map()));
170 m_release_mapper->setMapping(rep, rep);
198 if (m_available_list.empty()) {
202 m_available_list.pop_front();
203 m_busy_list.push_back(rep);
217 m_busy_list.remove(rep);
218 m_available_list.push_back(rep);
232 m_busy_list.remove(rep);
233 m_available_list.remove(rep);
235 m_connected_list.remove(
id);
241 return !m_available_list.empty();
244 void WorkerPool::StartWorker()
246 QString nodePath = QCoreApplication::applicationFilePath();
247 nodePath = nodePath.left(nodePath.lastIndexOf(
'/') + 1);
248 nodePath +=
"parex_node";
250 QStringList arguments;
252 node.startDetached(nodePath, arguments);
255 void WorkerPool::StartWorkers()
257 int neededWorkers = m_min_nodes - m_connected_list.size();
258 for (
int i = 0; i < neededWorkers; i++) {
263 void WorkerPool::PeriodicCheck()
273 m_min_nodes = min_nodes;
278 for (
auto worker : m_available_list) {
280 worker->Delete(name);
282 std::cout <<
"PROBLEM" << std::endl;
285 for (
auto worker : m_busy_list) {
287 worker->Delete(name);
289 std::cout <<
"PROBLEM" << std::endl;
bool WorkerAvailable()
Checks to see if a worker is ready to do some work.
void Delete(std::string name)
Delete an exploration on every node.
void ReleaseWorker(QObject *)
When a worker disconnects, this slot should be fired.
void MakeProcessAvailable(QObject *)
When a worker has finished his work, this slot should be fired.
A pool handling the workers that will do the actual work.
static WorkerPool * globalInstance()
A static method to get the instance of the workerPool.
void NewWorkerAvailable()
Emitted when a worker is ready for work.
int GetSenderPort() const
Gets the port of the worker.
void SetMinNumWorkers(int)
Set the minimum number of workers.
void WorkerReconnected(WorkerRepresentative *)
Emitted when a worker has reconnected after being disconnected.
Interface for WorkerPool.
const QHostAddress * GetSenderAddress() const
Gets the address of the worker.
WorkerRepresentative * getProcess()
Gets a process doing nothing if one is available, otherwise returns 0.
Namespace for SimPT parameter explorer package.
see the online Qt documentation
A worker taken as representative for multiple workers (handles the communication with the node)...
Interface for WorkerRepresentative.
Hack for QT 4 -> Qt 5 transition.