Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "objectStore.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "instanceCache.h"
28 : #include "log.h"
29 : #include "masterCMCommand.h"
30 : #include "nodeCommand.h"
31 : #include "oCommand.h"
32 : #include "objectCM.h"
33 : #include "objectCommand.h"
34 : #include "objectDataICommand.h"
35 : #include "objectDataIStream.h"
36 :
37 : #include <lunchbox/futureFunction.h>
38 : #include <lunchbox/scopedMutex.h>
39 :
40 : #include <boost/bind.hpp>
41 :
42 : #include <limits>
43 :
44 : //#define DEBUG_DISPATCH
45 : #ifdef DEBUG_DISPATCH
46 : # include <set>
47 : #endif
48 :
49 : namespace co
50 : {
51 : typedef CommandFunc< ObjectStore > CmdFunc;
52 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
53 :
54 51 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
55 : : _localNode( localNode )
56 : , _instanceIDs( -0x7FFFFFFF )
57 : , _instanceCache( new InstanceCache( Global::getIAttribute(
58 102 : Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
59 153 : , _counters( counters )
60 : {
61 51 : LBASSERT( localNode );
62 51 : CommandQueue* queue = localNode->getCommandThreadQueue();
63 :
64 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
65 51 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
66 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
67 51 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
68 : localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
69 51 : CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
70 : localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
71 51 : CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
72 : localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
73 51 : CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
74 : localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
75 51 : CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
76 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
77 51 : CmdFunc( this, &ObjectStore::_cmdMap ), queue );
78 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
79 51 : CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
80 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
81 51 : CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
82 : localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
83 51 : CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
84 : localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
85 51 : CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
86 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
87 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
88 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
89 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
90 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
91 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
92 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
93 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
94 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
95 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
96 : localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
97 51 : CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
98 : localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
99 51 : CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
100 : localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
101 51 : CmdFunc( this, &ObjectStore::_cmdPush ), queue );
102 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
103 51 : CmdFunc( this, &ObjectStore::_cmdSync ), queue );
104 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
105 51 : CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
106 51 : }
107 :
108 147 : ObjectStore::~ObjectStore()
109 : {
110 49 : LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
111 :
112 : #ifndef NDEBUG
113 49 : if( !_objects->empty( ))
114 : {
115 0 : LBWARN << _objects->size() << " attached objects in destructor"
116 0 : << std::endl;
117 :
118 0 : for( ObjectsHash::const_iterator i = _objects->begin();
119 0 : i != _objects->end(); ++i )
120 : {
121 0 : const Objects& objects = i->second;
122 0 : LBWARN << " " << objects.size() << " objects with id "
123 0 : << i->first << std::endl;
124 :
125 0 : for( Objects::const_iterator j = objects.begin();
126 0 : j != objects.end(); ++j )
127 : {
128 0 : const Object* object = *j;
129 0 : LBINFO << " object type " << lunchbox::className( object )
130 0 : << std::endl;
131 : }
132 : }
133 : }
134 : //LBASSERT( _objects->empty( ))
135 : #endif
136 49 : clear();
137 49 : delete _instanceCache;
138 49 : _instanceCache = 0;
139 98 : }
140 :
141 94 : void ObjectStore::clear( )
142 : {
143 94 : LBASSERT( _objects->empty( ));
144 94 : expireInstanceData( 0 );
145 94 : LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
146 :
147 94 : _objects->clear();
148 94 : _sendQueue.clear();
149 94 : }
150 :
151 0 : void ObjectStore::disableInstanceCache()
152 : {
153 0 : LBASSERT( _localNode->isClosed( ));
154 0 : delete _instanceCache;
155 0 : _instanceCache = 0;
156 0 : }
157 :
158 94 : void ObjectStore::expireInstanceData( const int64_t age )
159 : {
160 94 : if( _instanceCache )
161 94 : _instanceCache->expire( age );
162 94 : }
163 :
164 110 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
165 : {
166 110 : if( _instanceCache )
167 110 : _instanceCache->remove( nodeID );
168 110 : }
169 :
170 0 : void ObjectStore::enableSendOnRegister()
171 : {
172 0 : ++_sendOnRegister;
173 0 : }
174 :
175 0 : void ObjectStore::disableSendOnRegister()
176 : {
177 0 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
178 : {
179 : lunchbox::Request< void > request =
180 0 : _localNode->registerRequest< void >();
181 0 : _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
182 : }
183 : else // OPT
184 0 : --_sendOnRegister;
185 0 : }
186 :
187 : //---------------------------------------------------------------------------
188 : // identifier master node mapping
189 : //---------------------------------------------------------------------------
190 39 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
191 : {
192 39 : LB_TS_NOT_THREAD( _commandThread );
193 :
194 : // OPT: look up locally first?
195 39 : Nodes nodes;
196 39 : _localNode->getNodes( nodes );
197 :
198 : // OPT: send to multiple nodes at once?
199 39 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
200 : {
201 39 : NodePtr node = *i;
202 : lunchbox::Request< NodeID > request =
203 39 : _localNode->registerRequest< NodeID >();
204 :
205 78 : LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
206 117 : << " req " << request.getID() << std::endl;
207 39 : node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
208 :
209 39 : const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
210 :
211 39 : if( masterNodeID != 0 )
212 : {
213 39 : LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
214 39 : << masterNodeID << std::endl;
215 39 : return masterNodeID;
216 : }
217 0 : }
218 :
219 0 : return NodeID();
220 : }
221 :
222 : //---------------------------------------------------------------------------
223 : // object mapping
224 : //---------------------------------------------------------------------------
225 20 : void ObjectStore::attach( Object* object, const uint128_t& id,
226 : const uint32_t instanceID )
227 : {
228 20 : LBASSERT( object );
229 20 : LB_TS_NOT_THREAD( _receiverThread );
230 :
231 : lunchbox::Request< void > request =
232 20 : _localNode->registerRequest< void >( object );
233 20 : _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
234 20 : }
235 :
236 : namespace
237 : {
238 61 : uint32_t _genNextID( lunchbox::a_int32_t& val )
239 : {
240 : uint32_t result;
241 61 : do
242 : {
243 61 : const long id = ++val;
244 : result = static_cast< uint32_t >(
245 61 : static_cast< int64_t >( id ) + 0x7FFFFFFFu );
246 : }
247 : while( result > CO_INSTANCE_MAX );
248 :
249 61 : return result;
250 : }
251 : }
252 :
253 61 : void ObjectStore::_attach( Object* object, const uint128_t& id,
254 : const uint32_t inInstanceID )
255 : {
256 61 : LBASSERT( object );
257 61 : LB_TS_THREAD( _receiverThread );
258 :
259 61 : uint32_t instanceID = inInstanceID;
260 61 : if( inInstanceID == CO_INSTANCE_INVALID )
261 20 : instanceID = _genNextID( _instanceIDs );
262 :
263 61 : object->attach( id, instanceID );
264 :
265 : {
266 61 : lunchbox::ScopedFastWrite mutex( _objects );
267 61 : Objects& objects = _objects.data[ id ];
268 61 : LBASSERTINFO( !object->isMaster() || objects.empty(),
269 : "Attaching master " << *object << ", " <<
270 : objects.size() << " attached objects with same ID, " <<
271 : "first is " << ( objects[0]->isMaster() ? "master " :
272 : "slave " ) << *objects[0] );
273 61 : objects.push_back( object );
274 : }
275 :
276 61 : _localNode->flushCommands(); // redispatch pending commands
277 :
278 61 : LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
279 61 : << static_cast< void* >( object ) << std::endl;
280 61 : }
281 :
282 28 : void ObjectStore::detach( Object* object )
283 : {
284 28 : LBASSERT( object );
285 28 : LB_TS_NOT_THREAD( _receiverThread );
286 :
287 28 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
288 : _localNode->send( CMD_NODE_DETACH_OBJECT )
289 28 : << object->getID() << object->getInstanceID() << request;
290 28 : }
291 :
292 0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
293 : {
294 0 : LBASSERT( newObject );
295 0 : LBASSERT( oldObject );
296 0 : LBASSERT( oldObject->isMaster() );
297 0 : LB_TS_THREAD( _receiverThread );
298 :
299 0 : if( !oldObject->isAttached() )
300 0 : return;
301 :
302 0 : LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
303 0 : << std::endl;
304 0 : const uint128_t& id = oldObject->getID();
305 :
306 0 : lunchbox::ScopedFastWrite mutex( _objects );
307 0 : ObjectsHash::iterator i = _objects->find( id );
308 0 : LBASSERT( i != _objects->end( ));
309 0 : if( i == _objects->end( ))
310 0 : return;
311 :
312 0 : Objects& objects = i->second;
313 0 : Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
314 0 : LBASSERT( j != objects.end( ));
315 0 : if( j == objects.end( ))
316 0 : return;
317 :
318 0 : newObject->transfer( oldObject );
319 0 : *j = newObject;
320 : }
321 :
322 59 : void ObjectStore::_detach( Object* object )
323 : {
324 : // check also _cmdUnmapObject when modifying!
325 59 : LBASSERT( object );
326 59 : LB_TS_THREAD( _receiverThread );
327 :
328 59 : if( !object->isAttached() )
329 0 : return;
330 :
331 59 : const uint128_t& id = object->getID();
332 :
333 59 : LBASSERT( _objects->find( id ) != _objects->end( ));
334 59 : LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
335 :
336 59 : Objects& objects = _objects.data[ id ];
337 59 : Objects::iterator i = find( objects.begin(),objects.end(), object );
338 59 : LBASSERT( i != objects.end( ));
339 :
340 : {
341 59 : lunchbox::ScopedFastWrite mutex( _objects );
342 59 : objects.erase( i );
343 59 : if( objects.empty( ))
344 58 : _objects->erase( id );
345 : }
346 :
347 59 : LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
348 59 : object->detach();
349 59 : return;
350 : }
351 :
352 41 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
353 : const uint128_t& version, NodePtr master )
354 : {
355 41 : LB_TS_NOT_THREAD( _receiverThread );
356 41 : LBLOG( LOG_OBJECTS )
357 41 : << "Mapping " << lunchbox::className( object ) << " to id " << id
358 123 : << " version " << version << std::endl;
359 41 : LBASSERT( object );
360 41 : LBASSERTINFO( id.isUUID(), id );
361 :
362 41 : if( !master )
363 39 : master = _localNode->connectObjectMaster( id );
364 :
365 41 : if( !master || !master->isReachable( ))
366 : {
367 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
368 0 : << std::endl;
369 0 : return LB_UNDEFINED_UINT32;
370 : }
371 :
372 41 : if( !object || !id.isUUID( ))
373 : {
374 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
375 0 : return LB_UNDEFINED_UINT32;
376 : }
377 :
378 41 : const bool isAttached = object->isAttached();
379 41 : const bool isMaster = object->isMaster();
380 41 : LBASSERT( !isAttached );
381 41 : LBASSERT( !isMaster ) ;
382 41 : if( isAttached || isMaster )
383 : {
384 0 : LBWARN << "Invalid object state: attached " << isAttached << " master "
385 0 : << isMaster << std::endl;
386 0 : return LB_UNDEFINED_UINT32;
387 : }
388 :
389 : lunchbox::Request< void > request =
390 41 : _localNode->registerRequest< void >( object );
391 41 : uint128_t minCachedVersion = VERSION_HEAD;
392 41 : uint128_t maxCachedVersion = VERSION_NONE;
393 41 : uint32_t masterInstanceID = 0;
394 : const bool useCache = _checkInstanceCache( id, minCachedVersion,
395 : maxCachedVersion,
396 41 : masterInstanceID );
397 41 : object->notifyAttach();
398 : master->send( CMD_NODE_MAP_OBJECT )
399 82 : << version << minCachedVersion << maxCachedVersion << id
400 123 : << object->getMaxVersions() << request << _genNextID( _instanceIDs )
401 41 : << masterInstanceID << useCache;
402 41 : request.relinquish();
403 41 : return request.getID();
404 : }
405 :
406 45 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
407 : uint128_t& to, uint32_t& instanceID )
408 : {
409 45 : if( !_instanceCache )
410 0 : return false;
411 :
412 45 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
413 45 : if( cached == InstanceCache::Data::NONE )
414 45 : return false;
415 :
416 0 : const ObjectDataIStreamDeque& versions = cached.versions;
417 0 : LBASSERT( !cached.versions.empty( ));
418 0 : instanceID = cached.masterInstanceID;
419 0 : from = versions.front()->getVersion();
420 0 : to = versions.back()->getVersion();
421 0 : LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
422 0 : << std::endl;
423 0 : return true;
424 : }
425 :
426 41 : bool ObjectStore::mapSync( const uint32_t requestID )
427 : {
428 41 : if( requestID == LB_UNDEFINED_UINT32 )
429 0 : return false;
430 :
431 41 : void* data = _localNode->getRequestData( requestID );
432 41 : if( data == 0 )
433 0 : return false;
434 :
435 41 : Object* object = LBSAFECAST( Object*, data );
436 41 : uint128_t version = VERSION_NONE;
437 41 : _localNode->waitRequest( requestID, version );
438 :
439 41 : const bool mapped = object->isAttached();
440 41 : if( mapped )
441 41 : object->applyMapData( version ); // apply initial instance data
442 :
443 41 : object->notifyAttached();
444 41 : LBLOG( LOG_OBJECTS )
445 82 : << "Mapped " << lunchbox::className( object ) << std::endl;
446 41 : return mapped;
447 : }
448 :
449 :
450 4 : f_bool_t ObjectStore::sync( Object* object, NodePtr master, const uint128_t& id,
451 : const uint32_t instanceID )
452 : {
453 4 : const uint32_t request = _startSync( object, master, id, instanceID );
454 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
455 4 : this, request, object );
456 4 : return f_bool_t( new FuturebImpl( func ));
457 : }
458 :
459 4 : uint32_t ObjectStore::_startSync( Object* object, NodePtr master,
460 : const uint128_t& id,
461 : const uint32_t instanceID )
462 : {
463 4 : LB_TS_NOT_THREAD( _receiverThread );
464 4 : LBLOG( LOG_OBJECTS )
465 4 : << "Syncing " << lunchbox::className( object ) << " with id " << id
466 12 : << std::endl;
467 4 : LBASSERT( object );
468 4 : LBASSERTINFO( id.isUUID(), id );
469 :
470 4 : if( !object || !id.isUUID( ))
471 : {
472 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
473 0 : return LB_UNDEFINED_UINT32;
474 : }
475 :
476 4 : if( !master )
477 0 : master = _localNode->connectObjectMaster( id );
478 :
479 4 : if( !master || !master->isReachable( ))
480 : {
481 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
482 0 : << std::endl;
483 0 : return LB_UNDEFINED_UINT32;
484 : }
485 :
486 : lunchbox::Request< void > request =
487 4 : _localNode->registerRequest< void >( new ObjectDataIStream );
488 4 : uint128_t minCachedVersion = VERSION_HEAD;
489 4 : uint128_t maxCachedVersion = VERSION_NONE;
490 4 : uint32_t cacheInstanceID = 0;
491 :
492 : bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
493 4 : cacheInstanceID );
494 4 : if( useCache )
495 : {
496 0 : switch( instanceID )
497 : {
498 : case CO_INSTANCE_ALL:
499 0 : break;
500 : default:
501 0 : if( instanceID == cacheInstanceID )
502 0 : break;
503 :
504 0 : useCache = false;
505 0 : LBCHECK( _instanceCache->release( id, 1 ));
506 0 : break;
507 : }
508 : }
509 :
510 : // Use stream expected by MasterCMCommand
511 : master->send( CMD_NODE_SYNC_OBJECT )
512 8 : << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
513 12 : << uint64_t(0) /* maxVersions */ << request << instanceID
514 4 : << cacheInstanceID << useCache;
515 4 : request.relinquish();
516 4 : return request.getID();
517 : }
518 :
519 4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
520 : {
521 4 : if( requestID == LB_UNDEFINED_UINT32 )
522 0 : return false;
523 :
524 4 : void* data = _localNode->getRequestData( requestID );
525 4 : if( data == 0 )
526 0 : return false;
527 :
528 4 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
529 :
530 4 : bool ok = false;
531 4 : _localNode->waitRequest( requestID, ok );
532 :
533 4 : if( !ok )
534 : {
535 0 : LBWARN << "Object synchronization failed" << std::endl;
536 0 : delete is;
537 0 : return false;
538 : }
539 :
540 4 : is->waitReady();
541 4 : object->applyInstanceData( *is );
542 8 : LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
543 12 : << std::endl;
544 4 : delete is;
545 4 : return true;
546 : }
547 :
548 41 : void ObjectStore::unmap( Object* object )
549 : {
550 41 : LBASSERT( object );
551 41 : if( !object->isAttached( )) // not registered
552 34 : return;
553 :
554 40 : const uint128_t& id = object->getID();
555 :
556 40 : LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
557 :
558 40 : object->notifyDetach();
559 :
560 : // send unsubscribe to master, master will send detach command.
561 40 : LBASSERT( !object->isMaster( ));
562 40 : LB_TS_NOT_THREAD( _commandThread );
563 :
564 40 : const uint32_t masterInstanceID = object->getMasterInstanceID();
565 40 : if( masterInstanceID != CO_INSTANCE_INVALID )
566 : {
567 32 : NodePtr master = object->getMasterNode();
568 32 : LBASSERT( master )
569 :
570 32 : if( master && master->isReachable( ))
571 : {
572 : lunchbox::Request< void > request =
573 32 : _localNode->registerRequest< void >();
574 : master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
575 32 : << id << request << masterInstanceID << object->getInstanceID();
576 32 : request.wait();
577 32 : object->notifyDetached();
578 32 : return;
579 : }
580 0 : LBERROR << "Master node for object id " << id << " not connected"
581 0 : << std::endl;
582 : }
583 :
584 : // no unsubscribe sent: Detach directly
585 8 : detach( object );
586 8 : object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
587 8 : object->notifyDetached();
588 : }
589 :
590 20 : bool ObjectStore::register_( Object* object )
591 : {
592 20 : LBASSERT( object );
593 20 : LBASSERT( !object->isAttached( ));
594 :
595 20 : const uint128_t& id = object->getID( );
596 20 : LBASSERTINFO( id.isUUID(), id );
597 :
598 20 : object->notifyAttach();
599 20 : object->setupChangeManager( object->getChangeType(), true, _localNode,
600 40 : CO_INSTANCE_INVALID );
601 20 : attach( object, id, CO_INSTANCE_INVALID );
602 :
603 20 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
604 20 : _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
605 :
606 20 : object->notifyAttached();
607 :
608 20 : LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
609 20 : return true;
610 : }
611 :
612 20 : void ObjectStore::deregister( Object* object )
613 : {
614 20 : LBASSERT( object );
615 20 : if( !object->isAttached( )) // not registered
616 20 : return;
617 :
618 20 : LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
619 20 : LBASSERT( object->isMaster( ));
620 :
621 20 : object->notifyDetach();
622 :
623 20 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
624 : {
625 : // remove from send queue
626 : lunchbox::Request< void > request =
627 20 : _localNode->registerRequest< void >();
628 20 : _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
629 : }
630 :
631 20 : const uint128_t id = object->getID();
632 20 : detach( object );
633 20 : object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
634 20 : if( _instanceCache )
635 20 : _instanceCache->erase( id );
636 20 : object->notifyDetached();
637 : }
638 :
639 50596 : bool ObjectStore::notifyCommandThreadIdle()
640 : {
641 50596 : LB_TS_THREAD( _commandThread );
642 50597 : if( _sendQueue.empty( ))
643 50597 : return false;
644 :
645 0 : LBASSERT( _sendOnRegister > 0 );
646 0 : SendQueueItem& item = _sendQueue.front();
647 :
648 0 : if( item.age > _localNode->getTime64( ))
649 : {
650 0 : Nodes nodes;
651 0 : _localNode->getNodes( nodes, false );
652 0 : if( nodes.empty( ))
653 : {
654 0 : lunchbox::Thread::yield();
655 0 : return !_sendQueue.empty();
656 : }
657 :
658 0 : item.object->sendInstanceData( nodes );
659 : }
660 0 : _sendQueue.pop_front();
661 0 : return !_sendQueue.empty();
662 : }
663 :
664 7 : void ObjectStore::removeNode( NodePtr node )
665 : {
666 7 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
667 7 : _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
668 7 : }
669 :
670 : //===========================================================================
671 : // ICommand handling
672 : //===========================================================================
673 548 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
674 : {
675 548 : LB_TS_THREAD( _receiverThread );
676 548 : ObjectICommand command( cmd );
677 548 : const uint128_t& id = command.getObjectID();
678 548 : const uint32_t instanceID = command.getInstanceID();
679 :
680 548 : ObjectsHash::const_iterator i = _objects->find( id );
681 :
682 548 : if( i == _objects->end( ))
683 : // When the instance ID is set to none, we only care about the command
684 : // when we have an object of the given ID (multicast)
685 0 : return ( instanceID == CO_INSTANCE_NONE );
686 :
687 548 : const Objects& objects = i->second;
688 548 : LBASSERTINFO( !objects.empty(), command );
689 :
690 548 : if( instanceID <= CO_INSTANCE_MAX )
691 : {
692 59 : for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
693 : {
694 59 : Object* object = *j;
695 59 : if( instanceID == object->getInstanceID( ))
696 : {
697 53 : LBCHECK( object->dispatchCommand( command ));
698 53 : return true;
699 : }
700 : }
701 0 : LBUNREACHABLE;
702 0 : return false;
703 : }
704 :
705 495 : Objects::const_iterator j = objects.begin();
706 495 : Object* object = *j;
707 495 : LBCHECK( object->dispatchCommand( command ));
708 :
709 495 : for( ++j; j != objects.end(); ++j )
710 : {
711 0 : object = *j;
712 0 : LBCHECK( object->dispatchCommand( command ));
713 : }
714 495 : return true;
715 : }
716 :
717 39 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
718 : {
719 39 : LB_TS_THREAD( _commandThread );
720 :
721 39 : const uint128_t& id = command.get< uint128_t >();
722 39 : const uint32_t requestID = command.get< uint32_t >();
723 39 : LBASSERT( id.isUUID( ));
724 :
725 39 : NodeID masterNodeID;
726 : {
727 39 : lunchbox::ScopedFastRead mutex( _objects );
728 39 : ObjectsHashCIter i = _objects->find( id );
729 :
730 39 : if( i != _objects->end( ))
731 : {
732 39 : const Objects& objects = i->second;
733 39 : LBASSERT( !objects.empty( ));
734 :
735 39 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
736 : {
737 39 : Object* object = *j;
738 39 : if( object->isMaster( ))
739 39 : masterNodeID = _localNode->getNodeID();
740 : else
741 : {
742 0 : NodePtr master = object->getMasterNode();
743 0 : if( master.isValid( ))
744 0 : masterNodeID = master->getNodeID();
745 : }
746 39 : if( masterNodeID != 0 )
747 39 : break;
748 : }
749 39 : }
750 : }
751 :
752 39 : LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
753 39 : << " req " << requestID << std::endl;
754 : command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
755 39 : << masterNodeID << requestID;
756 39 : return true;
757 : }
758 :
759 39 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
760 : {
761 39 : const NodeID& masterNodeID = command.get< NodeID >();
762 39 : const uint32_t requestID = command.get< uint32_t >();
763 39 : _localNode->serveRequest( requestID, masterNodeID );
764 39 : return true;
765 : }
766 :
767 20 : bool ObjectStore::_cmdAttach( ICommand& command )
768 : {
769 20 : LB_TS_THREAD( _receiverThread );
770 20 : LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
771 :
772 20 : const uint128_t& objectID = command.get< uint128_t >();
773 20 : const uint32_t instanceID = command.get< uint32_t >();
774 20 : const uint32_t requestID = command.get< uint32_t >();
775 :
776 : Object* object = static_cast< Object* >( _localNode->getRequestData(
777 20 : requestID ));
778 20 : _attach( object, objectID, instanceID );
779 20 : _localNode->serveRequest( requestID );
780 20 : return true;
781 : }
782 :
783 60 : bool ObjectStore::_cmdDetach( ICommand& command )
784 : {
785 60 : LB_TS_THREAD( _receiverThread );
786 60 : LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
787 :
788 60 : const uint128_t& objectID = command.get< uint128_t >();
789 60 : const uint32_t instanceID = command.get< uint32_t >();
790 60 : const uint32_t requestID = command.get< uint32_t >();
791 :
792 60 : ObjectsHash::const_iterator i = _objects->find( objectID );
793 60 : if( i != _objects->end( ))
794 : {
795 59 : const Objects& objects = i->second;
796 :
797 180 : for( Objects::const_iterator j = objects.begin();
798 120 : j != objects.end(); ++j )
799 : {
800 60 : Object* object = *j;
801 60 : if( object->getInstanceID() == instanceID )
802 : {
803 59 : _detach( object );
804 59 : break;
805 : }
806 : }
807 : }
808 :
809 60 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
810 60 : _localNode->serveRequest( requestID );
811 60 : return true;
812 : }
813 :
814 20 : bool ObjectStore::_cmdRegister( ICommand& command )
815 : {
816 20 : LB_TS_THREAD( _commandThread );
817 20 : if( _sendOnRegister <= 0 )
818 20 : return true;
819 :
820 0 : LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
821 :
822 0 : Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
823 :
824 : const int32_t age = Global::getIAttribute(
825 0 : Global::IATTR_NODE_SEND_QUEUE_AGE );
826 : SendQueueItem item;
827 0 : item.age = age ? age + _localNode->getTime64() :
828 0 : std::numeric_limits< int64_t >::max();
829 0 : item.object = object;
830 0 : _sendQueue.push_back( item );
831 :
832 : const uint32_t size = Global::getIAttribute(
833 0 : Global::IATTR_NODE_SEND_QUEUE_SIZE );
834 0 : while( _sendQueue.size() > size )
835 0 : _sendQueue.pop_front();
836 :
837 0 : return true;
838 : }
839 :
840 20 : bool ObjectStore::_cmdDeregister( ICommand& command )
841 : {
842 20 : LB_TS_THREAD( _commandThread );
843 20 : LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
844 :
845 20 : const uint32_t requestID = command.get< uint32_t >();
846 :
847 20 : const void* object = _localNode->getRequestData( requestID );
848 :
849 20 : for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
850 : {
851 0 : if( i->object == object )
852 : {
853 0 : _sendQueue.erase( i );
854 0 : break;
855 : }
856 : }
857 :
858 20 : _localNode->serveRequest( requestID );
859 20 : return true;
860 : }
861 :
862 41 : bool ObjectStore::_cmdMap( ICommand& cmd )
863 : {
864 41 : LB_TS_THREAD( _commandThread );
865 :
866 41 : MasterCMCommand command( cmd );
867 41 : const uint128_t& id = command.getObjectID();
868 :
869 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
870 0 : << command.getInstanceID() << " req "
871 41 : << command.getRequestID() << std::endl;
872 :
873 82 : ObjectCMPtr masterCM;
874 : {
875 41 : lunchbox::ScopedFastRead mutex( _objects );
876 41 : ObjectsHash::const_iterator i = _objects->find( id );
877 41 : if( i != _objects->end( ))
878 : {
879 41 : const Objects& objects = i->second;
880 :
881 41 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
882 : {
883 41 : Object* object = *j;
884 41 : if( object->isMaster( ))
885 : {
886 41 : masterCM = object->_getChangeManager();
887 41 : break;
888 : }
889 : }
890 41 : }
891 : }
892 :
893 41 : if( !masterCM || !masterCM->addSlave( command ))
894 : {
895 0 : LBWARN << "Can't find master object to map " << id << std::endl;
896 0 : NodePtr node = command.getNode();
897 : node->send( CMD_NODE_MAP_OBJECT_REPLY )
898 0 : << node->getNodeID() << id << command.getRequestedVersion()
899 0 : << command.getRequestID() << false << command.useCache() << false;
900 : }
901 :
902 41 : ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
903 82 : return true;
904 : }
905 :
906 41 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
907 : {
908 41 : LB_TS_THREAD( _receiverThread );
909 :
910 41 : const uint128_t& nodeID = command.get< uint128_t >();
911 41 : const uint128_t& objectID = command.get< uint128_t >();
912 41 : const uint32_t requestID = command.get< uint32_t >();
913 41 : const uint32_t instanceID = command.get< uint32_t >();
914 41 : const Object::ChangeType changeType = command.get< Object::ChangeType >();
915 41 : const uint32_t masterInstanceID = command.get< uint32_t >();
916 :
917 : // Map success commands are potentially multicasted (see above)
918 : // verify that we are the intended receiver
919 41 : if( nodeID != _localNode->getNodeID( ))
920 0 : return true;
921 :
922 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
923 0 : << " id " << objectID << "." << instanceID
924 41 : << " req " << requestID << std::endl;
925 :
926 : // set up change manager and attach object to dispatch table
927 : Object* object = static_cast< Object* >(
928 41 : _localNode->getRequestData( requestID ));
929 41 : LBASSERT( object );
930 41 : LBASSERT( !object->isMaster( ));
931 :
932 : object->setupChangeManager( Object::ChangeType( changeType ), false,
933 41 : _localNode, masterInstanceID );
934 41 : _attach( object, objectID, instanceID );
935 41 : return true;
936 : }
937 :
938 41 : bool ObjectStore::_cmdMapReply( ICommand& command )
939 : {
940 41 : LB_TS_THREAD( _receiverThread );
941 :
942 : // Map reply commands are potentially multicasted (see above)
943 : // verify that we are the intended receiver
944 41 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
945 0 : return true;
946 :
947 41 : const uint128_t& objectID = command.get< uint128_t >();
948 41 : const uint128_t& version = command.get< uint128_t >();
949 41 : const uint32_t requestID = command.get< uint32_t >();
950 41 : const bool result = command.get< bool >();
951 41 : const bool releaseCache = command.get< bool >();
952 41 : const bool useCache = command.get< bool >();
953 :
954 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
955 41 : << objectID << " req " << requestID << std::endl;
956 :
957 41 : LBASSERT( _localNode->getRequestData( requestID ));
958 :
959 41 : if( result )
960 : {
961 : Object* object = static_cast<Object*>(
962 41 : _localNode->getRequestData( requestID ));
963 41 : LBASSERT( object );
964 41 : LBASSERT( !object->isMaster( ));
965 :
966 41 : object->setMasterNode( command.getNode( ));
967 :
968 41 : if( useCache )
969 : {
970 0 : LBASSERT( releaseCache );
971 0 : LBASSERT( _instanceCache );
972 :
973 0 : const uint128_t& id = objectID;
974 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
975 0 : LBASSERT( cached != InstanceCache::Data::NONE );
976 0 : LBASSERT( !cached.versions.empty( ));
977 :
978 0 : object->addInstanceDatas( cached.versions, version );
979 0 : LBCHECK( _instanceCache->release( id, 2 ));
980 : }
981 41 : else if( releaseCache )
982 : {
983 0 : LBCHECK( _instanceCache->release( objectID, 1 ));
984 : }
985 : }
986 : else
987 : {
988 0 : if( releaseCache )
989 0 : _instanceCache->release( objectID, 1 );
990 :
991 0 : LBWARN << "Could not map object " << objectID << std::endl;
992 : }
993 :
994 41 : _localNode->serveRequest( requestID, version );
995 41 : return true;
996 : }
997 :
998 32 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
999 : {
1000 32 : LB_TS_THREAD( _commandThread );
1001 32 : LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object " << command << std::endl;
1002 :
1003 32 : const uint128_t& id = command.get< uint128_t >();
1004 32 : const uint32_t requestID = command.get< uint32_t >();
1005 32 : const uint32_t masterInstanceID = command.get< uint32_t >();
1006 32 : const uint32_t slaveInstanceID = command.get< uint32_t >();
1007 :
1008 32 : NodePtr node = command.getNode();
1009 :
1010 : {
1011 32 : lunchbox::ScopedFastWrite mutex( _objects );
1012 32 : ObjectsHash::const_iterator i = _objects->find( id );
1013 32 : if( i != _objects->end( ))
1014 : {
1015 32 : const Objects& objects = i->second;
1016 32 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1017 : {
1018 32 : Object* object = *j;
1019 64 : if( object->isMaster() &&
1020 32 : object->getInstanceID() == masterInstanceID )
1021 : {
1022 32 : object->removeSlave( node, slaveInstanceID );
1023 32 : break;
1024 : }
1025 : }
1026 32 : }
1027 : }
1028 :
1029 32 : node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
1030 32 : return true;
1031 : }
1032 :
1033 2 : bool ObjectStore::_cmdUnmap( ICommand& command )
1034 : {
1035 2 : LB_TS_THREAD( _receiverThread );
1036 2 : LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
1037 :
1038 2 : const uint128_t& objectID = command.get< uint128_t >();
1039 :
1040 2 : if( _instanceCache )
1041 2 : _instanceCache->erase( objectID );
1042 :
1043 2 : ObjectsHash::iterator i = _objects->find( objectID );
1044 2 : if( i == _objects->end( )) // nothing to do
1045 0 : return true;
1046 :
1047 2 : const Objects objects = i->second;
1048 : {
1049 2 : lunchbox::ScopedFastWrite mutex( _objects );
1050 2 : _objects->erase( i );
1051 : }
1052 :
1053 4 : for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
1054 : {
1055 2 : Object* object = *j;
1056 2 : object->detach();
1057 : }
1058 :
1059 2 : return true;
1060 : }
1061 :
1062 4 : bool ObjectStore::_cmdSync( ICommand& cmd )
1063 : {
1064 4 : LB_TS_THREAD( _commandThread );
1065 4 : MasterCMCommand command( cmd );
1066 4 : const uint128_t& id = command.getObjectID();
1067 4 : LBINFO << command.getNode() << std::endl;
1068 :
1069 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
1070 0 : << command.getInstanceID() << " req "
1071 4 : << command.getRequestID() << std::endl;
1072 :
1073 4 : const uint32_t cacheInstanceID = command.getMasterInstanceID();
1074 8 : ObjectCMPtr cm;
1075 : {
1076 4 : lunchbox::ScopedFastRead mutex( _objects );
1077 4 : ObjectsHash::const_iterator i = _objects->find( id );
1078 4 : if( i != _objects->end( ))
1079 : {
1080 4 : const Objects& objects = i->second;
1081 :
1082 8 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1083 : {
1084 4 : Object* object = *j;
1085 4 : if( command.getInstanceID() == object->getInstanceID( ))
1086 : {
1087 0 : cm = object->_getChangeManager();
1088 0 : LBASSERT( cm );
1089 0 : break;
1090 : }
1091 :
1092 4 : if( command.getInstanceID() != CO_INSTANCE_ALL )
1093 0 : continue;
1094 :
1095 4 : cm = object->_getChangeManager();
1096 4 : LBASSERT( cm );
1097 4 : if( cacheInstanceID == object->getInstanceID( ))
1098 0 : break;
1099 : }
1100 4 : if( !cm )
1101 0 : LBWARN << "Can't find object to sync " << id << "."
1102 0 : << command.getInstanceID() << " in " << objects.size()
1103 0 : << " instances" << std::endl;
1104 : }
1105 4 : if( !cm )
1106 0 : LBWARN << "Can't find object to sync " << id
1107 0 : << ", no object with identifier" << std::endl;
1108 : }
1109 4 : if( !cm || !cm->sendSync( command ))
1110 : {
1111 0 : NodePtr node = command.getNode();
1112 : node->send( CMD_NODE_SYNC_OBJECT_REPLY )
1113 0 : << node->getNodeID() << id << command.getRequestID()
1114 0 : << false << command.useCache() << false;
1115 : }
1116 8 : return true;
1117 : }
1118 :
1119 4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
1120 : {
1121 4 : LB_TS_THREAD( _receiverThread );
1122 :
1123 : // Sync reply commands are potentially multicasted (see above)
1124 : // verify that we are the intended receiver
1125 4 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
1126 0 : return true;
1127 :
1128 4 : const NodeID& id = command.get< NodeID >();
1129 4 : const uint32_t requestID = command.get< uint32_t >();
1130 4 : const bool result = command.get< bool >();
1131 4 : const bool releaseCache = command.get< bool >();
1132 4 : const bool useCache = command.get< bool >();
1133 4 : void* const data = _localNode->getRequestData( requestID );
1134 4 : ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
1135 :
1136 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
1137 4 : << requestID << std::endl;
1138 4 : if( result )
1139 : {
1140 4 : if( useCache )
1141 : {
1142 0 : LBASSERT( releaseCache );
1143 0 : LBASSERT( _instanceCache );
1144 :
1145 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
1146 0 : LBASSERT( cached != InstanceCache::Data::NONE );
1147 0 : LBASSERT( !cached.versions.empty( ));
1148 :
1149 0 : *is = *cached.versions.back();
1150 0 : LBCHECK( _instanceCache->release( id, 2 ));
1151 : }
1152 4 : else if( releaseCache )
1153 : {
1154 0 : LBCHECK( _instanceCache->release( id, 1 ));
1155 : }
1156 : }
1157 : else
1158 : {
1159 0 : if( releaseCache )
1160 0 : _instanceCache->release( id, 1 );
1161 :
1162 0 : LBWARN << "Could not sync object " << id << " request " << requestID
1163 0 : << std::endl;
1164 : }
1165 :
1166 4 : _localNode->serveRequest( requestID, result );
1167 4 : return true;
1168 : }
1169 :
1170 54 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
1171 : {
1172 54 : LB_TS_THREAD( _receiverThread );
1173 54 : LBASSERT( _localNode );
1174 :
1175 54 : ObjectDataICommand command( inCommand );
1176 54 : const NodeID& nodeID = command.get< NodeID >();
1177 54 : const uint32_t masterInstanceID = command.get< uint32_t >();
1178 54 : const uint32_t cmd = command.getCommand();
1179 :
1180 54 : LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
1181 54 : << masterInstanceID << " node " << nodeID << std::endl;
1182 :
1183 54 : command.setType( COMMANDTYPE_OBJECT );
1184 54 : command.setCommand( CMD_OBJECT_INSTANCE );
1185 :
1186 54 : const uint128_t& version = command.getVersion();
1187 54 : if( _instanceCache && version.high() == 0 )
1188 : {
1189 54 : const ObjectVersion rev( command.getObjectID(), version );
1190 : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
1191 54 : if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
1192 : #endif
1193 48 : _instanceCache->add( rev, masterInstanceID, command, 0 );
1194 : }
1195 :
1196 54 : switch( cmd )
1197 : {
1198 : case CMD_NODE_OBJECT_INSTANCE:
1199 0 : LBASSERT( nodeID == 0 );
1200 0 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1201 0 : return true;
1202 :
1203 : case CMD_NODE_OBJECT_INSTANCE_MAP:
1204 37 : if( nodeID != _localNode->getNodeID( )) // not for me
1205 0 : return true;
1206 :
1207 37 : LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
1208 37 : return dispatchObjectCommand( command );
1209 :
1210 : case CMD_NODE_OBJECT_INSTANCE_COMMIT:
1211 3 : LBASSERT( nodeID == 0 );
1212 3 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1213 3 : return dispatchObjectCommand( command );
1214 :
1215 : case CMD_NODE_OBJECT_INSTANCE_PUSH:
1216 6 : LBASSERT( nodeID == 0 );
1217 6 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1218 6 : _pushData.addDataCommand( command.getObjectID(), command );
1219 6 : return true;
1220 :
1221 : case CMD_NODE_OBJECT_INSTANCE_SYNC:
1222 : {
1223 8 : if( nodeID != _localNode->getNodeID( )) // not for me
1224 0 : return true;
1225 :
1226 8 : void* data = _localNode->getRequestData( command.getInstanceID( ));
1227 8 : LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
1228 8 : LBASSERTINFO( data, this );
1229 :
1230 8 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
1231 8 : is->addDataCommand( command );
1232 8 : return true;
1233 : }
1234 :
1235 : default:
1236 0 : LBUNREACHABLE;
1237 0 : return false;
1238 54 : }
1239 : }
1240 :
1241 0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
1242 : {
1243 0 : LB_TS_THREAD( _commandThread );
1244 0 : LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
1245 :
1246 0 : if( --_sendOnRegister == 0 )
1247 : {
1248 0 : _sendQueue.clear();
1249 :
1250 0 : Nodes nodes;
1251 0 : _localNode->getNodes( nodes, false );
1252 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
1253 : {
1254 0 : NodePtr node = *i;
1255 0 : ConnectionPtr multicast = node->getConnection( true );
1256 0 : ConnectionPtr connection = node->getConnection( false );
1257 0 : if( multicast )
1258 0 : multicast->finish();
1259 0 : if( connection && connection != multicast )
1260 0 : connection->finish();
1261 0 : }
1262 : }
1263 :
1264 0 : const uint32_t requestID = command.get< uint32_t >();
1265 0 : _localNode->serveRequest( requestID );
1266 0 : return true;
1267 : }
1268 :
1269 40 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
1270 : {
1271 40 : LB_TS_THREAD( _commandThread );
1272 40 : LBLOG( LOG_OBJECTS ) << "Cmd object " << command << std::endl;
1273 :
1274 40 : Node* node = command.get< Node* >();
1275 40 : const uint32_t requestID = command.get< uint32_t >();
1276 :
1277 40 : lunchbox::ScopedFastWrite mutex( _objects );
1278 65 : for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
1279 : {
1280 25 : const Objects& objects = i->second;
1281 50 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1282 25 : (*j)->removeSlaves( node );
1283 : }
1284 :
1285 40 : if( requestID != LB_UNDEFINED_UINT32 )
1286 7 : _localNode->serveRequest( requestID );
1287 : else
1288 33 : node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
1289 :
1290 40 : return true;
1291 : }
1292 :
1293 4 : bool ObjectStore::_cmdPush( ICommand& command )
1294 : {
1295 4 : LB_TS_THREAD( _commandThread );
1296 :
1297 4 : const uint128_t& objectID = command.get< uint128_t >();
1298 4 : const uint128_t& groupID = command.get< uint128_t >();
1299 4 : const uint128_t& typeID = command.get< uint128_t >();
1300 :
1301 4 : ObjectDataIStream* is = _pushData.pull( objectID );
1302 :
1303 4 : _localNode->objectPush( groupID, typeID, objectID, *is );
1304 4 : _pushData.recycle( is );
1305 4 : return true;
1306 : }
1307 :
1308 0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
1309 : {
1310 0 : if( !objectStore )
1311 : {
1312 0 : os << "NULL objectStore";
1313 0 : return os;
1314 : }
1315 :
1316 0 : os << "objectStore (" << (void*)objectStore << ")";
1317 :
1318 0 : return os;
1319 : }
1320 60 : }
|