Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2012-2014, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_LOCALNODE_H
23 : #define CO_LOCALNODE_H
24 :
25 : #include <co/node.h> // base class
26 : #include <co/objectHandler.h> // base class
27 : #include <co/objectVersion.h> // VERSION_FOO used inline
28 : #include <lunchbox/requestHandler.h> // base class
29 :
30 : #include <boost/function/function1.hpp>
31 : #include <boost/function/function4.hpp>
32 :
33 : namespace co
34 : {
35 : namespace detail { class LocalNode; class ReceiverThread; class CommandThread; }
36 :
37 : /**
38 : * Node specialization for a local node.
39 : *
40 : * Local nodes listen on network connections, manage connections to other nodes
41 : * and provide Object registration, mapping and command dispatch. Typically each
42 : * process uses one local node to communicate with other processes.
43 : */
44 : class LocalNode : public lunchbox::RequestHandler, public Node,
45 : public ObjectHandler
46 : {
47 : public:
48 : /**
49 : * Counters are monotonically increasing performance variables for
50 : * operations performed by a LocalNode instance.
51 : */
52 : enum Counter
53 : {
54 : COUNTER_MAP_OBJECT_REMOTE, //!< Num of mapObjects served for other nodes
55 : COUNTER_ALL // must be last
56 : };
57 :
58 : /** Construct a new local node of the given type. @version 1.0 */
59 : CO_API explicit LocalNode( const uint32_t type = co::NODETYPE_NODE );
60 :
61 : /**
62 : * @name State Changes
63 : *
64 : * The following methods affect the state of the node by changing its
65 : * connectivity to the network.
66 : */
67 : //@{
68 : /**
69 : * Initialize the node.
70 : *
71 : * Parses the following command line options and calls listen()
72 : * afterwards:
73 : *
74 : * The '--co-listen <connection description>' command line option is
75 : * parsed by this method to add listening connections to this node. This
76 : * parameter might be used multiple times.
77 : * ConnectionDescription::fromString() is used to parse the provided
78 : * description.
79 : *
80 : * The '--co-globals <string>' option is used to initialize the
81 : * Globals. The string is parsed used Globals::fromString().
82 : *
83 : * Please note that further command line parameters are recognized by
84 : * co::init().
85 : *
86 : * @param argc the command line argument count.
87 : * @param argv the command line argument values.
88 : * @return true if the client was successfully initialized, false otherwise.
89 : * @version 1.0
90 : */
91 : CO_API virtual bool initLocal( const int argc, char** argv );
92 :
93 : /**
94 : * Open all connections and put this node into the listening state.
95 : *
96 : * The node will spawn a receiver and command thread, and listen on all
97 : * connections for incoming commands. The node will be in the listening
98 : * state if the method completed successfully. A listening node can connect
99 : * other nodes.
100 : *
101 : * @return true if the node could be initialized, false otherwise.
102 : * @sa connect()
103 : * @version 1.0
104 : */
105 : CO_API virtual bool listen();
106 :
107 : /**
108 : * Close a listening node.
109 : *
110 : * Disconnects all connected node proxies, closes the listening connections
111 : * and terminates all threads created in listen().
112 : *
113 : * @return true if the node was stopped, false otherwise.
114 : * @version 1.0
115 : */
116 : CO_API virtual bool close();
117 :
118 : /** Close a listening node. @version 1.0 */
119 0 : virtual bool exitLocal() { return close(); }
120 :
121 : /**
122 : * Connect a remote node (proxy) to this listening node.
123 : *
124 : * The connection descriptions of the node are used to connect the
125 : * remote local node. On success, the node is in the connected state,
126 : * otherwise its state is unchanged.
127 : *
128 : * This method is one-sided, that is, the node to be connected should
129 : * not initiate a connection to this node at the same time. For
130 : * concurrent connects use the other connect() method using node
131 : * identifiers.
132 : *
133 : * @param node the remote node.
134 : * @return true if this node was connected, false otherwise.
135 : * @version 1.0
136 : */
137 : CO_API bool connect( NodePtr node );
138 :
139 : /**
140 : * Create and connect a node given by an identifier.
141 : *
142 : * This method is two-sided and thread-safe, that is, it can be called
143 : * by multiple threads on the same node with the same nodeID, or
144 : * concurrently on two nodes with each others' nodeID.
145 : *
146 : * @param nodeID the identifier of the node to connect.
147 : * @return the connected node, or an invalid RefPtr if the node could
148 : * not be connected.
149 : * @version 1.0
150 : */
151 : CO_API NodePtr connect( const NodeID& nodeID );
152 :
153 : /**
154 : * Find and connect the node where the given object is registered.
155 : *
156 : * This method is relatively expensive, since potentially all connected
157 : * nodes are queried.
158 : *
159 : * @param id the identifier of the object to search for.
160 : * @return the connected node, or an invalid RefPtr if the node could
161 : * not be found or connected.
162 : * @sa registerObject(), connect()
163 : * @version 1.1.1
164 : */
165 : CO_API NodePtr connectObjectMaster( const uint128_t& id );
166 :
167 : /**
168 : * Disconnect a connected node.
169 : *
170 : * @param node the remote node.
171 : * @return true if the node was disconnected correctly, false otherwise.
172 : * @version 1.0
173 : */
174 : CO_API virtual bool disconnect( NodePtr node );
175 : //@}
176 :
177 : /** @name Object Registry */
178 : //@{
179 : /**
180 : * Register a distributed object.
181 : *
182 : * Registering a distributed object makes this object the master
183 : * version. The object's identifier is used to map slave instances of
184 : * the object. Master versions of objects are typically writable and can
185 : * commit new versions of the distributed object.
186 : *
187 : * @param object the object instance.
188 : * @return true if the object was registered, false otherwise.
189 : * @version 1.0
190 : */
191 : CO_API bool registerObject( Object* object ) override;
192 :
193 : /**
194 : * Deregister a distributed object.
195 : *
196 : * All slave instances should be unmapped before this call, and will be
197 : * forcefully unmapped by this method.
198 : *
199 : * @param object the object instance.
200 : * @version 1.0
201 : */
202 : CO_API void deregisterObject( Object* object ) override;
203 :
204 : /**
205 : * Map a distributed object.
206 : *
207 : * The mapped object becomes a slave instance of the master version which
208 : * was registered with the provided identifier. The given version can be
209 : * used to map a specific version.
210 : *
211 : * If VERSION_NONE is provided, the slave instance is not initialized with
212 : * any data from the master. This is useful if the object has been
213 : * pre-initialized by other means, for example from a shared file system.
214 : *
215 : * If VERSION_OLDEST is provided, the oldest available version is mapped.
216 : *
217 : * If a concrete requested version no longer exists, mapObject() will map
218 : * the oldest available version.
219 : *
220 : * If the requested version is newer than the head version, mapObject() will
221 : * block until the requested version is available.
222 : *
223 : * Mapping an object is a potentially time-consuming operation. Using
224 : * mapObjectNB() and mapObjectSync() to asynchronously map multiple objects
225 : * in parallel improves performance of this operation.
226 : *
227 : * After mapping, the object will have the version used during
228 : * initialization, or VERSION_NONE if mapped to this version.
229 : *
230 : * When no master node is given, connectObjectMaster() is used to find the
231 : * node with the master instance.
232 : *
233 : * This method returns immediately after initiating the mapping. Evaluating
234 : * the value of the returned lunchbox::Future will block on the completion
235 : * of the operation and return true if the object was mapped, false if the
236 : * master of the object is not found or the requested version is no longer
237 : * available.
238 : *
239 : * @param object the object.
240 : * @param id the master object identifier.
241 : * @param master the node with the master instance, may be 0.
242 : * @param version the initial version.
243 : * @return A lunchbox::Future which will deliver the success status of
244 : * the operation on evaluation.
245 : * @sa registerObject
246 : * @version 1.0
247 : */
248 : CO_API f_bool_t mapObject( Object* object, const uint128_t& id,
249 : NodePtr master,
250 : const uint128_t& version = VERSION_OLDEST );
251 :
252 : /** Convenience wrapper for mapObject(). @version 1.0 */
253 27 : f_bool_t mapObject( Object* object, const ObjectVersion& v )
254 27 : { return mapObject( object, v.identifier, 0, v.version ); }
255 :
256 : /** @deprecated */
257 13 : f_bool_t mapObject( Object* object, const uint128_t& id,
258 : const uint128_t& version = VERSION_OLDEST )
259 13 : { return mapObject( object, id, 0, version ); }
260 :
261 : /** @deprecated use mapObject() */
262 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
263 : const uint128_t& version = VERSION_OLDEST );
264 :
265 : /** @deprecated use mapObject() */
266 : CO_API uint32_t mapObjectNB( Object* object, const uint128_t& id,
267 : const uint128_t& version,
268 : NodePtr master ) override;
269 :
270 : /** @deprecated use mapObject() */
271 : CO_API bool mapObjectSync( const uint32_t requestID ) override;
272 :
273 : /**
274 : * Synchronize the local object with a remote object.
275 : *
276 : * The object is synchronized to the newest version of the first
277 : * attached object on the given master node matching the
278 : * instanceID. When no master node is given, connectObjectMaster() is
279 : * used to find the node with the master instance. When CO_INSTANCE_ALL
280 : * is given, the first instance is used. Before a successful return,
281 : * applyInstanceData() is called on the calling thread to synchronize
282 : * the given object.
283 : *
284 : * @param object The local object instance to synchronize.
285 : * @param master The node where the synchronizing object is attached.
286 : * @param id the object identifier.
287 : * @param instanceID the instance identifier of the synchronizing
288 : * object.
289 : * @return A lunchbox::Future which will deliver the success status of
290 : * the operation on evaluation.
291 : * @version 1.1.1
292 : */
293 : CO_API f_bool_t syncObject( Object* object, NodePtr master,
294 : const uint128_t& id,
295 : const uint32_t instanceID = CO_INSTANCE_ALL ) override;
296 : /**
297 : * Unmap a mapped object.
298 : *
299 : * @param object the mapped object.
300 : * @version 1.0
301 : */
302 : CO_API void unmapObject( Object* object ) override;
303 :
304 : /** Disable the instance cache of a stopped local node. @version 1.0 */
305 : CO_API void disableInstanceCache();
306 :
307 : /** @internal */
308 : CO_API void expireInstanceData( const int64_t age );
309 :
310 : /**
311 : * Enable sending instance data after registration.
312 : *
313 : * Send-on-register starts transmitting instance data of registered
314 : * objects directly after they have been registered. The data is cached
315 : * on remote nodes and accelerates object mapping. Send-on-register
316 : * should not be active when remote nodes are joining a multicast group
317 : * of this node, since they will potentially read out-of-order data
318 : * streams on the multicast connection.
319 : *
320 : * Enable and disable are counted, that is, the last enable on a
321 : * matched series of disable/enable will be effective. The disable is
322 : * completely synchronous, that is, no more instance data will be sent
323 : * after the first disable.
324 : *
325 : * @version 1.0
326 : */
327 : CO_API void enableSendOnRegister();
328 :
329 : /** Disable sending data of newly registered objects. @version 1.0 */
330 : CO_API void disableSendOnRegister();
331 :
332 : /**
333 : * Handler for an Object::push() operation.
334 : *
335 : * Called at least on each node listed in an Object::push() operation
336 : * upon reception of the pushed data from the command thread. Called on
337 : * all nodes of a multicast group, even for nodes not listed in the
338 : * Object::push().
339 : *
340 : * The default implementation calls registered push handlers. Typically
341 : * used to create an object on a remote node, using the objectType for
342 : * instantiation, the istream to initialize it, and the objectID to map
343 : * it using VERSION_NONE. The groupID may be used to differentiate
344 : * multiple concurrent push operations.
345 : *
346 : * @param groupID The group identifier given to Object::push()
347 : * @param objectType The type identifier given to Object::push()
348 : * @param objectID The identifier of the pushed object
349 : * @param istream the input data stream containing the instance data.
350 : * @version 1.0
351 : */
352 : CO_API virtual void objectPush( const uint128_t& groupID,
353 : const uint128_t& objectType,
354 : const uint128_t& objectID,
355 : DataIStream& istream );
356 :
357 : /** Function signature for push handlers. @version 1.0 */
358 : typedef boost::function< void( const uint128_t&, //!< groupID
359 : const uint128_t&, //!< objectType
360 : const uint128_t&, //!< objectID
361 : DataIStream& ) > PushHandler;
362 : /**
363 : * Register a custom handler for Object::push operations
364 : *
365 : * The registered handler function will be called automatically for an
366 : * incoming object push. Threadsafe with itself and objectPush().
367 : *
368 : * @param groupID The group identifier given to Object::push()
369 : * @param handler The handler function called for a registered groupID
370 : * @version 1.0
371 : */
372 : CO_API void registerPushHandler( const uint128_t& groupID,
373 : const PushHandler& handler );
374 :
375 :
376 : /** Function signature for custom command handlers. @version 1.0 */
377 : typedef boost::function< bool( CustomICommand& ) > CommandHandler;
378 :
379 : /**
380 : * Register a custom command handler handled by this node.
381 : *
382 : * Custom command handlers are invoked on reception of a CustomICommand
383 : * send by Node::send( uint128_t, ... ). The command identifier needs to
384 : * be unique. It is recommended to use servus::make_uint128() to generate
385 : * this identifier.
386 : *
387 : * @param command the unique identifier of the custom command
388 : * @param func the handler function for the custom command
389 : * @param queue the queue where the command should be inserted to
390 : * @return true on successful registering, false otherwise
391 : * @version 1.0
392 : */
393 : CO_API bool registerCommandHandler( const uint128_t& command,
394 : const CommandHandler& func,
395 : CommandQueue* queue );
396 :
397 : /** @internal swap the existing object by a new object and keep
398 : the cm, id and instanceID. */
399 : CO_API void swapObject( Object* oldObject, Object* newObject );
400 : //@}
401 :
402 : /** @name Data Access */
403 : //@{
404 : /**
405 : * Get a node by identifier.
406 : *
407 : * The node might not be connected. Thread safe.
408 : *
409 : * @param id the node identifier.
410 : * @return the node.
411 : * @version 1.0
412 : */
413 : CO_API NodePtr getNode( const NodeID& id ) const;
414 :
415 : /** Assemble a vector of the currently connected nodes. @version 1.0 */
416 : CO_API void getNodes( Nodes& nodes, const bool addSelf = true ) const;
417 :
418 : /** Return the command queue to the command thread. @version 1.0 */
419 : CO_API CommandQueue* getCommandThreadQueue();
420 :
421 : /**
422 : * @return true if executed from the command handler thread, false if
423 : * not.
424 : * @version 1.0
425 : */
426 : CO_API bool inCommandThread() const;
427 :
428 : CO_API int64_t getTime64() const; //!< @internal
429 : CO_API ssize_t getCounter( const Counter counter ) const; //!< @internal
430 : //@}
431 :
432 : /** @name Operations */
433 : //@{
434 : /**
435 : * Add a listening connection to this listening node.
436 : * @return the listening connection, or 0 upon error.
437 : */
438 : CO_API ConnectionPtr addListener( ConnectionDescriptionPtr desc );
439 :
440 : /** Add a listening connection to this listening node. */
441 : CO_API void addListener( ConnectionPtr connection );
442 :
443 : /** Remove listening connections from this listening node.*/
444 : CO_API void removeListeners( const Connections& connections );
445 :
446 : /** @internal
447 : * Flush all pending commands on this listening node.
448 : *
449 : * This causes the receiver thread to redispatch all pending commands,
450 : * which are normally only redispatched when a new command is received.
451 : */
452 : CO_API void flushCommands();
453 :
454 : /** @internal Allocate a command buffer from the receiver thread. */
455 : CO_API BufferPtr allocBuffer( const uint64_t size );
456 :
457 : /**
458 : * Dispatches a command to the registered command queue.
459 : *
460 : * Applications using custom command types have to override this method
461 : * to dispatch the custom commands.
462 : *
463 : * @param command the command.
464 : * @return the result of the operation.
465 : * @sa ICommand::invoke
466 : * @version 1.0
467 : */
468 : CO_API bool dispatchCommand( ICommand& command ) override;
469 :
470 :
471 : /** A handle for a send token acquired by acquireSendToken(). */
472 : typedef lunchbox::RefPtr< co::SendToken > SendToken;
473 :
474 : /**
475 : * Acquire a send token from the given node.
476 : *
477 : * The token is released automatically when it leaves its scope or
478 : * explicitly using releaseSendToken().
479 : *
480 : * @return The send token.
481 : */
482 : CO_API SendToken acquireSendToken( NodePtr toNode );
483 :
484 : /** @deprecated Token will auto-release when leaving scope. */
485 : CO_API void releaseSendToken( SendToken token );
486 :
487 : /** @return a Zeroconf communicator handle for this node. @version 1.0*/
488 : CO_API Zeroconf getZeroconf();
489 : //@}
490 :
491 : /** @internal Ack an operation to the sender. */
492 : CO_API void ackRequest( NodePtr node, const uint32_t requestID );
493 :
494 : /** Request keep-alive update from the remote node. */
495 : CO_API void ping( NodePtr remoteNode );
496 :
497 : /**
498 : * Request updates from all nodes above keep-alive timeout.
499 : *
500 : * @return true if at least one ping was send.
501 : */
502 : CO_API bool pingIdleNodes();
503 :
504 : /**
505 : * Bind this, the receiver and the command thread to the given
506 : * lunchbox::Thread affinity.
507 : */
508 : CO_API void setAffinity( const int32_t affinity );
509 :
510 : /** @internal */
511 : CO_API void addConnection( ConnectionPtr connection );
512 :
513 : protected:
514 : /** Destruct this local node. @version 1.0 */
515 : CO_API ~LocalNode() override;
516 :
517 : /** @internal
518 : * Connect a node proxy to this node.
519 : *
520 : * This node has to be in the listening state. The node proxy will be
521 : * put in the connected state upon success. The connection has to be
522 : * connected.
523 : *
524 : * @param node the remote node.
525 : * @param connection the connection to the remote node.
526 : * @return true if the node was connected correctly,
527 : * false otherwise.
528 : */
529 : CO_API bool connect( NodePtr node, ConnectionPtr connection );
530 :
531 : /** @internal Notify remote node connection. */
532 68 : virtual void notifyConnect( NodePtr ) {}
533 :
534 : /** @internal Notify remote node disconnection. */
535 115 : virtual void notifyDisconnect( NodePtr ) {}
536 :
537 : /**
538 : * Factory method to create a new node.
539 : *
540 : * @param type the type the node type
541 : * @return the node.
542 : * @sa ctor type parameter
543 : * @version 1.0
544 : */
545 : CO_API virtual NodePtr createNode( const uint32_t type );
546 :
547 : private:
548 : detail::LocalNode* const _impl;
549 :
550 : friend class detail::ReceiverThread;
551 : bool _startCommandThread( const int32_t threadID );
552 : void _runReceiverThread();
553 :
554 : friend class detail::CommandThread;
555 : bool _notifyCommandThreadIdle();
556 :
557 : void _cleanup();
558 : void _closeNode( NodePtr node );
559 : void _addConnection( ConnectionPtr connection );
560 : void _removeConnection( ConnectionPtr connection );
561 :
562 : lunchbox::Request< void > _removeListener( ConnectionPtr connection );
563 :
564 : uint32_t _connect( NodePtr node );
565 : NodePtr _connect( const NodeID& nodeID );
566 : uint32_t _connect( NodePtr node, ConnectionPtr connection );
567 : NodePtr _connect( const NodeID& nodeID, NodePtr peer );
568 : NodePtr _connectFromZeroconf( const NodeID& nodeID );
569 : bool _connectSelf();
570 :
571 : void _handleConnect();
572 : void _handleDisconnect();
573 : bool _handleData();
574 : BufferPtr _readHead( ConnectionPtr connection );
575 : ICommand _setupCommand( ConnectionPtr, ConstBufferPtr );
576 : bool _readTail( ICommand&, BufferPtr, ConnectionPtr );
577 : void _initService();
578 : void _exitService();
579 :
580 : friend class ObjectStore;
581 : template< typename T >
582 1155 : void _registerCommand( const uint32_t command, const CommandFunc< T >& func,
583 : CommandQueue* destinationQueue )
584 : {
585 1155 : registerCommand( command, func, destinationQueue );
586 1155 : }
587 :
588 : void _dispatchCommand( ICommand& command );
589 : void _redispatchCommands();
590 :
591 : /** The command functions. */
592 : bool _cmdAckRequest( ICommand& command );
593 : bool _cmdStopRcv( ICommand& command );
594 : bool _cmdStopCmd( ICommand& command );
595 : bool _cmdSetAffinity( ICommand& command );
596 : bool _cmdConnect( ICommand& command );
597 : bool _cmdConnectReply( ICommand& command );
598 : bool _cmdConnectAck( ICommand& command );
599 : bool _cmdID( ICommand& command );
600 : bool _cmdDisconnect( ICommand& command );
601 : bool _cmdGetNodeData( ICommand& command );
602 : bool _cmdGetNodeDataReply( ICommand& command );
603 : bool _cmdAcquireSendToken( ICommand& command );
604 : bool _cmdAcquireSendTokenReply( ICommand& command );
605 : bool _cmdReleaseSendToken( ICommand& command );
606 : bool _cmdAddListener( ICommand& command );
607 : bool _cmdRemoveListener( ICommand& command );
608 : bool _cmdPing( ICommand& command );
609 : bool _cmdCommand( ICommand& command );
610 : bool _cmdCommandAsync( ICommand& command );
611 : bool _cmdAddConnection( ICommand& command );
612 0 : bool _cmdDiscard( ICommand& ) { return true; }
613 : //@}
614 :
615 108 : LB_TS_VAR( _cmdThread )
616 108 : LB_TS_VAR( _rcvThread )
617 : };
618 :
619 165 : inline std::ostream& operator << ( std::ostream& os, const LocalNode& node )
620 : {
621 165 : os << static_cast< const Node& >( node );
622 165 : return os;
623 : }
624 : }
625 : #endif // CO_LOCALNODE_H
|