VPTissue Reference Manual
WorkerNode.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2011-2016 Universiteit Antwerpen
3  *
4  * Licensed under the EUPL, Version 1.1 or as soon they will be approved by
5  * the European Commission - subsequent versions of the EUPL (the "Licence");
6  * You may not use this work except in compliance with the Licence.
7  * You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl5
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the Licence is distributed on an "AS IS" basis,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the Licence for the specific language governing
13  * permissions and limitations under the Licence.
14  */
20 #include "WorkerNode.h"
21 
22 #include "Simulator.h"
25 #include "parex_protocol/SimTask.h"
27 
28 #include <QtGlobal>
29 #include <QPushButton>
30 #include <QString>
31 #include <QTcpSocket>
32 #include <QThread>
33 #include <cassert>
34 #include <iostream>
35 
36 using namespace std;
37 
38 namespace SimPT_Parex {
39 
40 const int WorkerNode::g_broadcast_port = 45678;
41 
42 WorkerNode::WorkerNode(Simulator* simulator, bool verbose, QObject* parent)
43  : QTcpServer(parent), m_simulator(simulator), m_simulator_thread(new QThread(this)),
44  m_advertiser(g_broadcast_port, verbose), m_verbose(verbose), m_protocol(nullptr),
45  m_current_task(nullptr), m_last_address(QHostAddress::Null), m_queue(), m_results()
46 {
47  assert(simulator && "simulator cannot be a nullptr");
48 
49  qRegisterMetaType<SimTask>("SimTask");
50  qRegisterMetaType<SimResult>("SimResult");
51  qRegisterMetaType<std::string>("std::string");
52 
53  //listen(QHostAddress::AnyIPv4);
54  listen(QHostAnyAddress());
55 
56  connect(this, SIGNAL(newConnection()), this, SLOT(ConnectionReceived()));
57 
58  StartAdvertiser();
59 
60  m_simulator->moveToThread(m_simulator_thread);
61  connect(m_simulator, SIGNAL(TaskSolved(const SimResult&)),
62  this, SLOT(FinishedWork(const SimResult&)));
63  connect(this, SIGNAL(NewTask(const SimTask&)),
64  m_simulator, SLOT(SolveTask(const SimTask&)));
65  m_simulator_thread->start();
66 }
67 
69 {
70  close();
71  m_simulator->deleteLater();
72  m_simulator_thread->quit();
73  m_simulator_thread->wait();
74 }
75 
76 void WorkerNode::ConnectionReceived()
77 {
78  if (m_verbose) {
79  cout << "Connection received" << endl;
80  }
81 
82  QTcpSocket* socket = nextPendingConnection();
83  if (!socket) {
84  return;
85  }
86 
87  if (!m_protocol) {
88  m_protocol = new NodeProtocol(socket, this);
89 
90  connect(m_protocol, SIGNAL(Ended()), m_protocol, SLOT(deleteLater()));
91  connect(m_protocol, SIGNAL(Error(const std::string&)),
92  this, SLOT(DisplayError(const std::string&)));
93 
94  // Connects for solving a SimTask and returning the result
95  connect(m_protocol, SIGNAL(TaskReceived(const SimTask*)),
96  this, SLOT(StartedWork(const SimTask*)));
97  connect(m_protocol, SIGNAL(StopTask()), this, SLOT(StopTask()));
98  connect(m_protocol, SIGNAL(Delete(const std::string&)),
99  this, SLOT(Delete(const std::string&)));
100 
101  // this slot is to another thread and will be executed after
102  // the current simulation finishes, if one is running
103  connect(m_protocol, SIGNAL(Delete(const std::string&)),
104  m_simulator, SLOT(Delete(const std::string&)));
105 
106  connect(m_protocol, SIGNAL(SuccessfullySent()), this, SLOT(ResultSent()));
107 
108  connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
109  this, SLOT(HandleError(QAbstractSocket::SocketError)));
110 
111  if (socket->peerAddress() == m_last_address) {
112  // Check for not-confirmed simulations
113  if (!m_results.empty())
114  m_protocol->SendSimResult(m_results.front());
115  } else {
116  while (!m_queue.empty()) {
117  delete m_queue.front();
118  m_queue.pop();
119  }
120 
121  while (!m_results.empty()) {
122  m_results.pop();
123  }
124  }
125  m_last_address = socket->peerAddress();
126  m_advertiser.Stop();
127  } else {
128  cerr << "Already connected, ignoring new connection." << endl;
129  socket->close();
130  delete socket;
131  }
132 }
133 
134 void WorkerNode::Delete(const std::string& name)
135 {
136  //a delete on the current exploration
137  if (m_current_task && name == m_current_task->GetExploration()) {
138  m_simulator->StopTask();
139  }
140 }
141 
142 void WorkerNode::DisplayError(const std::string& error) const
143 {
144  if (m_verbose) {
145  cerr << "WorkerNode Error: " << error << endl;
146  }
147 }
148 
149 void WorkerNode::FinishedWork(const SimResult& result)
150 {
151  delete m_current_task;
152  m_current_task = nullptr;
153 
154  m_results.push(result);
155  if (m_protocol && m_protocol->IsConnected()) {
156  m_protocol->SendSimResult(result);
157  } else {
158  if (m_verbose)
159  cout << "Disconnected when finished task" << endl;
160  }
161 
162  if (!m_queue.empty()) {
163  if (m_verbose)
164  cout << "More work in the queue, starting that now" << endl;
165  m_current_task = m_queue.front();
166  m_queue.pop();
167  } else {
168  if (m_verbose)
169  cout << "Finished job, waiting for more work" << endl;
170  }
171 }
172 
173 void WorkerNode::HandleError(QAbstractSocket::SocketError )
174 {
175  delete m_protocol;
176  m_protocol = nullptr;
177  StartAdvertiser();
178 }
179 
180 void WorkerNode::ResultSent()
181 {
182  m_results.pop();
183  if (!m_results.empty() && m_protocol && m_protocol->IsConnected()) {
184  m_protocol->SendSimResult(m_results.front());
185  }
186 }
187 
188 void WorkerNode::StartAdvertiser()
189 {
190  if (!m_results.empty()) {
191  m_advertiser.Start(serverPort(),
192  m_results.front().GetExplorationName(), m_results.front().GetTaskId());
193  } else if (m_current_task) {
194  m_advertiser.Start(serverPort(),
195  m_current_task->GetExploration(), m_current_task->GetId());
196  } else {
197  m_advertiser.Start(serverPort());
198  }
199 }
200 
201 void WorkerNode::StartedWork(const SimTask* task)
202 {
203  if (!m_current_task){
204  m_current_task = task;
205  emit NewTask(*task);
206  } else {
207  if (m_verbose) {
208  cerr << "WARNING: Received work while still working, queuing the work" << endl;
209  }
210  m_queue.push(task);
211  }
212 }
213 
214 void WorkerNode::StopTask()
215 {
216  m_simulator->StopTask();
217 }
218 
219 } // namespace
STL namespace.
see the online Qt documentation
A container class for the final result of a simulation.
Definition: SimResult.h:29
void StopTask()
Stop the current task.
Definition: Simulator.cpp:189
void NewTask(const SimTask &)
Emitted whenever a new task is ready to be simulated.
Interface for the Simulator.
void SendSimResult(const SimResult &result)
Sends the result of a simulation back to the server.
virtual ~WorkerNode()
Destroy the node, closing all open connections.
Definition: WorkerNode.cpp:68
void Start(int serverPort)
Start finding a server to do work for.
Interface for WorkerNode.
void Stop()
Stop finding a server.
Interface for SimResult.
Simulator handling requested simulation tasks.
Definition: Simulator.h:37
Interface for NodeProtocol.
Interface for SimTask.
Namespace for SimPT parameter explorer package.
Definition: Client.cpp:52
std::string GetExploration() const
Get the name of the exploration.
Definition: SimTask.cpp:71
Contains all information needed for a transmitable simulation task.
Definition: SimTask.h:31
bool IsConnected()
Check if still connected.
Definition: Protocol.cpp:73
int GetId() const
Get the id of the task in the exploration.
Definition: SimTask.cpp:76
see the online Qt documentation
Hack for QT 4 -> Qt 5 transition.