Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "node.h"
22 :
23 : #include "connectionDescription.h"
24 : #include "customOCommand.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 :
28 : #include <lunchbox/file.h>
29 : #include <lunchbox/scopedMutex.h>
30 : #include <lunchbox/spinLock.h>
31 :
32 : #ifndef MAXPATHLEN
33 : #define MAXPATHLEN 1024
34 : #endif
35 :
36 : namespace co
37 : {
38 : namespace
39 : {
40 : /** The state of the node. */
41 : enum State
42 : {
43 : STATE_CLOSED, //!< initial state
44 : STATE_CONNECTED, //!< proxy for a remote node, connected
45 : STATE_LISTENING, //!< local node, listening
46 : STATE_CLOSING //!< listening, about to close
47 : };
48 :
49 0 : struct MCData
50 : {
51 : ConnectionPtr connection;
52 : NodePtr node;
53 : };
54 : typedef std::vector<MCData> MCDatas;
55 : }
56 :
57 : namespace detail
58 : {
59 : class Node
60 : {
61 : public:
62 : /** Globally unique node identifier. */
63 : NodeID id;
64 :
65 : const uint32_t type;
66 :
67 : /** The current state of this node. */
68 : State state;
69 :
70 : /** The connection to this node. */
71 : ConnectionPtr outgoing;
72 :
73 : /** The multicast connection to this node, can be 0. */
74 : lunchbox::Lockable<ConnectionPtr> outMulticast;
75 :
76 : /**
77 : * Yet unused multicast connections for this node.
78 : *
79 : * On the first multicast send usage, the connection is 'primed' by sending
80 : * our node identifier to the MC group, removed from this vector and set as
81 : * outMulticast.
82 : */
83 : MCDatas multicasts;
84 :
85 : /** The host name used to launch the remote node process */
86 : std::string hostname;
87 :
88 : /** The list of descriptions on how this node is reachable. */
89 : lunchbox::Lockable<ConnectionDescriptions, lunchbox::SpinLock>
90 : connectionDescriptions;
91 :
92 : /** Last time commands were received */
93 : int64_t lastReceive;
94 :
95 119 : explicit Node(const uint32_t type_)
96 119 : : id(servus::make_UUID())
97 : , type(type_)
98 : , state(STATE_CLOSED)
99 119 : , lastReceive(0)
100 : {
101 119 : }
102 :
103 117 : ~Node()
104 117 : {
105 117 : LBASSERT(!outgoing);
106 117 : connectionDescriptions->clear();
107 117 : }
108 : };
109 : }
110 :
111 119 : Node::Node(const uint32_t type)
112 119 : : _impl(new detail::Node(type))
113 : {
114 119 : LBVERB << "New Node @" << (void*)this << " " << _impl->id << std::endl;
115 119 : }
116 :
117 300 : Node::~Node()
118 : {
119 117 : LBVERB << "Delete Node @" << (void*)this << " " << _impl->id << std::endl;
120 117 : delete _impl;
121 183 : }
122 :
123 0 : bool Node::operator==(const Node* node) const
124 : {
125 0 : LBASSERTINFO(_impl->id != node->_impl->id || this == node,
126 : "Two node instances with the same ID found "
127 : << (void*)this << " and " << (void*)node);
128 :
129 0 : return (_impl == node->_impl);
130 : }
131 :
132 616 : ConnectionDescriptions Node::getConnectionDescriptions() const
133 : {
134 1232 : lunchbox::ScopedFastRead mutex(_impl->connectionDescriptions);
135 1232 : return _impl->connectionDescriptions.data;
136 : }
137 :
138 215 : ConnectionPtr Node::getMulticast()
139 : {
140 215 : if (!isReachable())
141 0 : return 0;
142 :
143 430 : ConnectionPtr connection = _impl->outMulticast.data;
144 215 : if (connection && !connection->isClosed())
145 0 : return connection;
146 :
147 430 : lunchbox::ScopedWrite mutex(_impl->outMulticast);
148 215 : if (_impl->multicasts.empty())
149 215 : return 0;
150 :
151 0 : MCData data = _impl->multicasts.back();
152 0 : _impl->multicasts.pop_back();
153 0 : NodePtr node = data.node;
154 :
155 : // prime multicast connections on peers
156 0 : LBDEBUG << "Announcing id " << node->getNodeID() << " to multicast group "
157 0 : << data.connection->getDescription() << std::endl;
158 :
159 0 : const uint32_t cmd = CMD_NODE_ID;
160 0 : OCommand(Connections(1, data.connection), cmd)
161 0 : << node->getNodeID() << getType() << node->serialize();
162 :
163 0 : _impl->outMulticast.data = data.connection;
164 0 : return data.connection;
165 : }
166 :
167 78 : void Node::addConnectionDescription(ConnectionDescriptionPtr cd)
168 : {
169 78 : LBASSERTINFO(isClosed(), *this);
170 78 : if (!isClosed())
171 0 : return;
172 78 : _addConnectionDescription(cd);
173 : }
174 :
175 78 : void Node::_addConnectionDescription(ConnectionDescriptionPtr cd)
176 : {
177 156 : lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
178 78 : _impl->connectionDescriptions->push_back(cd);
179 78 : }
180 :
181 0 : bool Node::removeConnectionDescription(ConnectionDescriptionPtr cd)
182 : {
183 0 : LBASSERTINFO(isClosed(), *this);
184 0 : if (!isClosed())
185 0 : return false;
186 0 : return _removeConnectionDescription(cd);
187 : }
188 :
189 0 : bool Node::_removeConnectionDescription(ConnectionDescriptionPtr cd)
190 : {
191 0 : lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
192 :
193 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
194 0 : for (ConnectionDescriptionsIter i = _impl->connectionDescriptions->begin();
195 0 : i != _impl->connectionDescriptions->end(); ++i)
196 : {
197 0 : if (*cd != **i)
198 0 : continue;
199 :
200 0 : _impl->connectionDescriptions->erase(i);
201 0 : return true;
202 : }
203 0 : return false;
204 : }
205 :
206 66 : std::string Node::serialize() const
207 : {
208 132 : std::ostringstream data;
209 66 : data << Version::getMajor() << CO_SEPARATOR << Version::getMinor()
210 132 : << CO_SEPARATOR << _impl->id << CO_SEPARATOR;
211 : {
212 132 : lunchbox::ScopedFastRead mutex(_impl->connectionDescriptions);
213 66 : data << co::serialize(_impl->connectionDescriptions.data);
214 : }
215 132 : return data.str();
216 : }
217 :
218 66 : bool Node::deserialize(std::string& data)
219 : {
220 66 : LBASSERT(_impl->state == STATE_CLOSED);
221 :
222 : // version check
223 66 : int32_t major = 0;
224 66 : size_t nextPos = data.find(CO_SEPARATOR);
225 66 : if (nextPos == std::string::npos || nextPos == 0)
226 : {
227 0 : LBERROR << "Could not parse node major version data" << std::endl;
228 0 : return false;
229 : }
230 :
231 132 : std::istringstream is(data.substr(0, nextPos));
232 66 : data = data.substr(nextPos + 1);
233 66 : is >> major;
234 :
235 66 : int32_t minor = 0;
236 66 : nextPos = data.find(CO_SEPARATOR);
237 66 : if (nextPos == std::string::npos || nextPos == 0)
238 : {
239 0 : LBERROR << "Could not parse node minor version data" << std::endl;
240 0 : return false;
241 : }
242 :
243 66 : is.clear();
244 66 : is.str(data.substr(0, nextPos));
245 66 : data = data.substr(nextPos + 1);
246 66 : is >> minor;
247 :
248 66 : if (major != Version::getMajor() || minor != Version::getMinor())
249 : {
250 0 : LBWARN << "Protocol mismatch: remote node uses version " << major << '.'
251 0 : << minor << ", local node uses " << Version::getMajor() << '.'
252 0 : << Version::getMinor() << std::endl;
253 : }
254 :
255 : // node id
256 66 : nextPos = data.find(CO_SEPARATOR);
257 66 : if (nextPos == std::string::npos || nextPos == 0)
258 : {
259 0 : LBERROR << "Could not parse node id data" << std::endl;
260 0 : return false;
261 : }
262 :
263 66 : _impl->id = data.substr(0, nextPos);
264 66 : data = data.substr(nextPos + 1);
265 :
266 : // Connections data
267 132 : lunchbox::ScopedFastWrite mutex(_impl->connectionDescriptions);
268 66 : _impl->connectionDescriptions->clear();
269 66 : return co::deserialize(data, _impl->connectionDescriptions.data);
270 : }
271 :
272 1288 : bool Node::isReachable() const
273 : {
274 1288 : return isListening() || isConnected();
275 : }
276 :
277 601 : bool Node::isConnected() const
278 : {
279 601 : return _impl->state == STATE_CONNECTED;
280 : }
281 :
282 175018 : bool Node::isClosed() const
283 : {
284 175018 : return _impl->state == STATE_CLOSED;
285 : }
286 :
287 47 : bool Node::isClosing() const
288 : {
289 47 : return _impl->state == STATE_CLOSING;
290 : }
291 :
292 74165 : bool Node::isListening() const
293 : {
294 74165 : return _impl->state == STATE_LISTENING;
295 : }
296 :
297 741 : ConnectionPtr Node::getConnection(const bool preferMulticast)
298 : {
299 1482 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
300 1482 : return multicast ? multicast : _impl->outgoing;
301 : }
302 :
303 71411 : ConnectionPtr Node::_getConnection(const bool preferMulticast)
304 : {
305 142822 : ConnectionPtr multicast = preferMulticast ? getMulticast() : 0;
306 71411 : if (!isClosed())
307 71411 : return multicast ? multicast : _impl->outgoing;
308 0 : LBUNREACHABLE;
309 0 : return 0;
310 : }
311 :
312 111 : ConnectionPtr Node::_getMulticast() const
313 : {
314 111 : return _impl->outMulticast.data;
315 : }
316 :
317 0 : void Node::setHostname(const std::string& host)
318 : {
319 0 : _impl->hostname = host;
320 0 : }
321 :
322 0 : const std::string& Node::getHostname() const
323 : {
324 0 : return _impl->hostname;
325 : }
326 :
327 0 : std::string Node::getWorkDir() const
328 : {
329 0 : return lunchbox::getWorkDir();
330 : }
331 :
332 0 : std::string Node::getLaunchQuote() const
333 : {
334 : #ifdef WIN32
335 : return "\"";
336 : #else
337 0 : return "\'";
338 : #endif
339 : }
340 :
341 71409 : OCommand Node::send(const uint32_t cmd, const bool multicast)
342 : {
343 142818 : ConnectionPtr connection = _getConnection(multicast);
344 71409 : LBASSERT(connection);
345 142818 : return OCommand(Connections(1, connection), cmd, COMMANDTYPE_NODE);
346 : }
347 :
348 2 : CustomOCommand Node::send(const uint128_t& commandID, const bool multicast)
349 : {
350 4 : ConnectionPtr connection = _getConnection(multicast);
351 2 : LBASSERT(connection);
352 4 : return CustomOCommand(Connections(1, connection), commandID);
353 : }
354 :
355 187124 : const NodeID& Node::getNodeID() const
356 : {
357 187124 : return _impl->id;
358 : }
359 :
360 0 : int64_t Node::getLastReceiveTime() const
361 : {
362 0 : return _impl->lastReceive;
363 : }
364 :
365 177 : uint32_t Node::getType() const
366 : {
367 177 : return _impl->type;
368 : }
369 :
370 0 : void Node::_addMulticast(NodePtr node, ConnectionPtr connection)
371 : {
372 0 : lunchbox::ScopedWrite mutex(_impl->outMulticast);
373 0 : MCData data;
374 0 : data.connection = connection;
375 0 : data.node = node;
376 0 : _impl->multicasts.push_back(data);
377 0 : }
378 :
379 0 : void Node::_removeMulticast(ConnectionPtr connection)
380 : {
381 0 : LBASSERT(connection->getDescription()->type >= CONNECTIONTYPE_MULTICAST);
382 :
383 0 : lunchbox::ScopedWrite mutex(_impl->outMulticast);
384 0 : if (_impl->outMulticast == connection)
385 0 : _impl->outMulticast.data = 0;
386 : else
387 : {
388 0 : for (MCDatas::iterator j = _impl->multicasts.begin();
389 0 : j != _impl->multicasts.end(); ++j)
390 : {
391 0 : if ((*j).connection != connection)
392 0 : continue;
393 :
394 0 : _impl->multicasts.erase(j);
395 0 : return;
396 : }
397 : }
398 : }
399 :
400 66 : void Node::_connectMulticast(NodePtr node)
401 : {
402 132 : lunchbox::ScopedWrite mutex(_impl->outMulticast);
403 :
404 66 : if (node->_impl->outMulticast.data.isValid())
405 : // multicast already connected by previous _cmdID
406 0 : return;
407 :
408 : // Search if the connected node is in the same multicast group as we are
409 132 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
410 393 : for (ConnectionDescriptionsCIter i = descriptions.begin();
411 262 : i != descriptions.end(); ++i)
412 : {
413 65 : ConnectionDescriptionPtr description = *i;
414 65 : if (description->type < CONNECTIONTYPE_MULTICAST)
415 65 : continue;
416 :
417 : const ConnectionDescriptions& fromDescs =
418 0 : node->getConnectionDescriptions();
419 0 : for (ConnectionDescriptionsCIter j = fromDescs.begin();
420 0 : j != fromDescs.end(); ++j)
421 : {
422 0 : ConnectionDescriptionPtr fromDescription = *j;
423 0 : if (!description->isSameMulticastGroup(fromDescription))
424 0 : continue;
425 :
426 0 : LBASSERT(!node->_impl->outMulticast.data);
427 0 : LBASSERT(node->_impl->multicasts.empty());
428 :
429 0 : if (_impl->outMulticast->isValid() &&
430 0 : _impl->outMulticast.data->getDescription() == description)
431 : {
432 0 : node->_impl->outMulticast.data = _impl->outMulticast.data;
433 0 : LBDEBUG << "Using " << description << " as multicast group for "
434 0 : << node->getNodeID() << std::endl;
435 : }
436 : // find unused multicast connection to node
437 : else
438 0 : for (MCDatas::const_iterator k = _impl->multicasts.begin();
439 0 : k != _impl->multicasts.end(); ++k)
440 : {
441 0 : const MCData& data = *k;
442 : ConstConnectionDescriptionPtr dataDesc =
443 0 : data.connection->getDescription();
444 0 : if (!description->isSameMulticastGroup(dataDesc))
445 0 : continue;
446 :
447 0 : node->_impl->multicasts.push_back(data);
448 0 : LBDEBUG << "Adding " << dataDesc
449 0 : << " as multicast group for " << node->getNodeID()
450 0 : << std::endl;
451 : }
452 : }
453 : }
454 : }
455 :
456 0 : void Node::_connectMulticast(NodePtr node, ConnectionPtr connection)
457 : {
458 0 : lunchbox::ScopedWrite mutex(_impl->outMulticast);
459 0 : MCDatas::iterator i = node->_impl->multicasts.begin();
460 0 : for (; i != node->_impl->multicasts.end(); ++i)
461 : {
462 0 : if ((*i).connection == connection)
463 0 : break;
464 : }
465 :
466 0 : if (node->_impl->outMulticast->isValid())
467 : {
468 0 : if (node->_impl->outMulticast.data == connection)
469 : {
470 : // nop, connection already used
471 0 : LBASSERT(i == node->_impl->multicasts.end());
472 : }
473 0 : else if (i == node->_impl->multicasts.end())
474 : {
475 : // another connection is used as multicast connection, save this
476 0 : LBASSERT(isListening());
477 0 : MCData data;
478 0 : data.connection = connection;
479 0 : data.node = this;
480 0 : _impl->multicasts.push_back(data);
481 : }
482 : // else nop, already know connection
483 : }
484 : else
485 : {
486 0 : node->_impl->outMulticast.data = connection;
487 0 : if (i != node->_impl->multicasts.end())
488 0 : node->_impl->multicasts.erase(i);
489 : }
490 0 : }
491 :
492 48 : void Node::_setListening()
493 : {
494 48 : _impl->state = STATE_LISTENING;
495 48 : }
496 :
497 47 : void Node::_setClosing()
498 : {
499 47 : _impl->state = STATE_CLOSING;
500 47 : }
501 :
502 95 : void Node::_setClosed()
503 : {
504 95 : _impl->state = STATE_CLOSED;
505 95 : }
506 :
507 114 : void Node::_connect(ConnectionPtr connection)
508 : {
509 114 : _impl->outgoing = connection;
510 114 : _impl->state = STATE_CONNECTED;
511 114 : }
512 :
513 158 : void Node::_disconnect()
514 : {
515 158 : _impl->state = STATE_CLOSED;
516 158 : _impl->outgoing = 0;
517 158 : _impl->outMulticast.data = 0;
518 158 : _impl->multicasts.clear();
519 158 : }
520 :
521 72005 : void Node::_setLastReceive(const int64_t time)
522 : {
523 72005 : _impl->lastReceive = time;
524 72005 : }
525 :
526 420 : std::ostream& operator<<(std::ostream& os, const State state)
527 : {
528 : os << (state == STATE_CLOSED ? "closed" : state == STATE_CONNECTED
529 279 : ? "connected"
530 : : state == STATE_LISTENING
531 121 : ? "listening"
532 578 : : "ERROR");
533 420 : return os;
534 : }
535 :
536 420 : std::ostream& operator<<(std::ostream& os, const Node& node)
537 : {
538 420 : os << "node " << node.getNodeID() << " " << node._impl->state;
539 840 : const ConnectionDescriptions& descs = node.getConnectionDescriptions();
540 832 : for (ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i)
541 412 : os << ", " << (*i)->toString();
542 840 : return os;
543 : }
544 63 : }
|