Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "localNode.h"
23 :
24 : #include "buffer.h"
25 : #include "bufferCache.h"
26 : #include "commandQueue.h"
27 : #include "connectionDescription.h"
28 : #include "connectionSet.h"
29 : #include "customICommand.h"
30 : #include "dataIStream.h"
31 : #include "exception.h"
32 : #include "global.h"
33 : #include "iCommand.h"
34 : #include "nodeCommand.h"
35 : #include "oCommand.h"
36 : #include "object.h"
37 : #include "objectICommand.h"
38 : #include "objectStore.h"
39 : #include "pipeConnection.h"
40 : #include "sendToken.h"
41 : #include "worker.h"
42 : #include "zeroconf.h"
43 :
44 : #include <lunchbox/clock.h>
45 : #include <lunchbox/fork.h>
46 : #include <lunchbox/futureFunction.h>
47 : #include <lunchbox/hash.h>
48 : #include <lunchbox/lockable.h>
49 : #include <lunchbox/request.h>
50 : #include <lunchbox/rng.h>
51 : #include <lunchbox/servus.h>
52 : #include <lunchbox/sleep.h>
53 :
54 : #include <boost/bind.hpp>
55 : #include <boost/date_time/posix_time/posix_time.hpp>
56 : #include <boost/lexical_cast.hpp>
57 :
58 : #include <list>
59 : #ifdef _MSC_VER
60 : #include <direct.h> // for chdir
61 : #define chdir _chdir
62 : #endif
63 :
64 : namespace co
65 : {
66 : namespace
67 : {
68 21 : lunchbox::a_int32_t _threadIDs;
69 :
70 : typedef CommandFunc<LocalNode> CmdFunc;
71 : typedef std::list<ICommand> CommandList;
72 : typedef lunchbox::RefPtrHash<Connection, NodePtr> ConnectionNodeHash;
73 : typedef ConnectionNodeHash::const_iterator ConnectionNodeHashCIter;
74 : typedef ConnectionNodeHash::iterator ConnectionNodeHashIter;
75 : typedef std::unordered_map<uint128_t, NodePtr> NodeHash;
76 : typedef NodeHash::const_iterator NodeHashCIter;
77 : typedef std::unordered_map<uint128_t, LocalNode::PushHandler> HandlerHash;
78 : typedef HandlerHash::const_iterator HandlerHashCIter;
79 : typedef std::pair<LocalNode::CommandHandler, CommandQueue*> CommandPair;
80 : typedef std::unordered_map<uint128_t, CommandPair> CommandHash;
81 : typedef CommandHash::const_iterator CommandHashCIter;
82 : typedef lunchbox::FutureFunction<bool> FuturebImpl;
83 : }
84 :
85 : namespace detail
86 : {
87 102 : class ReceiverThread : public lunchbox::Thread
88 : {
89 : public:
90 53 : explicit ReceiverThread(co::LocalNode* localNode)
91 53 : : _localNode(localNode)
92 : {
93 53 : }
94 :
95 48 : bool init() override
96 : {
97 48 : const int32_t threadID = ++_threadIDs - 1;
98 96 : setName(std::string("Rcv") +
99 144 : boost::lexical_cast<std::string>(threadID));
100 48 : return _localNode->_startCommandThread(threadID);
101 : }
102 :
103 48 : void run() override { _localNode->_runReceiverThread(); }
104 : private:
105 : co::LocalNode* const _localNode;
106 : };
107 :
108 102 : class CommandThread : public Worker
109 : {
110 : public:
111 53 : explicit CommandThread(co::LocalNode* localNode)
112 53 : : Worker(Global::getCommandQueueLimit())
113 : , threadID(0)
114 53 : , _localNode(localNode)
115 : {
116 53 : }
117 :
118 : int32_t threadID;
119 :
120 : protected:
121 48 : bool init() override
122 : {
123 96 : setName(std::string("Cmd") +
124 144 : boost::lexical_cast<std::string>(threadID));
125 48 : return true;
126 : }
127 :
128 103273 : bool stopRunning() override { return _localNode->isClosed(); }
129 51502 : bool notifyIdle() override
130 : {
131 51502 : return _localNode->_notifyCommandThreadIdle();
132 : }
133 :
134 : private:
135 : co::LocalNode* const _localNode;
136 : };
137 :
138 : class LocalNode
139 : {
140 : public:
141 53 : LocalNode()
142 53 : : smallBuffers(200)
143 : , bigBuffers(20)
144 : , sendToken(true)
145 : , lastSendToken(0)
146 : , objectStore(0)
147 : , receiverThread(0)
148 : , commandThread(0)
149 53 : , service("_collage._tcp")
150 : {
151 53 : }
152 :
153 51 : ~LocalNode()
154 51 : {
155 51 : LBASSERT(incoming.isEmpty());
156 51 : LBASSERT(connectionNodes.empty());
157 51 : LBASSERT(pendingCommands.empty());
158 51 : LBASSERT(nodes->empty());
159 :
160 51 : delete objectStore;
161 51 : objectStore = 0;
162 51 : LBASSERT(!commandThread->isRunning());
163 51 : delete commandThread;
164 51 : commandThread = 0;
165 :
166 51 : LBASSERT(!receiverThread->isRunning());
167 51 : delete receiverThread;
168 51 : receiverThread = 0;
169 51 : }
170 :
171 271 : bool inReceiverThread() const { return receiverThread->isCurrent(); }
172 : /** Commands re-scheduled for dispatch. */
173 : CommandList pendingCommands;
174 :
175 : /** The command buffer 'allocator' for small packets */
176 : co::BufferCache smallBuffers;
177 :
178 : /** The command buffer 'allocator' for big packets */
179 : co::BufferCache bigBuffers;
180 :
181 : bool sendToken; //!< send token availability.
182 : uint64_t lastSendToken; //!< last used time for timeout detection
183 : std::deque<co::ICommand> sendTokenQueue; //!< pending requests
184 :
185 : /** Manager of distributed object */
186 : ObjectStore* objectStore;
187 :
188 : /** Needed for thread-safety during nodeID-based connect() */
189 : std::mutex connectLock;
190 :
191 : /** The node for each connection. */
192 : ConnectionNodeHash connectionNodes; // read and write: recv only
193 :
194 : /** The connected nodes. */
195 : lunchbox::Lockable<NodeHash, lunchbox::SpinLock> nodes; // r: all, w: recv
196 :
197 : /** The connection set of all connections from/to this node. */
198 : co::ConnectionSet incoming;
199 :
200 : /** The process-global clock. */
201 : lunchbox::Clock clock;
202 :
203 : /** The registered push handlers. */
204 : lunchbox::Lockable<HandlerHash, std::mutex> pushHandlers;
205 :
206 : /** The registered custom command handlers. */
207 : lunchbox::Lockable<CommandHash, lunchbox::SpinLock> commandHandlers;
208 :
209 : ReceiverThread* receiverThread;
210 : CommandThread* commandThread;
211 :
212 : lunchbox::Lockable<servus::Servus> service;
213 :
214 : a_ssize_t counters[co::LocalNode::COUNTER_ALL]; //!< Performance counters
215 :
216 : Strings args; //!< Command line arguments for remote node launch
217 : };
218 : }
219 :
220 53 : LocalNode::LocalNode(const uint32_t type)
221 : : Node(type)
222 53 : , _impl(new detail::LocalNode)
223 : {
224 53 : _impl->receiverThread = new detail::ReceiverThread(this);
225 53 : _impl->commandThread = new detail::CommandThread(this);
226 53 : _impl->objectStore = new ObjectStore(this, _impl->counters);
227 :
228 53 : CommandQueue* queue = getCommandThreadQueue();
229 106 : registerCommand(CMD_NODE_CONNECT, CmdFunc(this, &LocalNode::_cmdConnect),
230 53 : 0);
231 53 : registerCommand(CMD_NODE_CONNECT_REPLY,
232 106 : CmdFunc(this, &LocalNode::_cmdConnectReply), 0);
233 53 : registerCommand(CMD_NODE_ID, CmdFunc(this, &LocalNode::_cmdID), 0);
234 53 : registerCommand(CMD_NODE_ACK_REQUEST,
235 106 : CmdFunc(this, &LocalNode::_cmdAckRequest), 0);
236 106 : registerCommand(CMD_NODE_STOP_RCV, CmdFunc(this, &LocalNode::_cmdStopRcv),
237 53 : 0);
238 106 : registerCommand(CMD_NODE_STOP_CMD, CmdFunc(this, &LocalNode::_cmdStopCmd),
239 53 : queue);
240 53 : registerCommand(CMD_NODE_SET_AFFINITY_RCV,
241 106 : CmdFunc(this, &LocalNode::_cmdSetAffinity), 0);
242 53 : registerCommand(CMD_NODE_SET_AFFINITY_CMD,
243 106 : CmdFunc(this, &LocalNode::_cmdSetAffinity), queue);
244 53 : registerCommand(CMD_NODE_CONNECT_ACK,
245 106 : CmdFunc(this, &LocalNode::_cmdConnectAck), 0);
246 53 : registerCommand(CMD_NODE_DISCONNECT,
247 106 : CmdFunc(this, &LocalNode::_cmdDisconnect), 0);
248 53 : registerCommand(CMD_NODE_GET_NODE_DATA,
249 106 : CmdFunc(this, &LocalNode::_cmdGetNodeData), queue);
250 53 : registerCommand(CMD_NODE_GET_NODE_DATA_REPLY,
251 106 : CmdFunc(this, &LocalNode::_cmdGetNodeDataReply), 0);
252 53 : registerCommand(CMD_NODE_ACQUIRE_SEND_TOKEN,
253 106 : CmdFunc(this, &LocalNode::_cmdAcquireSendToken), queue);
254 53 : registerCommand(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY,
255 106 : CmdFunc(this, &LocalNode::_cmdAcquireSendTokenReply), 0);
256 53 : registerCommand(CMD_NODE_RELEASE_SEND_TOKEN,
257 106 : CmdFunc(this, &LocalNode::_cmdReleaseSendToken), queue);
258 53 : registerCommand(CMD_NODE_ADD_LISTENER,
259 106 : CmdFunc(this, &LocalNode::_cmdAddListener), 0);
260 53 : registerCommand(CMD_NODE_REMOVE_LISTENER,
261 106 : CmdFunc(this, &LocalNode::_cmdRemoveListener), 0);
262 53 : registerCommand(CMD_NODE_PING, CmdFunc(this, &LocalNode::_cmdPing), queue);
263 106 : registerCommand(CMD_NODE_PING_REPLY, CmdFunc(this, &LocalNode::_cmdDiscard),
264 53 : 0);
265 106 : registerCommand(CMD_NODE_COMMAND, CmdFunc(this, &LocalNode::_cmdCommand),
266 53 : 0);
267 53 : registerCommand(CMD_NODE_ADD_CONNECTION,
268 106 : CmdFunc(this, &LocalNode::_cmdAddConnection), 0);
269 53 : }
270 :
271 142 : LocalNode::~LocalNode()
272 : {
273 51 : LBASSERT(!hasPendingRequests());
274 51 : delete _impl;
275 91 : }
276 :
277 1 : bool LocalNode::initLocal(const int argc, char** argv)
278 : {
279 2 : std::string setupOpts;
280 1 : _impl->args.clear();
281 :
282 : // We do not use getopt_long because it really does not work due to the
283 : // following aspects:
284 : // - reordering of arguments
285 : // - different behavior of GNU and BSD implementations
286 : // - incomplete man pages
287 1 : for (int i = 1; i < argc; ++i)
288 : {
289 0 : if (std::string("--eq-listen") == argv[i] ||
290 0 : std::string("--co-listen") == argv[i])
291 : {
292 0 : if (std::string("--eq-listen") == argv[i])
293 0 : LBWARN << "Deprecated --eq-listen, use --co-listen"
294 0 : << std::endl;
295 :
296 0 : if ((i + 1) < argc && argv[i + 1][0] != '-')
297 : {
298 0 : std::string data = argv[++i];
299 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
300 0 : desc->port = Global::getDefaultPort();
301 :
302 0 : if (desc->fromString(data))
303 : {
304 0 : addConnectionDescription(desc);
305 0 : LBASSERTINFO(data.empty(), data);
306 : }
307 : else
308 0 : LBWARN << "Ignoring listen option: " << argv[i]
309 0 : << std::endl;
310 : }
311 : else
312 : {
313 0 : LBWARN << "No argument given to --co-listen!" << std::endl;
314 : }
315 : }
316 0 : else if (std::string("--co-globals") == argv[i])
317 : {
318 0 : if ((i + 1) < argc && argv[i + 1][0] != '-')
319 : {
320 0 : const std::string data = argv[++i];
321 0 : if (!Global::fromString(data))
322 : {
323 0 : LBWARN << "Invalid global variables string: " << data
324 0 : << ", using default global variables." << std::endl;
325 0 : }
326 : }
327 : else
328 : {
329 0 : LBWARN << "No argument given to --co-globals!" << std::endl;
330 : }
331 : }
332 0 : else if (std::string("--co-launch") == argv[i])
333 : {
334 0 : if (i == argc - 1 || argv[i + 1][0] == '-')
335 0 : LBTHROW(std::runtime_error("Missing options to --co-launch"));
336 :
337 0 : setupOpts = argv[++i];
338 0 : if (!deserialize(setupOpts))
339 0 : LBTHROW(std::runtime_error("Corrupt --co-launch argument"));
340 0 : LBASSERT(!setupOpts.empty());
341 : }
342 : else
343 0 : _impl->args.push_back(argv[i]);
344 : }
345 :
346 1 : if (!listen())
347 : {
348 0 : LBWARN << "Can't setup listener(s) on " << *static_cast<Node*>(this)
349 0 : << std::endl;
350 0 : return false;
351 : }
352 :
353 1 : if (!setupOpts.empty())
354 0 : return _setupPeer(setupOpts);
355 1 : return true;
356 : }
357 :
358 48 : bool LocalNode::listen()
359 : {
360 48 : LBVERB << "Listener data: " << serialize() << std::endl;
361 48 : if (!isClosed() || !_connectSelf())
362 0 : return false;
363 :
364 96 : const ConnectionDescriptions& descriptions = getConnectionDescriptions();
365 279 : for (ConnectionDescriptionsCIter i = descriptions.begin();
366 186 : i != descriptions.end(); ++i)
367 : {
368 90 : ConnectionDescriptionPtr description = *i;
369 90 : ConnectionPtr connection = Connection::create(description);
370 :
371 45 : if (!connection || !connection->listen())
372 : {
373 0 : LBWARN << "Can't create listener connection: " << description
374 0 : << std::endl;
375 0 : return false;
376 : }
377 :
378 45 : _impl->connectionNodes[connection] = this;
379 45 : if (connection->isMulticast())
380 0 : _addMulticast(this, connection);
381 :
382 45 : connection->acceptNB();
383 45 : _impl->incoming.addConnection(connection);
384 :
385 90 : LBVERB << "Added node " << getNodeID() << " using " << connection
386 135 : << std::endl;
387 : }
388 :
389 96 : LBVERB << lunchbox::className(this) << " start command and receiver thread "
390 144 : << std::endl;
391 :
392 48 : _setListening();
393 48 : _impl->receiverThread->start();
394 :
395 48 : LBDEBUG << *this << std::endl;
396 48 : return true;
397 : }
398 :
399 47 : bool LocalNode::close()
400 : {
401 47 : if (!isListening())
402 0 : return false;
403 :
404 47 : send(CMD_NODE_STOP_RCV);
405 :
406 47 : LBCHECK(_impl->receiverThread->join());
407 47 : _cleanup();
408 :
409 141 : LBDEBUG << _impl->incoming.getSize() << " connections open after close"
410 141 : << std::endl;
411 : #ifndef NDEBUG
412 47 : const Connections& connections = _impl->incoming.getConnections();
413 141 : for (Connections::const_iterator i = connections.begin();
414 94 : i != connections.end(); ++i)
415 : {
416 0 : LBDEBUG << " " << *i << std::endl;
417 : }
418 : #endif
419 :
420 47 : LBASSERT(!hasPendingRequests());
421 47 : return true;
422 : }
423 :
424 0 : void LocalNode::setAffinity(const int32_t affinity)
425 : {
426 0 : send(CMD_NODE_SET_AFFINITY_RCV) << affinity;
427 0 : send(CMD_NODE_SET_AFFINITY_CMD) << affinity;
428 :
429 0 : lunchbox::Thread::setAffinity(affinity);
430 0 : }
431 :
432 33 : void LocalNode::addConnection(ConnectionPtr connection)
433 : {
434 33 : connection->ref(); // unref in _cmdAddConnection
435 33 : send(CMD_NODE_ADD_CONNECTION) << connection;
436 33 : }
437 :
438 0 : ConnectionPtr LocalNode::addListener(ConnectionDescriptionPtr desc)
439 : {
440 0 : LBASSERT(isListening());
441 :
442 0 : ConnectionPtr connection = Connection::create(desc);
443 0 : if (connection && connection->listen())
444 : {
445 0 : addListener(connection);
446 0 : return connection;
447 : }
448 :
449 0 : return 0;
450 : }
451 :
452 0 : void LocalNode::addListener(ConnectionPtr connection)
453 : {
454 0 : LBASSERT(isListening());
455 0 : LBASSERT(connection->isListening());
456 0 : if (!isListening() || !connection->isListening())
457 0 : return;
458 :
459 0 : connection->ref(this); // unref in self handler
460 :
461 : // Update everybody's description list of me, add the listener to myself in
462 : // my handler
463 0 : const Nodes& nodes = getNodes();
464 0 : for (NodePtr node : nodes)
465 0 : node->send(CMD_NODE_ADD_LISTENER)
466 0 : << (uint64_t)(connection.get())
467 0 : << connection->getDescription()->toString();
468 : }
469 :
470 0 : void LocalNode::removeListeners(const Connections& connections)
471 : {
472 0 : std::vector<lunchbox::Request<void> > requests;
473 0 : for (ConnectionsCIter i = connections.begin(); i != connections.end(); ++i)
474 : {
475 0 : ConnectionPtr connection = *i;
476 0 : requests.push_back(_removeListener(connection));
477 : }
478 0 : }
479 :
480 0 : lunchbox::Request<void> LocalNode::_removeListener(ConnectionPtr conn)
481 : {
482 0 : LBASSERT(isListening());
483 0 : LBASSERTINFO(!conn->isConnected(), conn);
484 :
485 0 : conn->ref(this);
486 0 : const lunchbox::Request<void> request = registerRequest<void>();
487 :
488 0 : const Nodes& nodes = getNodes();
489 0 : for (NodePtr node : nodes)
490 0 : node->send(CMD_NODE_REMOVE_LISTENER)
491 0 : << request << conn.get() << conn->getDescription()->toString();
492 0 : return request;
493 : }
494 :
495 147 : void LocalNode::_addConnection(ConnectionPtr connection)
496 : {
497 147 : if (_impl->receiverThread->isRunning() && !_impl->inReceiverThread())
498 : {
499 33 : addConnection(connection);
500 33 : return;
501 : }
502 :
503 228 : BufferPtr buffer = _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
504 114 : connection->recvNB(buffer, COMMAND_MINSIZE);
505 114 : _impl->incoming.addConnection(connection);
506 : }
507 :
508 217 : void LocalNode::_removeConnection(ConnectionPtr connection)
509 : {
510 217 : LBASSERT(connection);
511 :
512 217 : _impl->incoming.removeConnection(connection);
513 217 : connection->resetRecvData();
514 217 : if (!connection->isClosed())
515 125 : connection->close(); // cancel pending IO's
516 217 : }
517 :
518 47 : void LocalNode::_cleanup()
519 : {
520 47 : LBVERB << "Clean up stopped node" << std::endl;
521 47 : LBASSERTINFO(isClosed(), *this);
522 :
523 47 : if (!_impl->connectionNodes.empty())
524 135 : LBDEBUG << _impl->connectionNodes.size()
525 135 : << " open connections during cleanup" << std::endl;
526 : #ifndef NDEBUG
527 276 : for (ConnectionNodeHashCIter i = _impl->connectionNodes.begin();
528 184 : i != _impl->connectionNodes.end(); ++i)
529 : {
530 90 : NodePtr node = i->second;
531 45 : LBDEBUG << " " << i->first << " : " << node << std::endl;
532 : }
533 : #endif
534 :
535 47 : _impl->connectionNodes.clear();
536 :
537 47 : if (!_impl->nodes->empty())
538 6 : LBDEBUG << _impl->nodes->size() << " nodes connected during cleanup"
539 6 : << std::endl;
540 :
541 : #ifndef NDEBUG
542 49 : for (NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
543 : {
544 4 : NodePtr node = i->second;
545 2 : LBDEBUG << " " << node << std::endl;
546 : }
547 : #endif
548 :
549 47 : _impl->nodes->clear();
550 47 : }
551 :
552 111 : void LocalNode::_closeNode(NodePtr node)
553 : {
554 222 : ConnectionPtr connection = node->getConnection();
555 222 : ConnectionPtr mcConnection = node->_getMulticast();
556 :
557 111 : node->_disconnect();
558 :
559 111 : if (connection)
560 : {
561 66 : LBASSERTINFO(_impl->connectionNodes.find(connection) !=
562 : _impl->connectionNodes.end(),
563 : connection);
564 :
565 66 : _removeConnection(connection);
566 66 : _impl->connectionNodes.erase(connection);
567 : }
568 :
569 111 : if (mcConnection)
570 : {
571 0 : _removeConnection(mcConnection);
572 0 : _impl->connectionNodes.erase(mcConnection);
573 : }
574 :
575 111 : _impl->objectStore->removeInstanceData(node->getNodeID());
576 :
577 222 : lunchbox::ScopedFastWrite mutex(_impl->nodes);
578 111 : _impl->nodes->erase(node->getNodeID());
579 111 : notifyDisconnect(node);
580 111 : LBDEBUG << node << " disconnected from " << *this << std::endl;
581 111 : }
582 :
583 48 : bool LocalNode::_connectSelf()
584 : {
585 : // setup local connection to myself
586 96 : PipeConnectionPtr connection = new PipeConnection;
587 48 : if (!connection->connect())
588 : {
589 0 : LBERROR << "Could not create local connection to receiver thread."
590 0 : << std::endl;
591 0 : return false;
592 : }
593 :
594 48 : Node::_connect(connection->getSibling());
595 48 : _setClosed(); // reset state after _connect set it to connected
596 :
597 : // add to connection set
598 48 : LBASSERT(connection->getDescription());
599 48 : LBASSERT(_impl->connectionNodes.find(connection) ==
600 : _impl->connectionNodes.end());
601 :
602 48 : _impl->connectionNodes[connection] = this;
603 48 : _impl->nodes.data[getNodeID()] = this;
604 48 : _addConnection(connection);
605 :
606 96 : LBVERB << "Added node " << getNodeID() << " using " << connection
607 144 : << std::endl;
608 48 : return true;
609 : }
610 :
611 7 : bool LocalNode::disconnect(NodePtr node)
612 : {
613 7 : if (!node || !isListening())
614 0 : return false;
615 :
616 7 : if (!node->isConnected())
617 0 : return true;
618 :
619 7 : LBASSERT(!inCommandThread());
620 14 : lunchbox::Request<void> request = registerRequest<void>(node.get());
621 7 : send(CMD_NODE_DISCONNECT) << request;
622 :
623 7 : request.wait();
624 7 : _impl->objectStore->removeNode(node);
625 7 : return true;
626 : }
627 :
628 20001 : void LocalNode::ackRequest(NodePtr node, const uint32_t requestID)
629 : {
630 20001 : if (requestID == LB_UNDEFINED_UINT32) // no need to ack operation
631 0 : return;
632 :
633 20001 : if (node == this) // OPT
634 0 : serveRequest(requestID);
635 : else
636 20001 : node->send(CMD_NODE_ACK_REQUEST) << requestID;
637 : }
638 :
639 0 : void LocalNode::ping(NodePtr peer)
640 : {
641 0 : LBASSERT(!_impl->inReceiverThread());
642 0 : peer->send(CMD_NODE_PING);
643 0 : }
644 :
645 0 : bool LocalNode::pingIdleNodes()
646 : {
647 0 : LBASSERT(!_impl->inReceiverThread());
648 0 : const int64_t timeout = Global::getKeepaliveTimeout() / 2;
649 0 : const Nodes& nodes = getNodes(false);
650 :
651 0 : bool pinged = false;
652 0 : for (NodePtr node : nodes)
653 : {
654 0 : if (getTime64() - node->getLastReceiveTime() > timeout)
655 : {
656 0 : LBINFO << " Ping Node: " << node->getNodeID() << " last seen "
657 0 : << node->getLastReceiveTime() << std::endl;
658 0 : node->send(CMD_NODE_PING);
659 0 : pinged = true;
660 : }
661 : }
662 0 : return pinged;
663 : }
664 :
665 : //----------------------------------------------------------------------
666 : // Object functionality
667 : //----------------------------------------------------------------------
668 0 : void LocalNode::disableInstanceCache()
669 : {
670 0 : _impl->objectStore->disableInstanceCache();
671 0 : }
672 :
673 0 : void LocalNode::expireInstanceData(const int64_t age)
674 : {
675 0 : _impl->objectStore->expireInstanceData(age);
676 0 : }
677 :
678 0 : void LocalNode::enableSendOnRegister()
679 : {
680 0 : _impl->objectStore->enableSendOnRegister();
681 0 : }
682 :
683 0 : void LocalNode::disableSendOnRegister()
684 : {
685 0 : _impl->objectStore->disableSendOnRegister();
686 0 : }
687 :
688 15 : bool LocalNode::registerObject(Object* object)
689 : {
690 15 : return _impl->objectStore->register_(object);
691 : }
692 :
693 15 : void LocalNode::deregisterObject(Object* object)
694 : {
695 15 : _impl->objectStore->deregister(object);
696 15 : }
697 :
698 34 : f_bool_t LocalNode::mapObject(Object* object, const uint128_t& id,
699 : NodePtr master, const uint128_t& version)
700 : {
701 : const uint32_t request =
702 34 : _impl->objectStore->mapNB(object, id, version, master);
703 : const FuturebImpl::Func& func =
704 68 : boost::bind(&ObjectStore::mapSync, _impl->objectStore, request);
705 68 : return f_bool_t(new FuturebImpl(func));
706 : }
707 :
708 0 : uint32_t LocalNode::mapObjectNB(Object* object, const uint128_t& id,
709 : const uint128_t& version)
710 : {
711 0 : return _impl->objectStore->mapNB(object, id, version, 0);
712 : }
713 :
714 2 : uint32_t LocalNode::mapObjectNB(Object* object, const uint128_t& id,
715 : const uint128_t& version, NodePtr master)
716 : {
717 2 : return _impl->objectStore->mapNB(object, id, version, master);
718 : }
719 :
720 2 : bool LocalNode::mapObjectSync(const uint32_t requestID)
721 : {
722 2 : return _impl->objectStore->mapSync(requestID);
723 : }
724 :
725 4 : f_bool_t LocalNode::syncObject(Object* object, const uint128_t& id,
726 : NodePtr master, const uint32_t instanceID)
727 : {
728 4 : return _impl->objectStore->sync(object, id, master, instanceID);
729 : }
730 :
731 36 : void LocalNode::unmapObject(Object* object)
732 : {
733 36 : _impl->objectStore->unmap(object);
734 36 : }
735 :
736 0 : void LocalNode::swapObject(Object* oldObject, Object* newObject)
737 : {
738 0 : _impl->objectStore->swap(oldObject, newObject);
739 0 : }
740 :
741 0 : void LocalNode::objectPush(const uint128_t& groupID,
742 : const uint128_t& objectType,
743 : const uint128_t& objectID, DataIStream& istream)
744 : {
745 0 : lunchbox::ScopedRead mutex(_impl->pushHandlers);
746 0 : HandlerHashCIter i = _impl->pushHandlers->find(groupID);
747 0 : if (i != _impl->pushHandlers->end())
748 0 : i->second(groupID, objectType, objectID, istream);
749 : else
750 0 : LBWARN << "No custom handler for push group " << groupID
751 0 : << " registered" << std::endl;
752 :
753 0 : if (istream.wasUsed() && istream.hasData())
754 0 : LBWARN << "Incomplete Object::push for group " << groupID << " type "
755 0 : << objectType << " object " << objectID << std::endl;
756 0 : }
757 :
758 0 : void LocalNode::registerPushHandler(const uint128_t& groupID,
759 : const PushHandler& handler)
760 : {
761 0 : lunchbox::ScopedWrite mutex(_impl->pushHandlers);
762 0 : (*_impl->pushHandlers)[groupID] = handler;
763 0 : }
764 :
765 2 : bool LocalNode::registerCommandHandler(const uint128_t& command,
766 : const CommandHandler& func,
767 : CommandQueue* queue)
768 : {
769 4 : lunchbox::ScopedFastWrite mutex(_impl->commandHandlers);
770 2 : if (_impl->commandHandlers->find(command) != _impl->commandHandlers->end())
771 : {
772 0 : LBWARN << "Already got a registered handler for custom command "
773 0 : << command << std::endl;
774 0 : return false;
775 : }
776 :
777 2 : _impl->commandHandlers->insert(
778 4 : std::make_pair(command, std::make_pair(func, queue)));
779 2 : return true;
780 : }
781 :
782 0 : LocalNode::SendToken LocalNode::acquireSendToken(NodePtr node)
783 : {
784 0 : LBASSERT(!inCommandThread());
785 0 : LBASSERT(!_impl->inReceiverThread());
786 :
787 0 : lunchbox::Request<void> request = registerRequest<void>();
788 0 : node->send(CMD_NODE_ACQUIRE_SEND_TOKEN) << request;
789 :
790 : try
791 : {
792 0 : request.wait(Global::getTimeout());
793 : }
794 0 : catch (const lunchbox::FutureTimeout&)
795 : {
796 0 : LBERROR << "Timeout while acquiring send token " << request.getID()
797 0 : << std::endl;
798 0 : request.unregister();
799 0 : return 0;
800 : }
801 0 : return new co::SendToken(node);
802 : }
803 :
804 0 : void LocalNode::releaseSendToken(SendToken token)
805 : {
806 0 : LBASSERT(!_impl->inReceiverThread());
807 0 : if (token)
808 0 : token->release();
809 0 : }
810 :
811 : //----------------------------------------------------------------------
812 : // Connecting a node
813 : //----------------------------------------------------------------------
814 : namespace
815 : {
816 : enum ConnectResult
817 : {
818 : CONNECT_OK,
819 : CONNECT_TRY_AGAIN,
820 : CONNECT_BAD_STATE,
821 : CONNECT_TIMEOUT,
822 : CONNECT_UNREACHABLE
823 : };
824 : }
825 :
826 68 : NodePtr LocalNode::connect(const NodeID& nodeID)
827 : {
828 68 : LBASSERT(nodeID != 0);
829 68 : LBASSERT(isListening());
830 :
831 : // Make sure that only one connection request based on the node identifier
832 : // is pending at a given time. Otherwise a node with the same id might be
833 : // instantiated twice in _cmdGetNodeDataReply(). The alternative to this
834 : // mutex is to register connecting nodes with this local node, and handle
835 : // all cases correctly, which is far more complex. Node connections only
836 : // happen a lot during initialization, and are therefore not time-critical.
837 136 : lunchbox::ScopedWrite mutex(_impl->connectLock);
838 :
839 136 : Nodes nodes = getNodes();
840 71 : for (NodePtr peer : nodes)
841 : {
842 71 : if (peer->getNodeID() == nodeID && peer->isReachable()) // early out
843 68 : return peer;
844 : }
845 :
846 0 : LBDEBUG << "Connecting node " << nodeID << std::endl;
847 0 : for (NodePtr peer : nodes)
848 : {
849 0 : NodePtr node = _connect(nodeID, peer);
850 0 : if (node)
851 0 : return node;
852 : }
853 : {
854 0 : NodePtr node = _connectFromZeroconf(nodeID);
855 0 : if (node)
856 0 : return node;
857 : }
858 : // check again if node connected by itself by now
859 0 : nodes = getNodes();
860 0 : for (NodePtr node : nodes)
861 0 : if (node->getNodeID() == nodeID && node->isReachable())
862 0 : return node;
863 :
864 0 : LBWARN << "Node " << nodeID << " connection failed" << std::endl;
865 0 : return 0;
866 : }
867 :
868 0 : NodePtr LocalNode::_connect(const NodeID& nodeID, NodePtr peer)
869 : {
870 0 : LBASSERT(nodeID != 0);
871 :
872 0 : NodePtr node;
873 : {
874 0 : lunchbox::ScopedFastRead mutexNodes(_impl->nodes);
875 0 : NodeHash::const_iterator i = _impl->nodes->find(nodeID);
876 0 : if (i != _impl->nodes->end())
877 0 : node = i->second;
878 : }
879 :
880 0 : LBASSERT(getNodeID() != nodeID);
881 0 : if (!node)
882 : {
883 0 : lunchbox::Request<void*> request = registerRequest<void*>();
884 0 : peer->send(CMD_NODE_GET_NODE_DATA) << nodeID << request;
885 0 : node = reinterpret_cast<Node*>(request.wait());
886 0 : if (!node)
887 : {
888 0 : LBINFO << "Node " << nodeID << " not found on " << peer->getNodeID()
889 0 : << std::endl;
890 0 : return 0;
891 : }
892 0 : node->unref(this); // ref'd before serveRequest()
893 : }
894 :
895 0 : if (node->isReachable())
896 0 : return node;
897 :
898 0 : size_t tries = 10;
899 0 : while (--tries)
900 : {
901 0 : switch (_connect(node))
902 : {
903 : case CONNECT_OK:
904 0 : return node;
905 : case CONNECT_TRY_AGAIN:
906 : {
907 0 : lunchbox::RNG rng;
908 : // collision avoidance
909 0 : lunchbox::sleep(rng.get<uint8_t>());
910 0 : break;
911 : }
912 : case CONNECT_BAD_STATE:
913 0 : LBWARN << "Internal connect error" << std::endl;
914 : // no break;
915 : case CONNECT_TIMEOUT:
916 0 : return 0;
917 :
918 : case CONNECT_UNREACHABLE:
919 0 : break; // maybe peer talks to us
920 : }
921 :
922 0 : lunchbox::ScopedFastRead mutexNodes(_impl->nodes);
923 : // connect failed - check for simultaneous connect from peer
924 0 : NodeHash::const_iterator i = _impl->nodes->find(nodeID);
925 0 : if (i != _impl->nodes->end())
926 0 : node = i->second;
927 : }
928 :
929 0 : return node->isReachable() ? node : 0;
930 : }
931 :
932 0 : NodePtr LocalNode::_connectFromZeroconf(const NodeID& nodeID)
933 : {
934 0 : NodePtr node;
935 : {
936 0 : lunchbox::ScopedWrite mutex(_impl->service);
937 : const Strings& instances =
938 0 : _impl->service->discover(servus::Servus::IF_ALL, 500);
939 :
940 0 : for (const auto& instance : instances)
941 : {
942 0 : const NodeID candidate(instance);
943 0 : if (candidate != nodeID)
944 0 : continue;
945 :
946 : const std::string& typeStr =
947 0 : _impl->service->get(instance, "co_type");
948 0 : if (typeStr.empty())
949 0 : return 0;
950 :
951 0 : std::istringstream in(typeStr);
952 0 : uint32_t type = 0;
953 0 : in >> type;
954 :
955 0 : node = createNode(type);
956 0 : if (!node)
957 : {
958 0 : LBINFO << "Can't create node of type " << type << std::endl;
959 0 : continue;
960 : }
961 :
962 : const std::string& numStr =
963 0 : _impl->service->get(instance, "co_numPorts");
964 0 : uint32_t num = 0;
965 :
966 0 : in.clear();
967 0 : in.str(numStr);
968 0 : in >> num;
969 0 : LBASSERT(num > 0);
970 0 : for (size_t j = 0; j < num; ++j)
971 : {
972 0 : ConnectionDescriptionPtr desc = new ConnectionDescription;
973 0 : std::ostringstream out;
974 0 : out << "co_port" << j;
975 :
976 0 : std::string descStr = _impl->service->get(instance, out.str());
977 0 : LBASSERT(!descStr.empty());
978 0 : LBCHECK(desc->fromString(descStr));
979 0 : LBASSERT(descStr.empty());
980 0 : node->addConnectionDescription(desc);
981 : }
982 0 : break;
983 : }
984 : }
985 :
986 0 : if (node && _connect(node))
987 0 : return node;
988 0 : return nullptr;
989 : }
990 :
991 33 : bool LocalNode::connect(NodePtr node)
992 : {
993 66 : lunchbox::ScopedWrite mutex(_impl->connectLock);
994 66 : return (_connect(node) == CONNECT_OK);
995 : }
996 :
997 33 : uint32_t LocalNode::_connect(NodePtr node)
998 : {
999 33 : LBASSERTINFO(isListening(), *this);
1000 33 : if (node->isReachable())
1001 0 : return CONNECT_OK;
1002 :
1003 33 : LBASSERT(node->isClosed());
1004 33 : LBDEBUG << "Connecting " << node << std::endl;
1005 :
1006 : // try connecting using the given descriptions
1007 66 : const ConnectionDescriptions& cds = node->getConnectionDescriptions();
1008 33 : for (ConnectionDescriptionsCIter i = cds.begin(); i != cds.end(); ++i)
1009 : {
1010 33 : ConnectionDescriptionPtr description = *i;
1011 33 : if (description->type >= CONNECTIONTYPE_MULTICAST)
1012 0 : continue; // Don't use multicast for primary connections
1013 :
1014 33 : ConnectionPtr connection = Connection::create(description);
1015 33 : if (!connection || !connection->connect())
1016 0 : continue;
1017 :
1018 33 : return _connect(node, connection);
1019 : }
1020 :
1021 0 : LBDEBUG << "Node " << node
1022 0 : << " unreachable, all connections failed to connect" << std::endl;
1023 0 : return CONNECT_UNREACHABLE;
1024 : }
1025 :
1026 0 : bool LocalNode::connect(NodePtr node, ConnectionPtr connection)
1027 : {
1028 0 : return (_connect(node, connection) == CONNECT_OK);
1029 : }
1030 :
1031 33 : uint32_t LocalNode::_connect(NodePtr node, ConnectionPtr connection)
1032 : {
1033 33 : LBASSERT(connection);
1034 33 : LBASSERT(node->getNodeID() != getNodeID());
1035 :
1036 66 : if (!node || !isListening() || !connection->isConnected() ||
1037 33 : !node->isClosed())
1038 : {
1039 0 : return CONNECT_BAD_STATE;
1040 : }
1041 :
1042 33 : _addConnection(connection);
1043 :
1044 : // send connect command to peer
1045 66 : lunchbox::Request<bool> request = registerRequest<bool>(node.get());
1046 33 : const uint32_t cmd = CMD_NODE_CONNECT;
1047 66 : OCommand(Connections(1, connection), cmd) << getNodeID() << request
1048 99 : << getType() << serialize();
1049 :
1050 33 : bool connected = false;
1051 : try
1052 : {
1053 33 : connected = request.wait(10000 /*ms*/);
1054 : }
1055 0 : catch (const lunchbox::FutureTimeout&)
1056 : {
1057 0 : LBWARN << "Node connection handshake timeout - " << node
1058 0 : << " not a Collage node?" << std::endl;
1059 0 : request.unregister();
1060 0 : return CONNECT_TIMEOUT;
1061 : }
1062 :
1063 : // In simultaneous connect case, depending on the connection type
1064 : // (e.g. RDMA), a check on the connection state of the node is required
1065 33 : if (!connected || !node->isConnected())
1066 0 : return CONNECT_TRY_AGAIN;
1067 :
1068 33 : LBASSERT(node->getNodeID() != 0);
1069 33 : LBASSERTINFO(node->getNodeID() != getNodeID(), getNodeID());
1070 33 : LBDEBUG << node << " connected to " << *(Node*)this << std::endl;
1071 33 : return CONNECT_OK;
1072 : }
1073 :
1074 34 : NodePtr LocalNode::connectObjectMaster(const uint128_t& id)
1075 : {
1076 34 : LBASSERTINFO(id.isUUID(), id);
1077 34 : if (!id.isUUID())
1078 : {
1079 0 : LBWARN << "Invalid object id " << id << std::endl;
1080 0 : return 0;
1081 : }
1082 :
1083 34 : const NodeID masterNodeID = _impl->objectStore->findMasterNodeID(id);
1084 34 : if (masterNodeID == 0)
1085 : {
1086 0 : LBWARN << "Can't find master node for object " << id << std::endl;
1087 0 : return 0;
1088 : }
1089 :
1090 68 : NodePtr master = connect(masterNodeID);
1091 34 : if (master && !master->isClosed())
1092 34 : return master;
1093 :
1094 0 : LBWARN << "Can't connect master node with id " << masterNodeID
1095 0 : << " for object " << id << std::endl;
1096 0 : return 0;
1097 : }
1098 :
1099 0 : bool LocalNode::launch(NodePtr node, const std::string& command)
1100 : {
1101 0 : size_t lastPos = 0;
1102 0 : std::string cmd;
1103 :
1104 0 : const std::string& quote = node->getLaunchQuote();
1105 : std::string launchOptions =
1106 0 : std::string(" --co-launch ") + quote + node->serialize() +
1107 0 : node->getWorkDir() + CO_SEPARATOR + std::to_string(getType()) +
1108 0 : CO_SEPARATOR + serialize() + quote + " --co-globals " + quote +
1109 0 : co::Global::toString() + quote;
1110 :
1111 0 : for (size_t percentPos = command.find('%'); percentPos != std::string::npos;
1112 0 : percentPos = command.find('%', percentPos + 1))
1113 : {
1114 0 : std::string replacement;
1115 0 : switch (command[percentPos + 1])
1116 : {
1117 : case 'h':
1118 0 : replacement = node->getHostname();
1119 0 : if (replacement.empty())
1120 0 : replacement = "127.0.0.1";
1121 0 : break;
1122 :
1123 : case 'n':
1124 0 : replacement = node->getNodeID().getString();
1125 0 : break;
1126 :
1127 : case 'd':
1128 0 : replacement = node->getWorkDir();
1129 0 : break;
1130 :
1131 : case 'q':
1132 0 : replacement = quote;
1133 0 : break;
1134 :
1135 : case 'o':
1136 0 : replacement = launchOptions;
1137 0 : launchOptions.clear();
1138 0 : break;
1139 :
1140 : default:
1141 0 : LBWARN << "Unknown token " << command[percentPos + 1] << std::endl;
1142 0 : replacement = command.substr(percentPos, percentPos + 1);
1143 : }
1144 :
1145 0 : cmd += command.substr(lastPos, percentPos - lastPos) + replacement;
1146 0 : lastPos = percentPos + 2;
1147 : }
1148 :
1149 0 : cmd += command.substr(lastPos) + launchOptions;
1150 :
1151 0 : LBDEBUG << "Launching: " << cmd << std::endl;
1152 0 : if (lunchbox::fork(cmd))
1153 0 : return true;
1154 :
1155 0 : LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
1156 0 : return false;
1157 : }
1158 :
1159 0 : NodePtr LocalNode::syncLaunch(const uint128_t& nodeID, const int64_t timeout)
1160 : {
1161 0 : const lunchbox::Clock clock;
1162 :
1163 0 : do
1164 : {
1165 0 : co::NodePtr node = getNode(nodeID);
1166 0 : if (node && node->isConnected())
1167 0 : return node;
1168 :
1169 0 : lunchbox::sleep(100 /*ms*/);
1170 0 : } while (timeout == static_cast<int32_t>(LB_TIMEOUT_INDEFINITE) ||
1171 0 : clock.getTime64() < timeout);
1172 :
1173 0 : return nullptr;
1174 : }
1175 :
1176 0 : bool LocalNode::_setupPeer(const std::string& setupOpts)
1177 : {
1178 0 : size_t nextPos = setupOpts.find(CO_SEPARATOR);
1179 0 : if (nextPos == std::string::npos)
1180 : {
1181 0 : LBERROR << "Could not parse working directory: " << setupOpts
1182 0 : << std::endl;
1183 0 : return false;
1184 : }
1185 :
1186 0 : const std::string workDir = setupOpts.substr(0, nextPos);
1187 0 : std::string description = setupOpts.substr(nextPos + 1);
1188 :
1189 0 : if (!workDir.empty() && chdir(workDir.c_str()) == -1)
1190 0 : LBWARN << "Can't change working directory to " << workDir << ": "
1191 0 : << lunchbox::sysError << std::endl;
1192 :
1193 0 : nextPos = description.find(CO_SEPARATOR);
1194 0 : if (nextPos == std::string::npos)
1195 : {
1196 0 : LBERROR << "Could not parse server node type: " << description
1197 0 : << " is left from " << setupOpts << std::endl;
1198 0 : return false;
1199 : }
1200 :
1201 0 : const std::string nodeType = description.substr(0, nextPos);
1202 0 : description = description.substr(nextPos + 1);
1203 :
1204 0 : co::NodePtr peer = createNode(std::stoi(nodeType));
1205 0 : if (!peer->deserialize(description))
1206 0 : LBWARN << "Can't parse peer data" << std::endl;
1207 :
1208 0 : LBASSERTINFO(description.empty(), description);
1209 0 : if (!connect(peer))
1210 : {
1211 0 : LBERROR << "Can't connect peer node using " << *peer << std::endl;
1212 0 : return false;
1213 : }
1214 0 : return true;
1215 : }
1216 :
1217 33 : NodePtr LocalNode::createNode(const uint32_t type)
1218 : {
1219 33 : LBASSERTINFO(type == NODETYPE_NODE, type);
1220 33 : return new Node(type);
1221 : }
1222 :
1223 0 : NodePtr LocalNode::getNode(const NodeID& id) const
1224 : {
1225 0 : lunchbox::ScopedFastRead mutex(_impl->nodes);
1226 0 : NodeHash::const_iterator i = _impl->nodes->find(id);
1227 0 : if (i == _impl->nodes->end() || !i->second->isReachable())
1228 0 : return 0;
1229 0 : return i->second;
1230 : }
1231 :
1232 102 : Nodes LocalNode::getNodes(const bool addSelf) const
1233 : {
1234 102 : Nodes nodes;
1235 204 : lunchbox::ScopedFastRead mutex(_impl->nodes);
1236 298 : for (NodeHashCIter i = _impl->nodes->begin(); i != _impl->nodes->end(); ++i)
1237 : {
1238 392 : NodePtr node = i->second;
1239 196 : if (node->isReachable() && (addSelf || node != this))
1240 196 : nodes.push_back(i->second);
1241 : }
1242 204 : return nodes;
1243 : }
1244 :
1245 142 : CommandQueue* LocalNode::getCommandThreadQueue()
1246 : {
1247 142 : return _impl->commandThread->getWorkerQueue();
1248 : }
1249 :
1250 7 : bool LocalNode::inCommandThread() const
1251 : {
1252 7 : return _impl->commandThread->isCurrent();
1253 : }
1254 :
1255 0 : const Strings& LocalNode::getCommandLine() const
1256 : {
1257 0 : return _impl->args;
1258 : }
1259 :
1260 72357 : int64_t LocalNode::getTime64() const
1261 : {
1262 72357 : return _impl->clock.getTime64();
1263 : }
1264 :
1265 0 : ssize_t LocalNode::getCounter(const Counter counter) const
1266 : {
1267 0 : return _impl->counters[counter];
1268 : }
1269 :
1270 184 : void LocalNode::flushCommands()
1271 : {
1272 184 : _impl->incoming.interrupt();
1273 184 : }
1274 :
1275 : //----------------------------------------------------------------------
1276 : // receiver thread functions
1277 : //----------------------------------------------------------------------
1278 48 : void LocalNode::_runReceiverThread()
1279 : {
1280 48 : LB_TS_THREAD(_rcvThread);
1281 48 : _initService();
1282 :
1283 48 : int nErrors = 0;
1284 144966 : while (isListening())
1285 : {
1286 72460 : const ConnectionSet::Event result = _impl->incoming.select();
1287 72459 : switch (result)
1288 : {
1289 : case ConnectionSet::EVENT_CONNECT:
1290 33 : _handleConnect();
1291 33 : break;
1292 :
1293 : case ConnectionSet::EVENT_DATA:
1294 72104 : nErrors = 0;
1295 72104 : _handleData();
1296 72104 : break;
1297 :
1298 : case ConnectionSet::EVENT_DISCONNECT:
1299 : case ConnectionSet::EVENT_INVALID_HANDLE:
1300 33 : _handleDisconnect();
1301 33 : break;
1302 :
1303 : case ConnectionSet::EVENT_TIMEOUT:
1304 0 : LBINFO << "select timeout" << std::endl;
1305 0 : break;
1306 :
1307 : case ConnectionSet::EVENT_ERROR:
1308 0 : ++nErrors;
1309 0 : LBWARN << "Connection error during select" << std::endl;
1310 0 : if (nErrors > 100)
1311 : {
1312 0 : LBWARN << "Too many errors in a row, capping connection"
1313 0 : << std::endl;
1314 0 : _handleDisconnect();
1315 : }
1316 0 : break;
1317 :
1318 : case ConnectionSet::EVENT_SELECT_ERROR:
1319 0 : LBWARN << "Error during select" << std::endl;
1320 0 : ++nErrors;
1321 0 : if (nErrors > 10)
1322 : {
1323 0 : LBWARN << "Too many errors in a row" << std::endl;
1324 0 : LBUNIMPLEMENTED;
1325 : }
1326 0 : break;
1327 :
1328 : case ConnectionSet::EVENT_INTERRUPT:
1329 289 : _redispatchCommands();
1330 289 : break;
1331 :
1332 : default:
1333 0 : LBUNIMPLEMENTED;
1334 : }
1335 : }
1336 :
1337 47 : if (!_impl->pendingCommands.empty())
1338 0 : LBWARN << _impl->pendingCommands.size()
1339 0 : << " commands pending while leaving command thread" << std::endl;
1340 :
1341 47 : _impl->pendingCommands.clear();
1342 47 : LBCHECK(_impl->commandThread->join());
1343 :
1344 94 : ConnectionPtr connection = getConnection();
1345 94 : PipeConnectionPtr pipe = LBSAFECAST(PipeConnection*, connection.get());
1346 47 : connection = pipe->getSibling();
1347 47 : _removeConnection(connection);
1348 47 : _impl->connectionNodes.erase(connection);
1349 47 : _disconnect();
1350 :
1351 47 : const Connections& connections = _impl->incoming.getConnections();
1352 189 : while (!connections.empty())
1353 : {
1354 71 : connection = connections.back();
1355 142 : NodePtr node = _impl->connectionNodes[connection];
1356 :
1357 71 : if (node)
1358 71 : _closeNode(node);
1359 71 : _removeConnection(connection);
1360 : }
1361 :
1362 47 : _impl->objectStore->clear();
1363 47 : _impl->pendingCommands.clear();
1364 47 : _impl->smallBuffers.flush();
1365 47 : _impl->bigBuffers.flush();
1366 :
1367 235 : LBDEBUG << "Leaving receiver thread of " << lunchbox::className(this)
1368 188 : << std::endl;
1369 47 : }
1370 :
1371 33 : void LocalNode::_handleConnect()
1372 : {
1373 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1374 66 : ConnectionPtr newConn = connection->acceptSync();
1375 33 : connection->acceptNB();
1376 :
1377 33 : if (newConn)
1378 33 : _addConnection(newConn);
1379 : else
1380 0 : LBINFO << "Received connect event, but accept() failed" << std::endl;
1381 33 : }
1382 :
1383 33 : void LocalNode::_handleDisconnect()
1384 : {
1385 33 : while (_handleData())
1386 : ; // read remaining data off connection
1387 :
1388 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1389 33 : ConnectionNodeHash::iterator i = _impl->connectionNodes.find(connection);
1390 :
1391 33 : if (i != _impl->connectionNodes.end())
1392 : {
1393 66 : NodePtr node = i->second;
1394 :
1395 33 : node->ref(); // extend lifetime to give cmd handler a chance
1396 :
1397 : // local command dispatching
1398 66 : OCommand(this, this, CMD_NODE_REMOVE_NODE)
1399 99 : << node.get() << uint32_t(LB_UNDEFINED_UINT32);
1400 :
1401 33 : if (node->getConnection() == connection)
1402 33 : _closeNode(node);
1403 0 : else if (connection->isMulticast())
1404 0 : node->_removeMulticast(connection);
1405 : }
1406 :
1407 33 : _removeConnection(connection);
1408 33 : }
1409 :
1410 72137 : bool LocalNode::_handleData()
1411 : {
1412 72137 : _impl->smallBuffers.compact();
1413 72137 : _impl->bigBuffers.compact();
1414 :
1415 144274 : ConnectionPtr connection = _impl->incoming.getConnection();
1416 72137 : LBASSERT(connection);
1417 :
1418 144274 : BufferPtr buffer = _readHead(connection);
1419 72137 : if (!buffer) // fluke signal
1420 66 : return false;
1421 :
1422 144142 : ICommand command = _setupCommand(connection, buffer);
1423 72071 : const bool gotCommand = _readTail(command, buffer, connection);
1424 72071 : LBASSERT(gotCommand);
1425 :
1426 : // start next receive
1427 144142 : BufferPtr nextBuffer = _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
1428 72071 : connection->recvNB(nextBuffer, COMMAND_MINSIZE);
1429 :
1430 72071 : if (gotCommand)
1431 : {
1432 72071 : _dispatchCommand(command);
1433 72071 : return true;
1434 : }
1435 :
1436 0 : LBERROR << "Incomplete command read: " << command << std::endl;
1437 0 : return false;
1438 : }
1439 :
1440 72137 : BufferPtr LocalNode::_readHead(ConnectionPtr connection)
1441 : {
1442 144274 : BufferPtr buffer;
1443 72137 : const bool gotSize = connection->recvSync(buffer, false);
1444 :
1445 72137 : if (!buffer) // fluke signal
1446 : {
1447 0 : LBWARN << "Erronous network event on " << connection->getDescription()
1448 0 : << std::endl;
1449 0 : _impl->incoming.setDirty();
1450 0 : return 0;
1451 : }
1452 :
1453 72137 : if (gotSize)
1454 72071 : return buffer;
1455 :
1456 : // Some systems signal data on dead connections.
1457 66 : buffer->setSize(0);
1458 66 : connection->recvNB(buffer, COMMAND_MINSIZE);
1459 66 : return 0;
1460 : }
1461 :
1462 72071 : ICommand LocalNode::_setupCommand(ConnectionPtr connection,
1463 : ConstBufferPtr buffer)
1464 : {
1465 144142 : NodePtr node;
1466 72071 : ConnectionNodeHashCIter i = _impl->connectionNodes.find(connection);
1467 72071 : if (i != _impl->connectionNodes.end())
1468 72005 : node = i->second;
1469 72071 : LBVERB << "Handle data from " << node << std::endl;
1470 :
1471 72071 : ICommand command(this, node, buffer);
1472 :
1473 72071 : if (node)
1474 72005 : node->_setLastReceive(getTime64());
1475 :
1476 144142 : return command;
1477 : }
1478 :
1479 72071 : bool LocalNode::_readTail(ICommand& command, BufferPtr buffer,
1480 : ConnectionPtr connection)
1481 : {
1482 72071 : const uint64_t needed = command.getSize();
1483 72071 : if (needed <= buffer->getSize())
1484 62060 : return true;
1485 :
1486 10011 : if (needed > buffer->getMaxSize())
1487 : {
1488 0 : LBASSERT(needed > COMMAND_ALLOCSIZE);
1489 0 : LBASSERTINFO(needed < LB_BIT48,
1490 : "Out-of-sync network stream: " << command << "?");
1491 : // not enough space for remaining data, alloc and copy to new buffer
1492 0 : BufferPtr newBuffer = _impl->bigBuffers.alloc(needed);
1493 0 : newBuffer->replace(*buffer);
1494 0 : buffer = newBuffer;
1495 :
1496 0 : command = ICommand(this, command.getRemoteNode(), buffer);
1497 : }
1498 :
1499 : // read remaining data
1500 10011 : connection->recvNB(buffer, command.getSize() - buffer->getSize());
1501 10011 : return connection->recvSync(buffer);
1502 : }
1503 :
1504 34 : BufferPtr LocalNode::allocBuffer(const uint64_t size)
1505 : {
1506 34 : LBASSERT(_impl->receiverThread->isStopped() || _impl->inReceiverThread());
1507 : BufferPtr buffer = size > COMMAND_ALLOCSIZE
1508 0 : ? _impl->bigBuffers.alloc(size)
1509 34 : : _impl->smallBuffers.alloc(COMMAND_ALLOCSIZE);
1510 34 : return buffer;
1511 : }
1512 :
1513 72118 : void LocalNode::_dispatchCommand(ICommand& command)
1514 : {
1515 72118 : LBASSERTINFO(command.isValid(), command);
1516 :
1517 72118 : if (dispatchCommand(command))
1518 72118 : _redispatchCommands();
1519 : else
1520 : {
1521 0 : _redispatchCommands();
1522 0 : _impl->pendingCommands.push_back(command);
1523 : }
1524 72118 : }
1525 :
1526 72154 : bool LocalNode::dispatchCommand(ICommand& command)
1527 : {
1528 72154 : LBVERB << "dispatch " << command << " by " << getNodeID() << std::endl;
1529 72154 : LBASSERTINFO(command.isValid(), command);
1530 :
1531 72154 : const uint32_t type = command.getType();
1532 72154 : switch (type)
1533 : {
1534 : case COMMANDTYPE_NODE:
1535 71649 : LBCHECK(Dispatcher::dispatchCommand(command));
1536 71649 : return true;
1537 :
1538 : case COMMANDTYPE_OBJECT:
1539 505 : return _impl->objectStore->dispatchObjectCommand(command);
1540 :
1541 : default:
1542 0 : LBABORT("Unknown command type " << type << " for " << command);
1543 0 : return true;
1544 : }
1545 : }
1546 :
1547 72407 : void LocalNode::_redispatchCommands()
1548 : {
1549 72407 : bool changes = true;
1550 72407 : while (changes && !_impl->pendingCommands.empty())
1551 : {
1552 0 : changes = false;
1553 :
1554 0 : for (CommandList::iterator i = _impl->pendingCommands.begin();
1555 0 : i != _impl->pendingCommands.end(); ++i)
1556 : {
1557 0 : ICommand& command = *i;
1558 0 : LBASSERT(command.isValid());
1559 :
1560 0 : if (dispatchCommand(command))
1561 : {
1562 0 : _impl->pendingCommands.erase(i);
1563 0 : changes = true;
1564 0 : break;
1565 : }
1566 : }
1567 : }
1568 :
1569 : #ifndef NDEBUG
1570 72407 : if (!_impl->pendingCommands.empty())
1571 0 : LBVERB << _impl->pendingCommands.size() << " undispatched commands"
1572 0 : << std::endl;
1573 72407 : LBASSERT(_impl->pendingCommands.size() < 200);
1574 : #endif
1575 72407 : }
1576 :
1577 48 : void LocalNode::_initService()
1578 : {
1579 93 : LB_TS_SCOPED(_rcvThread);
1580 48 : _impl->service->withdraw(); // go silent during k/v update
1581 :
1582 93 : const ConnectionDescriptions& descs = getConnectionDescriptions();
1583 48 : if (descs.empty())
1584 3 : return;
1585 :
1586 90 : std::ostringstream out;
1587 45 : out << getType();
1588 45 : _impl->service->set("co_type", out.str());
1589 :
1590 45 : out.str("");
1591 45 : out << descs.size();
1592 45 : _impl->service->set("co_numPorts", out.str());
1593 :
1594 90 : for (ConnectionDescriptionsCIter i = descs.begin(); i != descs.end(); ++i)
1595 : {
1596 90 : ConnectionDescriptionPtr desc = *i;
1597 45 : out.str("");
1598 45 : out << "co_port" << i - descs.begin();
1599 45 : _impl->service->set(out.str(), desc->toString());
1600 : }
1601 :
1602 45 : _impl->service->announce(descs.front()->port, getNodeID().getString());
1603 : }
1604 :
1605 47 : void LocalNode::_exitService()
1606 : {
1607 47 : _impl->service->withdraw();
1608 47 : }
1609 :
1610 1 : Zeroconf LocalNode::getZeroconf()
1611 : {
1612 2 : lunchbox::ScopedWrite mutex(_impl->service);
1613 1 : _impl->service->discover(servus::Servus::IF_ALL, 500);
1614 2 : return Zeroconf(_impl->service.data);
1615 : }
1616 :
1617 : //----------------------------------------------------------------------
1618 : // command thread functions
1619 : //----------------------------------------------------------------------
1620 48 : bool LocalNode::_startCommandThread(const int32_t threadID)
1621 : {
1622 48 : _impl->commandThread->threadID = threadID;
1623 48 : return _impl->commandThread->start();
1624 : }
1625 :
1626 51502 : bool LocalNode::_notifyCommandThreadIdle()
1627 : {
1628 51502 : return _impl->objectStore->notifyCommandThreadIdle();
1629 : }
1630 :
1631 20001 : bool LocalNode::_cmdAckRequest(ICommand& command)
1632 : {
1633 20001 : const uint32_t requestID = command.get<uint32_t>();
1634 20001 : LBASSERT(requestID != LB_UNDEFINED_UINT32);
1635 :
1636 20001 : serveRequest(requestID);
1637 20001 : return true;
1638 : }
1639 :
1640 47 : bool LocalNode::_cmdStopRcv(ICommand& command)
1641 : {
1642 47 : LB_TS_THREAD(_rcvThread);
1643 47 : LBASSERT(isListening());
1644 :
1645 47 : _exitService();
1646 47 : _setClosing(); // causes rcv thread exit
1647 :
1648 47 : command.setCommand(CMD_NODE_STOP_CMD); // causes cmd thread exit
1649 47 : _dispatchCommand(command);
1650 47 : return true;
1651 : }
1652 :
1653 47 : bool LocalNode::_cmdStopCmd(ICommand&)
1654 : {
1655 47 : LB_TS_THREAD(_cmdThread);
1656 47 : LBASSERTINFO(isClosing(), *this);
1657 :
1658 47 : _setClosed();
1659 47 : return true;
1660 : }
1661 :
1662 0 : bool LocalNode::_cmdSetAffinity(ICommand& command)
1663 : {
1664 0 : const int32_t affinity = command.get<int32_t>();
1665 :
1666 0 : lunchbox::Thread::setAffinity(affinity);
1667 0 : return true;
1668 : }
1669 :
1670 33 : bool LocalNode::_cmdConnect(ICommand& command)
1671 : {
1672 33 : LBASSERTINFO(!command.getRemoteNode(), command);
1673 33 : LBASSERT(_impl->inReceiverThread());
1674 :
1675 33 : const NodeID& nodeID = command.get<NodeID>();
1676 33 : const uint32_t requestID = command.get<uint32_t>();
1677 33 : const uint32_t nodeType = command.get<uint32_t>();
1678 66 : std::string data = command.get<std::string>();
1679 :
1680 33 : LBVERB << "handle connect " << command << " req " << requestID << " type "
1681 33 : << nodeType << " data " << data << std::endl;
1682 :
1683 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1684 :
1685 33 : LBASSERT(connection);
1686 33 : LBASSERT(nodeID != getNodeID());
1687 33 : LBASSERT(_impl->connectionNodes.find(connection) ==
1688 : _impl->connectionNodes.end());
1689 :
1690 66 : NodePtr peer;
1691 33 : const uint32_t cmd = CMD_NODE_CONNECT_REPLY;
1692 :
1693 : // No locking needed, only recv thread modifies
1694 33 : NodeHashCIter i = _impl->nodes->find(nodeID);
1695 33 : if (i != _impl->nodes->end())
1696 : {
1697 0 : peer = i->second;
1698 0 : if (peer->isReachable())
1699 : {
1700 : // Node exists, probably simultaneous connect from peer
1701 0 : LBINFO << "Already got node " << nodeID << ", refusing connect"
1702 0 : << std::endl;
1703 :
1704 : // refuse connection
1705 0 : OCommand(Connections(1, connection), cmd) << NodeID() << requestID;
1706 :
1707 : // NOTE: There is no close() here. The reply command above has to be
1708 : // received by the peer first, before closing the connection.
1709 0 : _removeConnection(connection);
1710 0 : return true;
1711 : }
1712 : }
1713 :
1714 : // create and add connected node
1715 33 : if (!peer)
1716 33 : peer = createNode(nodeType);
1717 33 : if (!peer)
1718 : {
1719 0 : LBDEBUG << "Can't create node of type " << nodeType << ", disconnecting"
1720 0 : << std::endl;
1721 :
1722 : // refuse connection
1723 0 : OCommand(Connections(1, connection), cmd) << NodeID() << requestID;
1724 :
1725 : // NOTE: There is no close() here. The reply command above has to be
1726 : // received by the peer first, before closing the connection.
1727 0 : _removeConnection(connection);
1728 0 : return true;
1729 : }
1730 :
1731 33 : if (!peer->deserialize(data))
1732 0 : LBWARN << "Error during node initialization" << std::endl;
1733 33 : LBASSERTINFO(data.empty(), data);
1734 33 : LBASSERTINFO(peer->getNodeID() == nodeID, peer->getNodeID() << "!="
1735 : << nodeID);
1736 33 : LBASSERT(peer->getType() == nodeType);
1737 :
1738 33 : _impl->connectionNodes[connection] = peer;
1739 : {
1740 66 : lunchbox::ScopedFastWrite mutex(_impl->nodes);
1741 33 : _impl->nodes.data[peer->getNodeID()] = peer;
1742 : }
1743 33 : LBVERB << "Added node " << nodeID << std::endl;
1744 :
1745 : // send our information as reply
1746 66 : OCommand(Connections(1, connection), cmd) << getNodeID() << requestID
1747 99 : << getType() << serialize();
1748 :
1749 33 : return true;
1750 : }
1751 :
1752 33 : bool LocalNode::_cmdConnectReply(ICommand& command)
1753 : {
1754 33 : LBASSERT(!command.getRemoteNode());
1755 33 : LBASSERT(_impl->inReceiverThread());
1756 :
1757 66 : ConnectionPtr connection = _impl->incoming.getConnection();
1758 33 : LBASSERT(_impl->connectionNodes.find(connection) ==
1759 : _impl->connectionNodes.end());
1760 :
1761 33 : const NodeID& nodeID = command.get<NodeID>();
1762 33 : const uint32_t requestID = command.get<uint32_t>();
1763 :
1764 : // connection refused
1765 33 : if (nodeID == 0)
1766 : {
1767 0 : LBINFO << "Connection refused, node already connected by peer"
1768 0 : << std::endl;
1769 :
1770 0 : _removeConnection(connection);
1771 0 : serveRequest(requestID, false);
1772 0 : return true;
1773 : }
1774 :
1775 33 : const uint32_t nodeType = command.get<uint32_t>();
1776 66 : std::string data = command.get<std::string>();
1777 :
1778 33 : LBVERB << "handle connect reply " << command << " req " << requestID
1779 33 : << " type " << nodeType << " data " << data << std::endl;
1780 :
1781 : // No locking needed, only recv thread modifies
1782 33 : NodeHash::const_iterator i = _impl->nodes->find(nodeID);
1783 66 : NodePtr peer;
1784 33 : if (i != _impl->nodes->end())
1785 0 : peer = i->second;
1786 :
1787 33 : if (peer && peer->isReachable()) // simultaneous connect
1788 : {
1789 0 : LBINFO << "Closing simultaneous connection from " << peer << " on "
1790 0 : << connection << std::endl;
1791 :
1792 0 : _removeConnection(connection);
1793 0 : _closeNode(peer);
1794 0 : serveRequest(requestID, false);
1795 0 : return true;
1796 : }
1797 :
1798 : // create and add node
1799 33 : if (!peer)
1800 : {
1801 33 : if (requestID != LB_UNDEFINED_UINT32)
1802 33 : peer = reinterpret_cast<Node*>(getRequestData(requestID));
1803 : else
1804 0 : peer = createNode(nodeType);
1805 : }
1806 33 : if (!peer)
1807 : {
1808 0 : LBINFO << "Can't create node of type " << nodeType << ", disconnecting"
1809 0 : << std::endl;
1810 0 : _removeConnection(connection);
1811 0 : return true;
1812 : }
1813 :
1814 33 : LBASSERTINFO(peer->getType() == nodeType, peer->getType() << " != "
1815 : << nodeType);
1816 33 : LBASSERT(peer->isClosed());
1817 :
1818 33 : if (!peer->deserialize(data))
1819 0 : LBWARN << "Error during node initialization" << std::endl;
1820 33 : LBASSERT(data.empty());
1821 33 : LBASSERT(peer->getNodeID() == nodeID);
1822 :
1823 : // send ACK to peer
1824 : // cppcheck-suppress unusedScopedObject
1825 33 : OCommand(Connections(1, connection), CMD_NODE_CONNECT_ACK);
1826 :
1827 33 : peer->_connect(connection);
1828 33 : _impl->connectionNodes[connection] = peer;
1829 : {
1830 66 : lunchbox::ScopedFastWrite mutex(_impl->nodes);
1831 33 : _impl->nodes.data[peer->getNodeID()] = peer;
1832 : }
1833 33 : _connectMulticast(peer);
1834 33 : LBVERB << "Added node " << nodeID << std::endl;
1835 :
1836 33 : serveRequest(requestID, true);
1837 :
1838 33 : notifyConnect(peer);
1839 33 : return true;
1840 : }
1841 :
1842 33 : bool LocalNode::_cmdConnectAck(ICommand& command)
1843 : {
1844 66 : NodePtr node = command.getRemoteNode();
1845 33 : LBASSERT(node);
1846 33 : LBASSERT(_impl->inReceiverThread());
1847 33 : LBVERB << "handle connect ack" << std::endl;
1848 :
1849 33 : node->_connect(_impl->incoming.getConnection());
1850 33 : _connectMulticast(node);
1851 33 : notifyConnect(node);
1852 66 : return true;
1853 : }
1854 :
1855 0 : bool LocalNode::_cmdID(ICommand& command)
1856 : {
1857 0 : LBASSERT(_impl->inReceiverThread());
1858 :
1859 0 : const NodeID& nodeID = command.get<NodeID>();
1860 0 : uint32_t nodeType = command.get<uint32_t>();
1861 0 : std::string data = command.get<std::string>();
1862 :
1863 0 : if (command.getRemoteNode())
1864 : {
1865 0 : LBASSERT(nodeID == command.getRemoteNode()->getNodeID());
1866 0 : LBASSERT(command.getRemoteNode()->_getMulticast());
1867 0 : return true;
1868 : }
1869 :
1870 0 : LBDEBUG << "handle ID " << command << " node " << nodeID << std::endl;
1871 :
1872 0 : ConnectionPtr connection = _impl->incoming.getConnection();
1873 0 : LBASSERT(connection->isMulticast());
1874 0 : LBASSERT(_impl->connectionNodes.find(connection) ==
1875 : _impl->connectionNodes.end());
1876 :
1877 0 : NodePtr node;
1878 0 : if (nodeID == getNodeID()) // 'self' multicast connection
1879 0 : node = this;
1880 : else
1881 : {
1882 : // No locking needed, only recv thread writes
1883 0 : NodeHash::const_iterator i = _impl->nodes->find(nodeID);
1884 :
1885 0 : if (i == _impl->nodes->end())
1886 : {
1887 : // unknown node: create and add unconnected node
1888 0 : node = createNode(nodeType);
1889 :
1890 0 : if (!node->deserialize(data))
1891 0 : LBWARN << "Error during node initialization" << std::endl;
1892 0 : LBASSERTINFO(data.empty(), data);
1893 :
1894 : {
1895 0 : lunchbox::ScopedFastWrite mutex(_impl->nodes);
1896 0 : _impl->nodes.data[nodeID] = node;
1897 : }
1898 0 : LBVERB << "Added node " << nodeID << " with multicast "
1899 0 : << connection << std::endl;
1900 : }
1901 : else
1902 0 : node = i->second;
1903 : }
1904 0 : LBASSERT(node);
1905 0 : LBASSERTINFO(node->getNodeID() == nodeID, node->getNodeID() << "!="
1906 : << nodeID);
1907 :
1908 0 : _connectMulticast(node, connection);
1909 0 : _impl->connectionNodes[connection] = node;
1910 0 : LBDEBUG << "Added multicast connection " << connection << " from " << nodeID
1911 0 : << " to " << getNodeID() << std::endl;
1912 0 : return true;
1913 : }
1914 :
1915 7 : bool LocalNode::_cmdDisconnect(ICommand& command)
1916 : {
1917 7 : LBASSERT(_impl->inReceiverThread());
1918 :
1919 7 : const uint32_t requestID = command.get<uint32_t>();
1920 :
1921 14 : NodePtr node = static_cast<Node*>(getRequestData(requestID));
1922 7 : LBASSERT(node);
1923 :
1924 7 : _closeNode(node);
1925 7 : LBASSERT(node->isClosed());
1926 7 : serveRequest(requestID);
1927 14 : return true;
1928 : }
1929 :
1930 0 : bool LocalNode::_cmdGetNodeData(ICommand& command)
1931 : {
1932 0 : const NodeID& nodeID = command.get<NodeID>();
1933 0 : const uint32_t requestID = command.get<uint32_t>();
1934 :
1935 0 : LBVERB << "cmd get node data: " << command << " req " << requestID
1936 0 : << " nodeID " << nodeID << std::endl;
1937 :
1938 0 : NodePtr node = getNode(nodeID);
1939 0 : NodePtr toNode = command.getRemoteNode();
1940 :
1941 0 : uint32_t nodeType = NODETYPE_INVALID;
1942 0 : std::string nodeData;
1943 0 : if (node)
1944 : {
1945 0 : nodeType = node->getType();
1946 0 : nodeData = node->serialize();
1947 0 : LBDEBUG << "Sent node data '" << nodeData << "' for " << nodeID
1948 0 : << " to " << toNode << std::endl;
1949 : }
1950 :
1951 0 : toNode->send(CMD_NODE_GET_NODE_DATA_REPLY) << nodeID << requestID
1952 0 : << nodeType << nodeData;
1953 0 : return true;
1954 : }
1955 :
1956 0 : bool LocalNode::_cmdGetNodeDataReply(ICommand& command)
1957 : {
1958 0 : LBASSERT(_impl->inReceiverThread());
1959 :
1960 0 : const NodeID& nodeID = command.get<NodeID>();
1961 0 : const uint32_t requestID = command.get<uint32_t>();
1962 0 : const uint32_t nodeType = command.get<uint32_t>();
1963 0 : std::string nodeData = command.get<std::string>();
1964 :
1965 0 : LBVERB << "cmd get node data reply: " << command << " req " << requestID
1966 0 : << " type " << nodeType << " data " << nodeData << std::endl;
1967 :
1968 : // No locking needed, only recv thread writes
1969 0 : NodeHash::const_iterator i = _impl->nodes->find(nodeID);
1970 0 : if (i != _impl->nodes->end())
1971 : {
1972 : // Requested node connected to us in the meantime
1973 0 : NodePtr node = i->second;
1974 :
1975 0 : node->ref(this);
1976 0 : serveRequest(requestID, node.get());
1977 0 : return true;
1978 : }
1979 :
1980 0 : if (nodeType == NODETYPE_INVALID)
1981 : {
1982 0 : serveRequest(requestID, (void*)0);
1983 0 : return true;
1984 : }
1985 :
1986 : // new node: create and add unconnected node
1987 0 : NodePtr node = createNode(nodeType);
1988 0 : if (node)
1989 : {
1990 0 : LBASSERT(node);
1991 :
1992 0 : if (!node->deserialize(nodeData))
1993 0 : LBWARN << "Failed to initialize node data" << std::endl;
1994 0 : LBASSERT(nodeData.empty());
1995 0 : node->ref(this);
1996 : }
1997 : else
1998 0 : LBINFO << "Can't create node of type " << nodeType << std::endl;
1999 :
2000 0 : serveRequest(requestID, node.get());
2001 0 : return true;
2002 : }
2003 :
2004 0 : bool LocalNode::_cmdAcquireSendToken(ICommand& command)
2005 : {
2006 0 : LBASSERT(inCommandThread());
2007 0 : if (!_impl->sendToken) // enqueue command if no token available
2008 : {
2009 0 : const uint32_t timeout = Global::getTimeout();
2010 0 : if (timeout == LB_TIMEOUT_INDEFINITE ||
2011 0 : (getTime64() - _impl->lastSendToken <= timeout))
2012 : {
2013 0 : _impl->sendTokenQueue.push_back(command);
2014 0 : return true;
2015 : }
2016 :
2017 : // timeout! - clear old requests
2018 0 : _impl->sendTokenQueue.clear();
2019 : // 'generate' new token - release is robust
2020 : }
2021 :
2022 0 : _impl->sendToken = false;
2023 :
2024 0 : const uint32_t requestID = command.get<uint32_t>();
2025 0 : command.getRemoteNode()->send(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY)
2026 0 : << requestID;
2027 0 : return true;
2028 : }
2029 :
2030 0 : bool LocalNode::_cmdAcquireSendTokenReply(ICommand& command)
2031 : {
2032 0 : const uint32_t requestID = command.get<uint32_t>();
2033 0 : serveRequest(requestID);
2034 0 : return true;
2035 : }
2036 :
2037 0 : bool LocalNode::_cmdReleaseSendToken(ICommand&)
2038 : {
2039 0 : LBASSERT(inCommandThread());
2040 0 : _impl->lastSendToken = getTime64();
2041 :
2042 0 : if (_impl->sendToken)
2043 0 : return true; // double release due to timeout
2044 0 : if (_impl->sendTokenQueue.empty())
2045 : {
2046 0 : _impl->sendToken = true;
2047 0 : return true;
2048 : }
2049 :
2050 0 : ICommand& request = _impl->sendTokenQueue.front();
2051 :
2052 0 : const uint32_t requestID = request.get<uint32_t>();
2053 0 : request.getRemoteNode()->send(CMD_NODE_ACQUIRE_SEND_TOKEN_REPLY)
2054 0 : << requestID;
2055 0 : _impl->sendTokenQueue.pop_front();
2056 0 : return true;
2057 : }
2058 :
2059 0 : bool LocalNode::_cmdAddListener(ICommand& command)
2060 : {
2061 : Connection* rawConnection =
2062 0 : reinterpret_cast<Connection*>(command.get<uint64_t>());
2063 0 : std::string data = command.get<std::string>();
2064 :
2065 0 : ConnectionDescriptionPtr description = new ConnectionDescription(data);
2066 0 : command.getRemoteNode()->_addConnectionDescription(description);
2067 :
2068 0 : if (command.getRemoteNode() != this)
2069 0 : return true;
2070 :
2071 0 : ConnectionPtr connection = rawConnection;
2072 0 : connection->unref();
2073 0 : LBASSERT(connection);
2074 :
2075 0 : _impl->connectionNodes[connection] = this;
2076 0 : if (connection->isMulticast())
2077 0 : _addMulticast(this, connection);
2078 :
2079 0 : connection->acceptNB();
2080 0 : _impl->incoming.addConnection(connection);
2081 :
2082 0 : _initService(); // update zeroconf
2083 0 : return true;
2084 : }
2085 :
2086 0 : bool LocalNode::_cmdRemoveListener(ICommand& command)
2087 : {
2088 0 : const uint32_t requestID = command.get<uint32_t>();
2089 0 : Connection* rawConnection = command.get<Connection*>();
2090 0 : std::string data = command.get<std::string>();
2091 :
2092 0 : ConnectionDescriptionPtr description = new ConnectionDescription(data);
2093 0 : LBCHECK(command.getRemoteNode()->_removeConnectionDescription(description));
2094 :
2095 0 : if (command.getRemoteNode() != this)
2096 0 : return true;
2097 :
2098 0 : _initService(); // update zeroconf
2099 :
2100 0 : ConnectionPtr connection = rawConnection;
2101 0 : connection->unref(this);
2102 0 : LBASSERT(connection);
2103 :
2104 0 : if (connection->isMulticast())
2105 0 : _removeMulticast(connection);
2106 :
2107 0 : _impl->incoming.removeConnection(connection);
2108 0 : LBASSERT(_impl->connectionNodes.find(connection) !=
2109 : _impl->connectionNodes.end());
2110 0 : _impl->connectionNodes.erase(connection);
2111 0 : serveRequest(requestID);
2112 0 : return true;
2113 : }
2114 :
2115 0 : bool LocalNode::_cmdPing(ICommand& command)
2116 : {
2117 0 : LBASSERT(inCommandThread());
2118 0 : command.getRemoteNode()->send(CMD_NODE_PING_REPLY);
2119 0 : return true;
2120 : }
2121 :
2122 2 : bool LocalNode::_cmdCommand(ICommand& command)
2123 : {
2124 2 : const uint128_t& commandID = command.get<uint128_t>();
2125 4 : CommandHandler func;
2126 : {
2127 3 : lunchbox::ScopedFastRead mutex(_impl->commandHandlers);
2128 2 : CommandHashCIter i = _impl->commandHandlers->find(commandID);
2129 2 : if (i == _impl->commandHandlers->end())
2130 0 : return false;
2131 :
2132 2 : CommandQueue* queue = i->second.second;
2133 2 : if (queue)
2134 : {
2135 2 : command.setDispatchFunction(
2136 3 : CmdFunc(this, &LocalNode::_cmdCommandAsync));
2137 1 : queue->push(command);
2138 1 : return true;
2139 : }
2140 : // else
2141 :
2142 1 : func = i->second.first;
2143 : }
2144 :
2145 2 : CustomICommand customCmd(command);
2146 1 : return func(customCmd);
2147 : }
2148 :
2149 1 : bool LocalNode::_cmdCommandAsync(ICommand& command)
2150 : {
2151 1 : const uint128_t& commandID = command.get<uint128_t>();
2152 2 : CommandHandler func;
2153 : {
2154 2 : lunchbox::ScopedFastRead mutex(_impl->commandHandlers);
2155 1 : CommandHashCIter i = _impl->commandHandlers->find(commandID);
2156 1 : LBASSERT(i != _impl->commandHandlers->end());
2157 1 : if (i == _impl->commandHandlers->end())
2158 0 : return true; // deregistered between dispatch and now
2159 1 : func = i->second.first;
2160 : }
2161 2 : CustomICommand customCmd(command);
2162 1 : return func(customCmd);
2163 : }
2164 :
2165 33 : bool LocalNode::_cmdAddConnection(ICommand& command)
2166 : {
2167 33 : LBASSERT(_impl->inReceiverThread());
2168 :
2169 66 : ConnectionPtr connection = command.get<ConnectionPtr>();
2170 33 : _addConnection(connection);
2171 33 : connection->unref(); // ref'd by _addConnection
2172 66 : return true;
2173 : }
2174 63 : }
|