Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_LOCALNODE_H
23 : #define CO_LOCALNODE_H
24 :
25 : #include <co/node.h> // base class
26 : #include <co/objectHandler.h> // base class
27 : #include <co/objectVersion.h> // VERSION_FOO used inline
28 : #include <lunchbox/requestHandler.h> // base class
29 :
30 : #include <boost/function/function1.hpp>
31 : #include <boost/function/function4.hpp>
32 :
33 : namespace co
34 : {
35 : namespace detail
36 : {
37 : class LocalNode;
38 : class ReceiverThread;
39 : class CommandThread;
40 : }
41 :
42 : /**
43 : * Node specialization for a local node.
44 : *
45 : * Local nodes listen on network connections, manage connections to other nodes
46 : * and provide Object registration, mapping and command dispatch. Typically each
47 : * process uses one local node to communicate with other processes.
48 : */
49 : class LocalNode : public lunchbox::RequestHandler,
50 : public Node,
51 : public ObjectHandler
52 : {
53 : public:
54 : /**
55 : * Counters are monotonically increasing performance variables for
56 : * operations performed by a LocalNode instance.
57 : */
58 : enum Counter
59 : {
60 : COUNTER_MAP_OBJECT_REMOTE, //!< Num of mapObjects served for other nodes
61 : COUNTER_ALL // must be last
62 : };
63 :
64 : /** Construct a new local node of the given type. @version 1.0 */
65 : CO_API explicit LocalNode(const uint32_t type = co::NODETYPE_NODE);
66 :
67 : /**
68 : * @name State Changes
69 : *
70 : * The following methods affect the state of the node by changing its
71 : * connectivity to the network.
72 : */
73 : //@{
74 : /**
75 : * Initialize the node.
76 : *
77 : * Parses the following command line options and calls listen()
78 : * afterwards:
79 : *
80 : * The '--co-listen <connection description>' command line option is
81 : * parsed by this method to add listening connections to this node. This
82 : * parameter might be used multiple times.
83 : * ConnectionDescription::fromString() is used to parse the provided
84 : * description.
85 : *
86 : * The '--co-globals <string>' option is used to initialize the
87 : * Globals. The string is parsed used Globals::fromString().
88 : *
89 : * Please note that further command line parameters are recognized by
90 : * co::init().
91 : *
92 : * @param argc the command line argument count.
93 : * @param argv the command line argument values.
94 : * @return true if the client was successfully initialized, false otherwise.
95 : * @version 1.0
96 : */
97 : CO_API virtual bool initLocal(const int argc, char** argv);
98 :
99 : /**
100 : * Open all connections and put this node into the listening state.
101 : *
102 : * The node will spawn a receiver and command thread, and listen on all
103 : * connections for incoming commands. The node will be in the listening
104 : * state if the method completed successfully. A listening node can connect
105 : * other nodes.
106 : *
107 : * @return true if the node could be initialized, false otherwise.
108 : * @sa connect()
109 : * @version 1.0
110 : */
111 : CO_API virtual bool listen();
112 :
113 : /**
114 : * Close a listening node.
115 : *
116 : * Disconnects all connected node proxies, closes the listening connections
117 : * and terminates all threads created in listen().
118 : *
119 : * @return true if the node was stopped, false otherwise.
120 : * @version 1.0
121 : */
122 : CO_API virtual bool close();
123 :
124 : /** Close a listening node. @version 1.0 */
125 0 : virtual bool exitLocal() { return close(); }
126 : /**
127 : * Connect a remote node (proxy) to this listening node.
128 : *
129 : * The connection descriptions of the node are used to connect the
130 : * remote local node. On success, the node is in the connected state,
131 : * otherwise its state is unchanged.
132 : *
133 : * This method is one-sided, that is, the node to be connected should
134 : * not initiate a connection to this node at the same time. For
135 : * concurrent connects use the other connect() method using node
136 : * identifiers.
137 : *
138 : * @param node the remote node.
139 : * @return true if this node was connected, false otherwise.
140 : * @version 1.0
141 : */
142 : CO_API bool connect(NodePtr node);
143 :
144 : /**
145 : * Create and connect a node given by an identifier.
146 : *
147 : * This method is two-sided and thread-safe, that is, it can be called
148 : * by multiple threads on the same node with the same nodeID, or
149 : * concurrently on two nodes with each others' nodeID.
150 : *
151 : * @param nodeID the identifier of the node to connect.
152 : * @return the connected node, or an invalid RefPtr if the node could
153 : * not be connected.
154 : * @version 1.0
155 : */
156 : CO_API NodePtr connect(const NodeID& nodeID);
157 :
158 : /**
159 : * Find and connect the node where the given object is registered.
160 : *
161 : * This method is relatively expensive, since potentially all connected
162 : * nodes are queried.
163 : *
164 : * @param id the identifier of the object to search for.
165 : * @return the connected node, or an invalid RefPtr if the node could
166 : * not be found or connected.
167 : * @sa registerObject(), connect()
168 : * @version 1.1.1
169 : */
170 : CO_API NodePtr connectObjectMaster(const uint128_t& id);
171 :
172 : /**
173 : * Disconnect a connected node.
174 : *
175 : * @param node the remote node.
176 : * @return true if the node was disconnected correctly, false otherwise.
177 : * @version 1.0
178 : */
179 : CO_API virtual bool disconnect(NodePtr node);
180 : //@}
181 :
182 : /** @name Launching a remote node */
183 : //@{
184 : /**
185 : * Launch a remote process using the given command.
186 : *
187 : * The launched node will automatically connect with this node, if it passes
188 : * the given options to its initLocal().
189 : *
190 : * The following markers are replaced in the command:
191 : * * %h: Node::getHostname()
192 : * * %n: Node identifier in string representation
193 : * * %d: Node::getWorkDir()
194 : * * %q: Node::getLaunchQuote()
195 : * * %o: Options needed by remote initLocal() to connect this node. If not
196 : * given, options are appended at the end of the given command.
197 : *
198 : * @return true if the launch process execution was successful.
199 : */
200 : CO_API bool launch(NodePtr node, const std::string& command);
201 :
202 : /**
203 : * Wait for a launched node to connect.
204 : *
205 : * @return the remote node handle, or 0 on timeout.
206 : */
207 : CO_API NodePtr syncLaunch(const uint128_t& nodeID, int64_t timeout);
208 : //@}
209 :
210 : /** @name Object Registry */
211 : //@{
212 : /**
213 : * Register a distributed object.
214 : *
215 : * Registering a distributed object makes this object the master
216 : * version. The object's identifier is used to map slave instances of
217 : * the object. Master versions of objects are typically writable and can
218 : * commit new versions of the distributed object.
219 : *
220 : * @param object the object instance.
221 : * @return true if the object was registered, false otherwise.
222 : * @version 1.0
223 : */
224 : CO_API bool registerObject(Object* object) override;
225 :
226 : /**
227 : * Deregister a distributed object.
228 : *
229 : * All slave instances should be unmapped before this call, and will be
230 : * forcefully unmapped by this method.
231 : *
232 : * @param object the object instance.
233 : * @version 1.0
234 : */
235 : CO_API void deregisterObject(Object* object) override;
236 :
237 : /**
238 : * Map a distributed object.
239 : *
240 : * The mapped object becomes a slave instance of the master version which
241 : * was registered with the provided identifier. The given version can be
242 : * used to map a specific version.
243 : *
244 : * If VERSION_NONE is provided, the slave instance is not initialized with
245 : * any data from the master. This is useful if the object has been
246 : * pre-initialized by other means, for example from a shared file system.
247 : *
248 : * If VERSION_OLDEST is provided, the oldest available version is mapped.
249 : *
250 : * If a concrete requested version no longer exists, mapObject() will map
251 : * the oldest available version.
252 : *
253 : * If the requested version is newer than the head version, mapObject() will
254 : * block until the requested version is available.
255 : *
256 : * Mapping an object is a potentially time-consuming operation. Using
257 : * mapObjectNB() and mapObjectSync() to asynchronously map multiple objects
258 : * in parallel improves performance of this operation.
259 : *
260 : * After mapping, the object will have the version used during
261 : * initialization, or VERSION_NONE if mapped to this version.
262 : *
263 : * When no master node is given, connectObjectMaster() is used to find the
264 : * node with the master instance.
265 : *
266 : * This method returns immediately after initiating the mapping. Evaluating
267 : * the value of the returned lunchbox::Future will block on the completion
268 : * of the operation and return true if the object was mapped, false if the
269 : * master of the object is not found or the requested version is no longer
270 : * available.
271 : *
272 : * @param object the object.
273 : * @param id the master object identifier.
274 : * @param master the node with the master instance, may be 0.
275 : * @param version the initial version.
276 : * @return A lunchbox::Future which will deliver the success status of
277 : * the operation on evaluation.
278 : * @sa registerObject
279 : * @version 1.0
280 : */
281 : CO_API f_bool_t mapObject(Object* object, const uint128_t& id,
282 : NodePtr master,
283 : const uint128_t& version = VERSION_OLDEST);
284 :
285 : /** Convenience wrapper for mapObject(). @version 1.0 */
286 27 : f_bool_t mapObject(Object* object, const ObjectVersion& v)
287 : {
288 27 : return mapObject(object, v.identifier, 0, v.version);
289 : }
290 :
291 : /** @deprecated */
292 7 : f_bool_t mapObject(Object* object, const uint128_t& id,
293 : const uint128_t& version = VERSION_OLDEST)
294 : {
295 7 : return mapObject(object, id, 0, version);
296 : }
297 :
298 : /** @deprecated use mapObject() */
299 : CO_API uint32_t mapObjectNB(Object* object, const uint128_t& id,
300 : const uint128_t& version = VERSION_OLDEST);
301 :
302 : /** @deprecated use mapObject() */
303 : CO_API uint32_t mapObjectNB(Object* object, const uint128_t& id,
304 : const uint128_t& version,
305 : NodePtr master) override;
306 :
307 : /** @deprecated use mapObject() */
308 : CO_API bool mapObjectSync(const uint32_t requestID) override;
309 :
310 : /**
311 : * Synchronize the local object with a remote object.
312 : *
313 : * The object is synchronized to the newest version of the first
314 : * attached object on the given master node matching the
315 : * instanceID. When no master node is given, connectObjectMaster() is
316 : * used to find the node with the master instance. When CO_INSTANCE_ALL
317 : * is given, the first instance is used. Before a successful return,
318 : * applyInstanceData() is called on the calling thread to synchronize
319 : * the given object.
320 : *
321 : * @param object The local object instance to synchronize.
322 : * @param master The node where the synchronizing object is attached.
323 : * @param id the object identifier.
324 : * @param instanceID the instance identifier of the synchronizing
325 : * object.
326 : * @return A lunchbox::Future which will deliver the success status of
327 : * the operation on evaluation.
328 : * @version 1.1.1
329 : */
330 : CO_API f_bool_t
331 : syncObject(Object* object, const uint128_t& id, NodePtr master,
332 : const uint32_t instanceID = CO_INSTANCE_ALL) override;
333 : /**
334 : * Unmap a mapped object.
335 : *
336 : * @param object the mapped object.
337 : * @version 1.0
338 : */
339 : CO_API void unmapObject(Object* object) override;
340 :
341 : /** Disable the instance cache of a stopped local node. @version 1.0 */
342 : CO_API void disableInstanceCache();
343 :
344 : /** @internal */
345 : CO_API void expireInstanceData(const int64_t age);
346 :
347 : /**
348 : * Enable sending instance data after registration.
349 : *
350 : * Send-on-register starts transmitting instance data of registered
351 : * objects directly after they have been registered. The data is cached
352 : * on remote nodes and accelerates object mapping. Send-on-register
353 : * should not be active when remote nodes are joining a multicast group
354 : * of this node, since they will potentially read out-of-order data
355 : * streams on the multicast connection.
356 : *
357 : * Enable and disable are counted, that is, the last enable on a
358 : * matched series of disable/enable will be effective. The disable is
359 : * completely synchronous, that is, no more instance data will be sent
360 : * after the first disable.
361 : *
362 : * @version 1.0
363 : */
364 : CO_API void enableSendOnRegister();
365 :
366 : /** Disable sending data of newly registered objects. @version 1.0 */
367 : CO_API void disableSendOnRegister();
368 :
369 : /**
370 : * Handler for an Object::push() operation.
371 : *
372 : * Called at least on each node listed in an Object::push() operation
373 : * upon reception of the pushed data from the command thread. Called on
374 : * all nodes of a multicast group, even for nodes not listed in the
375 : * Object::push().
376 : *
377 : * The default implementation calls registered push handlers. Typically
378 : * used to create an object on a remote node, using the objectType for
379 : * instantiation, the istream to initialize it, and the objectID to map
380 : * it using VERSION_NONE. The groupID may be used to differentiate
381 : * multiple concurrent push operations.
382 : *
383 : * @param groupID The group identifier given to Object::push()
384 : * @param objectType The type identifier given to Object::push()
385 : * @param objectID The identifier of the pushed object
386 : * @param istream the input data stream containing the instance data.
387 : * @version 1.0
388 : */
389 : CO_API virtual void objectPush(const uint128_t& groupID,
390 : const uint128_t& objectType,
391 : const uint128_t& objectID,
392 : DataIStream& istream);
393 :
394 : /** Function signature for push handlers. @version 1.0 */
395 : typedef boost::function<void(const uint128_t&, //!< groupID
396 : const uint128_t&, //!< objectType
397 : const uint128_t&, //!< objectID
398 : DataIStream&)>
399 : PushHandler;
400 : /**
401 : * Register a custom handler for Object::push operations
402 : *
403 : * The registered handler function will be called automatically for an
404 : * incoming object push. Threadsafe with itself and objectPush().
405 : *
406 : * @param groupID The group identifier given to Object::push()
407 : * @param handler The handler function called for a registered groupID
408 : * @version 1.0
409 : */
410 : CO_API void registerPushHandler(const uint128_t& groupID,
411 : const PushHandler& handler);
412 :
413 : /** Function signature for custom command handlers. @version 1.0 */
414 : typedef boost::function<bool(CustomICommand&)> CommandHandler;
415 :
416 : /**
417 : * Register a custom command handler handled by this node.
418 : *
419 : * Custom command handlers are invoked on reception of a CustomICommand
420 : * send by Node::send( uint128_t, ... ). The command identifier needs to
421 : * be unique. It is recommended to use servus::make_uint128() to generate
422 : * this identifier.
423 : *
424 : * @param command the unique identifier of the custom command
425 : * @param func the handler function for the custom command
426 : * @param queue the queue where the command should be inserted to
427 : * @return true on successful registering, false otherwise
428 : * @version 1.0
429 : */
430 : CO_API bool registerCommandHandler(const uint128_t& command,
431 : const CommandHandler& func,
432 : CommandQueue* queue);
433 :
434 : /** @internal swap the existing object by a new object and keep
435 : the cm, id and instanceID. */
436 : CO_API void swapObject(Object* oldObject, Object* newObject);
437 : //@}
438 :
439 : /** @name Data Access */
440 : //@{
441 : /**
442 : * Get a node by identifier.
443 : *
444 : * The node might not be connected. Thread safe.
445 : *
446 : * @param id the node identifier.
447 : * @return the node.
448 : * @version 1.0
449 : */
450 : CO_API NodePtr getNode(const NodeID& id) const;
451 :
452 : /** @return a vector of the currently connected nodes. @version 1.0 */
453 : CO_API Nodes getNodes(const bool addSelf = true) const;
454 :
455 : /** Return the command queue to the command thread. @version 1.0 */
456 : CO_API CommandQueue* getCommandThreadQueue();
457 :
458 : /**
459 : * @return true if executed from the command handler thread, false if
460 : * not.
461 : * @version 1.0
462 : */
463 : CO_API bool inCommandThread() const;
464 :
465 : /** @return the non-Collage command line options passed to initLocal(). */
466 : CO_API const Strings& getCommandLine() const;
467 :
468 : CO_API int64_t getTime64() const; //!< @internal
469 : CO_API ssize_t getCounter(const Counter counter) const; //!< @internal
470 : //@}
471 :
472 : /** @name Operations */
473 : //@{
474 : /**
475 : * Add a listening connection to this listening node.
476 : * @return the listening connection, or 0 upon error.
477 : */
478 : CO_API ConnectionPtr addListener(ConnectionDescriptionPtr desc);
479 :
480 : /** Add a listening connection to this listening node. */
481 : CO_API void addListener(ConnectionPtr connection);
482 :
483 : /** Remove listening connections from this listening node.*/
484 : CO_API void removeListeners(const Connections& connections);
485 :
486 : /** @internal
487 : * Flush all pending commands on this listening node.
488 : *
489 : * This causes the receiver thread to redispatch all pending commands,
490 : * which are normally only redispatched when a new command is received.
491 : */
492 : CO_API void flushCommands();
493 :
494 : /** @internal Allocate a command buffer from the receiver thread. */
495 : CO_API BufferPtr allocBuffer(const uint64_t size);
496 :
497 : /**
498 : * Dispatches a command to the registered command queue.
499 : *
500 : * Applications using custom command types have to override this method
501 : * to dispatch the custom commands.
502 : *
503 : * @param command the command.
504 : * @return the result of the operation.
505 : * @sa ICommand::invoke
506 : * @version 1.0
507 : */
508 : CO_API bool dispatchCommand(ICommand& command) override;
509 :
510 : /** A handle for a send token acquired by acquireSendToken(). */
511 : typedef lunchbox::RefPtr<co::SendToken> SendToken;
512 :
513 : /**
514 : * Acquire a send token from the given node.
515 : *
516 : * The token is released automatically when it leaves its scope or
517 : * explicitly using releaseSendToken().
518 : *
519 : * @return The send token.
520 : */
521 : CO_API SendToken acquireSendToken(NodePtr toNode);
522 :
523 : /** @deprecated Token will auto-release when leaving scope. */
524 : CO_API void releaseSendToken(SendToken token);
525 :
526 : /** @return a Zeroconf communicator handle for this node. @version 1.0*/
527 : CO_API Zeroconf getZeroconf();
528 : //@}
529 :
530 : /** @internal Ack an operation to the sender. */
531 : CO_API void ackRequest(NodePtr node, const uint32_t requestID);
532 :
533 : /** Request keep-alive update from the remote node. */
534 : CO_API void ping(NodePtr remoteNode);
535 :
536 : /**
537 : * Request updates from all nodes above keep-alive timeout.
538 : *
539 : * @return true if at least one ping was send.
540 : */
541 : CO_API bool pingIdleNodes();
542 :
543 : /**
544 : * Bind this, the receiver and the command thread to the given
545 : * lunchbox::Thread affinity.
546 : */
547 : CO_API void setAffinity(const int32_t affinity);
548 :
549 : /** @internal */
550 : CO_API void addConnection(ConnectionPtr connection);
551 :
552 : protected:
553 : /** Destruct this local node. @version 1.0 */
554 : CO_API ~LocalNode() override;
555 :
556 : /** @internal
557 : * Connect a node proxy to this node.
558 : *
559 : * This node has to be in the listening state. The node proxy will be
560 : * put in the connected state upon success. The connection has to be
561 : * connected.
562 : *
563 : * @param node the remote node.
564 : * @param connection the connection to the remote node.
565 : * @return true if the node was connected correctly,
566 : * false otherwise.
567 : */
568 : CO_API bool connect(NodePtr node, ConnectionPtr connection);
569 :
570 : /** @internal Notify remote node connection. */
571 66 : virtual void notifyConnect(NodePtr) {}
572 : /** @internal Notify remote node disconnection. */
573 111 : virtual void notifyDisconnect(NodePtr) {}
574 : /**
575 : * Factory method to create a new node.
576 : *
577 : * @param type the type the node type
578 : * @return the node.
579 : * @sa ctor type parameter
580 : * @version 1.0
581 : */
582 : CO_API virtual NodePtr createNode(const uint32_t type);
583 :
584 : private:
585 : detail::LocalNode* const _impl;
586 :
587 : friend class detail::ReceiverThread;
588 : bool _startCommandThread(const int32_t threadID);
589 : void _runReceiverThread();
590 :
591 : friend class detail::CommandThread;
592 : bool _notifyCommandThreadIdle();
593 :
594 : void _cleanup();
595 : void _closeNode(NodePtr node);
596 : void _addConnection(ConnectionPtr connection);
597 : void _removeConnection(ConnectionPtr connection);
598 :
599 : lunchbox::Request<void> _removeListener(ConnectionPtr connection);
600 :
601 : uint32_t _connect(NodePtr node);
602 : NodePtr _connect(const NodeID& nodeID);
603 : uint32_t _connect(NodePtr node, ConnectionPtr connection);
604 : NodePtr _connect(const NodeID& nodeID, NodePtr peer);
605 : NodePtr _connectFromZeroconf(const NodeID& nodeID);
606 : bool _connectSelf();
607 :
608 : bool _setupPeer(const std::string& setupOpts);
609 :
610 : void _handleConnect();
611 : void _handleDisconnect();
612 : bool _handleData();
613 : BufferPtr _readHead(ConnectionPtr connection);
614 : ICommand _setupCommand(ConnectionPtr, ConstBufferPtr);
615 : bool _readTail(ICommand&, BufferPtr, ConnectionPtr);
616 : void _initService();
617 : void _exitService();
618 :
619 : friend class ObjectStore;
620 : template <typename T>
621 1113 : void _registerCommand(const uint32_t command, const CommandFunc<T>& func,
622 : CommandQueue* destinationQueue)
623 : {
624 1113 : registerCommand(command, func, destinationQueue);
625 1113 : }
626 :
627 : void _dispatchCommand(ICommand& command);
628 : void _redispatchCommands();
629 :
630 : /** The command functions. */
631 : bool _cmdAckRequest(ICommand& command);
632 : bool _cmdStopRcv(ICommand& command);
633 : bool _cmdStopCmd(ICommand& command);
634 : bool _cmdSetAffinity(ICommand& command);
635 : bool _cmdConnect(ICommand& command);
636 : bool _cmdConnectReply(ICommand& command);
637 : bool _cmdConnectAck(ICommand& command);
638 : bool _cmdID(ICommand& command);
639 : bool _cmdDisconnect(ICommand& command);
640 : bool _cmdGetNodeData(ICommand& command);
641 : bool _cmdGetNodeDataReply(ICommand& command);
642 : bool _cmdAcquireSendToken(ICommand& command);
643 : bool _cmdAcquireSendTokenReply(ICommand& command);
644 : bool _cmdReleaseSendToken(ICommand& command);
645 : bool _cmdAddListener(ICommand& command);
646 : bool _cmdRemoveListener(ICommand& command);
647 : bool _cmdPing(ICommand& command);
648 : bool _cmdCommand(ICommand& command);
649 : bool _cmdCommandAsync(ICommand& command);
650 : bool _cmdAddConnection(ICommand& command);
651 0 : bool _cmdDiscard(ICommand&) { return true; }
652 : //@}
653 :
654 104 : LB_TS_VAR(_cmdThread)
655 104 : LB_TS_VAR(_rcvThread)
656 : };
657 :
658 159 : inline std::ostream& operator<<(std::ostream& os, const LocalNode& node)
659 : {
660 159 : os << static_cast<const Node&>(node);
661 159 : return os;
662 : }
663 : }
664 : #endif // CO_LOCALNODE_H
|