Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "objectStore.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "instanceCache.h"
28 : #include "log.h"
29 : #include "masterCMCommand.h"
30 : #include "nodeCommand.h"
31 : #include "oCommand.h"
32 : #include "objectCM.h"
33 : #include "objectCommand.h"
34 : #include "objectDataICommand.h"
35 : #include "objectDataIStream.h"
36 :
37 : #include <lunchbox/futureFunction.h>
38 : #include <lunchbox/scopedMutex.h>
39 :
40 : #include <boost/bind.hpp>
41 :
42 : #include <limits>
43 :
44 : //#define DEBUG_DISPATCH
45 : #ifdef DEBUG_DISPATCH
46 : # include <set>
47 : #endif
48 :
49 : namespace co
50 : {
51 : typedef CommandFunc< ObjectStore > CmdFunc;
52 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
53 :
54 55 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
55 : : _localNode( localNode )
56 : , _instanceIDs( -0x7FFFFFFF )
57 : , _instanceCache( new InstanceCache( Global::getIAttribute(
58 110 : Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
59 165 : , _counters( counters )
60 : {
61 55 : LBASSERT( localNode );
62 55 : CommandQueue* queue = localNode->getCommandThreadQueue();
63 :
64 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
65 55 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
66 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
67 55 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
68 : localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
69 55 : CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
70 : localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
71 55 : CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
72 : localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
73 55 : CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
74 : localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
75 55 : CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
76 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
77 55 : CmdFunc( this, &ObjectStore::_cmdMap ), queue );
78 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
79 55 : CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
80 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
81 55 : CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
82 : localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
83 55 : CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
84 : localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
85 55 : CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
86 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
87 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
88 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
89 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
90 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
91 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
92 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
93 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
94 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
95 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
96 : localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
97 55 : CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
98 : localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
99 55 : CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
100 : localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
101 55 : CmdFunc( this, &ObjectStore::_cmdPush ), queue );
102 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
103 55 : CmdFunc( this, &ObjectStore::_cmdSync ), queue );
104 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
105 55 : CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
106 55 : }
107 :
108 159 : ObjectStore::~ObjectStore()
109 : {
110 53 : LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
111 :
112 : #ifndef NDEBUG
113 53 : if( !_objects->empty( ))
114 : {
115 0 : LBWARN << _objects->size() << " attached objects in destructor"
116 0 : << std::endl;
117 :
118 0 : for( ObjectsHash::const_iterator i = _objects->begin();
119 0 : i != _objects->end(); ++i )
120 : {
121 0 : const Objects& objects = i->second;
122 0 : LBWARN << " " << objects.size() << " objects with id "
123 0 : << i->first << std::endl;
124 :
125 0 : for( Objects::const_iterator j = objects.begin();
126 0 : j != objects.end(); ++j )
127 : {
128 0 : const Object* object = *j;
129 0 : LBINFO << " object type " << lunchbox::className( object )
130 0 : << std::endl;
131 : }
132 : }
133 : }
134 : //LBASSERT( _objects->empty( ))
135 : #endif
136 53 : clear();
137 53 : delete _instanceCache;
138 53 : _instanceCache = 0;
139 106 : }
140 :
141 102 : void ObjectStore::clear( )
142 : {
143 102 : LBASSERT( _objects->empty( ));
144 102 : expireInstanceData( 0 );
145 102 : LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
146 :
147 102 : _objects->clear();
148 102 : _sendQueue.clear();
149 102 : }
150 :
151 0 : void ObjectStore::disableInstanceCache()
152 : {
153 0 : LBASSERT( _localNode->isClosed( ));
154 0 : delete _instanceCache;
155 0 : _instanceCache = 0;
156 0 : }
157 :
158 102 : void ObjectStore::expireInstanceData( const int64_t age )
159 : {
160 102 : if( _instanceCache )
161 102 : _instanceCache->expire( age );
162 102 : }
163 :
164 115 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
165 : {
166 115 : if( _instanceCache )
167 115 : _instanceCache->remove( nodeID );
168 115 : }
169 :
170 0 : void ObjectStore::enableSendOnRegister()
171 : {
172 0 : ++_sendOnRegister;
173 0 : }
174 :
175 0 : void ObjectStore::disableSendOnRegister()
176 : {
177 0 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
178 : {
179 : lunchbox::Request< void > request =
180 0 : _localNode->registerRequest< void >();
181 0 : _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
182 : }
183 : else // OPT
184 0 : --_sendOnRegister;
185 0 : }
186 :
187 : //---------------------------------------------------------------------------
188 : // identifier master node mapping
189 : //---------------------------------------------------------------------------
190 40 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
191 : {
192 40 : LB_TS_NOT_THREAD( _commandThread );
193 :
194 : // OPT: look up locally first?
195 40 : const Nodes& nodes = _localNode->getNodes();
196 :
197 : // OPT: send to multiple nodes at once?
198 40 : for( NodePtr node : nodes )
199 : {
200 : lunchbox::Request< NodeID > request =
201 40 : _localNode->registerRequest< NodeID >();
202 :
203 80 : LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
204 120 : << " req " << request.getID() << std::endl;
205 40 : node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
206 :
207 : try
208 : {
209 40 : const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
210 :
211 40 : if( masterNodeID != 0 )
212 : {
213 40 : LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
214 40 : << masterNodeID << std::endl;
215 40 : return masterNodeID;
216 : }
217 : }
218 0 : catch( const lunchbox::FutureTimeout& )
219 : {
220 0 : _localNode->unregisterRequest( request.getID( ));
221 0 : throw;
222 : }
223 0 : }
224 :
225 0 : return NodeID();
226 : }
227 :
228 : //---------------------------------------------------------------------------
229 : // object mapping
230 : //---------------------------------------------------------------------------
231 21 : void ObjectStore::attach( Object* object, const uint128_t& id,
232 : const uint32_t instanceID )
233 : {
234 21 : LBASSERT( object );
235 21 : LB_TS_NOT_THREAD( _receiverThread );
236 :
237 : lunchbox::Request< void > request =
238 21 : _localNode->registerRequest< void >( object );
239 21 : _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
240 21 : }
241 :
242 : namespace
243 : {
244 63 : uint32_t _genNextID( lunchbox::a_int32_t& val )
245 : {
246 : uint32_t result;
247 63 : do
248 : {
249 63 : const long id = ++val;
250 : result = static_cast< uint32_t >(
251 63 : static_cast< int64_t >( id ) + 0x7FFFFFFFu );
252 : }
253 : while( result > CO_INSTANCE_MAX );
254 :
255 63 : return result;
256 : }
257 : }
258 :
259 63 : void ObjectStore::_attach( Object* object, const uint128_t& id,
260 : const uint32_t inInstanceID )
261 : {
262 63 : LBASSERT( object );
263 63 : LB_TS_THREAD( _receiverThread );
264 :
265 63 : uint32_t instanceID = inInstanceID;
266 63 : if( inInstanceID == CO_INSTANCE_INVALID )
267 21 : instanceID = _genNextID( _instanceIDs );
268 :
269 63 : object->attach( id, instanceID );
270 :
271 : {
272 63 : lunchbox::ScopedFastWrite mutex( _objects );
273 63 : Objects& objects = _objects.data[ id ];
274 63 : LBASSERTINFO( !object->isMaster() || objects.empty(),
275 : "Attaching master " << *object << ", " <<
276 : objects.size() << " attached objects with same ID, " <<
277 : "first is " << ( objects[0]->isMaster() ? "master " :
278 : "slave " ) << *objects[0] );
279 63 : objects.push_back( object );
280 : }
281 :
282 63 : _localNode->flushCommands(); // redispatch pending commands
283 :
284 63 : LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
285 63 : << static_cast< void* >( object ) << std::endl;
286 63 : }
287 :
288 29 : void ObjectStore::detach( Object* object )
289 : {
290 29 : LBASSERT( object );
291 29 : LB_TS_NOT_THREAD( _receiverThread );
292 :
293 29 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
294 : _localNode->send( CMD_NODE_DETACH_OBJECT )
295 29 : << object->getID() << object->getInstanceID() << request;
296 29 : }
297 :
298 0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
299 : {
300 0 : LBASSERT( newObject );
301 0 : LBASSERT( oldObject );
302 0 : LBASSERT( oldObject->isMaster() );
303 0 : LB_TS_THREAD( _receiverThread );
304 :
305 0 : if( !oldObject->isAttached() )
306 0 : return;
307 :
308 0 : LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
309 0 : << std::endl;
310 0 : const uint128_t& id = oldObject->getID();
311 :
312 0 : lunchbox::ScopedFastWrite mutex( _objects );
313 0 : ObjectsHash::iterator i = _objects->find( id );
314 0 : LBASSERT( i != _objects->end( ));
315 0 : if( i == _objects->end( ))
316 0 : return;
317 :
318 0 : Objects& objects = i->second;
319 0 : Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
320 0 : LBASSERT( j != objects.end( ));
321 0 : if( j == objects.end( ))
322 0 : return;
323 :
324 0 : newObject->transfer( oldObject );
325 0 : *j = newObject;
326 : }
327 :
328 62 : void ObjectStore::_detach( Object* object )
329 : {
330 : // check also _cmdUnmapObject when modifying!
331 62 : LBASSERT( object );
332 62 : LB_TS_THREAD( _receiverThread );
333 :
334 62 : if( !object->isAttached() )
335 0 : return;
336 :
337 62 : const uint128_t& id = object->getID();
338 :
339 62 : LBASSERT( _objects->find( id ) != _objects->end( ));
340 62 : LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
341 :
342 62 : Objects& objects = _objects.data[ id ];
343 62 : Objects::iterator i = find( objects.begin(),objects.end(), object );
344 62 : LBASSERT( i != objects.end( ));
345 :
346 : {
347 62 : lunchbox::ScopedFastWrite mutex( _objects );
348 62 : objects.erase( i );
349 62 : if( objects.empty( ))
350 61 : _objects->erase( id );
351 : }
352 :
353 62 : LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
354 62 : object->detach();
355 62 : return;
356 : }
357 :
358 42 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
359 : const uint128_t& version, NodePtr master )
360 : {
361 42 : LB_TS_NOT_THREAD( _receiverThread );
362 42 : LBLOG( LOG_OBJECTS )
363 42 : << "Mapping " << lunchbox::className( object ) << " to id " << id
364 126 : << " version " << version << std::endl;
365 42 : LBASSERT( object );
366 42 : LBASSERTINFO( id.isUUID(), id );
367 :
368 42 : if( !master )
369 40 : master = _localNode->connectObjectMaster( id );
370 :
371 42 : if( !master || !master->isReachable( ))
372 : {
373 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
374 0 : << std::endl;
375 0 : return LB_UNDEFINED_UINT32;
376 : }
377 :
378 42 : if( !object || !id.isUUID( ))
379 : {
380 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
381 0 : return LB_UNDEFINED_UINT32;
382 : }
383 :
384 42 : const bool isAttached = object->isAttached();
385 42 : const bool isMaster = object->isMaster();
386 42 : LBASSERTINFO( !isAttached, *object );
387 42 : LBASSERT( !isMaster ) ;
388 42 : if( isAttached || isMaster )
389 : {
390 0 : LBWARN << "Invalid object state: attached " << isAttached << " master "
391 0 : << isMaster << std::endl;
392 0 : return LB_UNDEFINED_UINT32;
393 : }
394 :
395 42 : const uint32_t request = _localNode->registerRequest( object );
396 42 : uint128_t minCachedVersion = VERSION_HEAD;
397 42 : uint128_t maxCachedVersion = VERSION_NONE;
398 42 : uint32_t masterInstanceID = 0;
399 : const bool useCache = _checkInstanceCache( id, minCachedVersion,
400 : maxCachedVersion,
401 42 : masterInstanceID );
402 42 : object->notifyAttach();
403 : master->send( CMD_NODE_MAP_OBJECT )
404 84 : << version << minCachedVersion << maxCachedVersion << id
405 126 : << object->getMaxVersions() << request << _genNextID( _instanceIDs )
406 42 : << masterInstanceID << useCache;
407 42 : return request;
408 : }
409 :
410 46 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
411 : uint128_t& to, uint32_t& instanceID )
412 : {
413 46 : if( !_instanceCache )
414 0 : return false;
415 :
416 46 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
417 46 : if( cached == InstanceCache::Data::NONE )
418 46 : return false;
419 :
420 0 : const ObjectDataIStreamDeque& versions = cached.versions;
421 0 : LBASSERT( !cached.versions.empty( ));
422 0 : instanceID = cached.masterInstanceID;
423 0 : from = versions.front()->getVersion();
424 0 : to = versions.back()->getVersion();
425 0 : LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
426 0 : << std::endl;
427 0 : return true;
428 : }
429 :
430 42 : bool ObjectStore::mapSync( const uint32_t requestID )
431 : {
432 42 : if( requestID == LB_UNDEFINED_UINT32 )
433 0 : return false;
434 :
435 42 : void* data = _localNode->getRequestData( requestID );
436 42 : if( data == 0 )
437 0 : return false;
438 :
439 42 : Object* object = LBSAFECAST( Object*, data );
440 42 : uint128_t version = VERSION_NONE;
441 42 : _localNode->waitRequest( requestID, version );
442 :
443 42 : const bool mapped = object->isAttached();
444 42 : if( mapped )
445 42 : object->applyMapData( version ); // apply initial instance data
446 :
447 42 : object->notifyAttached();
448 42 : LBLOG( LOG_OBJECTS )
449 84 : << "Mapped " << lunchbox::className( object ) << std::endl;
450 42 : return mapped;
451 : }
452 :
453 :
454 4 : f_bool_t ObjectStore::sync( Object* object, const uint128_t& id, NodePtr master,
455 : const uint32_t instanceID )
456 : {
457 4 : const uint32_t request = _startSync( object, id, master, instanceID );
458 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
459 4 : this, request, object );
460 4 : return f_bool_t( new FuturebImpl( func ));
461 : }
462 :
463 4 : uint32_t ObjectStore::_startSync( Object* object, const uint128_t& id,
464 : NodePtr master, const uint32_t instanceID )
465 : {
466 4 : LB_TS_NOT_THREAD( _receiverThread );
467 4 : LBLOG( LOG_OBJECTS )
468 4 : << "Syncing " << lunchbox::className( object ) << " with id " << id
469 12 : << std::endl;
470 4 : LBASSERT( object );
471 4 : LBASSERTINFO( id.isUUID(), id );
472 :
473 4 : if( !object || !id.isUUID( ))
474 : {
475 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
476 0 : return LB_UNDEFINED_UINT32;
477 : }
478 :
479 4 : if( !master )
480 0 : master = _localNode->connectObjectMaster( id );
481 :
482 4 : if( !master || !master->isReachable( ))
483 : {
484 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
485 0 : << std::endl;
486 0 : return LB_UNDEFINED_UINT32;
487 : }
488 :
489 : const uint32_t request =
490 4 : _localNode->registerRequest( new ObjectDataIStream );
491 4 : uint128_t minCachedVersion = VERSION_HEAD;
492 4 : uint128_t maxCachedVersion = VERSION_NONE;
493 4 : uint32_t cacheInstanceID = 0;
494 :
495 : bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
496 4 : cacheInstanceID );
497 4 : if( useCache )
498 : {
499 0 : switch( instanceID )
500 : {
501 : case CO_INSTANCE_ALL:
502 0 : break;
503 : default:
504 0 : if( instanceID == cacheInstanceID )
505 0 : break;
506 :
507 0 : useCache = false;
508 0 : LBCHECK( _instanceCache->release( id, 1 ));
509 0 : break;
510 : }
511 : }
512 :
513 : // Use stream expected by MasterCMCommand
514 : master->send( CMD_NODE_SYNC_OBJECT )
515 8 : << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
516 12 : << uint64_t(0) /* maxVersions */ << request << instanceID
517 4 : << cacheInstanceID << useCache;
518 4 : return request;
519 : }
520 :
521 4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
522 : {
523 4 : if( requestID == LB_UNDEFINED_UINT32 )
524 0 : return false;
525 :
526 4 : void* data = _localNode->getRequestData( requestID );
527 4 : if( data == 0 )
528 0 : return false;
529 :
530 4 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
531 :
532 4 : bool ok = false;
533 4 : _localNode->waitRequest( requestID, ok );
534 :
535 4 : if( !ok )
536 : {
537 0 : LBWARN << "Object synchronization failed" << std::endl;
538 0 : delete is;
539 0 : return false;
540 : }
541 :
542 4 : is->waitReady();
543 4 : object->applyInstanceData( *is );
544 8 : LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
545 12 : << std::endl;
546 4 : delete is;
547 4 : return true;
548 : }
549 :
550 42 : void ObjectStore::unmap( Object* object )
551 : {
552 42 : LBASSERT( object );
553 42 : if( !object->isAttached( )) // not registered
554 35 : return;
555 :
556 41 : const uint128_t& id = object->getID();
557 :
558 41 : LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
559 :
560 41 : object->notifyDetach();
561 :
562 : // send unsubscribe to master, master will send detach command.
563 41 : LBASSERT( !object->isMaster( ));
564 41 : LB_TS_NOT_THREAD( _commandThread );
565 :
566 41 : const uint32_t masterInstanceID = object->getMasterInstanceID();
567 41 : if( masterInstanceID != CO_INSTANCE_INVALID )
568 : {
569 33 : NodePtr master = object->getMasterNode();
570 33 : LBASSERT( master )
571 :
572 33 : if( master && master->isReachable( ))
573 : {
574 : lunchbox::Request< void > request =
575 33 : _localNode->registerRequest< void >();
576 : master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
577 33 : << id << request << masterInstanceID << object->getInstanceID();
578 33 : request.wait();
579 33 : object->notifyDetached();
580 33 : return;
581 : }
582 0 : LBERROR << "Master node for object id " << id << " not connected"
583 0 : << std::endl;
584 : }
585 :
586 : // no unsubscribe sent: Detach directly
587 8 : detach( object );
588 8 : object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
589 8 : object->notifyDetached();
590 : }
591 :
592 21 : bool ObjectStore::register_( Object* object )
593 : {
594 21 : LBASSERT( object );
595 21 : LBASSERT( !object->isAttached( ));
596 :
597 21 : const uint128_t& id = object->getID( );
598 21 : LBASSERTINFO( id.isUUID(), id );
599 :
600 21 : object->notifyAttach();
601 21 : object->setupChangeManager( object->getChangeType(), true, _localNode,
602 42 : CO_INSTANCE_INVALID );
603 21 : attach( object, id, CO_INSTANCE_INVALID );
604 :
605 21 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
606 21 : _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
607 :
608 21 : object->notifyAttached();
609 :
610 21 : LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
611 21 : return true;
612 : }
613 :
614 21 : void ObjectStore::deregister( Object* object )
615 : {
616 21 : LBASSERT( object );
617 21 : if( !object->isAttached( )) // not registered
618 21 : return;
619 :
620 21 : LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
621 21 : LBASSERT( object->isMaster( ));
622 :
623 21 : object->notifyDetach();
624 :
625 21 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
626 : {
627 : // remove from send queue
628 : lunchbox::Request< void > request =
629 21 : _localNode->registerRequest< void >();
630 21 : _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
631 : }
632 :
633 21 : const uint128_t id = object->getID();
634 21 : detach( object );
635 21 : object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
636 21 : if( _instanceCache )
637 21 : _instanceCache->erase( id );
638 21 : object->notifyDetached();
639 : }
640 :
641 51608 : bool ObjectStore::notifyCommandThreadIdle()
642 : {
643 51608 : LB_TS_THREAD( _commandThread );
644 51608 : if( _sendQueue.empty( ))
645 51608 : return false;
646 :
647 0 : LBASSERT( _sendOnRegister > 0 );
648 0 : SendQueueItem& item = _sendQueue.front();
649 :
650 0 : if( item.age > _localNode->getTime64( ))
651 : {
652 0 : const Nodes& nodes = _localNode->getNodes( false );
653 0 : if( nodes.empty( ))
654 : {
655 0 : lunchbox::Thread::yield();
656 0 : return !_sendQueue.empty();
657 : }
658 :
659 0 : item.object->sendInstanceData( nodes );
660 : }
661 0 : _sendQueue.pop_front();
662 0 : return !_sendQueue.empty();
663 : }
664 :
665 8 : void ObjectStore::removeNode( NodePtr node )
666 : {
667 8 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
668 8 : _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
669 8 : }
670 :
671 : //===========================================================================
672 : // ICommand handling
673 : //===========================================================================
674 547 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
675 : {
676 547 : LB_TS_THREAD( _receiverThread );
677 547 : ObjectICommand command( cmd );
678 547 : const uint128_t& id = command.getObjectID();
679 547 : const uint32_t instanceID = command.getInstanceID();
680 :
681 547 : ObjectsHash::const_iterator i = _objects->find( id );
682 :
683 547 : if( i == _objects->end( ))
684 : // When the instance ID is set to none, we only care about the command
685 : // when we have an object of the given ID (multicast)
686 0 : return ( instanceID == CO_INSTANCE_NONE );
687 :
688 547 : const Objects& objects = i->second;
689 547 : LBASSERTINFO( !objects.empty(), command );
690 :
691 547 : if( instanceID <= CO_INSTANCE_MAX )
692 : {
693 60 : for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
694 : {
695 60 : Object* object = *j;
696 60 : if( instanceID == object->getInstanceID( ))
697 : {
698 54 : LBCHECK( object->dispatchCommand( command ));
699 54 : return true;
700 : }
701 : }
702 0 : LBERROR << "Can't find object instance " << instanceID << " for "
703 0 : << command << std::endl;
704 0 : LBUNREACHABLE;
705 0 : return false;
706 : }
707 :
708 493 : Objects::const_iterator j = objects.begin();
709 493 : Object* object = *j;
710 493 : LBCHECK( object->dispatchCommand( command ));
711 :
712 493 : for( ++j; j != objects.end(); ++j )
713 : {
714 0 : object = *j;
715 0 : LBCHECK( object->dispatchCommand( command ));
716 : }
717 493 : return true;
718 : }
719 :
720 40 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
721 : {
722 40 : LB_TS_THREAD( _commandThread );
723 :
724 40 : const uint128_t& id = command.get< uint128_t >();
725 40 : const uint32_t requestID = command.get< uint32_t >();
726 40 : LBASSERT( id.isUUID( ));
727 :
728 40 : NodeID masterNodeID;
729 : {
730 40 : lunchbox::ScopedFastRead mutex( _objects );
731 40 : ObjectsHashCIter i = _objects->find( id );
732 :
733 40 : if( i != _objects->end( ))
734 : {
735 40 : const Objects& objects = i->second;
736 40 : LBASSERT( !objects.empty( ));
737 :
738 40 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
739 : {
740 40 : Object* object = *j;
741 40 : if( object->isMaster( ))
742 40 : masterNodeID = _localNode->getNodeID();
743 : else
744 : {
745 0 : NodePtr master = object->getMasterNode();
746 0 : if( master.isValid( ))
747 0 : masterNodeID = master->getNodeID();
748 : }
749 40 : if( masterNodeID != 0 )
750 40 : break;
751 : }
752 40 : }
753 : }
754 :
755 40 : LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
756 40 : << " req " << requestID << std::endl;
757 : command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
758 40 : << masterNodeID << requestID;
759 40 : return true;
760 : }
761 :
762 40 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
763 : {
764 40 : const NodeID& masterNodeID = command.get< NodeID >();
765 40 : const uint32_t requestID = command.get< uint32_t >();
766 40 : _localNode->serveRequest( requestID, masterNodeID );
767 40 : return true;
768 : }
769 :
770 21 : bool ObjectStore::_cmdAttach( ICommand& command )
771 : {
772 21 : LB_TS_THREAD( _receiverThread );
773 21 : LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
774 :
775 21 : const uint128_t& objectID = command.get< uint128_t >();
776 21 : const uint32_t instanceID = command.get< uint32_t >();
777 21 : const uint32_t requestID = command.get< uint32_t >();
778 :
779 : Object* object = static_cast< Object* >( _localNode->getRequestData(
780 21 : requestID ));
781 21 : _attach( object, objectID, instanceID );
782 21 : _localNode->serveRequest( requestID );
783 21 : return true;
784 : }
785 :
786 62 : bool ObjectStore::_cmdDetach( ICommand& command )
787 : {
788 62 : LB_TS_THREAD( _receiverThread );
789 62 : LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
790 :
791 62 : const uint128_t& objectID = command.get< uint128_t >();
792 62 : const uint32_t instanceID = command.get< uint32_t >();
793 62 : const uint32_t requestID = command.get< uint32_t >();
794 :
795 62 : ObjectsHash::const_iterator i = _objects->find( objectID );
796 62 : if( i != _objects->end( ))
797 : {
798 62 : const Objects& objects = i->second;
799 :
800 189 : for( Objects::const_iterator j = objects.begin();
801 126 : j != objects.end(); ++j )
802 : {
803 63 : Object* object = *j;
804 63 : if( object->getInstanceID() == instanceID )
805 : {
806 62 : _detach( object );
807 62 : break;
808 : }
809 : }
810 : }
811 :
812 62 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
813 62 : _localNode->serveRequest( requestID );
814 62 : return true;
815 : }
816 :
817 21 : bool ObjectStore::_cmdRegister( ICommand& command )
818 : {
819 21 : LB_TS_THREAD( _commandThread );
820 21 : if( _sendOnRegister <= 0 )
821 21 : return true;
822 :
823 0 : LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
824 :
825 0 : Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
826 :
827 : const int32_t age = Global::getIAttribute(
828 0 : Global::IATTR_NODE_SEND_QUEUE_AGE );
829 0 : SendQueueItem item;
830 0 : item.age = age ? age + _localNode->getTime64() :
831 0 : std::numeric_limits< int64_t >::max();
832 0 : item.object = object;
833 0 : _sendQueue.push_back( item );
834 :
835 : const uint32_t size = Global::getIAttribute(
836 0 : Global::IATTR_NODE_SEND_QUEUE_SIZE );
837 0 : while( _sendQueue.size() > size )
838 0 : _sendQueue.pop_front();
839 :
840 0 : return true;
841 : }
842 :
843 21 : bool ObjectStore::_cmdDeregister( ICommand& command )
844 : {
845 21 : LB_TS_THREAD( _commandThread );
846 21 : LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
847 :
848 21 : const uint32_t requestID = command.get< uint32_t >();
849 :
850 21 : const void* object = _localNode->getRequestData( requestID );
851 :
852 21 : for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
853 : {
854 0 : if( i->object == object )
855 : {
856 0 : _sendQueue.erase( i );
857 0 : break;
858 : }
859 : }
860 :
861 21 : _localNode->serveRequest( requestID );
862 21 : return true;
863 : }
864 :
865 42 : bool ObjectStore::_cmdMap( ICommand& cmd )
866 : {
867 42 : LB_TS_THREAD( _commandThread );
868 :
869 42 : MasterCMCommand command( cmd );
870 42 : const uint128_t& id = command.getObjectID();
871 :
872 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
873 0 : << command.getInstanceID() << " req "
874 42 : << command.getRequestID() << std::endl;
875 :
876 84 : ObjectCMPtr masterCM;
877 : {
878 42 : lunchbox::ScopedFastRead mutex( _objects );
879 42 : ObjectsHash::const_iterator i = _objects->find( id );
880 42 : if( i != _objects->end( ))
881 : {
882 42 : const Objects& objects = i->second;
883 :
884 42 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
885 : {
886 42 : Object* object = *j;
887 42 : if( object->isMaster( ))
888 : {
889 42 : masterCM = object->_getChangeManager();
890 42 : break;
891 : }
892 : }
893 42 : }
894 : }
895 :
896 42 : if( !masterCM || !masterCM->addSlave( command ))
897 : {
898 0 : LBWARN << "Can't find master object to map " << id << std::endl;
899 0 : NodePtr node = command.getNode();
900 : node->send( CMD_NODE_MAP_OBJECT_REPLY )
901 0 : << node->getNodeID() << id << command.getRequestedVersion()
902 0 : << command.getRequestID() << false << command.useCache() << false;
903 : }
904 :
905 42 : ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
906 84 : return true;
907 : }
908 :
909 42 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
910 : {
911 42 : LB_TS_THREAD( _receiverThread );
912 :
913 42 : const uint128_t& nodeID = command.get< uint128_t >();
914 42 : const uint128_t& objectID = command.get< uint128_t >();
915 42 : const uint32_t requestID = command.get< uint32_t >();
916 42 : const uint32_t instanceID = command.get< uint32_t >();
917 42 : const Object::ChangeType changeType = command.get< Object::ChangeType >();
918 42 : const uint32_t masterInstanceID = command.get< uint32_t >();
919 :
920 : // Map success commands are potentially multicasted (see above)
921 : // verify that we are the intended receiver
922 42 : if( nodeID != _localNode->getNodeID( ))
923 0 : return true;
924 :
925 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
926 0 : << " id " << objectID << "." << instanceID
927 42 : << " req " << requestID << std::endl;
928 :
929 : // set up change manager and attach object to dispatch table
930 : Object* object = static_cast< Object* >(
931 42 : _localNode->getRequestData( requestID ));
932 42 : LBASSERT( object );
933 42 : LBASSERT( !object->isMaster( ));
934 :
935 : object->setupChangeManager( Object::ChangeType( changeType ), false,
936 42 : _localNode, masterInstanceID );
937 42 : _attach( object, objectID, instanceID );
938 42 : return true;
939 : }
940 :
941 42 : bool ObjectStore::_cmdMapReply( ICommand& command )
942 : {
943 42 : LB_TS_THREAD( _receiverThread );
944 :
945 : // Map reply commands are potentially multicasted (see above)
946 : // verify that we are the intended receiver
947 42 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
948 0 : return true;
949 :
950 42 : const uint128_t& objectID = command.get< uint128_t >();
951 42 : const uint128_t& version = command.get< uint128_t >();
952 42 : const uint32_t requestID = command.get< uint32_t >();
953 42 : const bool result = command.get< bool >();
954 42 : const bool releaseCache = command.get< bool >();
955 42 : const bool useCache = command.get< bool >();
956 :
957 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
958 42 : << objectID << " req " << requestID << std::endl;
959 :
960 42 : LBASSERT( _localNode->getRequestData( requestID ));
961 :
962 42 : if( result )
963 : {
964 : Object* object = static_cast<Object*>(
965 42 : _localNode->getRequestData( requestID ));
966 42 : LBASSERT( object );
967 42 : LBASSERT( !object->isMaster( ));
968 :
969 42 : object->setMasterNode( command.getNode( ));
970 :
971 42 : if( useCache )
972 : {
973 0 : LBASSERT( releaseCache );
974 0 : LBASSERT( _instanceCache );
975 :
976 0 : const uint128_t& id = objectID;
977 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
978 0 : LBASSERT( cached != InstanceCache::Data::NONE );
979 0 : LBASSERT( !cached.versions.empty( ));
980 :
981 0 : object->addInstanceDatas( cached.versions, version );
982 0 : LBCHECK( _instanceCache->release( id, 2 ));
983 : }
984 42 : else if( releaseCache )
985 : {
986 0 : LBCHECK( _instanceCache->release( objectID, 1 ));
987 : }
988 : }
989 : else
990 : {
991 0 : if( releaseCache )
992 0 : _instanceCache->release( objectID, 1 );
993 :
994 0 : LBWARN << "Could not map object " << objectID << std::endl;
995 : }
996 :
997 42 : _localNode->serveRequest( requestID, version );
998 42 : return true;
999 : }
1000 :
1001 33 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
1002 : {
1003 33 : LB_TS_THREAD( _commandThread );
1004 33 : LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object " << command << std::endl;
1005 :
1006 33 : const uint128_t& id = command.get< uint128_t >();
1007 33 : const uint32_t requestID = command.get< uint32_t >();
1008 33 : const uint32_t masterInstanceID = command.get< uint32_t >();
1009 33 : const uint32_t slaveInstanceID = command.get< uint32_t >();
1010 :
1011 33 : NodePtr node = command.getNode();
1012 :
1013 : {
1014 33 : lunchbox::ScopedFastWrite mutex( _objects );
1015 33 : ObjectsHash::const_iterator i = _objects->find( id );
1016 33 : if( i != _objects->end( ))
1017 : {
1018 33 : const Objects& objects = i->second;
1019 33 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1020 : {
1021 33 : Object* object = *j;
1022 66 : if( object->isMaster() &&
1023 33 : object->getInstanceID() == masterInstanceID )
1024 : {
1025 33 : object->removeSlave( node, slaveInstanceID );
1026 33 : break;
1027 : }
1028 : }
1029 33 : }
1030 : }
1031 :
1032 33 : node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
1033 33 : return true;
1034 : }
1035 :
1036 1 : bool ObjectStore::_cmdUnmap( ICommand& command )
1037 : {
1038 1 : LB_TS_THREAD( _receiverThread );
1039 1 : LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
1040 :
1041 1 : const uint128_t& objectID = command.get< uint128_t >();
1042 :
1043 1 : if( _instanceCache )
1044 1 : _instanceCache->erase( objectID );
1045 :
1046 1 : ObjectsHash::iterator i = _objects->find( objectID );
1047 1 : if( i == _objects->end( )) // nothing to do
1048 0 : return true;
1049 :
1050 1 : const Objects objects = i->second;
1051 : {
1052 1 : lunchbox::ScopedFastWrite mutex( _objects );
1053 1 : _objects->erase( i );
1054 : }
1055 :
1056 2 : for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
1057 : {
1058 1 : Object* object = *j;
1059 1 : object->detach();
1060 : }
1061 :
1062 1 : return true;
1063 : }
1064 :
1065 4 : bool ObjectStore::_cmdSync( ICommand& cmd )
1066 : {
1067 4 : LB_TS_THREAD( _commandThread );
1068 4 : MasterCMCommand command( cmd );
1069 4 : const uint128_t& id = command.getObjectID();
1070 4 : LBINFO << command.getNode() << std::endl;
1071 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
1072 0 : << command.getInstanceID() << " req "
1073 4 : << command.getRequestID() << std::endl;
1074 :
1075 4 : const uint32_t cacheInstanceID = command.getMasterInstanceID();
1076 8 : ObjectCMPtr cm;
1077 : {
1078 4 : lunchbox::ScopedFastRead mutex( _objects );
1079 4 : ObjectsHash::const_iterator i = _objects->find( id );
1080 4 : if( i != _objects->end( ))
1081 : {
1082 4 : const Objects& objects = i->second;
1083 :
1084 8 : for( Object* object : objects )
1085 : {
1086 4 : if( command.getInstanceID() == object->getInstanceID( ))
1087 : {
1088 0 : cm = object->_getChangeManager();
1089 0 : LBASSERT( cm );
1090 0 : break;
1091 : }
1092 :
1093 4 : if( command.getInstanceID() != CO_INSTANCE_ALL )
1094 0 : continue;
1095 :
1096 4 : cm = object->_getChangeManager();
1097 4 : LBASSERT( cm );
1098 4 : if( cacheInstanceID == object->getInstanceID( ))
1099 0 : break;
1100 : }
1101 4 : if( !cm )
1102 0 : LBWARN << "Can't find object to sync " << id << "."
1103 0 : << command.getInstanceID() << " in " << objects.size()
1104 0 : << " instances" << std::endl;
1105 : }
1106 4 : if( !cm )
1107 0 : LBWARN << "Can't find object to sync " << id
1108 0 : << ", no object with identifier" << std::endl;
1109 : }
1110 4 : if( !cm || !cm->sendSync( command ))
1111 : {
1112 0 : NodePtr node = command.getNode();
1113 : node->send( CMD_NODE_SYNC_OBJECT_REPLY )
1114 0 : << node->getNodeID() << id << command.getRequestID()
1115 0 : << false << command.useCache() << false;
1116 : }
1117 8 : return true;
1118 : }
1119 :
1120 4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
1121 : {
1122 4 : LB_TS_THREAD( _receiverThread );
1123 :
1124 : // Sync reply commands are potentially multicasted (see above)
1125 : // verify that we are the intended receiver
1126 4 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
1127 0 : return true;
1128 :
1129 4 : const NodeID& id = command.get< NodeID >();
1130 4 : const uint32_t requestID = command.get< uint32_t >();
1131 4 : const bool result = command.get< bool >();
1132 4 : const bool releaseCache = command.get< bool >();
1133 4 : const bool useCache = command.get< bool >();
1134 4 : void* const data = _localNode->getRequestData( requestID );
1135 4 : ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
1136 :
1137 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
1138 4 : << requestID << std::endl;
1139 4 : if( result )
1140 : {
1141 4 : if( useCache )
1142 : {
1143 0 : LBASSERT( releaseCache );
1144 0 : LBASSERT( _instanceCache );
1145 :
1146 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
1147 0 : LBASSERT( cached != InstanceCache::Data::NONE );
1148 0 : LBASSERT( !cached.versions.empty( ));
1149 :
1150 0 : *is = *cached.versions.back();
1151 0 : LBCHECK( _instanceCache->release( id, 2 ));
1152 : }
1153 4 : else if( releaseCache )
1154 : {
1155 0 : LBCHECK( _instanceCache->release( id, 1 ));
1156 : }
1157 : }
1158 : else
1159 : {
1160 0 : if( releaseCache )
1161 0 : _instanceCache->release( id, 1 );
1162 :
1163 0 : LBWARN << "Could not sync object " << id << " request " << requestID
1164 0 : << std::endl;
1165 : }
1166 :
1167 4 : _localNode->serveRequest( requestID, result );
1168 4 : return true;
1169 : }
1170 :
1171 57 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
1172 : {
1173 57 : LB_TS_THREAD( _receiverThread );
1174 57 : LBASSERT( _localNode );
1175 :
1176 57 : ObjectDataICommand command( inCommand );
1177 57 : const NodeID& nodeID = command.get< NodeID >();
1178 57 : const uint32_t masterInstanceID = command.get< uint32_t >();
1179 57 : const uint32_t cmd = command.getCommand();
1180 :
1181 57 : LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
1182 57 : << masterInstanceID << " node " << nodeID << std::endl;
1183 :
1184 57 : command.setType( COMMANDTYPE_OBJECT );
1185 57 : command.setCommand( CMD_OBJECT_INSTANCE );
1186 :
1187 57 : const uint128_t& version = command.getVersion();
1188 57 : if( _instanceCache && version.high() == 0 )
1189 : {
1190 57 : const ObjectVersion rev( command.getObjectID(), version );
1191 : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
1192 57 : if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
1193 : #endif
1194 50 : _instanceCache->add( rev, masterInstanceID, command, 0 );
1195 : }
1196 :
1197 57 : switch( cmd )
1198 : {
1199 : case CMD_NODE_OBJECT_INSTANCE:
1200 0 : LBASSERT( nodeID == 0 );
1201 0 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1202 0 : return true;
1203 :
1204 : case CMD_NODE_OBJECT_INSTANCE_MAP:
1205 38 : if( nodeID != _localNode->getNodeID( )) // not for me
1206 0 : return true;
1207 :
1208 38 : LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
1209 38 : return dispatchObjectCommand( command );
1210 :
1211 : case CMD_NODE_OBJECT_INSTANCE_COMMIT:
1212 4 : LBASSERT( nodeID == 0 );
1213 4 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1214 4 : return dispatchObjectCommand( command );
1215 :
1216 : case CMD_NODE_OBJECT_INSTANCE_PUSH:
1217 7 : LBASSERT( nodeID == 0 );
1218 7 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1219 7 : _pushData.addDataCommand( command.getObjectID(), command );
1220 7 : return true;
1221 :
1222 : case CMD_NODE_OBJECT_INSTANCE_SYNC:
1223 : {
1224 8 : if( nodeID != _localNode->getNodeID( )) // not for me
1225 0 : return true;
1226 :
1227 8 : void* data = _localNode->getRequestData( command.getInstanceID( ));
1228 8 : LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
1229 8 : LBASSERTINFO( data, this );
1230 :
1231 8 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
1232 8 : is->addDataCommand( command );
1233 8 : return true;
1234 : }
1235 :
1236 : default:
1237 0 : LBUNREACHABLE;
1238 0 : return false;
1239 57 : }
1240 : }
1241 :
1242 0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
1243 : {
1244 0 : LB_TS_THREAD( _commandThread );
1245 0 : LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
1246 :
1247 0 : if( --_sendOnRegister == 0 )
1248 : {
1249 0 : _sendQueue.clear();
1250 :
1251 0 : const Nodes& nodes = _localNode->getNodes( false );
1252 0 : for( NodePtr node : nodes )
1253 : {
1254 0 : ConnectionPtr multicast = node->getConnection( true );
1255 0 : ConnectionPtr connection = node->getConnection( false );
1256 0 : if( multicast )
1257 0 : multicast->finish();
1258 0 : if( connection && connection != multicast )
1259 0 : connection->finish();
1260 0 : }
1261 : }
1262 :
1263 0 : const uint32_t requestID = command.get< uint32_t >();
1264 0 : _localNode->serveRequest( requestID );
1265 0 : return true;
1266 : }
1267 :
1268 42 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
1269 : {
1270 42 : LB_TS_THREAD( _commandThread );
1271 42 : LBLOG( LOG_OBJECTS ) << "Cmd object " << command << std::endl;
1272 :
1273 42 : Node* node = command.get< Node* >();
1274 42 : const uint32_t requestID = command.get< uint32_t >();
1275 :
1276 42 : lunchbox::ScopedFastWrite mutex( _objects );
1277 67 : for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
1278 : {
1279 25 : const Objects& objects = i->second;
1280 50 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1281 25 : (*j)->removeSlaves( node );
1282 : }
1283 :
1284 42 : if( requestID != LB_UNDEFINED_UINT32 )
1285 8 : _localNode->serveRequest( requestID );
1286 : else
1287 34 : node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
1288 :
1289 42 : return true;
1290 : }
1291 :
1292 5 : bool ObjectStore::_cmdPush( ICommand& command )
1293 : {
1294 5 : LB_TS_THREAD( _commandThread );
1295 :
1296 5 : const uint128_t& objectID = command.get< uint128_t >();
1297 5 : const uint128_t& groupID = command.get< uint128_t >();
1298 5 : const uint128_t& typeID = command.get< uint128_t >();
1299 :
1300 5 : ObjectDataIStream* is = _pushData.pull( objectID );
1301 :
1302 5 : _localNode->objectPush( groupID, typeID, objectID, *is );
1303 5 : _pushData.recycle( is );
1304 5 : return true;
1305 : }
1306 :
1307 0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
1308 : {
1309 0 : if( !objectStore )
1310 : {
1311 0 : os << "NULL objectStore";
1312 0 : return os;
1313 : }
1314 :
1315 0 : os << "objectStore (" << (void*)objectStore << ")";
1316 :
1317 0 : return os;
1318 : }
1319 66 : }
|