VPTissue Reference Manual
WorkerPool.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2011-2016 Universiteit Antwerpen
3  *
4  * Licensed under the EUPL, Version 1.1 or as soon they will be approved by
5  * the European Commission - subsequent versions of the EUPL (the "Licence");
6  * You may not use this work except in compliance with the Licence.
7  * You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl5
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the Licence is distributed on an "AS IS" basis,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the Licence for the specific language governing
13  * permissions and limitations under the Licence.
14  */
20 #include "WorkerPool.h"
21 
22 #include "QHostAnyAddress.h"
23 #include "WorkerRepresentative.h"
24 
25 #include <QByteArray>
26 #include <QCoreApplication>
27 #include <QDebug>
28 #include <QProcess>
29 
30 namespace {
31  //The interval on which the number of nodes is checked (in msec)
32  const int C_WAIT_TIME = 30 * 1000;
33 }
34 
35 namespace SimPT_Parex {
36 
37 WorkerPool::WorkerPool()
38  : m_socket(new QUdpSocket()), m_tcpSocket(0), m_mapper(new QSignalMapper(this)),
39  m_release_mapper(new QSignalMapper(this)), m_initialized(false),
40  m_initialize_timer(nullptr), m_min_nodes(0), m_check_timer(new QTimer(this))
41 {
42  Initialize();
43 
44 }
45 
46 void WorkerPool::Initialize()
47 {
48  if (!m_socket->bind(QHostAnyAddress(), 45678, QUdpSocket::ShareAddress)) {
49  std::cerr << "WARNING: couldn't bind to the discovery port";
50  if (!m_initialize_timer) {
51  m_initialize_timer = new QTimer();
52  connect(m_initialize_timer, SIGNAL(timeout()), this, SLOT(Initialize()));
53  m_initialize_timer->start(1000);
54  }
55  return;
56  }
57 
58  m_socket->joinMulticastGroup(QHostAddress("239.13.13.13"));
59  connect(m_socket, SIGNAL(readyRead()), this, SLOT(handleDatagrams()));
60  connect(m_mapper, SIGNAL(mapped(QObject *)), this, SLOT(MakeProcessAvailable(QObject *)));
61  connect(m_release_mapper, SIGNAL(mapped(QObject *)), this, SLOT(ReleaseWorker(QObject *)));
62 
63  delete m_initialize_timer;
64  m_initialized = true;
65  connect(QCoreApplication::instance(), SIGNAL(aboutToQuit()), this, SLOT(deleteLater()));
66  connect(m_check_timer, SIGNAL(timeout()), this, SLOT(PeriodicCheck()));
67  m_check_timer->setSingleShot(false);
68  m_check_timer->start(C_WAIT_TIME);
69 }
70 
71 WorkerPool::~WorkerPool()
72 {
73  m_socket->close();
74  delete m_socket;
75 }
76 
77 bool contains(const std::list<QString>& l, QString s)
78 {
79  for (QString q : l) {
80  if (s == q)
81  return true;
82  }
83  return false;
84 }
85 
86 void WorkerPool::handleDatagrams()
87 {
88  //We have to process all pending datagrams or the readyRead() signal will not be emitted again
89  while (m_socket->hasPendingDatagrams()) {
90  QByteArray datagram;
91  datagram.resize(m_socket->pendingDatagramSize());
92 
93  //We don't know where the node is located, so check the sender address of the UDP packet
94  QHostAddress *senderAddr = new QHostAddress();
95 
96  m_socket->readDatagram(datagram.data(), datagram.size(), senderAddr);
97 
98  QString s(datagram);
99  int port = 0;
100  int taskId = -1;
101  std::string taskName = "";
102  QString portString = "";
103 
104  const int FIND_SERVER_MESSAGE = 0;
105  const int FIND_SERVER_WHILE_WORKING_MESSAGE = 1;
106  const int BAD_MESSAGE = -1;
107 
108  int messageType;
109  try {
110  if (s.startsWith("Job Application")) {
111  QString msg("Job Application");
112  s.remove(0, msg.length() + 1); //remove the server message + " "
113  portString = s;
114  port = portString.toInt();
115  messageType = FIND_SERVER_MESSAGE;
116  } else if (s.startsWith("Server Discovery")) {
117  QString msg("Server Discovery");
118  s.remove(0, msg.length() + 1);
119  int index = s.indexOf(" ");
120  QString idString = s.left(index + 1); //index is 0based, left is not
121  taskId = idString.toInt();
122  s.remove(0, index + 1); //index is 0based, remove is not
123  index = s.indexOf(" ");
124  portString = s.left(index + 1); //index is 0based, left is not
125  port = portString.toInt();
126  s.remove(0, index + 1); //index is 0based, remove is not
127  taskName = s.toStdString();
128  messageType = FIND_SERVER_WHILE_WORKING_MESSAGE;
129  }
130  }
131  catch (std::exception &) {
132  messageType = BAD_MESSAGE;
133  }
134  if (!port) {
135  messageType = BAD_MESSAGE;
136  }
137 
138  switch (messageType) {
139  case FIND_SERVER_MESSAGE:
140  if (senderAddr && port) {
141  QString id = senderAddr->toString() + ":" + s;
142  if (!contains(m_connected_list, id)) {
143  m_connected_list.push_back(id);
144 
145  WorkerRepresentative *rep = new WorkerRepresentative(*senderAddr, port);
146 
147  connect(rep, SIGNAL(ReadyToWork()), m_mapper, SLOT(map()));
148  m_mapper->setMapping(rep, rep);
149 
150  connect(rep, SIGNAL(Disconnected()), m_release_mapper, SLOT(map()));
151  m_release_mapper->setMapping(rep, rep);
152 
153  rep->Setup();
154  }
155  }
156  break;
157  case FIND_SERVER_WHILE_WORKING_MESSAGE:
158  if (senderAddr && port) {
159  QString id = senderAddr->toString() + ":" + portString;
160  if (!contains(m_connected_list, id)) {
161  m_connected_list.push_back(id);
162 
163  WorkerRepresentative *rep = new WorkerRepresentative(*senderAddr, port, taskId,
164  taskName);
165 
166  connect(rep, SIGNAL(ReadyToWork()), m_mapper, SLOT(map()));
167  m_mapper->setMapping(rep, rep);
168 
169  connect(rep, SIGNAL(Disconnected()), m_release_mapper, SLOT(map()));
170  m_release_mapper->setMapping(rep, rep);
171 
172  rep->Setup();
173 
174  emit WorkerReconnected(rep);
175  }
176  }
177  break;
178  default:
179  break;
180  }
181 
182  delete senderAddr;
183  }
184 }
185 
187 {
188  static WorkerPool *pool = new WorkerPool();
189  if (!pool) {
190  //if the workerpool was destructed, make sure we create a new one
191  pool = new WorkerPool();
192  }
193  return pool;
194 }
195 
197 {
198  if (m_available_list.empty()) {
199  return 0;
200  }
201  WorkerRepresentative* rep = m_available_list.front();
202  m_available_list.pop_front();
203  m_busy_list.push_back(rep);
204  return rep;
205 }
206 
208 {
209  MakeProcessAvailable(dynamic_cast<WorkerRepresentative*>(rep));
210 }
211 
213 {
214  if (!rep) {
215  return;
216  }
217  m_busy_list.remove(rep);
218  m_available_list.push_back(rep);
219 
220  emit NewWorkerAvailable();
221 }
222 
224 {
225  ReleaseWorker(dynamic_cast<WorkerRepresentative*>(rep));
226 }
227 
229 {
230  if (!rep)
231  return;
232  m_busy_list.remove(rep);
233  m_available_list.remove(rep);
234  QString id = rep->GetSenderAddress()->toString() + ":" + QString::number(rep->GetSenderPort());
235  m_connected_list.remove(id);
236  rep->deleteLater();
237 }
238 
240 {
241  return !m_available_list.empty();
242 }
243 
244 void WorkerPool::StartWorker()
245 {
246  QString nodePath = QCoreApplication::applicationFilePath();
247  nodePath = nodePath.left(nodePath.lastIndexOf('/') + 1);
248  nodePath += "parex_node";
249  QProcess node;
250  QStringList arguments;
251  arguments << "-q";
252  node.startDetached(nodePath, arguments);
253 }
254 
255 void WorkerPool::StartWorkers()
256 {
257  int neededWorkers = m_min_nodes - m_connected_list.size();
258  for (int i = 0; i < neededWorkers; i++) {
259  StartWorker();
260  }
261 }
262 
263 void WorkerPool::PeriodicCheck()
264 {
265  StartWorkers();
266  if (WorkerAvailable()) {
267  emit NewWorkerAvailable();
268  }
269 }
270 
271 void WorkerPool::SetMinNumWorkers(int min_nodes)
272 {
273  m_min_nodes = min_nodes;
274 }
275 
276 void WorkerPool::Delete(std::string name)
277 {
278  for (auto worker : m_available_list) {
279  if (worker) {
280  worker->Delete(name);
281  } else {
282  std::cout << "PROBLEM" << std::endl;
283  }
284  }
285  for (auto worker : m_busy_list) {
286  if (worker) {
287  worker->Delete(name);
288  } else {
289  std::cout << "PROBLEM" << std::endl;
290  }
291  }
292 }
293 
294 } // namespace
bool WorkerAvailable()
Checks to see if a worker is ready to do some work.
Definition: WorkerPool.cpp:239
void Delete(std::string name)
Delete an exploration on every node.
Definition: WorkerPool.cpp:276
void ReleaseWorker(QObject *)
When a worker disconnects, this slot should be fired.
Definition: WorkerPool.cpp:223
void MakeProcessAvailable(QObject *)
When a worker has finished his work, this slot should be fired.
Definition: WorkerPool.cpp:207
A pool handling the workers that will do the actual work.
Definition: WorkerPool.h:37
static WorkerPool * globalInstance()
A static method to get the instance of the workerPool.
Definition: WorkerPool.cpp:186
void NewWorkerAvailable()
Emitted when a worker is ready for work.
int GetSenderPort() const
Gets the port of the worker.
void SetMinNumWorkers(int)
Set the minimum number of workers.
Definition: WorkerPool.cpp:271
void WorkerReconnected(WorkerRepresentative *)
Emitted when a worker has reconnected after being disconnected.
Interface for WorkerPool.
const QHostAddress * GetSenderAddress() const
Gets the address of the worker.
WorkerRepresentative * getProcess()
Gets a process doing nothing if one is available, otherwise returns 0.
Definition: WorkerPool.cpp:196
Namespace for SimPT parameter explorer package.
Definition: Client.cpp:52
see the online Qt documentation
A worker taken as representative for multiple workers (handles the communication with the node)...
Interface for WorkerRepresentative.
Hack for QT 4 -> Qt 5 transition.