VPTissue Reference Manual
ExplorationManager.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2011-2016 Universiteit Antwerpen
3  *
4  * Licensed under the EUPL, Version 1.1 or as soon they will be approved by
5  * the European Commission - subsequent versions of the EUPL (the "Licence");
6  * You may not use this work except in compliance with the Licence.
7  * You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl5
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the Licence is distributed on an "AS IS" basis,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the Licence for the specific language governing
13  * permissions and limitations under the Licence.
14  */
21 #include "ExplorationManager.h"
22 
23 #include "WorkerPool.h"
24 #include "WorkerRepresentative.h"
28 #include "parex_protocol/SimTask.h"
29 #include "util/misc/StringUtils.h"
30 
31 #include <boost/property_tree/ptree.hpp>
32 #include <boost/property_tree/xml_parser.hpp>
33 #include <algorithm>
34 #include <cassert>
35 #include <iostream>
36 #include <sstream>
37 
38 using namespace std;
39 using boost::property_tree::ptree;
40 using namespace boost::property_tree::xml_parser;
41 
42 namespace SimPT_Parex {
43 
44 ExplorationManager::ExplorationManager()
45 {
46  Read_Backup();
47 
48  //Make sure the WorkerPool is listening to connecting nodes by calling globalInstance()
49  connect(WorkerPool::globalInstance(), SIGNAL(NewWorkerAvailable()),
50  this, SLOT(NewWorkerAvailable()));
51  connect(WorkerPool::globalInstance(), SIGNAL(WorkerReconnected(WorkerRepresentative*)),
52  this, SLOT(WorkerReconnected(WorkerRepresentative*)));
53 }
54 
55 ExplorationManager::~ExplorationManager()
56 {
57 }
58 
59 ExplorationProgress* ExplorationManager::GetExploration(const std::string& name,
60  const std::deque<ExplorationProgress*>& explorations)
61 {
62  auto matching_iter = std::find_if(explorations.begin(), explorations.end(),
63  [&name] (ExplorationProgress* e) { return name == e->GetExploration().GetName(); });
64 
65  if (matching_iter != explorations.end()) {
66  return (*matching_iter);
67  } else {
68  return nullptr;
69  }
70 }
71 
72 void ExplorationManager::RemoveExploration(const std::string& name,
73  std::deque<ExplorationProgress*>& explorations)
74 {
75  auto matching_iter = std::find_if(explorations.begin(), explorations.end(),
76  [&name] (ExplorationProgress* e) { return name == e->GetExploration().GetName(); });
77 
78  if (matching_iter != explorations.end()) {
79  explorations.erase(matching_iter);
80  }
81 }
82 
83 std::vector<std::string> ExplorationManager::GetExplorationNames()
84 {
85  std::vector<std::string> names;
86 
87  for (auto progress : m_running_explorations) {
88  names.push_back(progress->GetExploration().GetName());
89  }
90  for (auto progress : m_done_explorations) {
91  names.push_back(progress->GetExploration().GetName());
92  }
93 
94  return names;
95 
96 }
97 
98 bool ExplorationManager::WorkAvailable()
99 {
100  if (m_running_explorations.empty()){
101  return false;
102  }
103 
104  for (auto progress : m_running_explorations) {
105  if (progress->GetTaskCount(TaskState::Waiting) != 0)
106  return true;
107  }
108 
109  return false;
110 }
111 
112 void ExplorationManager::RegisterExploration(const Exploration* exploration)
113 {
114  if (exploration->GetNumberOfTasks() == 0) {
115  delete exploration;
116  return;
117  }
118 
119  ExplorationProgress* progress = new ExplorationProgress(exploration);
120  m_running_explorations.push_back(progress);
121  connect(progress, SIGNAL(Updated()), this, SIGNAL(Updated()));
122  connect(progress, SIGNAL(Updated()), this, SLOT(BackUp()));
123  while (WorkerPool::globalInstance()->WorkerAvailable() && WorkAvailable()) {
124  NewWorkerAvailable();
125  }
126 
127  BackUp();
128  }
129 
130 void ExplorationManager::DeleteExploration(const std::string& name)
131 {
132  auto exploration = GetExploration(name, m_running_explorations);
133 
134  if (exploration) {
135  RemoveExploration(name, m_running_explorations);
136  delete exploration;
137  } else {
138  exploration = GetExploration(name, m_done_explorations);
139  if (exploration) {
140  RemoveExploration(name, m_done_explorations);
141  delete exploration;
142  }
143  }
144  WorkerPool::globalInstance()->Delete(name);
145  BackUp();
146  emit Updated();
147  }
148 
149 const ExplorationProgress* ExplorationManager::GetExplorationProgress(const std::string& explorationName)
150 {
151  auto progress = GetExploration(explorationName, m_running_explorations);
152  if (progress == nullptr)
153  progress = GetExploration(explorationName, m_done_explorations);
154  if (progress == nullptr)
155  return nullptr;
156 
157  return progress;
158 }
159 
160 void ExplorationManager::NewWorkerAvailable()
161 {
162  if (m_running_explorations.empty()) {
163  return;
164  }
165 
166  bool done = false;
167  ExplorationProgress* progress = nullptr;
168 
169  unsigned int i = 0;
170  while (!done && i < m_running_explorations.size()) {
171  //scheduling is FIFO
172  progress = m_running_explorations.at(i);
173 
174  if (progress->GetTaskCount(TaskState::Waiting) > 0) {
175  done = true;
176  } else {
177  progress = nullptr;
178  }
179  ++i;
180  }
181 
182  if (!progress) {
183  return;
184  }
185 
186  auto task = progress->NextTask();
187  assert(task && "ExplorationProgress has waiting tasks, but NextTask returns a nullptr");
188 
189  auto worker = WorkerPool::globalInstance()->getProcess();
190  if (!worker) {
191  //Maybe someone else was faster in retrieving a worker.
192  progress->GiveBack(task);
193  return;
194  }
195 
196  connect(worker, SIGNAL(FinishedWork(const SimResult&)),
197  progress, SLOT(HandleResult(const SimResult&)), Qt::UniqueConnection);
198  connect(worker, SIGNAL(FinishedWork(const SimResult&)),
199  this, SLOT(WorkerFinished()), Qt::UniqueConnection);
200  connect(worker, SIGNAL(FinishedWork(const SimResult&)),
201  this, SLOT(BackUp()), Qt::UniqueConnection);
202 
203  connect(worker, SIGNAL(Disconnected()),
204  this, SLOT(WorkerDisconnected()), Qt::UniqueConnection);
205 
206  m_running_tasks[worker] = task;
207  worker->DoWork(*task);
208 
209  BackUp();
210 }
211 
212 void ExplorationManager::WorkerFinished()
213 {
214  WorkerRepresentative* worker = dynamic_cast<WorkerRepresentative*>(QObject::sender());
215  assert(worker && "QObject::sender() is not a WorkerRepresentative*");
216 
217  auto progress = GetExploration(worker->GetExplName(), m_running_explorations);
218  if (!progress)
219  return;
220 
221  if (progress->IsFinished()) {
222  m_done_explorations.push_back(progress);
223  RemoveExploration(worker->GetExplName(), m_running_explorations);
224  }
225 
226  delete m_running_tasks[worker];
227  m_running_tasks.erase(worker);
228 }
229 
230 void ExplorationManager::WorkerDisconnected()
231 {
232  WorkerRepresentative* worker = dynamic_cast<WorkerRepresentative*>(QObject::sender());
233  assert(worker && "QObject::sender() is not a WorkerRepresentative*");
234 
235  SimTask* task = m_running_tasks[worker];
236 
237  if (task == nullptr) {
238  m_running_tasks.erase(worker);
239  return;
240  }
241 
242  auto progress = GetExploration(task->GetExploration(), m_running_explorations);
243 
244  if (!progress)
245  return;
246 
247  progress->GiveBack(task);
248  m_running_tasks.erase(worker);
249 }
250 
251 void ExplorationManager::WorkerReconnected(WorkerRepresentative* )
252 {
253  // TODO Choose what to do when a worker reconnects.
254  // What is the best choice: stop all other tasks and reassign and let the same node continue?
255  // Or just give the reconnected node a new task and let the other one finish the task the disconnected node was doing?
256 
257 // SimTask *task = new SimTask(rep->GetTaskId(), "", rep->GetExplName());
258 //
259 // /* Check if the exploration is sent by this Manager */
260 // auto matching_iter = std::find_if(m_running_explorations.begin(), m_running_explorations.end(),
261 // [task] (ExplorationProgress* e) {
262 // return task->GetExploration() == e->GetExploration().GetName();
263 // });
264 //
265 // if (matching_iter == m_running_explorations.end())
266 // return;
267 //
268 // for (auto tasks : m_running_tasks) {
269 // if (tasks.second->GetExploration() == task->GetExploration() &&
270 // tasks.second->GetId() == task->GetId()) {
271 // tasks.first->StopTask();
272 // return;
273 // }
274 // }
275 //
276 // m_running_tasks[rep] = task;
277 //
278 // //make sure we don't have a double connection
279 // for (auto progress : m_running_explorations) {
280 // disconnect(rep, SIGNAL(FinishedWork(const SimResult&)), progress, SLOT(Finished(const SimResult&)));
281 // }
282 // for (auto progress : m_done_explorations) {
283 // disconnect(rep, SIGNAL(FinishedWork(const SimResult&)), progress, SLOT(Finished(const SimResult&)));
284 // }
285 //
286 // auto progress = *matching_iter;
287 //
288 // // TODO Hmmm, so the task would've been set to Waiting-state...
289 // // In the meanwhile, another worker might have gotten this task
290 // // What was the meaning of this?
291 // //progress->ResendCancel(task->GetId());
292 //
293 // connect(rep, SIGNAL(FinishedWork(const SimResult&)), progress, SLOT(Finished(const SimResult&)));
294 // connect(rep, SIGNAL(FinishedWork(const SimResult&)), this, SLOT(WorkerFinished()));
295 // connect(rep, SIGNAL(FinishedWork(const SimResult&)), this, SLOT(BackUp()));
296 //
297 // disconnect(rep, SIGNAL(Disconnected()), this, SLOT(WorkerDisconnected()));
298 // connect(rep, SIGNAL(Disconnected()), this, SLOT(WorkerDisconnected()));
299 
300  NewWorkerAvailable();
301 }
302 
303 void ExplorationManager::BackUp()
304 {
305  ptree writer;
306 
307  writer.put("RunningExplorationsNr", m_running_explorations.size());
308  int i = 0;
309  for (auto progress : m_running_explorations) {
310  writer.put_child("RunningExplorations" + to_string(i), progress->ToPtree());
311  ++i;
312  }
313 
314  i = 0;
315  writer.put("DoneExplorationsNr", m_done_explorations.size());
316  for (auto progress : m_done_explorations) {
317  writer.put_child("DoneExplorations" + to_string(i), progress->ToPtree());
318  ++i;
319  }
320 
321  write_xml("exploration_backup.xml", writer);
322 }
323 
324 void ExplorationManager::Read_Backup()
325 {
326  try {
327  ptree reader;
328  read_xml("exploration_backup.xml", reader);
329 
330  for (int i = 0; i < reader.get<int>("RunningExplorationsNr"); i++) {
331  ExplorationProgress* progress
332  = new ExplorationProgress(
333  reader.get_child("RunningExplorations" + to_string(i)));
334  connect(progress, SIGNAL(Updated()), this, SIGNAL(Updated()));
335  m_running_explorations.push_back(progress);
336  }
337 
338  for (int i = 0; i < reader.get<int>("DoneExplorationsNr"); i++) {
339  ExplorationProgress* progress
340  = new ExplorationProgress(
341  reader.get_child("DoneExplorations" + to_string(i)));
342  connect(progress, SIGNAL(Updated()), this, SIGNAL(Updated()));
343  m_done_explorations.push_back(progress);
344  }
345  }
346  catch (std::exception& e) {
347 
348  }
349  catch (...) {
350  return;
351  }
352 }
353 
354 void ExplorationManager::StopTask(const std::string& name, int id)
355 {
356  auto progress = GetExploration(name, m_running_explorations);
357  if (!progress)
358  return;
359 
360  auto state = progress->GetTask(id).GetState();
361  if (state == TaskState::Running) {
362  for (auto task : m_running_tasks) {
363  if (task.second->GetExploration() == name && task.second->GetId() == id) {
364  task.first->StopTask();
365  return;
366  }
367  }
368  } else if (state == TaskState::Waiting) {
369  progress->CancelWaitingTask(id);
370  }
371 }
372 
373 void ExplorationManager::RestartTask(const std::string& name, int id)
374 {
375  auto progress = GetExploration(name, m_running_explorations);
376  if (!progress) {
377  progress = GetExploration(name, m_done_explorations);
378  if (!progress)
379  return;
380 
381  m_running_explorations.push_back(progress);
382  RemoveExploration(name, m_done_explorations);
383  }
384 
385  if (progress->GetTask(id).GetState() == TaskState::Cancelled) {
386  progress->ResendCancelledTask(id);
387  }
388 
389  while (WorkerPool::globalInstance()->WorkerAvailable() && WorkAvailable()) {
390  NewWorkerAvailable();
391  }
392 }
393 
394 } // namespace
Interface for Exploration.
STL namespace.
Interface for ExplorationTask.
Class describing the progress state of an exploration.
virtual unsigned int GetNumberOfTasks() const =0
Returns the number of tasks the exploration currently contains.
Interface for ExplorationManager.
Interface for WorkerPool.
Interface for SimTask.
String manipulation utilities.
Namespace for SimPT parameter explorer package.
Definition: Client.cpp:52
Class describing a generic exploration.
Definition: Exploration.h:33
Interface for ExplorationProgress.
A worker taken as representative for multiple workers (handles the communication with the node)...
Interface for WorkerRepresentative.