Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "objectStore.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "instanceCache.h"
28 : #include "log.h"
29 : #include "masterCMCommand.h"
30 : #include "nodeCommand.h"
31 : #include "oCommand.h"
32 : #include "objectCM.h"
33 : #include "objectCommand.h"
34 : #include "objectDataICommand.h"
35 : #include "objectDataIStream.h"
36 :
37 : #include <lunchbox/futureFunction.h>
38 : #include <lunchbox/scopedMutex.h>
39 :
40 : #include <boost/bind.hpp>
41 :
42 : #include <limits>
43 :
44 : //#define DEBUG_DISPATCH
45 : #ifdef DEBUG_DISPATCH
46 : # include <set>
47 : #endif
48 :
49 : namespace co
50 : {
51 : typedef CommandFunc< ObjectStore > CmdFunc;
52 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
53 :
54 51 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
55 : : _localNode( localNode )
56 : , _instanceIDs( -0x7FFFFFFF )
57 : , _instanceCache( new InstanceCache( Global::getIAttribute(
58 102 : Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
59 153 : , _counters( counters )
60 : {
61 51 : LBASSERT( localNode );
62 51 : CommandQueue* queue = localNode->getCommandThreadQueue();
63 :
64 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
65 51 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
66 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
67 51 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
68 : localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
69 51 : CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
70 : localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
71 51 : CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
72 : localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
73 51 : CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
74 : localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
75 51 : CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
76 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
77 51 : CmdFunc( this, &ObjectStore::_cmdMap ), queue );
78 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
79 51 : CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
80 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
81 51 : CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
82 : localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
83 51 : CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
84 : localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
85 51 : CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
86 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
87 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
88 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
89 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
90 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
91 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
92 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
93 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
94 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
95 51 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
96 : localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
97 51 : CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
98 : localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
99 51 : CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
100 : localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
101 51 : CmdFunc( this, &ObjectStore::_cmdPush ), queue );
102 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
103 51 : CmdFunc( this, &ObjectStore::_cmdSync ), queue );
104 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
105 51 : CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
106 51 : }
107 :
108 147 : ObjectStore::~ObjectStore()
109 : {
110 49 : LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
111 :
112 : #ifndef NDEBUG
113 49 : if( !_objects->empty( ))
114 : {
115 0 : LBWARN << _objects->size() << " attached objects in destructor"
116 0 : << std::endl;
117 :
118 0 : for( ObjectsHash::const_iterator i = _objects->begin();
119 0 : i != _objects->end(); ++i )
120 : {
121 0 : const Objects& objects = i->second;
122 0 : LBWARN << " " << objects.size() << " objects with id "
123 0 : << i->first << std::endl;
124 :
125 0 : for( Objects::const_iterator j = objects.begin();
126 0 : j != objects.end(); ++j )
127 : {
128 0 : const Object* object = *j;
129 0 : LBINFO << " object type " << lunchbox::className( object )
130 0 : << std::endl;
131 : }
132 : }
133 : }
134 : //LBASSERT( _objects->empty( ))
135 : #endif
136 49 : clear();
137 49 : delete _instanceCache;
138 49 : _instanceCache = 0;
139 98 : }
140 :
141 94 : void ObjectStore::clear( )
142 : {
143 94 : LBASSERT( _objects->empty( ));
144 94 : expireInstanceData( 0 );
145 94 : LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
146 :
147 94 : _objects->clear();
148 94 : _sendQueue.clear();
149 94 : }
150 :
151 0 : void ObjectStore::disableInstanceCache()
152 : {
153 0 : LBASSERT( _localNode->isClosed( ));
154 0 : delete _instanceCache;
155 0 : _instanceCache = 0;
156 0 : }
157 :
158 94 : void ObjectStore::expireInstanceData( const int64_t age )
159 : {
160 94 : if( _instanceCache )
161 94 : _instanceCache->expire( age );
162 94 : }
163 :
164 110 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
165 : {
166 110 : if( _instanceCache )
167 110 : _instanceCache->remove( nodeID );
168 110 : }
169 :
170 0 : void ObjectStore::enableSendOnRegister()
171 : {
172 0 : ++_sendOnRegister;
173 0 : }
174 :
175 0 : void ObjectStore::disableSendOnRegister()
176 : {
177 0 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
178 : {
179 : lunchbox::Request< void > request =
180 0 : _localNode->registerRequest< void >();
181 0 : _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
182 : }
183 : else // OPT
184 0 : --_sendOnRegister;
185 0 : }
186 :
187 : //---------------------------------------------------------------------------
188 : // identifier master node mapping
189 : //---------------------------------------------------------------------------
190 39 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
191 : {
192 39 : LB_TS_NOT_THREAD( _commandThread );
193 :
194 : // OPT: look up locally first?
195 39 : Nodes nodes;
196 39 : _localNode->getNodes( nodes );
197 :
198 : // OPT: send to multiple nodes at once?
199 39 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
200 : {
201 39 : NodePtr node = *i;
202 : lunchbox::Request< NodeID > request =
203 39 : _localNode->registerRequest< NodeID >();
204 :
205 78 : LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
206 117 : << " req " << request.getID() << std::endl;
207 39 : node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
208 :
209 : try
210 : {
211 39 : const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
212 :
213 39 : if( masterNodeID != 0 )
214 : {
215 39 : LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
216 39 : << masterNodeID << std::endl;
217 39 : return masterNodeID;
218 : }
219 : }
220 0 : catch( const lunchbox::FutureTimeout& )
221 : {
222 0 : _localNode->unregisterRequest( request.getID( ));
223 0 : throw;
224 : }
225 0 : }
226 :
227 0 : return NodeID();
228 : }
229 :
230 : //---------------------------------------------------------------------------
231 : // object mapping
232 : //---------------------------------------------------------------------------
233 20 : void ObjectStore::attach( Object* object, const uint128_t& id,
234 : const uint32_t instanceID )
235 : {
236 20 : LBASSERT( object );
237 20 : LB_TS_NOT_THREAD( _receiverThread );
238 :
239 : lunchbox::Request< void > request =
240 20 : _localNode->registerRequest< void >( object );
241 20 : _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
242 20 : }
243 :
244 : namespace
245 : {
246 61 : uint32_t _genNextID( lunchbox::a_int32_t& val )
247 : {
248 : uint32_t result;
249 61 : do
250 : {
251 61 : const long id = ++val;
252 : result = static_cast< uint32_t >(
253 61 : static_cast< int64_t >( id ) + 0x7FFFFFFFu );
254 : }
255 : while( result > CO_INSTANCE_MAX );
256 :
257 61 : return result;
258 : }
259 : }
260 :
261 61 : void ObjectStore::_attach( Object* object, const uint128_t& id,
262 : const uint32_t inInstanceID )
263 : {
264 61 : LBASSERT( object );
265 61 : LB_TS_THREAD( _receiverThread );
266 :
267 61 : uint32_t instanceID = inInstanceID;
268 61 : if( inInstanceID == CO_INSTANCE_INVALID )
269 20 : instanceID = _genNextID( _instanceIDs );
270 :
271 61 : object->attach( id, instanceID );
272 :
273 : {
274 61 : lunchbox::ScopedFastWrite mutex( _objects );
275 61 : Objects& objects = _objects.data[ id ];
276 61 : LBASSERTINFO( !object->isMaster() || objects.empty(),
277 : "Attaching master " << *object << ", " <<
278 : objects.size() << " attached objects with same ID, " <<
279 : "first is " << ( objects[0]->isMaster() ? "master " :
280 : "slave " ) << *objects[0] );
281 61 : objects.push_back( object );
282 : }
283 :
284 61 : _localNode->flushCommands(); // redispatch pending commands
285 :
286 61 : LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
287 61 : << static_cast< void* >( object ) << std::endl;
288 61 : }
289 :
290 28 : void ObjectStore::detach( Object* object )
291 : {
292 28 : LBASSERT( object );
293 28 : LB_TS_NOT_THREAD( _receiverThread );
294 :
295 28 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
296 : _localNode->send( CMD_NODE_DETACH_OBJECT )
297 28 : << object->getID() << object->getInstanceID() << request;
298 28 : }
299 :
300 0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
301 : {
302 0 : LBASSERT( newObject );
303 0 : LBASSERT( oldObject );
304 0 : LBASSERT( oldObject->isMaster() );
305 0 : LB_TS_THREAD( _receiverThread );
306 :
307 0 : if( !oldObject->isAttached() )
308 0 : return;
309 :
310 0 : LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
311 0 : << std::endl;
312 0 : const uint128_t& id = oldObject->getID();
313 :
314 0 : lunchbox::ScopedFastWrite mutex( _objects );
315 0 : ObjectsHash::iterator i = _objects->find( id );
316 0 : LBASSERT( i != _objects->end( ));
317 0 : if( i == _objects->end( ))
318 0 : return;
319 :
320 0 : Objects& objects = i->second;
321 0 : Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
322 0 : LBASSERT( j != objects.end( ));
323 0 : if( j == objects.end( ))
324 0 : return;
325 :
326 0 : newObject->transfer( oldObject );
327 0 : *j = newObject;
328 : }
329 :
330 59 : void ObjectStore::_detach( Object* object )
331 : {
332 : // check also _cmdUnmapObject when modifying!
333 59 : LBASSERT( object );
334 59 : LB_TS_THREAD( _receiverThread );
335 :
336 59 : if( !object->isAttached() )
337 0 : return;
338 :
339 59 : const uint128_t& id = object->getID();
340 :
341 59 : LBASSERT( _objects->find( id ) != _objects->end( ));
342 59 : LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
343 :
344 59 : Objects& objects = _objects.data[ id ];
345 59 : Objects::iterator i = find( objects.begin(),objects.end(), object );
346 59 : LBASSERT( i != objects.end( ));
347 :
348 : {
349 59 : lunchbox::ScopedFastWrite mutex( _objects );
350 59 : objects.erase( i );
351 59 : if( objects.empty( ))
352 58 : _objects->erase( id );
353 : }
354 :
355 59 : LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
356 59 : object->detach();
357 59 : return;
358 : }
359 :
360 41 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
361 : const uint128_t& version, NodePtr master )
362 : {
363 41 : LB_TS_NOT_THREAD( _receiverThread );
364 41 : LBLOG( LOG_OBJECTS )
365 41 : << "Mapping " << lunchbox::className( object ) << " to id " << id
366 123 : << " version " << version << std::endl;
367 41 : LBASSERT( object );
368 41 : LBASSERTINFO( id.isUUID(), id );
369 :
370 41 : if( !master )
371 39 : master = _localNode->connectObjectMaster( id );
372 :
373 41 : if( !master || !master->isReachable( ))
374 : {
375 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
376 0 : << std::endl;
377 0 : return LB_UNDEFINED_UINT32;
378 : }
379 :
380 41 : if( !object || !id.isUUID( ))
381 : {
382 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
383 0 : return LB_UNDEFINED_UINT32;
384 : }
385 :
386 41 : const bool isAttached = object->isAttached();
387 41 : const bool isMaster = object->isMaster();
388 41 : LBASSERT( !isAttached );
389 41 : LBASSERT( !isMaster ) ;
390 41 : if( isAttached || isMaster )
391 : {
392 0 : LBWARN << "Invalid object state: attached " << isAttached << " master "
393 0 : << isMaster << std::endl;
394 0 : return LB_UNDEFINED_UINT32;
395 : }
396 :
397 41 : const uint32_t request = _localNode->registerRequest( object );
398 41 : uint128_t minCachedVersion = VERSION_HEAD;
399 41 : uint128_t maxCachedVersion = VERSION_NONE;
400 41 : uint32_t masterInstanceID = 0;
401 : const bool useCache = _checkInstanceCache( id, minCachedVersion,
402 : maxCachedVersion,
403 41 : masterInstanceID );
404 41 : object->notifyAttach();
405 : master->send( CMD_NODE_MAP_OBJECT )
406 82 : << version << minCachedVersion << maxCachedVersion << id
407 123 : << object->getMaxVersions() << request << _genNextID( _instanceIDs )
408 41 : << masterInstanceID << useCache;
409 41 : return request;
410 : }
411 :
412 45 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
413 : uint128_t& to, uint32_t& instanceID )
414 : {
415 45 : if( !_instanceCache )
416 0 : return false;
417 :
418 45 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
419 45 : if( cached == InstanceCache::Data::NONE )
420 45 : return false;
421 :
422 0 : const ObjectDataIStreamDeque& versions = cached.versions;
423 0 : LBASSERT( !cached.versions.empty( ));
424 0 : instanceID = cached.masterInstanceID;
425 0 : from = versions.front()->getVersion();
426 0 : to = versions.back()->getVersion();
427 0 : LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
428 0 : << std::endl;
429 0 : return true;
430 : }
431 :
432 41 : bool ObjectStore::mapSync( const uint32_t requestID )
433 : {
434 41 : if( requestID == LB_UNDEFINED_UINT32 )
435 0 : return false;
436 :
437 41 : void* data = _localNode->getRequestData( requestID );
438 41 : if( data == 0 )
439 0 : return false;
440 :
441 41 : Object* object = LBSAFECAST( Object*, data );
442 41 : uint128_t version = VERSION_NONE;
443 41 : _localNode->waitRequest( requestID, version );
444 :
445 41 : const bool mapped = object->isAttached();
446 41 : if( mapped )
447 41 : object->applyMapData( version ); // apply initial instance data
448 :
449 41 : object->notifyAttached();
450 41 : LBLOG( LOG_OBJECTS )
451 82 : << "Mapped " << lunchbox::className( object ) << std::endl;
452 41 : return mapped;
453 : }
454 :
455 :
456 4 : f_bool_t ObjectStore::sync( Object* object, NodePtr master, const uint128_t& id,
457 : const uint32_t instanceID )
458 : {
459 4 : const uint32_t request = _startSync( object, master, id, instanceID );
460 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
461 4 : this, request, object );
462 4 : return f_bool_t( new FuturebImpl( func ));
463 : }
464 :
465 4 : uint32_t ObjectStore::_startSync( Object* object, NodePtr master,
466 : const uint128_t& id,
467 : const uint32_t instanceID )
468 : {
469 4 : LB_TS_NOT_THREAD( _receiverThread );
470 4 : LBLOG( LOG_OBJECTS )
471 4 : << "Syncing " << lunchbox::className( object ) << " with id " << id
472 12 : << std::endl;
473 4 : LBASSERT( object );
474 4 : LBASSERTINFO( id.isUUID(), id );
475 :
476 4 : if( !object || !id.isUUID( ))
477 : {
478 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
479 0 : return LB_UNDEFINED_UINT32;
480 : }
481 :
482 4 : if( !master )
483 0 : master = _localNode->connectObjectMaster( id );
484 :
485 4 : if( !master || !master->isReachable( ))
486 : {
487 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
488 0 : << std::endl;
489 0 : return LB_UNDEFINED_UINT32;
490 : }
491 :
492 : const uint32_t request =
493 4 : _localNode->registerRequest( new ObjectDataIStream );
494 4 : uint128_t minCachedVersion = VERSION_HEAD;
495 4 : uint128_t maxCachedVersion = VERSION_NONE;
496 4 : uint32_t cacheInstanceID = 0;
497 :
498 : bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
499 4 : cacheInstanceID );
500 4 : if( useCache )
501 : {
502 0 : switch( instanceID )
503 : {
504 : case CO_INSTANCE_ALL:
505 0 : break;
506 : default:
507 0 : if( instanceID == cacheInstanceID )
508 0 : break;
509 :
510 0 : useCache = false;
511 0 : LBCHECK( _instanceCache->release( id, 1 ));
512 0 : break;
513 : }
514 : }
515 :
516 : // Use stream expected by MasterCMCommand
517 : master->send( CMD_NODE_SYNC_OBJECT )
518 8 : << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
519 12 : << uint64_t(0) /* maxVersions */ << request << instanceID
520 4 : << cacheInstanceID << useCache;
521 4 : return request;
522 : }
523 :
524 4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
525 : {
526 4 : if( requestID == LB_UNDEFINED_UINT32 )
527 0 : return false;
528 :
529 4 : void* data = _localNode->getRequestData( requestID );
530 4 : if( data == 0 )
531 0 : return false;
532 :
533 4 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
534 :
535 4 : bool ok = false;
536 4 : _localNode->waitRequest( requestID, ok );
537 :
538 4 : if( !ok )
539 : {
540 0 : LBWARN << "Object synchronization failed" << std::endl;
541 0 : delete is;
542 0 : return false;
543 : }
544 :
545 4 : is->waitReady();
546 4 : object->applyInstanceData( *is );
547 8 : LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
548 12 : << std::endl;
549 4 : delete is;
550 4 : return true;
551 : }
552 :
553 41 : void ObjectStore::unmap( Object* object )
554 : {
555 41 : LBASSERT( object );
556 41 : if( !object->isAttached( )) // not registered
557 34 : return;
558 :
559 40 : const uint128_t& id = object->getID();
560 :
561 40 : LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
562 :
563 40 : object->notifyDetach();
564 :
565 : // send unsubscribe to master, master will send detach command.
566 40 : LBASSERT( !object->isMaster( ));
567 40 : LB_TS_NOT_THREAD( _commandThread );
568 :
569 40 : const uint32_t masterInstanceID = object->getMasterInstanceID();
570 40 : if( masterInstanceID != CO_INSTANCE_INVALID )
571 : {
572 32 : NodePtr master = object->getMasterNode();
573 32 : LBASSERT( master )
574 :
575 32 : if( master && master->isReachable( ))
576 : {
577 : lunchbox::Request< void > request =
578 32 : _localNode->registerRequest< void >();
579 : master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
580 32 : << id << request << masterInstanceID << object->getInstanceID();
581 32 : request.wait();
582 32 : object->notifyDetached();
583 32 : return;
584 : }
585 0 : LBERROR << "Master node for object id " << id << " not connected"
586 0 : << std::endl;
587 : }
588 :
589 : // no unsubscribe sent: Detach directly
590 8 : detach( object );
591 8 : object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
592 8 : object->notifyDetached();
593 : }
594 :
595 20 : bool ObjectStore::register_( Object* object )
596 : {
597 20 : LBASSERT( object );
598 20 : LBASSERT( !object->isAttached( ));
599 :
600 20 : const uint128_t& id = object->getID( );
601 20 : LBASSERTINFO( id.isUUID(), id );
602 :
603 20 : object->notifyAttach();
604 20 : object->setupChangeManager( object->getChangeType(), true, _localNode,
605 40 : CO_INSTANCE_INVALID );
606 20 : attach( object, id, CO_INSTANCE_INVALID );
607 :
608 20 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
609 20 : _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
610 :
611 20 : object->notifyAttached();
612 :
613 20 : LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
614 20 : return true;
615 : }
616 :
617 20 : void ObjectStore::deregister( Object* object )
618 : {
619 20 : LBASSERT( object );
620 20 : if( !object->isAttached( )) // not registered
621 20 : return;
622 :
623 20 : LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
624 20 : LBASSERT( object->isMaster( ));
625 :
626 20 : object->notifyDetach();
627 :
628 20 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
629 : {
630 : // remove from send queue
631 : lunchbox::Request< void > request =
632 20 : _localNode->registerRequest< void >();
633 20 : _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
634 : }
635 :
636 20 : const uint128_t id = object->getID();
637 20 : detach( object );
638 20 : object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
639 20 : if( _instanceCache )
640 20 : _instanceCache->erase( id );
641 20 : object->notifyDetached();
642 : }
643 :
644 51172 : bool ObjectStore::notifyCommandThreadIdle()
645 : {
646 51172 : LB_TS_THREAD( _commandThread );
647 51173 : if( _sendQueue.empty( ))
648 51172 : return false;
649 :
650 0 : LBASSERT( _sendOnRegister > 0 );
651 0 : SendQueueItem& item = _sendQueue.front();
652 :
653 0 : if( item.age > _localNode->getTime64( ))
654 : {
655 0 : Nodes nodes;
656 0 : _localNode->getNodes( nodes, false );
657 0 : if( nodes.empty( ))
658 : {
659 0 : lunchbox::Thread::yield();
660 0 : return !_sendQueue.empty();
661 : }
662 :
663 0 : item.object->sendInstanceData( nodes );
664 : }
665 0 : _sendQueue.pop_front();
666 0 : return !_sendQueue.empty();
667 : }
668 :
669 7 : void ObjectStore::removeNode( NodePtr node )
670 : {
671 7 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
672 7 : _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
673 7 : }
674 :
675 : //===========================================================================
676 : // ICommand handling
677 : //===========================================================================
678 548 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
679 : {
680 548 : LB_TS_THREAD( _receiverThread );
681 548 : ObjectICommand command( cmd );
682 548 : const uint128_t& id = command.getObjectID();
683 548 : const uint32_t instanceID = command.getInstanceID();
684 :
685 548 : ObjectsHash::const_iterator i = _objects->find( id );
686 :
687 548 : if( i == _objects->end( ))
688 : // When the instance ID is set to none, we only care about the command
689 : // when we have an object of the given ID (multicast)
690 0 : return ( instanceID == CO_INSTANCE_NONE );
691 :
692 548 : const Objects& objects = i->second;
693 548 : LBASSERTINFO( !objects.empty(), command );
694 :
695 548 : if( instanceID <= CO_INSTANCE_MAX )
696 : {
697 59 : for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
698 : {
699 59 : Object* object = *j;
700 59 : if( instanceID == object->getInstanceID( ))
701 : {
702 53 : LBCHECK( object->dispatchCommand( command ));
703 53 : return true;
704 : }
705 : }
706 0 : LBERROR << "Can't find object instance " << instanceID << " for "
707 0 : << command << std::endl;
708 0 : LBUNREACHABLE;
709 0 : return false;
710 : }
711 :
712 495 : Objects::const_iterator j = objects.begin();
713 495 : Object* object = *j;
714 495 : LBCHECK( object->dispatchCommand( command ));
715 :
716 495 : for( ++j; j != objects.end(); ++j )
717 : {
718 0 : object = *j;
719 0 : LBCHECK( object->dispatchCommand( command ));
720 : }
721 495 : return true;
722 : }
723 :
724 39 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
725 : {
726 39 : LB_TS_THREAD( _commandThread );
727 :
728 39 : const uint128_t& id = command.get< uint128_t >();
729 39 : const uint32_t requestID = command.get< uint32_t >();
730 39 : LBASSERT( id.isUUID( ));
731 :
732 39 : NodeID masterNodeID;
733 : {
734 39 : lunchbox::ScopedFastRead mutex( _objects );
735 39 : ObjectsHashCIter i = _objects->find( id );
736 :
737 39 : if( i != _objects->end( ))
738 : {
739 39 : const Objects& objects = i->second;
740 39 : LBASSERT( !objects.empty( ));
741 :
742 39 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
743 : {
744 39 : Object* object = *j;
745 39 : if( object->isMaster( ))
746 39 : masterNodeID = _localNode->getNodeID();
747 : else
748 : {
749 0 : NodePtr master = object->getMasterNode();
750 0 : if( master.isValid( ))
751 0 : masterNodeID = master->getNodeID();
752 : }
753 39 : if( masterNodeID != 0 )
754 39 : break;
755 : }
756 39 : }
757 : }
758 :
759 39 : LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
760 39 : << " req " << requestID << std::endl;
761 : command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
762 39 : << masterNodeID << requestID;
763 39 : return true;
764 : }
765 :
766 39 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
767 : {
768 39 : const NodeID& masterNodeID = command.get< NodeID >();
769 39 : const uint32_t requestID = command.get< uint32_t >();
770 39 : _localNode->serveRequest( requestID, masterNodeID );
771 39 : return true;
772 : }
773 :
774 20 : bool ObjectStore::_cmdAttach( ICommand& command )
775 : {
776 20 : LB_TS_THREAD( _receiverThread );
777 20 : LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
778 :
779 20 : const uint128_t& objectID = command.get< uint128_t >();
780 20 : const uint32_t instanceID = command.get< uint32_t >();
781 20 : const uint32_t requestID = command.get< uint32_t >();
782 :
783 : Object* object = static_cast< Object* >( _localNode->getRequestData(
784 20 : requestID ));
785 20 : _attach( object, objectID, instanceID );
786 20 : _localNode->serveRequest( requestID );
787 20 : return true;
788 : }
789 :
790 60 : bool ObjectStore::_cmdDetach( ICommand& command )
791 : {
792 60 : LB_TS_THREAD( _receiverThread );
793 60 : LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
794 :
795 60 : const uint128_t& objectID = command.get< uint128_t >();
796 60 : const uint32_t instanceID = command.get< uint32_t >();
797 60 : const uint32_t requestID = command.get< uint32_t >();
798 :
799 60 : ObjectsHash::const_iterator i = _objects->find( objectID );
800 60 : if( i != _objects->end( ))
801 : {
802 59 : const Objects& objects = i->second;
803 :
804 180 : for( Objects::const_iterator j = objects.begin();
805 120 : j != objects.end(); ++j )
806 : {
807 60 : Object* object = *j;
808 60 : if( object->getInstanceID() == instanceID )
809 : {
810 59 : _detach( object );
811 59 : break;
812 : }
813 : }
814 : }
815 :
816 60 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
817 60 : _localNode->serveRequest( requestID );
818 60 : return true;
819 : }
820 :
821 20 : bool ObjectStore::_cmdRegister( ICommand& command )
822 : {
823 20 : LB_TS_THREAD( _commandThread );
824 20 : if( _sendOnRegister <= 0 )
825 20 : return true;
826 :
827 0 : LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
828 :
829 0 : Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
830 :
831 : const int32_t age = Global::getIAttribute(
832 0 : Global::IATTR_NODE_SEND_QUEUE_AGE );
833 0 : SendQueueItem item;
834 0 : item.age = age ? age + _localNode->getTime64() :
835 0 : std::numeric_limits< int64_t >::max();
836 0 : item.object = object;
837 0 : _sendQueue.push_back( item );
838 :
839 : const uint32_t size = Global::getIAttribute(
840 0 : Global::IATTR_NODE_SEND_QUEUE_SIZE );
841 0 : while( _sendQueue.size() > size )
842 0 : _sendQueue.pop_front();
843 :
844 0 : return true;
845 : }
846 :
847 20 : bool ObjectStore::_cmdDeregister( ICommand& command )
848 : {
849 20 : LB_TS_THREAD( _commandThread );
850 20 : LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
851 :
852 20 : const uint32_t requestID = command.get< uint32_t >();
853 :
854 20 : const void* object = _localNode->getRequestData( requestID );
855 :
856 20 : for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
857 : {
858 0 : if( i->object == object )
859 : {
860 0 : _sendQueue.erase( i );
861 0 : break;
862 : }
863 : }
864 :
865 20 : _localNode->serveRequest( requestID );
866 20 : return true;
867 : }
868 :
869 41 : bool ObjectStore::_cmdMap( ICommand& cmd )
870 : {
871 41 : LB_TS_THREAD( _commandThread );
872 :
873 41 : MasterCMCommand command( cmd );
874 41 : const uint128_t& id = command.getObjectID();
875 :
876 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
877 0 : << command.getInstanceID() << " req "
878 41 : << command.getRequestID() << std::endl;
879 :
880 82 : ObjectCMPtr masterCM;
881 : {
882 41 : lunchbox::ScopedFastRead mutex( _objects );
883 41 : ObjectsHash::const_iterator i = _objects->find( id );
884 41 : if( i != _objects->end( ))
885 : {
886 41 : const Objects& objects = i->second;
887 :
888 41 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
889 : {
890 41 : Object* object = *j;
891 41 : if( object->isMaster( ))
892 : {
893 41 : masterCM = object->_getChangeManager();
894 41 : break;
895 : }
896 : }
897 41 : }
898 : }
899 :
900 41 : if( !masterCM || !masterCM->addSlave( command ))
901 : {
902 0 : LBWARN << "Can't find master object to map " << id << std::endl;
903 0 : NodePtr node = command.getNode();
904 : node->send( CMD_NODE_MAP_OBJECT_REPLY )
905 0 : << node->getNodeID() << id << command.getRequestedVersion()
906 0 : << command.getRequestID() << false << command.useCache() << false;
907 : }
908 :
909 41 : ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
910 82 : return true;
911 : }
912 :
913 41 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
914 : {
915 41 : LB_TS_THREAD( _receiverThread );
916 :
917 41 : const uint128_t& nodeID = command.get< uint128_t >();
918 41 : const uint128_t& objectID = command.get< uint128_t >();
919 41 : const uint32_t requestID = command.get< uint32_t >();
920 41 : const uint32_t instanceID = command.get< uint32_t >();
921 41 : const Object::ChangeType changeType = command.get< Object::ChangeType >();
922 41 : const uint32_t masterInstanceID = command.get< uint32_t >();
923 :
924 : // Map success commands are potentially multicasted (see above)
925 : // verify that we are the intended receiver
926 41 : if( nodeID != _localNode->getNodeID( ))
927 0 : return true;
928 :
929 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
930 0 : << " id " << objectID << "." << instanceID
931 41 : << " req " << requestID << std::endl;
932 :
933 : // set up change manager and attach object to dispatch table
934 : Object* object = static_cast< Object* >(
935 41 : _localNode->getRequestData( requestID ));
936 41 : LBASSERT( object );
937 41 : LBASSERT( !object->isMaster( ));
938 :
939 : object->setupChangeManager( Object::ChangeType( changeType ), false,
940 41 : _localNode, masterInstanceID );
941 41 : _attach( object, objectID, instanceID );
942 41 : return true;
943 : }
944 :
945 41 : bool ObjectStore::_cmdMapReply( ICommand& command )
946 : {
947 41 : LB_TS_THREAD( _receiverThread );
948 :
949 : // Map reply commands are potentially multicasted (see above)
950 : // verify that we are the intended receiver
951 41 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
952 0 : return true;
953 :
954 41 : const uint128_t& objectID = command.get< uint128_t >();
955 41 : const uint128_t& version = command.get< uint128_t >();
956 41 : const uint32_t requestID = command.get< uint32_t >();
957 41 : const bool result = command.get< bool >();
958 41 : const bool releaseCache = command.get< bool >();
959 41 : const bool useCache = command.get< bool >();
960 :
961 41 : LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
962 41 : << objectID << " req " << requestID << std::endl;
963 :
964 41 : LBASSERT( _localNode->getRequestData( requestID ));
965 :
966 41 : if( result )
967 : {
968 : Object* object = static_cast<Object*>(
969 41 : _localNode->getRequestData( requestID ));
970 41 : LBASSERT( object );
971 41 : LBASSERT( !object->isMaster( ));
972 :
973 41 : object->setMasterNode( command.getNode( ));
974 :
975 41 : if( useCache )
976 : {
977 0 : LBASSERT( releaseCache );
978 0 : LBASSERT( _instanceCache );
979 :
980 0 : const uint128_t& id = objectID;
981 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
982 0 : LBASSERT( cached != InstanceCache::Data::NONE );
983 0 : LBASSERT( !cached.versions.empty( ));
984 :
985 0 : object->addInstanceDatas( cached.versions, version );
986 0 : LBCHECK( _instanceCache->release( id, 2 ));
987 : }
988 41 : else if( releaseCache )
989 : {
990 0 : LBCHECK( _instanceCache->release( objectID, 1 ));
991 : }
992 : }
993 : else
994 : {
995 0 : if( releaseCache )
996 0 : _instanceCache->release( objectID, 1 );
997 :
998 0 : LBWARN << "Could not map object " << objectID << std::endl;
999 : }
1000 :
1001 41 : _localNode->serveRequest( requestID, version );
1002 41 : return true;
1003 : }
1004 :
1005 32 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
1006 : {
1007 32 : LB_TS_THREAD( _commandThread );
1008 32 : LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object " << command << std::endl;
1009 :
1010 32 : const uint128_t& id = command.get< uint128_t >();
1011 32 : const uint32_t requestID = command.get< uint32_t >();
1012 32 : const uint32_t masterInstanceID = command.get< uint32_t >();
1013 32 : const uint32_t slaveInstanceID = command.get< uint32_t >();
1014 :
1015 32 : NodePtr node = command.getNode();
1016 :
1017 : {
1018 32 : lunchbox::ScopedFastWrite mutex( _objects );
1019 32 : ObjectsHash::const_iterator i = _objects->find( id );
1020 32 : if( i != _objects->end( ))
1021 : {
1022 32 : const Objects& objects = i->second;
1023 32 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1024 : {
1025 32 : Object* object = *j;
1026 64 : if( object->isMaster() &&
1027 32 : object->getInstanceID() == masterInstanceID )
1028 : {
1029 32 : object->removeSlave( node, slaveInstanceID );
1030 32 : break;
1031 : }
1032 : }
1033 32 : }
1034 : }
1035 :
1036 32 : node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
1037 32 : return true;
1038 : }
1039 :
1040 2 : bool ObjectStore::_cmdUnmap( ICommand& command )
1041 : {
1042 2 : LB_TS_THREAD( _receiverThread );
1043 2 : LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
1044 :
1045 2 : const uint128_t& objectID = command.get< uint128_t >();
1046 :
1047 2 : if( _instanceCache )
1048 2 : _instanceCache->erase( objectID );
1049 :
1050 2 : ObjectsHash::iterator i = _objects->find( objectID );
1051 2 : if( i == _objects->end( )) // nothing to do
1052 0 : return true;
1053 :
1054 2 : const Objects objects = i->second;
1055 : {
1056 2 : lunchbox::ScopedFastWrite mutex( _objects );
1057 2 : _objects->erase( i );
1058 : }
1059 :
1060 4 : for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
1061 : {
1062 2 : Object* object = *j;
1063 2 : object->detach();
1064 : }
1065 :
1066 2 : return true;
1067 : }
1068 :
1069 4 : bool ObjectStore::_cmdSync( ICommand& cmd )
1070 : {
1071 4 : LB_TS_THREAD( _commandThread );
1072 4 : MasterCMCommand command( cmd );
1073 4 : const uint128_t& id = command.getObjectID();
1074 4 : LBINFO << command.getNode() << std::endl;
1075 :
1076 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
1077 0 : << command.getInstanceID() << " req "
1078 4 : << command.getRequestID() << std::endl;
1079 :
1080 4 : const uint32_t cacheInstanceID = command.getMasterInstanceID();
1081 8 : ObjectCMPtr cm;
1082 : {
1083 4 : lunchbox::ScopedFastRead mutex( _objects );
1084 4 : ObjectsHash::const_iterator i = _objects->find( id );
1085 4 : if( i != _objects->end( ))
1086 : {
1087 4 : const Objects& objects = i->second;
1088 :
1089 8 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1090 : {
1091 4 : Object* object = *j;
1092 4 : if( command.getInstanceID() == object->getInstanceID( ))
1093 : {
1094 0 : cm = object->_getChangeManager();
1095 0 : LBASSERT( cm );
1096 0 : break;
1097 : }
1098 :
1099 4 : if( command.getInstanceID() != CO_INSTANCE_ALL )
1100 0 : continue;
1101 :
1102 4 : cm = object->_getChangeManager();
1103 4 : LBASSERT( cm );
1104 4 : if( cacheInstanceID == object->getInstanceID( ))
1105 0 : break;
1106 : }
1107 4 : if( !cm )
1108 0 : LBWARN << "Can't find object to sync " << id << "."
1109 0 : << command.getInstanceID() << " in " << objects.size()
1110 0 : << " instances" << std::endl;
1111 : }
1112 4 : if( !cm )
1113 0 : LBWARN << "Can't find object to sync " << id
1114 0 : << ", no object with identifier" << std::endl;
1115 : }
1116 4 : if( !cm || !cm->sendSync( command ))
1117 : {
1118 0 : NodePtr node = command.getNode();
1119 : node->send( CMD_NODE_SYNC_OBJECT_REPLY )
1120 0 : << node->getNodeID() << id << command.getRequestID()
1121 0 : << false << command.useCache() << false;
1122 : }
1123 8 : return true;
1124 : }
1125 :
1126 4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
1127 : {
1128 4 : LB_TS_THREAD( _receiverThread );
1129 :
1130 : // Sync reply commands are potentially multicasted (see above)
1131 : // verify that we are the intended receiver
1132 4 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
1133 0 : return true;
1134 :
1135 4 : const NodeID& id = command.get< NodeID >();
1136 4 : const uint32_t requestID = command.get< uint32_t >();
1137 4 : const bool result = command.get< bool >();
1138 4 : const bool releaseCache = command.get< bool >();
1139 4 : const bool useCache = command.get< bool >();
1140 4 : void* const data = _localNode->getRequestData( requestID );
1141 4 : ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
1142 :
1143 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
1144 4 : << requestID << std::endl;
1145 4 : if( result )
1146 : {
1147 4 : if( useCache )
1148 : {
1149 0 : LBASSERT( releaseCache );
1150 0 : LBASSERT( _instanceCache );
1151 :
1152 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
1153 0 : LBASSERT( cached != InstanceCache::Data::NONE );
1154 0 : LBASSERT( !cached.versions.empty( ));
1155 :
1156 0 : *is = *cached.versions.back();
1157 0 : LBCHECK( _instanceCache->release( id, 2 ));
1158 : }
1159 4 : else if( releaseCache )
1160 : {
1161 0 : LBCHECK( _instanceCache->release( id, 1 ));
1162 : }
1163 : }
1164 : else
1165 : {
1166 0 : if( releaseCache )
1167 0 : _instanceCache->release( id, 1 );
1168 :
1169 0 : LBWARN << "Could not sync object " << id << " request " << requestID
1170 0 : << std::endl;
1171 : }
1172 :
1173 4 : _localNode->serveRequest( requestID, result );
1174 4 : return true;
1175 : }
1176 :
1177 54 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
1178 : {
1179 54 : LB_TS_THREAD( _receiverThread );
1180 54 : LBASSERT( _localNode );
1181 :
1182 54 : ObjectDataICommand command( inCommand );
1183 54 : const NodeID& nodeID = command.get< NodeID >();
1184 54 : const uint32_t masterInstanceID = command.get< uint32_t >();
1185 54 : const uint32_t cmd = command.getCommand();
1186 :
1187 54 : LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
1188 54 : << masterInstanceID << " node " << nodeID << std::endl;
1189 :
1190 54 : command.setType( COMMANDTYPE_OBJECT );
1191 54 : command.setCommand( CMD_OBJECT_INSTANCE );
1192 :
1193 54 : const uint128_t& version = command.getVersion();
1194 54 : if( _instanceCache && version.high() == 0 )
1195 : {
1196 54 : const ObjectVersion rev( command.getObjectID(), version );
1197 : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
1198 54 : if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
1199 : #endif
1200 48 : _instanceCache->add( rev, masterInstanceID, command, 0 );
1201 : }
1202 :
1203 54 : switch( cmd )
1204 : {
1205 : case CMD_NODE_OBJECT_INSTANCE:
1206 0 : LBASSERT( nodeID == 0 );
1207 0 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1208 0 : return true;
1209 :
1210 : case CMD_NODE_OBJECT_INSTANCE_MAP:
1211 37 : if( nodeID != _localNode->getNodeID( )) // not for me
1212 0 : return true;
1213 :
1214 37 : LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
1215 37 : return dispatchObjectCommand( command );
1216 :
1217 : case CMD_NODE_OBJECT_INSTANCE_COMMIT:
1218 3 : LBASSERT( nodeID == 0 );
1219 3 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1220 3 : return dispatchObjectCommand( command );
1221 :
1222 : case CMD_NODE_OBJECT_INSTANCE_PUSH:
1223 6 : LBASSERT( nodeID == 0 );
1224 6 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1225 6 : _pushData.addDataCommand( command.getObjectID(), command );
1226 6 : return true;
1227 :
1228 : case CMD_NODE_OBJECT_INSTANCE_SYNC:
1229 : {
1230 8 : if( nodeID != _localNode->getNodeID( )) // not for me
1231 0 : return true;
1232 :
1233 8 : void* data = _localNode->getRequestData( command.getInstanceID( ));
1234 8 : LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
1235 8 : LBASSERTINFO( data, this );
1236 :
1237 8 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
1238 8 : is->addDataCommand( command );
1239 8 : return true;
1240 : }
1241 :
1242 : default:
1243 0 : LBUNREACHABLE;
1244 0 : return false;
1245 54 : }
1246 : }
1247 :
1248 0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
1249 : {
1250 0 : LB_TS_THREAD( _commandThread );
1251 0 : LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
1252 :
1253 0 : if( --_sendOnRegister == 0 )
1254 : {
1255 0 : _sendQueue.clear();
1256 :
1257 0 : Nodes nodes;
1258 0 : _localNode->getNodes( nodes, false );
1259 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
1260 : {
1261 0 : NodePtr node = *i;
1262 0 : ConnectionPtr multicast = node->getConnection( true );
1263 0 : ConnectionPtr connection = node->getConnection( false );
1264 0 : if( multicast )
1265 0 : multicast->finish();
1266 0 : if( connection && connection != multicast )
1267 0 : connection->finish();
1268 0 : }
1269 : }
1270 :
1271 0 : const uint32_t requestID = command.get< uint32_t >();
1272 0 : _localNode->serveRequest( requestID );
1273 0 : return true;
1274 : }
1275 :
1276 40 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
1277 : {
1278 40 : LB_TS_THREAD( _commandThread );
1279 40 : LBLOG( LOG_OBJECTS ) << "Cmd object " << command << std::endl;
1280 :
1281 40 : Node* node = command.get< Node* >();
1282 40 : const uint32_t requestID = command.get< uint32_t >();
1283 :
1284 40 : lunchbox::ScopedFastWrite mutex( _objects );
1285 65 : for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
1286 : {
1287 25 : const Objects& objects = i->second;
1288 50 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1289 25 : (*j)->removeSlaves( node );
1290 : }
1291 :
1292 40 : if( requestID != LB_UNDEFINED_UINT32 )
1293 7 : _localNode->serveRequest( requestID );
1294 : else
1295 33 : node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
1296 :
1297 40 : return true;
1298 : }
1299 :
1300 4 : bool ObjectStore::_cmdPush( ICommand& command )
1301 : {
1302 4 : LB_TS_THREAD( _commandThread );
1303 :
1304 4 : const uint128_t& objectID = command.get< uint128_t >();
1305 4 : const uint128_t& groupID = command.get< uint128_t >();
1306 4 : const uint128_t& typeID = command.get< uint128_t >();
1307 :
1308 4 : ObjectDataIStream* is = _pushData.pull( objectID );
1309 :
1310 4 : _localNode->objectPush( groupID, typeID, objectID, *is );
1311 4 : _pushData.recycle( is );
1312 4 : return true;
1313 : }
1314 :
1315 0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
1316 : {
1317 0 : if( !objectStore )
1318 : {
1319 0 : os << "NULL objectStore";
1320 0 : return os;
1321 : }
1322 :
1323 0 : os << "objectStore (" << (void*)objectStore << ")";
1324 :
1325 0 : return os;
1326 : }
1327 63 : }
|