Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_LOCALNODE_H
23 : #define CO_LOCALNODE_H
24 :
25 : #include <co/node.h> // base class
26 : #include <co/objectHandler.h> // base class
27 : #include <co/objectVersion.h> // VERSION_FOO used inline
28 : #include <lunchbox/requestHandler.h> // base class
29 :
30 : #include <boost/function/function1.hpp>
31 : #include <boost/function/function4.hpp>
32 :
33 : namespace co
34 : {
35 : namespace detail { class LocalNode; class ReceiverThread; class CommandThread; }
36 :
37 : /**
38 : * Node specialization for a local node.
39 : *
40 : * Local nodes listen on network connections, manage connections to other nodes
41 : * and provide Object registration, mapping and command dispatch. Typically each
42 : * process uses one local node to communicate with other processes.
43 : */
44 : class LocalNode : public lunchbox::RequestHandler, public Node,
45 : public ObjectHandler
46 : {
47 : public:
48 : /**
49 : * Counters are monotonically increasing performance variables for
50 : * operations performed by a LocalNode instance.
51 : */
52 : enum Counter
53 : {
54 : COUNTER_MAP_OBJECT_REMOTE, //!< Num of mapObjects served for other nodes
55 : COUNTER_ALL // must be last
56 : };
57 :
58 : /** Construct a new local node of the given type. @version 1.0 */
59 : CO_API explicit LocalNode( const uint32_t type = co::NODETYPE_NODE );
60 :
61 : /**
62 : * @name State Changes
63 : *
64 : * The following methods affect the state of the node by changing its
65 : * connectivity to the network.
66 : */
67 : //@{
68 : /**
69 : * Initialize the node.
70 : *
71 : * Parses the following command line options and calls listen()
72 : * afterwards:
73 : *
74 : * The '--co-listen <connection description>' command line option is
75 : * parsed by this method to add listening connections to this node. This
76 : * parameter might be used multiple times.
77 : * ConnectionDescription::fromString() is used to parse the provided
78 : * description.
79 : *
80 : * The '--co-globals <string>' option is used to initialize the
81 : * Globals. The string is parsed used Globals::fromString().
82 : *
83 : * Please note that further command line parameters are recognized by
84 : * co::init().
85 : *
86 : * @param argc the command line argument count.
87 : * @param argv the command line argument values.
88 : * @return true if the client was successfully initialized, false otherwise.
89 : * @version 1.0
90 : */
91 : CO_API virtual bool initLocal( const int argc, char** argv );
92 :
93 : /**
94 : * Open all connections and put this node into the listening state.
95 : *
96 : * The node will spawn a receiver and command thread, and listen on all
97 : * connections for incoming commands. The node will be in the listening
98 : * state if the method completed successfully. A listening node can connect
99 : * other nodes.
100 : *
101 : * @return true if the node could be initialized, false otherwise.
102 : * @sa connect()
103 : * @version 1.0
104 : */
105 : CO_API virtual bool listen();
106 :
107 : /**
108 : * Close a listening node.
109 : *
110 : * Disconnects all connected node proxies, closes the listening connections
111 : * and terminates all threads created in listen().
112 : *
113 : * @return true if the node was stopped, false otherwise.
114 : * @version 1.0
115 : */
116 : CO_API virtual bool close();
117 :
118 : /** Close a listening node. @version 1.0 */
119 0 : virtual bool exitLocal() { return close(); }
120 :
121 : /**
122 : * Connect a remote node (proxy) to this listening node.
123 : *
124 : * The connection descriptions of the node are used to connect the
125 : * remote local node. On success, the node is in the connected state,
126 : * otherwise its state is unchanged.
127 : *
128 : * This method is one-sided, that is, the node to be connected should
129 : * not initiate a connection to this node at the same time. For
130 : * concurrent connects use the other connect() method using node
131 : * identifiers.
132 : *
133 : * @param node the remote node.
134 : * @return true if this node was connected, false otherwise.
135 : * @version 1.0
136 : */
137 : CO_API bool connect( NodePtr node );
138 :
139 : /**
140 : * Create and connect a node given by an identifier.
141 : *
142 : * This method is two-sided and thread-safe, that is, it can be called
143 : * by multiple threads on the same node with the same nodeID, or
144 : * concurrently on two nodes with each others' nodeID.
145 : *
146 : * @param nodeID the identifier of the node to connect.
147 : * @return the connected node, or an invalid RefPtr if the node could
148 : * not be connected.
149 : * @version 1.0
150 : */
151 : CO_API NodePtr connect( const NodeID& nodeID );
152 :
153 : /**
154 : * Find and connect the node where the given object is registered.
155 : *
156 : * This method is relatively expensive, since potentially all connected
157 : * nodes are queried.
158 : *
159 : * @param id the identifier of the object to search for.
160 : * @return the connected node, or an invalid RefPtr if the node could
161 : * not be found or connected.
162 : * @sa registerObject(), connect()
163 : * @version 1.1.1
164 : */
165 : CO_API NodePtr connectObjectMaster( const uint128_t& id );
166 :
167 : /**
168 : * Disconnect a connected node.
169 : *
170 : * @param node the remote node.
171 : * @return true if the node was disconnected correctly, false otherwise.
172 : * @version 1.0
173 : */
174 : CO_API virtual bool disconnect( NodePtr node );
175 : //@}
176 :
177 : /** @name Launching a remote node */
178 : //@{
179 : /**
180 : * Launch a remote process using the given command.
181 : *
182 : * The launched node will automatically connect with this node, if it passes
183 : * the given options to its initLocal().
184 : *
185 : * The following markers are replaced in the command:
186 : * * %h: Node::getHostname()
187 : * * %n: Node identifier in string representation
188 : * * %d: Node::getWorkDir()
189 : * * %q: Node::getLaunchQuote()
190 : * * %o: Options needed by remote initLocal() to connect this node. If not
191 : * given, options are appended at the end of the given command.
192 : *
193 : * @return true if the launch process execution was successful.
194 : */
195 : CO_API bool launch( NodePtr node, const std::string& command );
196 :
197 : /**
198 : * Wait for a launched node to connect.
199 : *
200 : * @return the remote node handle, or 0 on timeout.
201 : */
202 : CO_API NodePtr syncLaunch( const uint128_t& nodeID, int64_t timeout );
203 : //@}
204 :
205 : /** @name Object Registry */
206 : //@{
207 : /**
208 : * Register a distributed object.
209 : *
210 : * Registering a distributed object makes this object the master
211 : * version. The object's identifier is used to map slave instances of
212 : * the object. Master versions of objects are typically writable and can
213 : * commit new versions of the distributed object.
214 : *
215 : * @param object the object instance.
216 : * @return true if the object was registered, false otherwise.
217 : * @version 1.0
218 : */
219 : CO_API bool registerObject( Object* object ) override;
220 :
221 : /**
222 : * Deregister a distributed object.
223 : *
224 : * All slave instances should be unmapped before this call, and will be
225 : * forcefully unmapped by this method.
226 : *
227 : * @param object the object instance.
228 : * @version 1.0
229 : */
230 : CO_API void deregisterObject( Object* object ) override;
231 :
232 : /**
233 : * Map a distributed object.
234 : *
235 : * The mapped object becomes a slave instance of the master version which
236 : * was registered with the provided identifier. The given version can be
237 : * used to map a specific version.
238 : *
239 : * If VERSION_NONE is provided, the slave instance is not initialized with
240 : * any data from the master. This is useful if the object has been
241 : * pre-initialized by other means, for example from a shared file system.
242 : *
243 : * If VERSION_OLDEST is provided, the oldest available version is mapped.
244 : *
245 : * If a concrete requested version no longer exists, mapObject() will map
246 : * the oldest available version.
247 : *
248 : * If the requested version is newer than the head version, mapObject() will
249 : * block until the requested version is available.
250 : *
251 : * Mapping an object is a potentially time-consuming operation. Using
252 : * mapObjectNB() and mapObjectSync() to asynchronously map multiple objects
253 : * in parallel improves performance of this operation.
254 : *
255 : * After mapping, the object will have the version used during
256 : * initialization, or VERSION_NONE if mapped to this version.
257 : *
258 : * When no master node is given, connectObjectMaster() is used to find the
259 : * node with the master instance.
260 : *
261 : * This method returns immediately after initiating the mapping. Evaluating
262 : * the value of the returned lunchbox::Future will block on the completion
263 : * of the operation and return true if the object was mapped, false if the
264 : * master of the object is not found or the requested version is no longer
265 : * available.
266 : *
267 : * @param object the object.
268 : * @param id the master object identifier.
269 : * @param master the node with the master instance, may be 0.
270 : * @param version the initial version.
271 : * @return A lunchbox::Future which will deliver the success status of
272 : * the operation on evaluation.
273 : * @sa registerObject
274 : * @version 1.0
275 : */
276 : CO_API f_bool_t mapObject( Object* object, const uint128_t& id,
277 : NodePtr master,
278 : const uint128_t& version = VERSION_OLDEST );
279 :
280 : /** Convenience wrapper for mapObject(). @version 1.0 */
281 27 : f_bool_t mapObject( Object* object, const ObjectVersion& v )
282 27 : { return mapObject( object, v.identifier, 0, v.version ); }
283 :
284 : /** @deprecated */
285 13 : f_bool_t mapObject( Object* object, const uint128_t& id,
286 : const uint128_t& version = VERSION_OLDEST )
287 13 : { return mapObject( object, id, 0, version ); }
288 :
289 : /** @deprecated use mapObject() */
290 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
291 : const uint128_t& version = VERSION_OLDEST );
292 :
293 : /** @deprecated use mapObject() */
294 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
295 : const uint128_t& version,
296 : NodePtr master ) override;
297 :
298 : /** @deprecated use mapObject() */
299 : CO_API bool mapObjectSync( const uint32_t requestID ) override;
300 :
301 : /**
302 : * Synchronize the local object with a remote object.
303 : *
304 : * The object is synchronized to the newest version of the first
305 : * attached object on the given master node matching the
306 : * instanceID. When no master node is given, connectObjectMaster() is
307 : * used to find the node with the master instance. When CO_INSTANCE_ALL
308 : * is given, the first instance is used. Before a successful return,
309 : * applyInstanceData() is called on the calling thread to synchronize
310 : * the given object.
311 : *
312 : * @param object The local object instance to synchronize.
313 : * @param master The node where the synchronizing object is attached.
314 : * @param id the object identifier.
315 : * @param instanceID the instance identifier of the synchronizing
316 : * object.
317 : * @return A lunchbox::Future which will deliver the success status of
318 : * the operation on evaluation.
319 : * @version 1.1.1
320 : */
321 : CO_API f_bool_t syncObject( Object* object, const uint128_t& id,
322 : NodePtr master,
323 : const uint32_t instanceID = CO_INSTANCE_ALL ) override;
324 : /**
325 : * Unmap a mapped object.
326 : *
327 : * @param object the mapped object.
328 : * @version 1.0
329 : */
330 : CO_API void unmapObject( Object* object ) override;
331 :
332 : /** Disable the instance cache of a stopped local node. @version 1.0 */
333 : CO_API void disableInstanceCache();
334 :
335 : /** @internal */
336 : CO_API void expireInstanceData( const int64_t age );
337 :
338 : /**
339 : * Enable sending instance data after registration.
340 : *
341 : * Send-on-register starts transmitting instance data of registered
342 : * objects directly after they have been registered. The data is cached
343 : * on remote nodes and accelerates object mapping. Send-on-register
344 : * should not be active when remote nodes are joining a multicast group
345 : * of this node, since they will potentially read out-of-order data
346 : * streams on the multicast connection.
347 : *
348 : * Enable and disable are counted, that is, the last enable on a
349 : * matched series of disable/enable will be effective. The disable is
350 : * completely synchronous, that is, no more instance data will be sent
351 : * after the first disable.
352 : *
353 : * @version 1.0
354 : */
355 : CO_API void enableSendOnRegister();
356 :
357 : /** Disable sending data of newly registered objects. @version 1.0 */
358 : CO_API void disableSendOnRegister();
359 :
360 : /**
361 : * Handler for an Object::push() operation.
362 : *
363 : * Called at least on each node listed in an Object::push() operation
364 : * upon reception of the pushed data from the command thread. Called on
365 : * all nodes of a multicast group, even for nodes not listed in the
366 : * Object::push().
367 : *
368 : * The default implementation calls registered push handlers. Typically
369 : * used to create an object on a remote node, using the objectType for
370 : * instantiation, the istream to initialize it, and the objectID to map
371 : * it using VERSION_NONE. The groupID may be used to differentiate
372 : * multiple concurrent push operations.
373 : *
374 : * @param groupID The group identifier given to Object::push()
375 : * @param objectType The type identifier given to Object::push()
376 : * @param objectID The identifier of the pushed object
377 : * @param istream the input data stream containing the instance data.
378 : * @version 1.0
379 : */
380 : CO_API virtual void objectPush( const uint128_t& groupID,
381 : const uint128_t& objectType,
382 : const uint128_t& objectID,
383 : DataIStream& istream );
384 :
385 : /** Function signature for push handlers. @version 1.0 */
386 : typedef boost::function< void( const uint128_t&, //!< groupID
387 : const uint128_t&, //!< objectType
388 : const uint128_t&, //!< objectID
389 : DataIStream& ) > PushHandler;
390 : /**
391 : * Register a custom handler for Object::push operations
392 : *
393 : * The registered handler function will be called automatically for an
394 : * incoming object push. Threadsafe with itself and objectPush().
395 : *
396 : * @param groupID The group identifier given to Object::push()
397 : * @param handler The handler function called for a registered groupID
398 : * @version 1.0
399 : */
400 : CO_API void registerPushHandler( const uint128_t& groupID,
401 : const PushHandler& handler );
402 :
403 :
404 : /** Function signature for custom command handlers. @version 1.0 */
405 : typedef boost::function< bool( CustomICommand& ) > CommandHandler;
406 :
407 : /**
408 : * Register a custom command handler handled by this node.
409 : *
410 : * Custom command handlers are invoked on reception of a CustomICommand
411 : * send by Node::send( uint128_t, ... ). The command identifier needs to
412 : * be unique. It is recommended to use servus::make_uint128() to generate
413 : * this identifier.
414 : *
415 : * @param command the unique identifier of the custom command
416 : * @param func the handler function for the custom command
417 : * @param queue the queue where the command should be inserted to
418 : * @return true on successful registering, false otherwise
419 : * @version 1.0
420 : */
421 : CO_API bool registerCommandHandler( const uint128_t& command,
422 : const CommandHandler& func,
423 : CommandQueue* queue );
424 :
425 : /** @internal swap the existing object by a new object and keep
426 : the cm, id and instanceID. */
427 : CO_API void swapObject( Object* oldObject, Object* newObject );
428 : //@}
429 :
430 : /** @name Data Access */
431 : //@{
432 : /**
433 : * Get a node by identifier.
434 : *
435 : * The node might not be connected. Thread safe.
436 : *
437 : * @param id the node identifier.
438 : * @return the node.
439 : * @version 1.0
440 : */
441 : CO_API NodePtr getNode( const NodeID& id ) const;
442 :
443 : /** @return a vector of the currently connected nodes. @version 1.0 */
444 : CO_API Nodes getNodes( const bool addSelf = true ) const;
445 :
446 : /** Return the command queue to the command thread. @version 1.0 */
447 : CO_API CommandQueue* getCommandThreadQueue();
448 :
449 : /**
450 : * @return true if executed from the command handler thread, false if
451 : * not.
452 : * @version 1.0
453 : */
454 : CO_API bool inCommandThread() const;
455 :
456 : CO_API int64_t getTime64() const; //!< @internal
457 : CO_API ssize_t getCounter( const Counter counter ) const; //!< @internal
458 : //@}
459 :
460 : /** @name Operations */
461 : //@{
462 : /**
463 : * Add a listening connection to this listening node.
464 : * @return the listening connection, or 0 upon error.
465 : */
466 : CO_API ConnectionPtr addListener( ConnectionDescriptionPtr desc );
467 :
468 : /** Add a listening connection to this listening node. */
469 : CO_API void addListener( ConnectionPtr connection );
470 :
471 : /** Remove listening connections from this listening node.*/
472 : CO_API void removeListeners( const Connections& connections );
473 :
474 : /** @internal
475 : * Flush all pending commands on this listening node.
476 : *
477 : * This causes the receiver thread to redispatch all pending commands,
478 : * which are normally only redispatched when a new command is received.
479 : */
480 : CO_API void flushCommands();
481 :
482 : /** @internal Allocate a command buffer from the receiver thread. */
483 : CO_API BufferPtr allocBuffer( const uint64_t size );
484 :
485 : /**
486 : * Dispatches a command to the registered command queue.
487 : *
488 : * Applications using custom command types have to override this method
489 : * to dispatch the custom commands.
490 : *
491 : * @param command the command.
492 : * @return the result of the operation.
493 : * @sa ICommand::invoke
494 : * @version 1.0
495 : */
496 : CO_API bool dispatchCommand( ICommand& command ) override;
497 :
498 :
499 : /** A handle for a send token acquired by acquireSendToken(). */
500 : typedef lunchbox::RefPtr< co::SendToken > SendToken;
501 :
502 : /**
503 : * Acquire a send token from the given node.
504 : *
505 : * The token is released automatically when it leaves its scope or
506 : * explicitly using releaseSendToken().
507 : *
508 : * @return The send token.
509 : */
510 : CO_API SendToken acquireSendToken( NodePtr toNode );
511 :
512 : /** @deprecated Token will auto-release when leaving scope. */
513 : CO_API void releaseSendToken( SendToken token );
514 :
515 : /** @return a Zeroconf communicator handle for this node. @version 1.0*/
516 : CO_API Zeroconf getZeroconf();
517 : //@}
518 :
519 : /** @internal Ack an operation to the sender. */
520 : CO_API void ackRequest( NodePtr node, const uint32_t requestID );
521 :
522 : /** Request keep-alive update from the remote node. */
523 : CO_API void ping( NodePtr remoteNode );
524 :
525 : /**
526 : * Request updates from all nodes above keep-alive timeout.
527 : *
528 : * @return true if at least one ping was send.
529 : */
530 : CO_API bool pingIdleNodes();
531 :
532 : /**
533 : * Bind this, the receiver and the command thread to the given
534 : * lunchbox::Thread affinity.
535 : */
536 : CO_API void setAffinity( const int32_t affinity );
537 :
538 : /** @internal */
539 : CO_API void addConnection( ConnectionPtr connection );
540 :
541 : protected:
542 : /** Destruct this local node. @version 1.0 */
543 : CO_API ~LocalNode() override;
544 :
545 : /** @internal
546 : * Connect a node proxy to this node.
547 : *
548 : * This node has to be in the listening state. The node proxy will be
549 : * put in the connected state upon success. The connection has to be
550 : * connected.
551 : *
552 : * @param node the remote node.
553 : * @param connection the connection to the remote node.
554 : * @return true if the node was connected correctly,
555 : * false otherwise.
556 : */
557 : CO_API bool connect( NodePtr node, ConnectionPtr connection );
558 :
559 : /** @internal Notify remote node connection. */
560 68 : virtual void notifyConnect( NodePtr ) {}
561 :
562 : /** @internal Notify remote node disconnection. */
563 115 : virtual void notifyDisconnect( NodePtr ) {}
564 :
565 : /**
566 : * Factory method to create a new node.
567 : *
568 : * @param type the type the node type
569 : * @return the node.
570 : * @sa ctor type parameter
571 : * @version 1.0
572 : */
573 : CO_API virtual NodePtr createNode( const uint32_t type );
574 :
575 : private:
576 : detail::LocalNode* const _impl;
577 :
578 : friend class detail::ReceiverThread;
579 : bool _startCommandThread( const int32_t threadID );
580 : void _runReceiverThread();
581 :
582 : friend class detail::CommandThread;
583 : bool _notifyCommandThreadIdle();
584 :
585 : void _cleanup();
586 : void _closeNode( NodePtr node );
587 : void _addConnection( ConnectionPtr connection );
588 : void _removeConnection( ConnectionPtr connection );
589 :
590 : lunchbox::Request< void > _removeListener( ConnectionPtr connection );
591 :
592 : uint32_t _connect( NodePtr node );
593 : NodePtr _connect( const NodeID& nodeID );
594 : uint32_t _connect( NodePtr node, ConnectionPtr connection );
595 : NodePtr _connect( const NodeID& nodeID, NodePtr peer );
596 : NodePtr _connectFromZeroconf( const NodeID& nodeID );
597 : bool _connectSelf();
598 :
599 : bool _setupPeer( const std::string& setupOpts );
600 :
601 : void _handleConnect();
602 : void _handleDisconnect();
603 : bool _handleData();
604 : BufferPtr _readHead( ConnectionPtr connection );
605 : ICommand _setupCommand( ConnectionPtr, ConstBufferPtr );
606 : bool _readTail( ICommand&, BufferPtr, ConnectionPtr );
607 : void _initService();
608 : void _exitService();
609 :
610 : friend class ObjectStore;
611 : template< typename T >
612 1155 : void _registerCommand( const uint32_t command, const CommandFunc< T >& func,
613 : CommandQueue* destinationQueue )
614 : {
615 1155 : registerCommand( command, func, destinationQueue );
616 1155 : }
617 :
618 : void _dispatchCommand( ICommand& command );
619 : void _redispatchCommands();
620 :
621 : /** The command functions. */
622 : bool _cmdAckRequest( ICommand& command );
623 : bool _cmdStopRcv( ICommand& command );
624 : bool _cmdStopCmd( ICommand& command );
625 : bool _cmdSetAffinity( ICommand& command );
626 : bool _cmdConnect( ICommand& command );
627 : bool _cmdConnectReply( ICommand& command );
628 : bool _cmdConnectAck( ICommand& command );
629 : bool _cmdID( ICommand& command );
630 : bool _cmdDisconnect( ICommand& command );
631 : bool _cmdGetNodeData( ICommand& command );
632 : bool _cmdGetNodeDataReply( ICommand& command );
633 : bool _cmdAcquireSendToken( ICommand& command );
634 : bool _cmdAcquireSendTokenReply( ICommand& command );
635 : bool _cmdReleaseSendToken( ICommand& command );
636 : bool _cmdAddListener( ICommand& command );
637 : bool _cmdRemoveListener( ICommand& command );
638 : bool _cmdPing( ICommand& command );
639 : bool _cmdCommand( ICommand& command );
640 : bool _cmdCommandAsync( ICommand& command );
641 : bool _cmdAddConnection( ICommand& command );
642 0 : bool _cmdDiscard( ICommand& ) { return true; }
643 : //@}
644 :
645 108 : LB_TS_VAR( _cmdThread )
646 108 : LB_TS_VAR( _rcvThread )
647 : };
648 :
649 165 : inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
650 : {
651 165 : os << static_cast< const Node& >( node );
652 165 : return os;
653 : }
654 : }
655 : #endif // CO_LOCALNODE_H
|