Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_LOCALNODE_H
23 : #define CO_LOCALNODE_H
24 :
25 : #include <co/node.h> // base class
26 : #include <co/objectHandler.h> // base class
27 : #include <co/objectVersion.h> // VERSION_FOO used inline
28 : #include <lunchbox/requestHandler.h> // base class
29 :
30 : #include <boost/function/function1.hpp>
31 : #include <boost/function/function4.hpp>
32 :
33 : namespace co
34 : {
35 : namespace detail { class LocalNode; class ReceiverThread; class CommandThread; }
36 :
37 : /**
38 : * Node specialization for a local node.
39 : *
40 : * Local nodes listen on network connections, manage connections to other nodes
41 : * and provide Object registration, mapping and command dispatch. Typically each
42 : * process uses one local node to communicate with other processes.
43 : */
44 : class LocalNode : public lunchbox::RequestHandler, public Node,
45 : public ObjectHandler
46 : {
47 : public:
48 : /**
49 : * Counters are monotonically increasing performance variables for
50 : * operations performed by a LocalNode instance.
51 : */
52 : enum Counter
53 : {
54 : COUNTER_MAP_OBJECT_REMOTE, //!< Num of mapObjects served for other nodes
55 : COUNTER_ALL // must be last
56 : };
57 :
58 : /** Construct a new local node of the given type. @version 1.0 */
59 : CO_API explicit LocalNode( const uint32_t type = co::NODETYPE_NODE );
60 :
61 : /**
62 : * @name State Changes
63 : *
64 : * The following methods affect the state of the node by changing its
65 : * connectivity to the network.
66 : */
67 : //@{
68 : /**
69 : * Initialize the node.
70 : *
71 : * Parses the following command line options and calls listen()
72 : * afterwards:
73 : *
74 : * The '--co-listen <connection description>' command line option is
75 : * parsed by this method to add listening connections to this node. This
76 : * parameter might be used multiple times.
77 : * ConnectionDescription::fromString() is used to parse the provided
78 : * description.
79 : *
80 : * The '--co-globals <string>' option is used to initialize the
81 : * Globals. The string is parsed used Globals::fromString().
82 : *
83 : * Please note that further command line parameters are recognized by
84 : * co::init().
85 : *
86 : * @param argc the command line argument count.
87 : * @param argv the command line argument values.
88 : * @return true if the client was successfully initialized, false otherwise.
89 : * @version 1.0
90 : */
91 : CO_API virtual bool initLocal( const int argc, char** argv );
92 :
93 : /**
94 : * Open all connections and put this node into the listening state.
95 : *
96 : * The node will spawn a receiver and command thread, and listen on all
97 : * connections for incoming commands. The node will be in the listening
98 : * state if the method completed successfully. A listening node can connect
99 : * other nodes.
100 : *
101 : * @return true if the node could be initialized, false otherwise.
102 : * @sa connect()
103 : * @version 1.0
104 : */
105 : CO_API virtual bool listen();
106 : CO_API virtual bool listen( ConnectionPtr connection ); //!< @internal
107 :
108 : /**
109 : * Close a listening node.
110 : *
111 : * Disconnects all connected node proxies, closes the listening connections
112 : * and terminates all threads created in listen().
113 : *
114 : * @return true if the node was stopped, false otherwise.
115 : * @version 1.0
116 : */
117 : CO_API virtual bool close();
118 :
119 : /** Close a listening node. @version 1.0 */
120 0 : virtual bool exitLocal() { return close(); }
121 :
122 : /**
123 : * Connect a remote node (proxy) to this listening node.
124 : *
125 : * The connection descriptions of the node are used to connect the
126 : * remote local node. On success, the node is in the connected state,
127 : * otherwise its state is unchanged.
128 : *
129 : * This method is one-sided, that is, the node to be connected should
130 : * not initiate a connection to this node at the same time. For
131 : * concurrent connects use the other connect() method using node
132 : * identifiers.
133 : *
134 : * @param node the remote node.
135 : * @return true if this node was connected, false otherwise.
136 : * @version 1.0
137 : */
138 : CO_API bool connect( NodePtr node );
139 :
140 : /**
141 : * Create and connect a node given by an identifier.
142 : *
143 : * This method is two-sided and thread-safe, that is, it can be called
144 : * by multiple threads on the same node with the same nodeID, or
145 : * concurrently on two nodes with each others' nodeID.
146 : *
147 : * @param nodeID the identifier of the node to connect.
148 : * @return the connected node, or an invalid RefPtr if the node could
149 : * not be connected.
150 : * @version 1.0
151 : */
152 : CO_API NodePtr connect( const NodeID& nodeID );
153 :
154 : /**
155 : * Find and connect the node where the given object is registered.
156 : *
157 : * This method is relatively expensive, since potentially all connected
158 : * nodes are queried.
159 : *
160 : * @param id the identifier of the object to search for.
161 : * @return the connected node, or an invalid RefPtr if the node could
162 : * not be found or connected.
163 : * @sa registerObject(), connect()
164 : * @version 1.1.1
165 : */
166 : CO_API NodePtr connectObjectMaster( const uint128_t& id );
167 :
168 : /**
169 : * Disconnect a connected node.
170 : *
171 : * @param node the remote node.
172 : * @return true if the node was disconnected correctly, false otherwise.
173 : * @version 1.0
174 : */
175 : CO_API virtual bool disconnect( NodePtr node );
176 : //@}
177 :
178 : /** @name Object Registry */
179 : //@{
180 : /**
181 : * Register a distributed object.
182 : *
183 : * Registering a distributed object makes this object the master
184 : * version. The object's identifier is used to map slave instances of
185 : * the object. Master versions of objects are typically writable and can
186 : * commit new versions of the distributed object.
187 : *
188 : * @param object the object instance.
189 : * @return true if the object was registered, false otherwise.
190 : * @version 1.0
191 : */
192 : CO_API bool registerObject( Object* object ) override;
193 :
194 : /**
195 : * Deregister a distributed object.
196 : *
197 : * All slave instances should be unmapped before this call, and will be
198 : * forcefully unmapped by this method.
199 : *
200 : * @param object the object instance.
201 : * @version 1.0
202 : */
203 : CO_API void deregisterObject( Object* object ) override;
204 :
205 : /**
206 : * Map a distributed object.
207 : *
208 : * The mapped object becomes a slave instance of the master version
209 : * which was registered with the provided identifier. The given version
210 : * can be used to map a specific version.
211 : *
212 : * If VERSION_NONE is provided, the slave instance is not initialized
213 : * with any data from the master. This is useful if the object has been
214 : * pre-initialized by other means, for example from a shared file
215 : * system.
216 : *
217 : * If VERSION_OLDEST is provided, the oldest available version is
218 : * mapped.
219 : *
220 : * If a concrete requested version no longer exists, mapObject() will
221 : * map the oldest available version.
222 : *
223 : * If the requested version is newer than the head version, mapObject()
224 : * will block until the requested version is available.
225 : *
226 : * Mapping an object is a potentially time-consuming operation. Using
227 : * mapObjectNB() and mapObjectSync() to asynchronously map multiple
228 : * objects in parallel improves performance of this operation.
229 : *
230 : * After mapping, the object will have the version used during
231 : * initialization, or VERSION_NONE if mapped to this version.
232 : *
233 : * When no master node is given, connectObjectMaster() is used to find
234 : * the node with the master instance.
235 : *
236 : * This method returns immediately after initiating the
237 : * mapping. Evaluating the value of the returned lunchbox::Future will
238 : * block on the completion of the operation and return true if the
239 : * object was mapped, false if the master of the object is not found or
240 : * the requested version is no longer available.
241 : *
242 : * @param object the object.
243 : * @param id the master object identifier.
244 : * @param master the node with the master instance, may be 0.
245 : * @param version the initial version.
246 : * @return A lunchbox::Future which will deliver the success status of
247 : * the operation on evaluation.
248 : * @sa registerObject
249 : * @version 1.0
250 : */
251 : CO_API f_bool_t mapObject( Object* object, const uint128_t& id,
252 : NodePtr master,
253 : const uint128_t& version = VERSION_OLDEST );
254 :
255 : /** Convenience wrapper for mapObject(). @version 1.0 */
256 27 : f_bool_t mapObject( Object* object, const ObjectVersion& v )
257 27 : { return mapObject( object, v.identifier, 0, v.version ); }
258 :
259 : /** @deprecated */
260 12 : f_bool_t mapObject( Object* object, const uint128_t& id,
261 : const uint128_t& version = VERSION_OLDEST )
262 12 : { return mapObject( object, id, 0, version ); }
263 :
264 : /** @deprecated use mapObject() */
265 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
266 : const uint128_t& version = VERSION_OLDEST );
267 :
268 : /** @deprecated use mapObject() */
269 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
270 : const uint128_t& version,
271 : NodePtr master ) override;
272 :
273 : /** @deprecated use mapObject() */
274 : CO_API bool mapObjectSync( const uint32_t requestID ) override;
275 :
276 : /**
277 : * Synchronize the local object with a remote object.
278 : *
279 : * The object is synchronized to the newest version of the first
280 : * attached object on the given master node matching the
281 : * instanceID. When no master node is given, connectObjectMaster() is
282 : * used to find the node with the master instance. When CO_INSTANCE_ALL
283 : * is given, the first instance is used. Before a successful return,
284 : * applyInstanceData() is called on the calling thread to synchronize
285 : * the given object.
286 : *
287 : * @param object The local object instance to synchronize.
288 : * @param master The node where the synchronizing object is attached.
289 : * @param id the object identifier.
290 : * @param instanceID the instance identifier of the synchronizing
291 : * object.
292 : * @return A lunchbox::Future which will deliver the success status of
293 : * the operation on evaluation.
294 : * @version 1.1.1
295 : */
296 : CO_API f_bool_t syncObject( Object* object, NodePtr master,
297 : const uint128_t& id,
298 : const uint32_t instanceID = CO_INSTANCE_ALL ) override;
299 : /**
300 : * Unmap a mapped object.
301 : *
302 : * @param object the mapped object.
303 : * @version 1.0
304 : */
305 : CO_API void unmapObject( Object* object ) override;
306 :
307 : /** Disable the instance cache of a stopped local node. @version 1.0 */
308 : CO_API void disableInstanceCache();
309 :
310 : /** @internal */
311 : CO_API void expireInstanceData( const int64_t age );
312 :
313 : /**
314 : * Enable sending instance data after registration.
315 : *
316 : * Send-on-register starts transmitting instance data of registered
317 : * objects directly after they have been registered. The data is cached
318 : * on remote nodes and accelerates object mapping. Send-on-register
319 : * should not be active when remote nodes are joining a multicast group
320 : * of this node, since they will potentially read out-of-order data
321 : * streams on the multicast connection.
322 : *
323 : * Enable and disable are counted, that is, the last enable on a
324 : * matched series of disable/enable will be effective. The disable is
325 : * completely synchronous, that is, no more instance data will be sent
326 : * after the first disable.
327 : *
328 : * @version 1.0
329 : */
330 : CO_API void enableSendOnRegister();
331 :
332 : /** Disable sending data of newly registered objects. @version 1.0 */
333 : CO_API void disableSendOnRegister();
334 :
335 : /**
336 : * Handler for an Object::push() operation.
337 : *
338 : * Called at least on each node listed in an Object::push() operation
339 : * upon reception of the pushed data from the command thread. Called on
340 : * all nodes of a multicast group, even for nodes not listed in the
341 : * Object::push().
342 : *
343 : * The default implementation calls registered push handlers. Typically
344 : * used to create an object on a remote node, using the objectType for
345 : * instantiation, the istream to initialize it, and the objectID to map
346 : * it using VERSION_NONE. The groupID may be used to differentiate
347 : * multiple concurrent push operations.
348 : *
349 : * @param groupID The group identifier given to Object::push()
350 : * @param objectType The type identifier given to Object::push()
351 : * @param objectID The identifier of the pushed object
352 : * @param istream the input data stream containing the instance data.
353 : * @version 1.0
354 : */
355 : CO_API virtual void objectPush( const uint128_t& groupID,
356 : const uint128_t& objectType,
357 : const uint128_t& objectID,
358 : DataIStream& istream );
359 :
360 : /** Function signature for push handlers. @version 1.0 */
361 : typedef boost::function< void( const uint128_t&, //!< groupID
362 : const uint128_t&, //!< objectType
363 : const uint128_t&, //!< objectID
364 : DataIStream& ) > PushHandler;
365 : /**
366 : * Register a custom handler for Object::push operations
367 : *
368 : * The registered handler function will be called automatically for an
369 : * incoming object push. Threadsafe with itself and objectPush().
370 : *
371 : * @param groupID The group identifier given to Object::push()
372 : * @param handler The handler function called for a registered groupID
373 : * @version 1.0
374 : */
375 : CO_API void registerPushHandler( const uint128_t& groupID,
376 : const PushHandler& handler );
377 :
378 :
379 : /** Function signature for custom command handlers. @version 1.0 */
380 : typedef boost::function< bool( CustomICommand& ) > CommandHandler;
381 :
382 : /**
383 : * Register a custom command handler handled by this node.
384 : *
385 : * Custom command handlers are invoked on reception of a CustomICommand
386 : * send by Node::send( uint128_t, ... ). The command identifier needs to
387 : * be unique. It is recommended to use an UUID or
388 : * lunchbox::make_uint128() to generate this identifier.
389 : *
390 : * @param command the unique identifier of the custom command
391 : * @param func the handler function for the custom command
392 : * @param queue the queue where the command should be inserted to
393 : * @return true on successful registering, false otherwise
394 : * @version 1.0
395 : */
396 : CO_API bool registerCommandHandler( const uint128_t& command,
397 : const CommandHandler& func,
398 : CommandQueue* queue );
399 :
400 : /** @internal swap the existing object by a new object and keep
401 : the cm, id and instanceID. */
402 : CO_API void swapObject( Object* oldObject, Object* newObject );
403 : //@}
404 :
405 : /** @name Data Access */
406 : //@{
407 : /**
408 : * Get a node by identifier.
409 : *
410 : * The node might not be connected. Thread safe.
411 : *
412 : * @param id the node identifier.
413 : * @return the node.
414 : * @version 1.0
415 : */
416 : CO_API NodePtr getNode( const NodeID& id ) const;
417 :
418 : /** Assemble a vector of the currently connected nodes. @version 1.0 */
419 : CO_API void getNodes( Nodes& nodes, const bool addSelf = true ) const;
420 :
421 : /** Return the command queue to the command thread. @version 1.0 */
422 : CO_API CommandQueue* getCommandThreadQueue();
423 :
424 : /**
425 : * @return true if executed from the command handler thread, false if
426 : * not.
427 : * @version 1.0
428 : */
429 : CO_API bool inCommandThread() const;
430 :
431 : CO_API int64_t getTime64() const; //!< @internal
432 : CO_API ssize_t getCounter( const Counter counter ) const; //!< @internal
433 : //@}
434 :
435 : /** @name Operations */
436 : //@{
437 : /**
438 : * Add a listening connection to this listening node.
439 : * @return the listening connection, or 0 upon error.
440 : */
441 : CO_API ConnectionPtr addListener( ConnectionDescriptionPtr desc );
442 :
443 : /** Add a listening connection to this listening node. */
444 : CO_API void addListener( ConnectionPtr connection );
445 :
446 : /** Remove listening connections from this listening node.*/
447 : CO_API void removeListeners( const Connections& connections );
448 :
449 : /** @internal
450 : * Flush all pending commands on this listening node.
451 : *
452 : * This causes the receiver thread to redispatch all pending commands,
453 : * which are normally only redispatched when a new command is received.
454 : */
455 : CO_API void flushCommands();
456 :
457 : /** @internal Allocate a command buffer from the receiver thread. */
458 : CO_API BufferPtr allocBuffer( const uint64_t size );
459 :
460 : /**
461 : * Dispatches a command to the registered command queue.
462 : *
463 : * Applications using custom command types have to override this method
464 : * to dispatch the custom commands.
465 : *
466 : * @param command the command.
467 : * @return the result of the operation.
468 : * @sa ICommand::invoke
469 : * @version 1.0
470 : */
471 : CO_API bool dispatchCommand( ICommand& command ) override;
472 :
473 :
474 : /** A handle for a send token acquired by acquireSendToken(). */
475 : typedef lunchbox::RefPtr< co::SendToken > SendToken;
476 :
477 : /**
478 : * Acquire a send token from the given node.
479 : *
480 : * The token is released automatically when it leaves its scope or
481 : * explicitly using releaseSendToken().
482 : *
483 : * @return The send token.
484 : */
485 : CO_API SendToken acquireSendToken( NodePtr toNode );
486 :
487 : /** @deprecated Token will auto-release when leaving scope. */
488 : CO_API void releaseSendToken( SendToken token );
489 :
490 : /** @return a Zeroconf communicator handle for this node. @version 1.0*/
491 : CO_API Zeroconf getZeroconf();
492 : //@}
493 :
494 : /** @internal Ack an operation to the sender. */
495 : CO_API void ackRequest( NodePtr node, const uint32_t requestID );
496 :
497 : /** Request keep-alive update from the remote node. */
498 : CO_API void ping( NodePtr remoteNode );
499 :
500 : /**
501 : * Request updates from all nodes above keep-alive timeout.
502 : *
503 : * @return true if at least one ping was send.
504 : */
505 : CO_API bool pingIdleNodes();
506 :
507 : /**
508 : * Bind this, the receiver and the command thread to the given
509 : * lunchbox::Thread affinity.
510 : */
511 : CO_API void setAffinity( const int32_t affinity );
512 :
513 : protected:
514 : /** Destruct this local node. @version 1.0 */
515 : CO_API ~LocalNode() override;
516 :
517 : /** @internal
518 : * Connect a node proxy to this node.
519 : *
520 : * This node has to be in the listening state. The node proxy will be
521 : * put in the connected state upon success. The connection has to be
522 : * connected.
523 : *
524 : * @param node the remote node.
525 : * @param connection the connection to the remote node.
526 : * @return true if the node was connected correctly,
527 : * false otherwise.
528 : */
529 : CO_API bool connect( NodePtr node, ConnectionPtr connection );
530 :
531 : /** @internal Notify remote node connection. */
532 66 : virtual void notifyConnect( NodePtr ) {}
533 :
534 : /** @internal Notify remote node disconnection. */
535 110 : virtual void notifyDisconnect( NodePtr ) {}
536 :
537 : /**
538 : * Factory method to create a new node.
539 : *
540 : * @param type the type the node type
541 : * @return the node.
542 : * @sa ctor type parameter
543 : * @version 1.0
544 : */
545 : CO_API virtual NodePtr createNode( const uint32_t type );
546 :
547 : private:
548 : detail::LocalNode* const _impl;
549 :
550 : friend class detail::ReceiverThread;
551 : bool _startCommandThread( const int32_t threadID );
552 : void _runReceiverThread();
553 :
554 : friend class detail::CommandThread;
555 : bool _notifyCommandThreadIdle();
556 :
557 : void _cleanup();
558 : void _closeNode( NodePtr node );
559 : void _addConnection( ConnectionPtr connection );
560 : void _removeConnection( ConnectionPtr connection );
561 :
562 : lunchbox::Request< void > _removeListener( ConnectionPtr connection );
563 :
564 : uint32_t _connect( NodePtr node );
565 : NodePtr _connect( const NodeID& nodeID );
566 : uint32_t _connect( NodePtr node, ConnectionPtr connection );
567 : NodePtr _connect( const NodeID& nodeID, NodePtr peer );
568 : NodePtr _connectFromZeroconf( const NodeID& nodeID );
569 : bool _connectSelf();
570 :
571 : void _handleConnect();
572 : void _handleDisconnect();
573 : bool _handleData();
574 : BufferPtr _readHead( ConnectionPtr connection );
575 : ICommand _setupCommand( ConnectionPtr, ConstBufferPtr );
576 : bool _readTail( ICommand&, BufferPtr, ConnectionPtr );
577 : void _initService();
578 : void _exitService();
579 :
580 : friend class ObjectStore;
581 : template< typename T >
582 1071 : void _registerCommand( const uint32_t command, const CommandFunc< T >& func,
583 : CommandQueue* destinationQueue )
584 : {
585 1071 : registerCommand( command, func, destinationQueue );
586 1071 : }
587 :
588 : void _dispatchCommand( ICommand& command );
589 : void _redispatchCommands();
590 :
591 : /** The command functions. */
592 : bool _cmdAckRequest( ICommand& command );
593 : bool _cmdStopRcv( ICommand& command );
594 : bool _cmdStopCmd( ICommand& command );
595 : bool _cmdSetAffinity( ICommand& command );
596 : bool _cmdConnect( ICommand& command );
597 : bool _cmdConnectReply( ICommand& command );
598 : bool _cmdConnectAck( ICommand& command );
599 : bool _cmdID( ICommand& command );
600 : bool _cmdDisconnect( ICommand& command );
601 : bool _cmdGetNodeData( ICommand& command );
602 : bool _cmdGetNodeDataReply( ICommand& command );
603 : bool _cmdAcquireSendToken( ICommand& command );
604 : bool _cmdAcquireSendTokenReply( ICommand& command );
605 : bool _cmdReleaseSendToken( ICommand& command );
606 : bool _cmdAddListener( ICommand& command );
607 : bool _cmdRemoveListener( ICommand& command );
608 : bool _cmdPing( ICommand& command );
609 : bool _cmdCommand( ICommand& command );
610 : bool _cmdCommandAsync( ICommand& command );
611 : bool _cmdAddConnection( ICommand& command );
612 0 : bool _cmdDiscard( ICommand& ) { return true; }
613 : //@}
614 :
615 100 : LB_TS_VAR( _cmdThread )
616 100 : LB_TS_VAR( _rcvThread )
617 : };
618 :
619 156 : inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
620 : {
621 156 : os << static_cast< const Node& >( node );
622 156 : return os;
623 : }
624 : }
625 : #endif // CO_LOCALNODE_H
|