Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
4 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "objectStore.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "instanceCache.h"
28 : #include "log.h"
29 : #include "masterCMCommand.h"
30 : #include "nodeCommand.h"
31 : #include "oCommand.h"
32 : #include "objectCM.h"
33 : #include "objectCommand.h"
34 : #include "objectDataICommand.h"
35 : #include "objectDataIStream.h"
36 :
37 : #include <lunchbox/futureFunction.h>
38 : #include <lunchbox/scopedMutex.h>
39 :
40 : #include <boost/bind.hpp>
41 :
42 : #include <limits>
43 :
44 : //#define DEBUG_DISPATCH
45 : #ifdef DEBUG_DISPATCH
46 : # include <set>
47 : #endif
48 :
49 : namespace co
50 : {
51 : typedef CommandFunc< ObjectStore > CmdFunc;
52 : typedef lunchbox::FutureFunction< bool > FuturebImpl;
53 :
54 55 : ObjectStore::ObjectStore( LocalNode* localNode, a_ssize_t* counters )
55 : : _localNode( localNode )
56 : , _instanceIDs( -0x7FFFFFFF )
57 : , _instanceCache( new InstanceCache( Global::getIAttribute(
58 110 : Global::IATTR_INSTANCE_CACHE_SIZE ) * LB_1MB ) )
59 165 : , _counters( counters )
60 : {
61 55 : LBASSERT( localNode );
62 55 : CommandQueue* queue = localNode->getCommandThreadQueue();
63 :
64 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID,
65 55 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeID ), queue );
66 : localNode->_registerCommand( CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
67 55 : CmdFunc( this, &ObjectStore::_cmdFindMasterNodeIDReply ), 0 );
68 : localNode->_registerCommand( CMD_NODE_ATTACH_OBJECT,
69 55 : CmdFunc( this, &ObjectStore::_cmdAttach ), 0 );
70 : localNode->_registerCommand( CMD_NODE_DETACH_OBJECT,
71 55 : CmdFunc( this, &ObjectStore::_cmdDetach ), 0 );
72 : localNode->_registerCommand( CMD_NODE_REGISTER_OBJECT,
73 55 : CmdFunc( this, &ObjectStore::_cmdRegister ), queue );
74 : localNode->_registerCommand( CMD_NODE_DEREGISTER_OBJECT,
75 55 : CmdFunc( this, &ObjectStore::_cmdDeregister ), queue );
76 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT,
77 55 : CmdFunc( this, &ObjectStore::_cmdMap ), queue );
78 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_SUCCESS,
79 55 : CmdFunc( this, &ObjectStore::_cmdMapSuccess ), 0 );
80 : localNode->_registerCommand( CMD_NODE_MAP_OBJECT_REPLY,
81 55 : CmdFunc( this, &ObjectStore::_cmdMapReply ), 0 );
82 : localNode->_registerCommand( CMD_NODE_UNMAP_OBJECT,
83 55 : CmdFunc( this, &ObjectStore::_cmdUnmap ), 0 );
84 : localNode->_registerCommand( CMD_NODE_UNSUBSCRIBE_OBJECT,
85 55 : CmdFunc( this, &ObjectStore::_cmdUnsubscribe ), queue );
86 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE,
87 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
88 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_MAP,
89 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
90 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_COMMIT,
91 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
92 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_PUSH,
93 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
94 : localNode->_registerCommand( CMD_NODE_OBJECT_INSTANCE_SYNC,
95 55 : CmdFunc( this, &ObjectStore::_cmdInstance ), 0 );
96 : localNode->_registerCommand( CMD_NODE_DISABLE_SEND_ON_REGISTER,
97 55 : CmdFunc( this, &ObjectStore::_cmdDisableSendOnRegister ), queue );
98 : localNode->_registerCommand( CMD_NODE_REMOVE_NODE,
99 55 : CmdFunc( this, &ObjectStore::_cmdRemoveNode ), queue );
100 : localNode->_registerCommand( CMD_NODE_OBJECT_PUSH,
101 55 : CmdFunc( this, &ObjectStore::_cmdPush ), queue );
102 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT,
103 55 : CmdFunc( this, &ObjectStore::_cmdSync ), queue );
104 : localNode->_registerCommand( CMD_NODE_SYNC_OBJECT_REPLY,
105 55 : CmdFunc( this, &ObjectStore::_cmdSyncReply ), 0 );
106 55 : }
107 :
108 159 : ObjectStore::~ObjectStore()
109 : {
110 53 : LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
111 :
112 : #ifndef NDEBUG
113 53 : if( !_objects->empty( ))
114 : {
115 0 : LBWARN << _objects->size() << " attached objects in destructor"
116 0 : << std::endl;
117 :
118 0 : for( ObjectsHash::const_iterator i = _objects->begin();
119 0 : i != _objects->end(); ++i )
120 : {
121 0 : const Objects& objects = i->second;
122 0 : LBWARN << " " << objects.size() << " objects with id "
123 0 : << i->first << std::endl;
124 :
125 0 : for( Objects::const_iterator j = objects.begin();
126 0 : j != objects.end(); ++j )
127 : {
128 0 : const Object* object = *j;
129 0 : LBINFO << " object type " << lunchbox::className( object )
130 0 : << std::endl;
131 : }
132 : }
133 : }
134 : //LBASSERT( _objects->empty( ))
135 : #endif
136 53 : clear();
137 53 : delete _instanceCache;
138 53 : _instanceCache = 0;
139 106 : }
140 :
141 102 : void ObjectStore::clear( )
142 : {
143 102 : LBASSERT( _objects->empty( ));
144 102 : expireInstanceData( 0 );
145 102 : LBASSERT( !_instanceCache || _instanceCache->isEmpty( ));
146 :
147 102 : _objects->clear();
148 102 : _sendQueue.clear();
149 102 : }
150 :
151 0 : void ObjectStore::disableInstanceCache()
152 : {
153 0 : LBASSERT( _localNode->isClosed( ));
154 0 : delete _instanceCache;
155 0 : _instanceCache = 0;
156 0 : }
157 :
158 102 : void ObjectStore::expireInstanceData( const int64_t age )
159 : {
160 102 : if( _instanceCache )
161 102 : _instanceCache->expire( age );
162 102 : }
163 :
164 115 : void ObjectStore::removeInstanceData( const NodeID& nodeID )
165 : {
166 115 : if( _instanceCache )
167 115 : _instanceCache->remove( nodeID );
168 115 : }
169 :
170 0 : void ObjectStore::enableSendOnRegister()
171 : {
172 0 : ++_sendOnRegister;
173 0 : }
174 :
175 0 : void ObjectStore::disableSendOnRegister()
176 : {
177 0 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
178 : {
179 : lunchbox::Request< void > request =
180 0 : _localNode->registerRequest< void >();
181 0 : _localNode->send( CMD_NODE_DISABLE_SEND_ON_REGISTER ) << request;
182 : }
183 : else // OPT
184 0 : --_sendOnRegister;
185 0 : }
186 :
187 : //---------------------------------------------------------------------------
188 : // identifier master node mapping
189 : //---------------------------------------------------------------------------
190 40 : NodeID ObjectStore::findMasterNodeID( const uint128_t& identifier )
191 : {
192 40 : LB_TS_NOT_THREAD( _commandThread );
193 :
194 : // OPT: look up locally first?
195 40 : Nodes nodes;
196 40 : _localNode->getNodes( nodes );
197 :
198 : // OPT: send to multiple nodes at once?
199 40 : for( NodesIter i = nodes.begin(); i != nodes.end(); ++i )
200 : {
201 40 : NodePtr node = *i;
202 : lunchbox::Request< NodeID > request =
203 40 : _localNode->registerRequest< NodeID >();
204 :
205 80 : LBLOG( LOG_OBJECTS ) << "Finding " << identifier << " on " << node
206 120 : << " req " << request.getID() << std::endl;
207 40 : node->send( CMD_NODE_FIND_MASTER_NODE_ID ) << identifier << request;
208 :
209 : try
210 : {
211 40 : const NodeID& masterNodeID = request.wait( Global::getTimeout( ));
212 :
213 40 : if( masterNodeID != 0 )
214 : {
215 40 : LBLOG( LOG_OBJECTS ) << "Found " << identifier << " on "
216 40 : << masterNodeID << std::endl;
217 40 : return masterNodeID;
218 : }
219 : }
220 0 : catch( const lunchbox::FutureTimeout& )
221 : {
222 0 : _localNode->unregisterRequest( request.getID( ));
223 0 : throw;
224 : }
225 0 : }
226 :
227 0 : return NodeID();
228 : }
229 :
230 : //---------------------------------------------------------------------------
231 : // object mapping
232 : //---------------------------------------------------------------------------
233 21 : void ObjectStore::attach( Object* object, const uint128_t& id,
234 : const uint32_t instanceID )
235 : {
236 21 : LBASSERT( object );
237 21 : LB_TS_NOT_THREAD( _receiverThread );
238 :
239 : lunchbox::Request< void > request =
240 21 : _localNode->registerRequest< void >( object );
241 21 : _localNode->send( CMD_NODE_ATTACH_OBJECT ) << id << instanceID << request;
242 21 : }
243 :
244 : namespace
245 : {
246 63 : uint32_t _genNextID( lunchbox::a_int32_t& val )
247 : {
248 : uint32_t result;
249 63 : do
250 : {
251 63 : const long id = ++val;
252 : result = static_cast< uint32_t >(
253 63 : static_cast< int64_t >( id ) + 0x7FFFFFFFu );
254 : }
255 : while( result > CO_INSTANCE_MAX );
256 :
257 63 : return result;
258 : }
259 : }
260 :
261 63 : void ObjectStore::_attach( Object* object, const uint128_t& id,
262 : const uint32_t inInstanceID )
263 : {
264 63 : LBASSERT( object );
265 63 : LB_TS_THREAD( _receiverThread );
266 :
267 63 : uint32_t instanceID = inInstanceID;
268 63 : if( inInstanceID == CO_INSTANCE_INVALID )
269 21 : instanceID = _genNextID( _instanceIDs );
270 :
271 63 : object->attach( id, instanceID );
272 :
273 : {
274 63 : lunchbox::ScopedFastWrite mutex( _objects );
275 63 : Objects& objects = _objects.data[ id ];
276 63 : LBASSERTINFO( !object->isMaster() || objects.empty(),
277 : "Attaching master " << *object << ", " <<
278 : objects.size() << " attached objects with same ID, " <<
279 : "first is " << ( objects[0]->isMaster() ? "master " :
280 : "slave " ) << *objects[0] );
281 63 : objects.push_back( object );
282 : }
283 :
284 63 : _localNode->flushCommands(); // redispatch pending commands
285 :
286 63 : LBLOG( LOG_OBJECTS ) << "attached " << *object << " @"
287 63 : << static_cast< void* >( object ) << std::endl;
288 63 : }
289 :
290 29 : void ObjectStore::detach( Object* object )
291 : {
292 29 : LBASSERT( object );
293 29 : LB_TS_NOT_THREAD( _receiverThread );
294 :
295 29 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
296 : _localNode->send( CMD_NODE_DETACH_OBJECT )
297 29 : << object->getID() << object->getInstanceID() << request;
298 29 : }
299 :
300 0 : void ObjectStore::swap( Object* oldObject, Object* newObject )
301 : {
302 0 : LBASSERT( newObject );
303 0 : LBASSERT( oldObject );
304 0 : LBASSERT( oldObject->isMaster() );
305 0 : LB_TS_THREAD( _receiverThread );
306 :
307 0 : if( !oldObject->isAttached() )
308 0 : return;
309 :
310 0 : LBLOG( LOG_OBJECTS ) << "Swap " << lunchbox::className( oldObject )
311 0 : << std::endl;
312 0 : const uint128_t& id = oldObject->getID();
313 :
314 0 : lunchbox::ScopedFastWrite mutex( _objects );
315 0 : ObjectsHash::iterator i = _objects->find( id );
316 0 : LBASSERT( i != _objects->end( ));
317 0 : if( i == _objects->end( ))
318 0 : return;
319 :
320 0 : Objects& objects = i->second;
321 0 : Objects::iterator j = find( objects.begin(), objects.end(), oldObject );
322 0 : LBASSERT( j != objects.end( ));
323 0 : if( j == objects.end( ))
324 0 : return;
325 :
326 0 : newObject->transfer( oldObject );
327 0 : *j = newObject;
328 : }
329 :
330 62 : void ObjectStore::_detach( Object* object )
331 : {
332 : // check also _cmdUnmapObject when modifying!
333 62 : LBASSERT( object );
334 62 : LB_TS_THREAD( _receiverThread );
335 :
336 62 : if( !object->isAttached() )
337 0 : return;
338 :
339 62 : const uint128_t& id = object->getID();
340 :
341 62 : LBASSERT( _objects->find( id ) != _objects->end( ));
342 62 : LBLOG( LOG_OBJECTS ) << "Detach " << *object << std::endl;
343 :
344 62 : Objects& objects = _objects.data[ id ];
345 62 : Objects::iterator i = find( objects.begin(),objects.end(), object );
346 62 : LBASSERT( i != objects.end( ));
347 :
348 : {
349 62 : lunchbox::ScopedFastWrite mutex( _objects );
350 62 : objects.erase( i );
351 62 : if( objects.empty( ))
352 61 : _objects->erase( id );
353 : }
354 :
355 62 : LBASSERT( object->getInstanceID() != CO_INSTANCE_INVALID );
356 62 : object->detach();
357 62 : return;
358 : }
359 :
360 42 : uint32_t ObjectStore::mapNB( Object* object, const uint128_t& id,
361 : const uint128_t& version, NodePtr master )
362 : {
363 42 : LB_TS_NOT_THREAD( _receiverThread );
364 42 : LBLOG( LOG_OBJECTS )
365 42 : << "Mapping " << lunchbox::className( object ) << " to id " << id
366 126 : << " version " << version << std::endl;
367 42 : LBASSERT( object );
368 42 : LBASSERTINFO( id.isUUID(), id );
369 :
370 42 : if( !master )
371 40 : master = _localNode->connectObjectMaster( id );
372 :
373 42 : if( !master || !master->isReachable( ))
374 : {
375 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
376 0 : << std::endl;
377 0 : return LB_UNDEFINED_UINT32;
378 : }
379 :
380 42 : if( !object || !id.isUUID( ))
381 : {
382 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
383 0 : return LB_UNDEFINED_UINT32;
384 : }
385 :
386 42 : const bool isAttached = object->isAttached();
387 42 : const bool isMaster = object->isMaster();
388 42 : LBASSERTINFO( !isAttached, *object );
389 42 : LBASSERT( !isMaster ) ;
390 42 : if( isAttached || isMaster )
391 : {
392 0 : LBWARN << "Invalid object state: attached " << isAttached << " master "
393 0 : << isMaster << std::endl;
394 0 : return LB_UNDEFINED_UINT32;
395 : }
396 :
397 42 : const uint32_t request = _localNode->registerRequest( object );
398 42 : uint128_t minCachedVersion = VERSION_HEAD;
399 42 : uint128_t maxCachedVersion = VERSION_NONE;
400 42 : uint32_t masterInstanceID = 0;
401 : const bool useCache = _checkInstanceCache( id, minCachedVersion,
402 : maxCachedVersion,
403 42 : masterInstanceID );
404 42 : object->notifyAttach();
405 : master->send( CMD_NODE_MAP_OBJECT )
406 84 : << version << minCachedVersion << maxCachedVersion << id
407 126 : << object->getMaxVersions() << request << _genNextID( _instanceIDs )
408 42 : << masterInstanceID << useCache;
409 42 : return request;
410 : }
411 :
412 46 : bool ObjectStore::_checkInstanceCache( const uint128_t& id, uint128_t& from,
413 : uint128_t& to, uint32_t& instanceID )
414 : {
415 46 : if( !_instanceCache )
416 0 : return false;
417 :
418 46 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
419 46 : if( cached == InstanceCache::Data::NONE )
420 46 : return false;
421 :
422 0 : const ObjectDataIStreamDeque& versions = cached.versions;
423 0 : LBASSERT( !cached.versions.empty( ));
424 0 : instanceID = cached.masterInstanceID;
425 0 : from = versions.front()->getVersion();
426 0 : to = versions.back()->getVersion();
427 0 : LBLOG( LOG_OBJECTS ) << "Object " << id << " have v" << from << ".." << to
428 0 : << std::endl;
429 0 : return true;
430 : }
431 :
432 42 : bool ObjectStore::mapSync( const uint32_t requestID )
433 : {
434 42 : if( requestID == LB_UNDEFINED_UINT32 )
435 0 : return false;
436 :
437 42 : void* data = _localNode->getRequestData( requestID );
438 42 : if( data == 0 )
439 0 : return false;
440 :
441 42 : Object* object = LBSAFECAST( Object*, data );
442 42 : uint128_t version = VERSION_NONE;
443 42 : _localNode->waitRequest( requestID, version );
444 :
445 42 : const bool mapped = object->isAttached();
446 42 : if( mapped )
447 42 : object->applyMapData( version ); // apply initial instance data
448 :
449 42 : object->notifyAttached();
450 42 : LBLOG( LOG_OBJECTS )
451 84 : << "Mapped " << lunchbox::className( object ) << std::endl;
452 42 : return mapped;
453 : }
454 :
455 :
456 4 : f_bool_t ObjectStore::sync( Object* object, NodePtr master, const uint128_t& id,
457 : const uint32_t instanceID )
458 : {
459 4 : const uint32_t request = _startSync( object, master, id, instanceID );
460 : const FuturebImpl::Func& func = boost::bind( &ObjectStore::_finishSync,
461 4 : this, request, object );
462 4 : return f_bool_t( new FuturebImpl( func ));
463 : }
464 :
465 4 : uint32_t ObjectStore::_startSync( Object* object, NodePtr master,
466 : const uint128_t& id,
467 : const uint32_t instanceID )
468 : {
469 4 : LB_TS_NOT_THREAD( _receiverThread );
470 4 : LBLOG( LOG_OBJECTS )
471 4 : << "Syncing " << lunchbox::className( object ) << " with id " << id
472 12 : << std::endl;
473 4 : LBASSERT( object );
474 4 : LBASSERTINFO( id.isUUID(), id );
475 :
476 4 : if( !object || !id.isUUID( ))
477 : {
478 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
479 0 : return LB_UNDEFINED_UINT32;
480 : }
481 :
482 4 : if( !master )
483 0 : master = _localNode->connectObjectMaster( id );
484 :
485 4 : if( !master || !master->isReachable( ))
486 : {
487 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
488 0 : << std::endl;
489 0 : return LB_UNDEFINED_UINT32;
490 : }
491 :
492 : const uint32_t request =
493 4 : _localNode->registerRequest( new ObjectDataIStream );
494 4 : uint128_t minCachedVersion = VERSION_HEAD;
495 4 : uint128_t maxCachedVersion = VERSION_NONE;
496 4 : uint32_t cacheInstanceID = 0;
497 :
498 : bool useCache = _checkInstanceCache( id, minCachedVersion, maxCachedVersion,
499 4 : cacheInstanceID );
500 4 : if( useCache )
501 : {
502 0 : switch( instanceID )
503 : {
504 : case CO_INSTANCE_ALL:
505 0 : break;
506 : default:
507 0 : if( instanceID == cacheInstanceID )
508 0 : break;
509 :
510 0 : useCache = false;
511 0 : LBCHECK( _instanceCache->release( id, 1 ));
512 0 : break;
513 : }
514 : }
515 :
516 : // Use stream expected by MasterCMCommand
517 : master->send( CMD_NODE_SYNC_OBJECT )
518 8 : << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
519 12 : << uint64_t(0) /* maxVersions */ << request << instanceID
520 4 : << cacheInstanceID << useCache;
521 4 : return request;
522 : }
523 :
524 4 : bool ObjectStore::_finishSync( const uint32_t requestID, Object* object )
525 : {
526 4 : if( requestID == LB_UNDEFINED_UINT32 )
527 0 : return false;
528 :
529 4 : void* data = _localNode->getRequestData( requestID );
530 4 : if( data == 0 )
531 0 : return false;
532 :
533 4 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
534 :
535 4 : bool ok = false;
536 4 : _localNode->waitRequest( requestID, ok );
537 :
538 4 : if( !ok )
539 : {
540 0 : LBWARN << "Object synchronization failed" << std::endl;
541 0 : delete is;
542 0 : return false;
543 : }
544 :
545 4 : is->waitReady();
546 4 : object->applyInstanceData( *is );
547 8 : LBLOG( LOG_OBJECTS ) << "Synced " << lunchbox::className( object )
548 12 : << std::endl;
549 4 : delete is;
550 4 : return true;
551 : }
552 :
553 42 : void ObjectStore::unmap( Object* object )
554 : {
555 42 : LBASSERT( object );
556 42 : if( !object->isAttached( )) // not registered
557 35 : return;
558 :
559 41 : const uint128_t& id = object->getID();
560 :
561 41 : LBLOG( LOG_OBJECTS ) << "Unmap " << object << std::endl;
562 :
563 41 : object->notifyDetach();
564 :
565 : // send unsubscribe to master, master will send detach command.
566 41 : LBASSERT( !object->isMaster( ));
567 41 : LB_TS_NOT_THREAD( _commandThread );
568 :
569 41 : const uint32_t masterInstanceID = object->getMasterInstanceID();
570 41 : if( masterInstanceID != CO_INSTANCE_INVALID )
571 : {
572 33 : NodePtr master = object->getMasterNode();
573 33 : LBASSERT( master )
574 :
575 33 : if( master && master->isReachable( ))
576 : {
577 : lunchbox::Request< void > request =
578 33 : _localNode->registerRequest< void >();
579 : master->send( CMD_NODE_UNSUBSCRIBE_OBJECT )
580 33 : << id << request << masterInstanceID << object->getInstanceID();
581 33 : request.wait();
582 33 : object->notifyDetached();
583 33 : return;
584 : }
585 0 : LBERROR << "Master node for object id " << id << " not connected"
586 0 : << std::endl;
587 : }
588 :
589 : // no unsubscribe sent: Detach directly
590 8 : detach( object );
591 8 : object->setupChangeManager( Object::NONE, false, 0, CO_INSTANCE_INVALID );
592 8 : object->notifyDetached();
593 : }
594 :
595 21 : bool ObjectStore::register_( Object* object )
596 : {
597 21 : LBASSERT( object );
598 21 : LBASSERT( !object->isAttached( ));
599 :
600 21 : const uint128_t& id = object->getID( );
601 21 : LBASSERTINFO( id.isUUID(), id );
602 :
603 21 : object->notifyAttach();
604 21 : object->setupChangeManager( object->getChangeType(), true, _localNode,
605 42 : CO_INSTANCE_INVALID );
606 21 : attach( object, id, CO_INSTANCE_INVALID );
607 :
608 21 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
609 21 : _localNode->send( CMD_NODE_REGISTER_OBJECT ) << object;
610 :
611 21 : object->notifyAttached();
612 :
613 21 : LBLOG( LOG_OBJECTS ) << "Registered " << object << std::endl;
614 21 : return true;
615 : }
616 :
617 21 : void ObjectStore::deregister( Object* object )
618 : {
619 21 : LBASSERT( object );
620 21 : if( !object->isAttached( )) // not registered
621 21 : return;
622 :
623 21 : LBLOG( LOG_OBJECTS ) << "Deregister " << *object << std::endl;
624 21 : LBASSERT( object->isMaster( ));
625 :
626 21 : object->notifyDetach();
627 :
628 21 : if( Global::getIAttribute( Global::IATTR_NODE_SEND_QUEUE_SIZE ) > 0 )
629 : {
630 : // remove from send queue
631 : lunchbox::Request< void > request =
632 21 : _localNode->registerRequest< void >();
633 21 : _localNode->send( CMD_NODE_DEREGISTER_OBJECT ) << request;
634 : }
635 :
636 21 : const uint128_t id = object->getID();
637 21 : detach( object );
638 21 : object->setupChangeManager( Object::NONE, true, 0, CO_INSTANCE_INVALID );
639 21 : if( _instanceCache )
640 21 : _instanceCache->erase( id );
641 21 : object->notifyDetached();
642 : }
643 :
644 51606 : bool ObjectStore::notifyCommandThreadIdle()
645 : {
646 51606 : LB_TS_THREAD( _commandThread );
647 51606 : if( _sendQueue.empty( ))
648 51606 : return false;
649 :
650 0 : LBASSERT( _sendOnRegister > 0 );
651 0 : SendQueueItem& item = _sendQueue.front();
652 :
653 0 : if( item.age > _localNode->getTime64( ))
654 : {
655 0 : Nodes nodes;
656 0 : _localNode->getNodes( nodes, false );
657 0 : if( nodes.empty( ))
658 : {
659 0 : lunchbox::Thread::yield();
660 0 : return !_sendQueue.empty();
661 : }
662 :
663 0 : item.object->sendInstanceData( nodes );
664 : }
665 0 : _sendQueue.pop_front();
666 0 : return !_sendQueue.empty();
667 : }
668 :
669 8 : void ObjectStore::removeNode( NodePtr node )
670 : {
671 8 : lunchbox::Request< void > request = _localNode->registerRequest< void >();
672 8 : _localNode->send( CMD_NODE_REMOVE_NODE ) << node.get() << request;
673 8 : }
674 :
675 : //===========================================================================
676 : // ICommand handling
677 : //===========================================================================
678 547 : bool ObjectStore::dispatchObjectCommand( ICommand& cmd )
679 : {
680 547 : LB_TS_THREAD( _receiverThread );
681 547 : ObjectICommand command( cmd );
682 547 : const uint128_t& id = command.getObjectID();
683 547 : const uint32_t instanceID = command.getInstanceID();
684 :
685 547 : ObjectsHash::const_iterator i = _objects->find( id );
686 :
687 547 : if( i == _objects->end( ))
688 : // When the instance ID is set to none, we only care about the command
689 : // when we have an object of the given ID (multicast)
690 0 : return ( instanceID == CO_INSTANCE_NONE );
691 :
692 547 : const Objects& objects = i->second;
693 547 : LBASSERTINFO( !objects.empty(), command );
694 :
695 547 : if( instanceID <= CO_INSTANCE_MAX )
696 : {
697 60 : for( Objects::const_iterator j = objects.begin(); j!=objects.end(); ++j)
698 : {
699 60 : Object* object = *j;
700 60 : if( instanceID == object->getInstanceID( ))
701 : {
702 54 : LBCHECK( object->dispatchCommand( command ));
703 54 : return true;
704 : }
705 : }
706 0 : LBERROR << "Can't find object instance " << instanceID << " for "
707 0 : << command << std::endl;
708 0 : LBUNREACHABLE;
709 0 : return false;
710 : }
711 :
712 493 : Objects::const_iterator j = objects.begin();
713 493 : Object* object = *j;
714 493 : LBCHECK( object->dispatchCommand( command ));
715 :
716 493 : for( ++j; j != objects.end(); ++j )
717 : {
718 0 : object = *j;
719 0 : LBCHECK( object->dispatchCommand( command ));
720 : }
721 493 : return true;
722 : }
723 :
724 40 : bool ObjectStore::_cmdFindMasterNodeID( ICommand& command )
725 : {
726 40 : LB_TS_THREAD( _commandThread );
727 :
728 40 : const uint128_t& id = command.get< uint128_t >();
729 40 : const uint32_t requestID = command.get< uint32_t >();
730 40 : LBASSERT( id.isUUID( ));
731 :
732 40 : NodeID masterNodeID;
733 : {
734 40 : lunchbox::ScopedFastRead mutex( _objects );
735 40 : ObjectsHashCIter i = _objects->find( id );
736 :
737 40 : if( i != _objects->end( ))
738 : {
739 40 : const Objects& objects = i->second;
740 40 : LBASSERT( !objects.empty( ));
741 :
742 40 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
743 : {
744 40 : Object* object = *j;
745 40 : if( object->isMaster( ))
746 40 : masterNodeID = _localNode->getNodeID();
747 : else
748 : {
749 0 : NodePtr master = object->getMasterNode();
750 0 : if( master.isValid( ))
751 0 : masterNodeID = master->getNodeID();
752 : }
753 40 : if( masterNodeID != 0 )
754 40 : break;
755 : }
756 40 : }
757 : }
758 :
759 40 : LBLOG( LOG_OBJECTS ) << "Object " << id << " master " << masterNodeID
760 40 : << " req " << requestID << std::endl;
761 : command.getNode()->send( CMD_NODE_FIND_MASTER_NODE_ID_REPLY )
762 40 : << masterNodeID << requestID;
763 40 : return true;
764 : }
765 :
766 40 : bool ObjectStore::_cmdFindMasterNodeIDReply( ICommand& command )
767 : {
768 40 : const NodeID& masterNodeID = command.get< NodeID >();
769 40 : const uint32_t requestID = command.get< uint32_t >();
770 40 : _localNode->serveRequest( requestID, masterNodeID );
771 40 : return true;
772 : }
773 :
774 21 : bool ObjectStore::_cmdAttach( ICommand& command )
775 : {
776 21 : LB_TS_THREAD( _receiverThread );
777 21 : LBLOG( LOG_OBJECTS ) << "Cmd attach object " << command << std::endl;
778 :
779 21 : const uint128_t& objectID = command.get< uint128_t >();
780 21 : const uint32_t instanceID = command.get< uint32_t >();
781 21 : const uint32_t requestID = command.get< uint32_t >();
782 :
783 : Object* object = static_cast< Object* >( _localNode->getRequestData(
784 21 : requestID ));
785 21 : _attach( object, objectID, instanceID );
786 21 : _localNode->serveRequest( requestID );
787 21 : return true;
788 : }
789 :
790 62 : bool ObjectStore::_cmdDetach( ICommand& command )
791 : {
792 62 : LB_TS_THREAD( _receiverThread );
793 62 : LBLOG( LOG_OBJECTS ) << "Cmd detach object " << command << std::endl;
794 :
795 62 : const uint128_t& objectID = command.get< uint128_t >();
796 62 : const uint32_t instanceID = command.get< uint32_t >();
797 62 : const uint32_t requestID = command.get< uint32_t >();
798 :
799 62 : ObjectsHash::const_iterator i = _objects->find( objectID );
800 62 : if( i != _objects->end( ))
801 : {
802 62 : const Objects& objects = i->second;
803 :
804 189 : for( Objects::const_iterator j = objects.begin();
805 126 : j != objects.end(); ++j )
806 : {
807 63 : Object* object = *j;
808 63 : if( object->getInstanceID() == instanceID )
809 : {
810 62 : _detach( object );
811 62 : break;
812 : }
813 : }
814 : }
815 :
816 62 : LBASSERT( requestID != LB_UNDEFINED_UINT32 );
817 62 : _localNode->serveRequest( requestID );
818 62 : return true;
819 : }
820 :
821 21 : bool ObjectStore::_cmdRegister( ICommand& command )
822 : {
823 21 : LB_TS_THREAD( _commandThread );
824 21 : if( _sendOnRegister <= 0 )
825 21 : return true;
826 :
827 0 : LBLOG( LOG_OBJECTS ) << "Cmd register object " << command << std::endl;
828 :
829 0 : Object* object = reinterpret_cast< Object* >( command.get< void* >( ));
830 :
831 : const int32_t age = Global::getIAttribute(
832 0 : Global::IATTR_NODE_SEND_QUEUE_AGE );
833 0 : SendQueueItem item;
834 0 : item.age = age ? age + _localNode->getTime64() :
835 0 : std::numeric_limits< int64_t >::max();
836 0 : item.object = object;
837 0 : _sendQueue.push_back( item );
838 :
839 : const uint32_t size = Global::getIAttribute(
840 0 : Global::IATTR_NODE_SEND_QUEUE_SIZE );
841 0 : while( _sendQueue.size() > size )
842 0 : _sendQueue.pop_front();
843 :
844 0 : return true;
845 : }
846 :
847 21 : bool ObjectStore::_cmdDeregister( ICommand& command )
848 : {
849 21 : LB_TS_THREAD( _commandThread );
850 21 : LBLOG( LOG_OBJECTS ) << "Cmd deregister object " << command << std::endl;
851 :
852 21 : const uint32_t requestID = command.get< uint32_t >();
853 :
854 21 : const void* object = _localNode->getRequestData( requestID );
855 :
856 21 : for( SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i )
857 : {
858 0 : if( i->object == object )
859 : {
860 0 : _sendQueue.erase( i );
861 0 : break;
862 : }
863 : }
864 :
865 21 : _localNode->serveRequest( requestID );
866 21 : return true;
867 : }
868 :
869 42 : bool ObjectStore::_cmdMap( ICommand& cmd )
870 : {
871 42 : LB_TS_THREAD( _commandThread );
872 :
873 42 : MasterCMCommand command( cmd );
874 42 : const uint128_t& id = command.getObjectID();
875 :
876 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object " << command << " id " << id << "."
877 0 : << command.getInstanceID() << " req "
878 42 : << command.getRequestID() << std::endl;
879 :
880 84 : ObjectCMPtr masterCM;
881 : {
882 42 : lunchbox::ScopedFastRead mutex( _objects );
883 42 : ObjectsHash::const_iterator i = _objects->find( id );
884 42 : if( i != _objects->end( ))
885 : {
886 42 : const Objects& objects = i->second;
887 :
888 42 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
889 : {
890 42 : Object* object = *j;
891 42 : if( object->isMaster( ))
892 : {
893 42 : masterCM = object->_getChangeManager();
894 42 : break;
895 : }
896 : }
897 42 : }
898 : }
899 :
900 42 : if( !masterCM || !masterCM->addSlave( command ))
901 : {
902 0 : LBWARN << "Can't find master object to map " << id << std::endl;
903 0 : NodePtr node = command.getNode();
904 : node->send( CMD_NODE_MAP_OBJECT_REPLY )
905 0 : << node->getNodeID() << id << command.getRequestedVersion()
906 0 : << command.getRequestID() << false << command.useCache() << false;
907 : }
908 :
909 42 : ++_counters[ LocalNode::COUNTER_MAP_OBJECT_REMOTE ];
910 84 : return true;
911 : }
912 :
913 42 : bool ObjectStore::_cmdMapSuccess( ICommand& command )
914 : {
915 42 : LB_TS_THREAD( _receiverThread );
916 :
917 42 : const uint128_t& nodeID = command.get< uint128_t >();
918 42 : const uint128_t& objectID = command.get< uint128_t >();
919 42 : const uint32_t requestID = command.get< uint32_t >();
920 42 : const uint32_t instanceID = command.get< uint32_t >();
921 42 : const Object::ChangeType changeType = command.get< Object::ChangeType >();
922 42 : const uint32_t masterInstanceID = command.get< uint32_t >();
923 :
924 : // Map success commands are potentially multicasted (see above)
925 : // verify that we are the intended receiver
926 42 : if( nodeID != _localNode->getNodeID( ))
927 0 : return true;
928 :
929 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object success " << command
930 0 : << " id " << objectID << "." << instanceID
931 42 : << " req " << requestID << std::endl;
932 :
933 : // set up change manager and attach object to dispatch table
934 : Object* object = static_cast< Object* >(
935 42 : _localNode->getRequestData( requestID ));
936 42 : LBASSERT( object );
937 42 : LBASSERT( !object->isMaster( ));
938 :
939 : object->setupChangeManager( Object::ChangeType( changeType ), false,
940 42 : _localNode, masterInstanceID );
941 42 : _attach( object, objectID, instanceID );
942 42 : return true;
943 : }
944 :
945 42 : bool ObjectStore::_cmdMapReply( ICommand& command )
946 : {
947 42 : LB_TS_THREAD( _receiverThread );
948 :
949 : // Map reply commands are potentially multicasted (see above)
950 : // verify that we are the intended receiver
951 42 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
952 0 : return true;
953 :
954 42 : const uint128_t& objectID = command.get< uint128_t >();
955 42 : const uint128_t& version = command.get< uint128_t >();
956 42 : const uint32_t requestID = command.get< uint32_t >();
957 42 : const bool result = command.get< bool >();
958 42 : const bool releaseCache = command.get< bool >();
959 42 : const bool useCache = command.get< bool >();
960 :
961 42 : LBLOG( LOG_OBJECTS ) << "Cmd map object reply " << command << " id "
962 42 : << objectID << " req " << requestID << std::endl;
963 :
964 42 : LBASSERT( _localNode->getRequestData( requestID ));
965 :
966 42 : if( result )
967 : {
968 : Object* object = static_cast<Object*>(
969 42 : _localNode->getRequestData( requestID ));
970 42 : LBASSERT( object );
971 42 : LBASSERT( !object->isMaster( ));
972 :
973 42 : object->setMasterNode( command.getNode( ));
974 :
975 42 : if( useCache )
976 : {
977 0 : LBASSERT( releaseCache );
978 0 : LBASSERT( _instanceCache );
979 :
980 0 : const uint128_t& id = objectID;
981 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
982 0 : LBASSERT( cached != InstanceCache::Data::NONE );
983 0 : LBASSERT( !cached.versions.empty( ));
984 :
985 0 : object->addInstanceDatas( cached.versions, version );
986 0 : LBCHECK( _instanceCache->release( id, 2 ));
987 : }
988 42 : else if( releaseCache )
989 : {
990 0 : LBCHECK( _instanceCache->release( objectID, 1 ));
991 : }
992 : }
993 : else
994 : {
995 0 : if( releaseCache )
996 0 : _instanceCache->release( objectID, 1 );
997 :
998 0 : LBWARN << "Could not map object " << objectID << std::endl;
999 : }
1000 :
1001 42 : _localNode->serveRequest( requestID, version );
1002 42 : return true;
1003 : }
1004 :
1005 33 : bool ObjectStore::_cmdUnsubscribe( ICommand& command )
1006 : {
1007 33 : LB_TS_THREAD( _commandThread );
1008 33 : LBLOG( LOG_OBJECTS ) << "Cmd unsubscribe object " << command << std::endl;
1009 :
1010 33 : const uint128_t& id = command.get< uint128_t >();
1011 33 : const uint32_t requestID = command.get< uint32_t >();
1012 33 : const uint32_t masterInstanceID = command.get< uint32_t >();
1013 33 : const uint32_t slaveInstanceID = command.get< uint32_t >();
1014 :
1015 33 : NodePtr node = command.getNode();
1016 :
1017 : {
1018 33 : lunchbox::ScopedFastWrite mutex( _objects );
1019 33 : ObjectsHash::const_iterator i = _objects->find( id );
1020 33 : if( i != _objects->end( ))
1021 : {
1022 33 : const Objects& objects = i->second;
1023 33 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1024 : {
1025 33 : Object* object = *j;
1026 66 : if( object->isMaster() &&
1027 33 : object->getInstanceID() == masterInstanceID )
1028 : {
1029 33 : object->removeSlave( node, slaveInstanceID );
1030 33 : break;
1031 : }
1032 : }
1033 33 : }
1034 : }
1035 :
1036 33 : node->send( CMD_NODE_DETACH_OBJECT ) << id << slaveInstanceID << requestID;
1037 33 : return true;
1038 : }
1039 :
1040 1 : bool ObjectStore::_cmdUnmap( ICommand& command )
1041 : {
1042 1 : LB_TS_THREAD( _receiverThread );
1043 1 : LBLOG( LOG_OBJECTS ) << "Cmd unmap object " << command << std::endl;
1044 :
1045 1 : const uint128_t& objectID = command.get< uint128_t >();
1046 :
1047 1 : if( _instanceCache )
1048 1 : _instanceCache->erase( objectID );
1049 :
1050 1 : ObjectsHash::iterator i = _objects->find( objectID );
1051 1 : if( i == _objects->end( )) // nothing to do
1052 0 : return true;
1053 :
1054 1 : const Objects objects = i->second;
1055 : {
1056 1 : lunchbox::ScopedFastWrite mutex( _objects );
1057 1 : _objects->erase( i );
1058 : }
1059 :
1060 2 : for( Objects::const_iterator j = objects.begin(); j != objects.end(); ++j )
1061 : {
1062 1 : Object* object = *j;
1063 1 : object->detach();
1064 : }
1065 :
1066 1 : return true;
1067 : }
1068 :
1069 4 : bool ObjectStore::_cmdSync( ICommand& cmd )
1070 : {
1071 4 : LB_TS_THREAD( _commandThread );
1072 4 : MasterCMCommand command( cmd );
1073 4 : const uint128_t& id = command.getObjectID();
1074 4 : LBINFO << command.getNode() << std::endl;
1075 :
1076 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object id " << id << "."
1077 0 : << command.getInstanceID() << " req "
1078 4 : << command.getRequestID() << std::endl;
1079 :
1080 4 : const uint32_t cacheInstanceID = command.getMasterInstanceID();
1081 8 : ObjectCMPtr cm;
1082 : {
1083 4 : lunchbox::ScopedFastRead mutex( _objects );
1084 4 : ObjectsHash::const_iterator i = _objects->find( id );
1085 4 : if( i != _objects->end( ))
1086 : {
1087 4 : const Objects& objects = i->second;
1088 :
1089 8 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1090 : {
1091 4 : Object* object = *j;
1092 4 : if( command.getInstanceID() == object->getInstanceID( ))
1093 : {
1094 0 : cm = object->_getChangeManager();
1095 0 : LBASSERT( cm );
1096 0 : break;
1097 : }
1098 :
1099 4 : if( command.getInstanceID() != CO_INSTANCE_ALL )
1100 0 : continue;
1101 :
1102 4 : cm = object->_getChangeManager();
1103 4 : LBASSERT( cm );
1104 4 : if( cacheInstanceID == object->getInstanceID( ))
1105 0 : break;
1106 : }
1107 4 : if( !cm )
1108 0 : LBWARN << "Can't find object to sync " << id << "."
1109 0 : << command.getInstanceID() << " in " << objects.size()
1110 0 : << " instances" << std::endl;
1111 : }
1112 4 : if( !cm )
1113 0 : LBWARN << "Can't find object to sync " << id
1114 0 : << ", no object with identifier" << std::endl;
1115 : }
1116 4 : if( !cm || !cm->sendSync( command ))
1117 : {
1118 0 : NodePtr node = command.getNode();
1119 : node->send( CMD_NODE_SYNC_OBJECT_REPLY )
1120 0 : << node->getNodeID() << id << command.getRequestID()
1121 0 : << false << command.useCache() << false;
1122 : }
1123 8 : return true;
1124 : }
1125 :
1126 4 : bool ObjectStore::_cmdSyncReply( ICommand& command )
1127 : {
1128 4 : LB_TS_THREAD( _receiverThread );
1129 :
1130 : // Sync reply commands are potentially multicasted (see above)
1131 : // verify that we are the intended receiver
1132 4 : if( command.get< uint128_t >() != _localNode->getNodeID( ))
1133 0 : return true;
1134 :
1135 4 : const NodeID& id = command.get< NodeID >();
1136 4 : const uint32_t requestID = command.get< uint32_t >();
1137 4 : const bool result = command.get< bool >();
1138 4 : const bool releaseCache = command.get< bool >();
1139 4 : const bool useCache = command.get< bool >();
1140 4 : void* const data = _localNode->getRequestData( requestID );
1141 4 : ObjectDataIStream* const is = LBSAFECAST( ObjectDataIStream*, data );
1142 :
1143 4 : LBLOG( LOG_OBJECTS ) << "Cmd sync object reply " << command << " req "
1144 4 : << requestID << std::endl;
1145 4 : if( result )
1146 : {
1147 4 : if( useCache )
1148 : {
1149 0 : LBASSERT( releaseCache );
1150 0 : LBASSERT( _instanceCache );
1151 :
1152 0 : const InstanceCache::Data& cached = (*_instanceCache)[ id ];
1153 0 : LBASSERT( cached != InstanceCache::Data::NONE );
1154 0 : LBASSERT( !cached.versions.empty( ));
1155 :
1156 0 : *is = *cached.versions.back();
1157 0 : LBCHECK( _instanceCache->release( id, 2 ));
1158 : }
1159 4 : else if( releaseCache )
1160 : {
1161 0 : LBCHECK( _instanceCache->release( id, 1 ));
1162 : }
1163 : }
1164 : else
1165 : {
1166 0 : if( releaseCache )
1167 0 : _instanceCache->release( id, 1 );
1168 :
1169 0 : LBWARN << "Could not sync object " << id << " request " << requestID
1170 0 : << std::endl;
1171 : }
1172 :
1173 4 : _localNode->serveRequest( requestID, result );
1174 4 : return true;
1175 : }
1176 :
1177 57 : bool ObjectStore::_cmdInstance( ICommand& inCommand )
1178 : {
1179 57 : LB_TS_THREAD( _receiverThread );
1180 57 : LBASSERT( _localNode );
1181 :
1182 57 : ObjectDataICommand command( inCommand );
1183 57 : const NodeID& nodeID = command.get< NodeID >();
1184 57 : const uint32_t masterInstanceID = command.get< uint32_t >();
1185 57 : const uint32_t cmd = command.getCommand();
1186 :
1187 57 : LBLOG( LOG_OBJECTS ) << "Cmd instance " << command << " master "
1188 57 : << masterInstanceID << " node " << nodeID << std::endl;
1189 :
1190 57 : command.setType( COMMANDTYPE_OBJECT );
1191 57 : command.setCommand( CMD_OBJECT_INSTANCE );
1192 :
1193 57 : const uint128_t& version = command.getVersion();
1194 57 : if( _instanceCache && version.high() == 0 )
1195 : {
1196 57 : const ObjectVersion rev( command.getObjectID(), version );
1197 : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
1198 57 : if( cmd != CMD_NODE_OBJECT_INSTANCE_PUSH )
1199 : #endif
1200 50 : _instanceCache->add( rev, masterInstanceID, command, 0 );
1201 : }
1202 :
1203 57 : switch( cmd )
1204 : {
1205 : case CMD_NODE_OBJECT_INSTANCE:
1206 0 : LBASSERT( nodeID == 0 );
1207 0 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1208 0 : return true;
1209 :
1210 : case CMD_NODE_OBJECT_INSTANCE_MAP:
1211 38 : if( nodeID != _localNode->getNodeID( )) // not for me
1212 0 : return true;
1213 :
1214 38 : LBASSERT( command.getInstanceID() <= CO_INSTANCE_MAX );
1215 38 : return dispatchObjectCommand( command );
1216 :
1217 : case CMD_NODE_OBJECT_INSTANCE_COMMIT:
1218 4 : LBASSERT( nodeID == 0 );
1219 4 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1220 4 : return dispatchObjectCommand( command );
1221 :
1222 : case CMD_NODE_OBJECT_INSTANCE_PUSH:
1223 7 : LBASSERT( nodeID == 0 );
1224 7 : LBASSERT( command.getInstanceID() == CO_INSTANCE_NONE );
1225 7 : _pushData.addDataCommand( command.getObjectID(), command );
1226 7 : return true;
1227 :
1228 : case CMD_NODE_OBJECT_INSTANCE_SYNC:
1229 : {
1230 8 : if( nodeID != _localNode->getNodeID( )) // not for me
1231 0 : return true;
1232 :
1233 8 : void* data = _localNode->getRequestData( command.getInstanceID( ));
1234 8 : LBASSERT( command.getInstanceID() != CO_INSTANCE_NONE );
1235 8 : LBASSERTINFO( data, this );
1236 :
1237 8 : ObjectDataIStream* is = LBSAFECAST( ObjectDataIStream*, data );
1238 8 : is->addDataCommand( command );
1239 8 : return true;
1240 : }
1241 :
1242 : default:
1243 0 : LBUNREACHABLE;
1244 0 : return false;
1245 57 : }
1246 : }
1247 :
1248 0 : bool ObjectStore::_cmdDisableSendOnRegister( ICommand& command )
1249 : {
1250 0 : LB_TS_THREAD( _commandThread );
1251 0 : LBASSERTINFO( _sendOnRegister > 0, _sendOnRegister );
1252 :
1253 0 : if( --_sendOnRegister == 0 )
1254 : {
1255 0 : _sendQueue.clear();
1256 :
1257 0 : Nodes nodes;
1258 0 : _localNode->getNodes( nodes, false );
1259 0 : for( NodesCIter i = nodes.begin(); i != nodes.end(); ++i )
1260 : {
1261 0 : NodePtr node = *i;
1262 0 : ConnectionPtr multicast = node->getConnection( true );
1263 0 : ConnectionPtr connection = node->getConnection( false );
1264 0 : if( multicast )
1265 0 : multicast->finish();
1266 0 : if( connection && connection != multicast )
1267 0 : connection->finish();
1268 0 : }
1269 : }
1270 :
1271 0 : const uint32_t requestID = command.get< uint32_t >();
1272 0 : _localNode->serveRequest( requestID );
1273 0 : return true;
1274 : }
1275 :
1276 42 : bool ObjectStore::_cmdRemoveNode( ICommand& command )
1277 : {
1278 42 : LB_TS_THREAD( _commandThread );
1279 42 : LBLOG( LOG_OBJECTS ) << "Cmd object " << command << std::endl;
1280 :
1281 42 : Node* node = command.get< Node* >();
1282 42 : const uint32_t requestID = command.get< uint32_t >();
1283 :
1284 42 : lunchbox::ScopedFastWrite mutex( _objects );
1285 67 : for( ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i )
1286 : {
1287 25 : const Objects& objects = i->second;
1288 50 : for( ObjectsCIter j = objects.begin(); j != objects.end(); ++j )
1289 25 : (*j)->removeSlaves( node );
1290 : }
1291 :
1292 42 : if( requestID != LB_UNDEFINED_UINT32 )
1293 8 : _localNode->serveRequest( requestID );
1294 : else
1295 34 : node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
1296 :
1297 42 : return true;
1298 : }
1299 :
1300 5 : bool ObjectStore::_cmdPush( ICommand& command )
1301 : {
1302 5 : LB_TS_THREAD( _commandThread );
1303 :
1304 5 : const uint128_t& objectID = command.get< uint128_t >();
1305 5 : const uint128_t& groupID = command.get< uint128_t >();
1306 5 : const uint128_t& typeID = command.get< uint128_t >();
1307 :
1308 5 : ObjectDataIStream* is = _pushData.pull( objectID );
1309 :
1310 5 : _localNode->objectPush( groupID, typeID, objectID, *is );
1311 5 : _pushData.recycle( is );
1312 5 : return true;
1313 : }
1314 :
1315 0 : std::ostream& operator << ( std::ostream& os, ObjectStore* objectStore )
1316 : {
1317 0 : if( !objectStore )
1318 : {
1319 0 : os << "NULL objectStore";
1320 0 : return os;
1321 : }
1322 :
1323 0 : os << "objectStore (" << (void*)objectStore << ")";
1324 :
1325 0 : return os;
1326 : }
1327 66 : }
|