Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "objectStore.h"
23 :
24 : #include "connection.h"
25 : #include "connectionDescription.h"
26 : #include "global.h"
27 : #include "instanceCache.h"
28 : #include "log.h"
29 : #include "masterCMCommand.h"
30 : #include "nodeCommand.h"
31 : #include "oCommand.h"
32 : #include "objectCM.h"
33 : #include "objectCommand.h"
34 : #include "objectDataICommand.h"
35 : #include "objectDataIStream.h"
36 :
37 : #include <lunchbox/futureFunction.h>
38 : #include <lunchbox/scopedMutex.h>
39 :
40 : #include <boost/bind.hpp>
41 :
42 : #include <limits>
43 :
44 : //#define DEBUG_DISPATCH
45 : #ifdef DEBUG_DISPATCH
46 : #include <set>
47 : #endif
48 :
49 : namespace co
50 : {
51 : typedef CommandFunc<ObjectStore> CmdFunc;
52 : typedef lunchbox::FutureFunction<bool> FuturebImpl;
53 :
54 53 : ObjectStore::ObjectStore(LocalNode* localNode, a_ssize_t* counters)
55 : : _localNode(localNode)
56 : , _instanceIDs(-0x7FFFFFFF)
57 : , _instanceCache(new InstanceCache(
58 53 : Global::getIAttribute(Global::IATTR_INSTANCE_CACHE_SIZE) * LB_1MB))
59 106 : , _counters(counters)
60 : {
61 53 : LBASSERT(localNode);
62 53 : CommandQueue* queue = localNode->getCommandThreadQueue();
63 :
64 : localNode->_registerCommand(CMD_NODE_FIND_MASTER_NODE_ID,
65 106 : CmdFunc(this,
66 : &ObjectStore::_cmdFindMasterNodeID),
67 53 : queue);
68 : localNode->_registerCommand(
69 : CMD_NODE_FIND_MASTER_NODE_ID_REPLY,
70 53 : CmdFunc(this, &ObjectStore::_cmdFindMasterNodeIDReply), 0);
71 : localNode->_registerCommand(CMD_NODE_ATTACH_OBJECT,
72 53 : CmdFunc(this, &ObjectStore::_cmdAttach), 0);
73 : localNode->_registerCommand(CMD_NODE_DETACH_OBJECT,
74 53 : CmdFunc(this, &ObjectStore::_cmdDetach), 0);
75 : localNode->_registerCommand(CMD_NODE_REGISTER_OBJECT,
76 106 : CmdFunc(this, &ObjectStore::_cmdRegister),
77 53 : queue);
78 : localNode->_registerCommand(CMD_NODE_DEREGISTER_OBJECT,
79 106 : CmdFunc(this, &ObjectStore::_cmdDeregister),
80 53 : queue);
81 : localNode->_registerCommand(CMD_NODE_MAP_OBJECT,
82 53 : CmdFunc(this, &ObjectStore::_cmdMap), queue);
83 : localNode->_registerCommand(CMD_NODE_MAP_OBJECT_SUCCESS,
84 53 : CmdFunc(this, &ObjectStore::_cmdMapSuccess), 0);
85 : localNode->_registerCommand(CMD_NODE_MAP_OBJECT_REPLY,
86 53 : CmdFunc(this, &ObjectStore::_cmdMapReply), 0);
87 : localNode->_registerCommand(CMD_NODE_UNMAP_OBJECT,
88 53 : CmdFunc(this, &ObjectStore::_cmdUnmap), 0);
89 : localNode->_registerCommand(CMD_NODE_UNSUBSCRIBE_OBJECT,
90 106 : CmdFunc(this, &ObjectStore::_cmdUnsubscribe),
91 53 : queue);
92 : localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE,
93 53 : CmdFunc(this, &ObjectStore::_cmdInstance), 0);
94 : localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_MAP,
95 53 : CmdFunc(this, &ObjectStore::_cmdInstance), 0);
96 : localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_COMMIT,
97 53 : CmdFunc(this, &ObjectStore::_cmdInstance), 0);
98 : localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_PUSH,
99 53 : CmdFunc(this, &ObjectStore::_cmdInstance), 0);
100 : localNode->_registerCommand(CMD_NODE_OBJECT_INSTANCE_SYNC,
101 53 : CmdFunc(this, &ObjectStore::_cmdInstance), 0);
102 : localNode->_registerCommand(
103 : CMD_NODE_DISABLE_SEND_ON_REGISTER,
104 53 : CmdFunc(this, &ObjectStore::_cmdDisableSendOnRegister), queue);
105 : localNode->_registerCommand(CMD_NODE_REMOVE_NODE,
106 106 : CmdFunc(this, &ObjectStore::_cmdRemoveNode),
107 53 : queue);
108 : localNode->_registerCommand(CMD_NODE_OBJECT_PUSH,
109 53 : CmdFunc(this, &ObjectStore::_cmdPush), queue);
110 : localNode->_registerCommand(CMD_NODE_SYNC_OBJECT,
111 53 : CmdFunc(this, &ObjectStore::_cmdSync), queue);
112 : localNode->_registerCommand(CMD_NODE_SYNC_OBJECT_REPLY,
113 53 : CmdFunc(this, &ObjectStore::_cmdSyncReply), 0);
114 53 : }
115 :
116 153 : ObjectStore::~ObjectStore()
117 : {
118 51 : LBVERB << "Delete ObjectStore @" << (void*)this << std::endl;
119 :
120 : #ifndef NDEBUG
121 51 : if (!_objects->empty())
122 : {
123 0 : LBWARN << _objects->size() << " attached objects in destructor"
124 0 : << std::endl;
125 :
126 0 : for (ObjectsHash::const_iterator i = _objects->begin();
127 0 : i != _objects->end(); ++i)
128 : {
129 0 : const Objects& objects = i->second;
130 0 : LBWARN << " " << objects.size() << " objects with id " << i->first
131 0 : << std::endl;
132 :
133 0 : for (Objects::const_iterator j = objects.begin();
134 0 : j != objects.end(); ++j)
135 : {
136 0 : const Object* object = *j;
137 0 : LBINFO << " object type " << lunchbox::className(object)
138 0 : << std::endl;
139 : }
140 : }
141 : }
142 : // LBASSERT( _objects->empty( ))
143 : #endif
144 51 : clear();
145 51 : delete _instanceCache;
146 51 : _instanceCache = 0;
147 102 : }
148 :
149 98 : void ObjectStore::clear()
150 : {
151 98 : LBASSERT(_objects->empty());
152 98 : expireInstanceData(0);
153 98 : LBASSERT(!_instanceCache || _instanceCache->isEmpty());
154 :
155 98 : _objects->clear();
156 98 : _sendQueue.clear();
157 98 : }
158 :
159 0 : void ObjectStore::disableInstanceCache()
160 : {
161 0 : LBASSERT(_localNode->isClosed());
162 0 : delete _instanceCache;
163 0 : _instanceCache = 0;
164 0 : }
165 :
166 98 : void ObjectStore::expireInstanceData(const int64_t age)
167 : {
168 98 : if (_instanceCache)
169 98 : _instanceCache->expire(age);
170 98 : }
171 :
172 111 : void ObjectStore::removeInstanceData(const NodeID& nodeID)
173 : {
174 111 : if (_instanceCache)
175 111 : _instanceCache->remove(nodeID);
176 111 : }
177 :
178 0 : void ObjectStore::enableSendOnRegister()
179 : {
180 0 : ++_sendOnRegister;
181 0 : }
182 :
183 0 : void ObjectStore::disableSendOnRegister()
184 : {
185 0 : if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
186 : {
187 0 : lunchbox::Request<void> request = _localNode->registerRequest<void>();
188 0 : _localNode->send(CMD_NODE_DISABLE_SEND_ON_REGISTER) << request;
189 : }
190 : else // OPT
191 0 : --_sendOnRegister;
192 0 : }
193 :
194 : //---------------------------------------------------------------------------
195 : // identifier master node mapping
196 : //---------------------------------------------------------------------------
197 34 : NodeID ObjectStore::findMasterNodeID(const uint128_t& identifier)
198 : {
199 34 : LB_TS_NOT_THREAD(_commandThread);
200 :
201 : // OPT: look up locally first?
202 68 : const Nodes& nodes = _localNode->getNodes();
203 :
204 : // OPT: send to multiple nodes at once?
205 34 : for (NodePtr node : nodes)
206 : {
207 : lunchbox::Request<NodeID> request =
208 34 : _localNode->registerRequest<NodeID>();
209 :
210 68 : LBLOG(LOG_OBJECTS) << "Finding " << identifier << " on " << node
211 102 : << " req " << request.getID() << std::endl;
212 34 : node->send(CMD_NODE_FIND_MASTER_NODE_ID) << identifier << request;
213 :
214 : try
215 : {
216 34 : const NodeID& masterNodeID = request.wait(Global::getTimeout());
217 :
218 34 : if (masterNodeID != 0)
219 : {
220 34 : LBLOG(LOG_OBJECTS) << "Found " << identifier << " on "
221 34 : << masterNodeID << std::endl;
222 34 : return masterNodeID;
223 : }
224 : }
225 0 : catch (const lunchbox::FutureTimeout&)
226 : {
227 0 : _localNode->unregisterRequest(request.getID());
228 0 : throw;
229 : }
230 : }
231 :
232 0 : return NodeID();
233 : }
234 :
235 : //---------------------------------------------------------------------------
236 : // object mapping
237 : //---------------------------------------------------------------------------
238 15 : void ObjectStore::attach(Object* object, const uint128_t& id,
239 : const uint32_t instanceID)
240 : {
241 15 : LBASSERT(object);
242 15 : LB_TS_NOT_THREAD(_receiverThread);
243 :
244 30 : lunchbox::Request<void> request = _localNode->registerRequest<void>(object);
245 15 : _localNode->send(CMD_NODE_ATTACH_OBJECT) << id << instanceID << request;
246 15 : }
247 :
248 : namespace
249 : {
250 51 : uint32_t _genNextID(lunchbox::a_int32_t& val)
251 : {
252 : uint32_t result;
253 0 : do
254 : {
255 51 : const long id = ++val;
256 51 : result = static_cast<uint32_t>(static_cast<int64_t>(id) + 0x7FFFFFFFu);
257 51 : } while (result > CO_INSTANCE_MAX);
258 :
259 51 : return result;
260 : }
261 : }
262 :
263 51 : void ObjectStore::_attach(Object* object, const uint128_t& id,
264 : const uint32_t inInstanceID)
265 : {
266 51 : LBASSERT(object);
267 51 : LB_TS_THREAD(_receiverThread);
268 :
269 51 : uint32_t instanceID = inInstanceID;
270 51 : if (inInstanceID == CO_INSTANCE_INVALID)
271 15 : instanceID = _genNextID(_instanceIDs);
272 :
273 51 : object->attach(id, instanceID);
274 :
275 : {
276 102 : lunchbox::ScopedFastWrite mutex(_objects);
277 51 : Objects& objects = _objects.data[id];
278 51 : LBASSERTINFO(!object->isMaster() || objects.empty(),
279 : "Attaching master "
280 : << *object << ", " << objects.size()
281 : << " attached objects with same ID, "
282 : << "first is "
283 : << (objects[0]->isMaster() ? "master " : "slave ")
284 : << *objects[0]);
285 51 : objects.push_back(object);
286 : }
287 :
288 51 : _localNode->flushCommands(); // redispatch pending commands
289 :
290 51 : LBLOG(LOG_OBJECTS) << "attached " << *object << " @"
291 51 : << static_cast<void*>(object) << std::endl;
292 51 : }
293 :
294 17 : void ObjectStore::detach(Object* object)
295 : {
296 17 : LBASSERT(object);
297 17 : LB_TS_NOT_THREAD(_receiverThread);
298 :
299 34 : lunchbox::Request<void> request = _localNode->registerRequest<void>();
300 34 : _localNode->send(CMD_NODE_DETACH_OBJECT)
301 51 : << object->getID() << object->getInstanceID() << request;
302 17 : }
303 :
304 0 : void ObjectStore::swap(Object* oldObject, Object* newObject)
305 : {
306 0 : LBASSERT(newObject);
307 0 : LBASSERT(oldObject);
308 0 : LBASSERT(oldObject->isMaster());
309 0 : LB_TS_THREAD(_receiverThread);
310 :
311 0 : if (!oldObject->isAttached())
312 0 : return;
313 :
314 0 : LBLOG(LOG_OBJECTS) << "Swap " << lunchbox::className(oldObject)
315 0 : << std::endl;
316 0 : const uint128_t& id = oldObject->getID();
317 :
318 0 : lunchbox::ScopedFastWrite mutex(_objects);
319 0 : ObjectsHash::iterator i = _objects->find(id);
320 0 : LBASSERT(i != _objects->end());
321 0 : if (i == _objects->end())
322 0 : return;
323 :
324 0 : Objects& objects = i->second;
325 0 : Objects::iterator j = find(objects.begin(), objects.end(), oldObject);
326 0 : LBASSERT(j != objects.end());
327 0 : if (j == objects.end())
328 0 : return;
329 :
330 0 : newObject->transfer(oldObject);
331 0 : *j = newObject;
332 : }
333 :
334 50 : void ObjectStore::_detach(Object* object)
335 : {
336 : // check also _cmdUnmapObject when modifying!
337 50 : LBASSERT(object);
338 50 : LB_TS_THREAD(_receiverThread);
339 :
340 50 : if (!object->isAttached())
341 0 : return;
342 :
343 50 : const uint128_t& id = object->getID();
344 :
345 50 : LBASSERT(_objects->find(id) != _objects->end());
346 50 : LBLOG(LOG_OBJECTS) << "Detach " << *object << std::endl;
347 :
348 50 : Objects& objects = _objects.data[id];
349 50 : Objects::iterator i = find(objects.begin(), objects.end(), object);
350 50 : LBASSERT(i != objects.end());
351 :
352 : {
353 100 : lunchbox::ScopedFastWrite mutex(_objects);
354 50 : objects.erase(i);
355 50 : if (objects.empty())
356 49 : _objects->erase(id);
357 : }
358 :
359 50 : LBASSERT(object->getInstanceID() != CO_INSTANCE_INVALID);
360 50 : object->detach();
361 50 : return;
362 : }
363 :
364 36 : uint32_t ObjectStore::mapNB(Object* object, const uint128_t& id,
365 : const uint128_t& version, NodePtr master)
366 : {
367 36 : LB_TS_NOT_THREAD(_receiverThread);
368 72 : LBLOG(LOG_OBJECTS) << "Mapping " << lunchbox::className(object) << " to id "
369 108 : << id << " version " << version << std::endl;
370 36 : LBASSERT(object);
371 36 : LBASSERTINFO(id.isUUID(), id);
372 :
373 36 : if (!master)
374 34 : master = _localNode->connectObjectMaster(id);
375 :
376 36 : if (!master || !master->isReachable())
377 : {
378 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
379 0 : << std::endl;
380 0 : return LB_UNDEFINED_UINT32;
381 : }
382 :
383 36 : if (!object || !id.isUUID())
384 : {
385 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
386 0 : return LB_UNDEFINED_UINT32;
387 : }
388 :
389 36 : const bool isAttached = object->isAttached();
390 36 : const bool isMaster = object->isMaster();
391 36 : LBASSERTINFO(!isAttached, *object);
392 36 : LBASSERT(!isMaster);
393 36 : if (isAttached || isMaster)
394 : {
395 0 : LBWARN << "Invalid object state: attached " << isAttached << " master "
396 0 : << isMaster << std::endl;
397 0 : return LB_UNDEFINED_UINT32;
398 : }
399 :
400 36 : const uint32_t request = _localNode->registerRequest(object);
401 36 : uint128_t minCachedVersion = VERSION_HEAD;
402 36 : uint128_t maxCachedVersion = VERSION_NONE;
403 36 : uint32_t masterInstanceID = 0;
404 : const bool useCache =
405 36 : _checkInstanceCache(id, minCachedVersion, maxCachedVersion,
406 36 : masterInstanceID);
407 36 : object->notifyAttach();
408 72 : master->send(CMD_NODE_MAP_OBJECT)
409 36 : << version << minCachedVersion << maxCachedVersion << id
410 108 : << object->getMaxVersions() << request << _genNextID(_instanceIDs)
411 36 : << masterInstanceID << useCache;
412 36 : return request;
413 : }
414 :
415 40 : bool ObjectStore::_checkInstanceCache(const uint128_t& id, uint128_t& from,
416 : uint128_t& to, uint32_t& instanceID)
417 : {
418 40 : if (!_instanceCache)
419 0 : return false;
420 :
421 40 : const InstanceCache::Data& cached = (*_instanceCache)[id];
422 40 : if (cached == InstanceCache::Data::NONE)
423 40 : return false;
424 :
425 0 : const ObjectDataIStreamDeque& versions = cached.versions;
426 0 : LBASSERT(!cached.versions.empty());
427 0 : instanceID = cached.masterInstanceID;
428 0 : from = versions.front()->getVersion();
429 0 : to = versions.back()->getVersion();
430 0 : LBLOG(LOG_OBJECTS) << "Object " << id << " have v" << from << ".." << to
431 0 : << std::endl;
432 0 : return true;
433 : }
434 :
435 36 : bool ObjectStore::mapSync(const uint32_t requestID)
436 : {
437 36 : if (requestID == LB_UNDEFINED_UINT32)
438 0 : return false;
439 :
440 36 : void* data = _localNode->getRequestData(requestID);
441 36 : if (data == 0)
442 0 : return false;
443 :
444 36 : Object* object = LBSAFECAST(Object*, data);
445 36 : uint128_t version = VERSION_NONE;
446 36 : _localNode->waitRequest(requestID, version);
447 :
448 36 : const bool mapped = object->isAttached();
449 36 : if (mapped)
450 36 : object->applyMapData(version); // apply initial instance data
451 :
452 36 : object->notifyAttached();
453 36 : LBLOG(LOG_OBJECTS) << "Mapped " << lunchbox::className(object) << std::endl;
454 36 : return mapped;
455 : }
456 :
457 4 : f_bool_t ObjectStore::sync(Object* object, const uint128_t& id, NodePtr master,
458 : const uint32_t instanceID)
459 : {
460 4 : const uint32_t request = _startSync(object, id, master, instanceID);
461 : const FuturebImpl::Func& func =
462 8 : boost::bind(&ObjectStore::_finishSync, this, request, object);
463 8 : return f_bool_t(new FuturebImpl(func));
464 : }
465 :
466 4 : uint32_t ObjectStore::_startSync(Object* object, const uint128_t& id,
467 : NodePtr master, const uint32_t instanceID)
468 : {
469 4 : LB_TS_NOT_THREAD(_receiverThread);
470 8 : LBLOG(LOG_OBJECTS) << "Syncing " << lunchbox::className(object)
471 12 : << " with id " << id << std::endl;
472 4 : LBASSERT(object);
473 4 : LBASSERTINFO(id.isUUID(), id);
474 :
475 4 : if (!object || !id.isUUID())
476 : {
477 0 : LBWARN << "Invalid object " << object << " or id " << id << std::endl;
478 0 : return LB_UNDEFINED_UINT32;
479 : }
480 :
481 4 : if (!master)
482 0 : master = _localNode->connectObjectMaster(id);
483 :
484 4 : if (!master || !master->isReachable())
485 : {
486 0 : LBWARN << "Mapping of object " << id << " failed, invalid master node"
487 0 : << std::endl;
488 0 : return LB_UNDEFINED_UINT32;
489 : }
490 :
491 4 : const uint32_t request = _localNode->registerRequest(new ObjectDataIStream);
492 4 : uint128_t minCachedVersion = VERSION_HEAD;
493 4 : uint128_t maxCachedVersion = VERSION_NONE;
494 4 : uint32_t cacheInstanceID = 0;
495 :
496 4 : bool useCache = _checkInstanceCache(id, minCachedVersion, maxCachedVersion,
497 4 : cacheInstanceID);
498 4 : if (useCache)
499 : {
500 0 : switch (instanceID)
501 : {
502 : case CO_INSTANCE_ALL:
503 0 : break;
504 : default:
505 0 : if (instanceID == cacheInstanceID)
506 0 : break;
507 :
508 0 : useCache = false;
509 0 : LBCHECK(_instanceCache->release(id, 1));
510 0 : break;
511 : }
512 : }
513 :
514 : // Use stream expected by MasterCMCommand
515 8 : master->send(CMD_NODE_SYNC_OBJECT)
516 4 : << VERSION_NEWEST << minCachedVersion << maxCachedVersion << id
517 12 : << uint64_t(0) /* maxVersions */ << request << instanceID
518 4 : << cacheInstanceID << useCache;
519 4 : return request;
520 : }
521 :
522 4 : bool ObjectStore::_finishSync(const uint32_t requestID, Object* object)
523 : {
524 4 : if (requestID == LB_UNDEFINED_UINT32)
525 0 : return false;
526 :
527 4 : void* data = _localNode->getRequestData(requestID);
528 4 : if (data == 0)
529 0 : return false;
530 :
531 4 : ObjectDataIStream* is = LBSAFECAST(ObjectDataIStream*, data);
532 :
533 4 : bool ok = false;
534 4 : _localNode->waitRequest(requestID, ok);
535 :
536 4 : if (!ok)
537 : {
538 0 : LBWARN << "Object synchronization failed" << std::endl;
539 0 : delete is;
540 0 : return false;
541 : }
542 :
543 4 : is->waitReady();
544 4 : object->applyInstanceData(*is);
545 4 : LBLOG(LOG_OBJECTS) << "Synced " << lunchbox::className(object) << std::endl;
546 4 : delete is;
547 4 : return true;
548 : }
549 :
550 36 : void ObjectStore::unmap(Object* object)
551 : {
552 36 : LBASSERT(object);
553 36 : if (!object->isAttached()) // not registered
554 35 : return;
555 :
556 35 : const uint128_t& id = object->getID();
557 :
558 35 : LBLOG(LOG_OBJECTS) << "Unmap " << object << std::endl;
559 :
560 35 : object->notifyDetach();
561 :
562 : // send unsubscribe to master, master will send detach command.
563 35 : LBASSERT(!object->isMaster());
564 35 : LB_TS_NOT_THREAD(_commandThread);
565 :
566 35 : const uint32_t masterInstanceID = object->getMasterInstanceID();
567 35 : if (masterInstanceID != CO_INSTANCE_INVALID)
568 : {
569 33 : NodePtr master = object->getMasterNode();
570 33 : LBASSERT(master)
571 :
572 33 : if (master && master->isReachable())
573 : {
574 : lunchbox::Request<void> request =
575 66 : _localNode->registerRequest<void>();
576 66 : master->send(CMD_NODE_UNSUBSCRIBE_OBJECT)
577 99 : << id << request << masterInstanceID << object->getInstanceID();
578 33 : request.wait();
579 33 : object->notifyDetached();
580 33 : return;
581 : }
582 0 : LBERROR << "Master node for object id " << id << " not connected"
583 0 : << std::endl;
584 : }
585 :
586 : // no unsubscribe sent: Detach directly
587 2 : detach(object);
588 2 : object->setupChangeManager(Object::NONE, false, 0, CO_INSTANCE_INVALID);
589 2 : object->notifyDetached();
590 : }
591 :
592 15 : bool ObjectStore::register_(Object* object)
593 : {
594 15 : LBASSERT(object);
595 15 : LBASSERT(!object->isAttached());
596 :
597 15 : const uint128_t& id = object->getID();
598 15 : LBASSERTINFO(id.isUUID(), id);
599 :
600 15 : object->notifyAttach();
601 30 : object->setupChangeManager(object->getChangeType(), true, _localNode,
602 15 : CO_INSTANCE_INVALID);
603 15 : attach(object, id, CO_INSTANCE_INVALID);
604 :
605 15 : if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
606 15 : _localNode->send(CMD_NODE_REGISTER_OBJECT) << object;
607 :
608 15 : object->notifyAttached();
609 :
610 15 : LBLOG(LOG_OBJECTS) << "Registered " << object << std::endl;
611 15 : return true;
612 : }
613 :
614 15 : void ObjectStore::deregister(Object* object)
615 : {
616 15 : LBASSERT(object);
617 15 : if (!object->isAttached()) // not registered
618 0 : return;
619 :
620 15 : LBLOG(LOG_OBJECTS) << "Deregister " << *object << std::endl;
621 15 : LBASSERT(object->isMaster());
622 :
623 15 : object->notifyDetach();
624 :
625 15 : if (Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE) > 0)
626 : {
627 : // remove from send queue
628 30 : lunchbox::Request<void> request = _localNode->registerRequest<void>();
629 15 : _localNode->send(CMD_NODE_DEREGISTER_OBJECT) << request;
630 : }
631 :
632 15 : const uint128_t id = object->getID();
633 15 : detach(object);
634 15 : object->setupChangeManager(Object::NONE, true, 0, CO_INSTANCE_INVALID);
635 15 : if (_instanceCache)
636 15 : _instanceCache->erase(id);
637 15 : object->notifyDetached();
638 : }
639 :
640 51502 : bool ObjectStore::notifyCommandThreadIdle()
641 : {
642 51502 : LB_TS_THREAD(_commandThread);
643 51502 : if (_sendQueue.empty())
644 51502 : return false;
645 :
646 0 : LBASSERT(_sendOnRegister > 0);
647 0 : SendQueueItem& item = _sendQueue.front();
648 :
649 0 : if (item.age > _localNode->getTime64())
650 : {
651 0 : const Nodes& nodes = _localNode->getNodes(false);
652 0 : if (nodes.empty())
653 : {
654 0 : lunchbox::Thread::yield();
655 0 : return !_sendQueue.empty();
656 : }
657 :
658 0 : item.object->sendInstanceData(nodes);
659 : }
660 0 : _sendQueue.pop_front();
661 0 : return !_sendQueue.empty();
662 : }
663 :
664 7 : void ObjectStore::removeNode(NodePtr node)
665 : {
666 14 : lunchbox::Request<void> request = _localNode->registerRequest<void>();
667 7 : _localNode->send(CMD_NODE_REMOVE_NODE) << node.get() << request;
668 7 : }
669 :
670 : //===========================================================================
671 : // ICommand handling
672 : //===========================================================================
673 541 : bool ObjectStore::dispatchObjectCommand(ICommand& cmd)
674 : {
675 541 : LB_TS_THREAD(_receiverThread);
676 1082 : ObjectICommand command(cmd);
677 541 : const uint128_t& id = command.getObjectID();
678 541 : const uint32_t instanceID = command.getInstanceID();
679 :
680 541 : ObjectsHash::const_iterator i = _objects->find(id);
681 :
682 541 : if (i == _objects->end())
683 : // When the instance ID is set to none, we only care about the command
684 : // when we have an object of the given ID (multicast)
685 0 : return (instanceID == CO_INSTANCE_NONE);
686 :
687 541 : const Objects& objects = i->second;
688 541 : LBASSERTINFO(!objects.empty(), command);
689 :
690 541 : if (instanceID <= CO_INSTANCE_MAX)
691 : {
692 54 : for (Objects::const_iterator j = objects.begin(); j != objects.end();
693 : ++j)
694 : {
695 54 : Object* object = *j;
696 54 : if (instanceID == object->getInstanceID())
697 : {
698 48 : LBCHECK(object->dispatchCommand(command));
699 48 : return true;
700 : }
701 : }
702 0 : LBERROR << "Can't find object instance " << instanceID << " for "
703 0 : << command << std::endl;
704 0 : LBUNREACHABLE;
705 0 : return false;
706 : }
707 :
708 493 : Objects::const_iterator j = objects.begin();
709 493 : Object* object = *j;
710 493 : LBCHECK(object->dispatchCommand(command));
711 :
712 493 : for (++j; j != objects.end(); ++j)
713 : {
714 0 : object = *j;
715 0 : LBCHECK(object->dispatchCommand(command));
716 : }
717 493 : return true;
718 : }
719 :
720 34 : bool ObjectStore::_cmdFindMasterNodeID(ICommand& command)
721 : {
722 34 : LB_TS_THREAD(_commandThread);
723 :
724 34 : const uint128_t& id = command.get<uint128_t>();
725 34 : const uint32_t requestID = command.get<uint32_t>();
726 34 : LBASSERT(id.isUUID());
727 :
728 34 : NodeID masterNodeID;
729 : {
730 68 : lunchbox::ScopedFastRead mutex(_objects);
731 34 : ObjectsHashCIter i = _objects->find(id);
732 :
733 34 : if (i != _objects->end())
734 : {
735 34 : const Objects& objects = i->second;
736 34 : LBASSERT(!objects.empty());
737 :
738 34 : for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
739 : {
740 34 : Object* object = *j;
741 34 : if (object->isMaster())
742 34 : masterNodeID = _localNode->getNodeID();
743 : else
744 : {
745 0 : NodePtr master = object->getMasterNode();
746 0 : if (master.isValid())
747 0 : masterNodeID = master->getNodeID();
748 : }
749 34 : if (masterNodeID != 0)
750 34 : break;
751 : }
752 : }
753 : }
754 :
755 34 : LBLOG(LOG_OBJECTS) << "Object " << id << " master " << masterNodeID
756 34 : << " req " << requestID << std::endl;
757 68 : command.getNode()->send(CMD_NODE_FIND_MASTER_NODE_ID_REPLY) << masterNodeID
758 34 : << requestID;
759 34 : return true;
760 : }
761 :
762 34 : bool ObjectStore::_cmdFindMasterNodeIDReply(ICommand& command)
763 : {
764 34 : const NodeID& masterNodeID = command.get<NodeID>();
765 34 : const uint32_t requestID = command.get<uint32_t>();
766 34 : _localNode->serveRequest(requestID, masterNodeID);
767 34 : return true;
768 : }
769 :
770 15 : bool ObjectStore::_cmdAttach(ICommand& command)
771 : {
772 15 : LB_TS_THREAD(_receiverThread);
773 15 : LBLOG(LOG_OBJECTS) << "Cmd attach object " << command << std::endl;
774 :
775 15 : const uint128_t& objectID = command.get<uint128_t>();
776 15 : const uint32_t instanceID = command.get<uint32_t>();
777 15 : const uint32_t requestID = command.get<uint32_t>();
778 :
779 : Object* object =
780 15 : static_cast<Object*>(_localNode->getRequestData(requestID));
781 15 : _attach(object, objectID, instanceID);
782 15 : _localNode->serveRequest(requestID);
783 15 : return true;
784 : }
785 :
786 50 : bool ObjectStore::_cmdDetach(ICommand& command)
787 : {
788 50 : LB_TS_THREAD(_receiverThread);
789 50 : LBLOG(LOG_OBJECTS) << "Cmd detach object " << command << std::endl;
790 :
791 50 : const uint128_t& objectID = command.get<uint128_t>();
792 50 : const uint32_t instanceID = command.get<uint32_t>();
793 50 : const uint32_t requestID = command.get<uint32_t>();
794 :
795 50 : ObjectsHash::const_iterator i = _objects->find(objectID);
796 50 : if (i != _objects->end())
797 : {
798 50 : const Objects& objects = i->second;
799 :
800 51 : for (Objects::const_iterator j = objects.begin(); j != objects.end();
801 : ++j)
802 : {
803 51 : Object* object = *j;
804 51 : if (object->getInstanceID() == instanceID)
805 : {
806 50 : _detach(object);
807 50 : break;
808 : }
809 : }
810 : }
811 :
812 50 : LBASSERT(requestID != LB_UNDEFINED_UINT32);
813 50 : _localNode->serveRequest(requestID);
814 50 : return true;
815 : }
816 :
817 15 : bool ObjectStore::_cmdRegister(ICommand& command)
818 : {
819 15 : LB_TS_THREAD(_commandThread);
820 15 : if (_sendOnRegister <= 0)
821 15 : return true;
822 :
823 0 : LBLOG(LOG_OBJECTS) << "Cmd register object " << command << std::endl;
824 :
825 0 : Object* object = reinterpret_cast<Object*>(command.get<void*>());
826 :
827 : const int32_t age =
828 0 : Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_AGE);
829 0 : SendQueueItem item;
830 0 : item.age = age ? age + _localNode->getTime64()
831 : : std::numeric_limits<int64_t>::max();
832 0 : item.object = object;
833 0 : _sendQueue.push_back(item);
834 :
835 : const uint32_t size =
836 0 : Global::getIAttribute(Global::IATTR_NODE_SEND_QUEUE_SIZE);
837 0 : while (_sendQueue.size() > size)
838 0 : _sendQueue.pop_front();
839 :
840 0 : return true;
841 : }
842 :
843 15 : bool ObjectStore::_cmdDeregister(ICommand& command)
844 : {
845 15 : LB_TS_THREAD(_commandThread);
846 15 : LBLOG(LOG_OBJECTS) << "Cmd deregister object " << command << std::endl;
847 :
848 15 : const uint32_t requestID = command.get<uint32_t>();
849 :
850 15 : const void* object = _localNode->getRequestData(requestID);
851 :
852 15 : for (SendQueueIter i = _sendQueue.begin(); i != _sendQueue.end(); ++i)
853 : {
854 0 : if (i->object == object)
855 : {
856 0 : _sendQueue.erase(i);
857 0 : break;
858 : }
859 : }
860 :
861 15 : _localNode->serveRequest(requestID);
862 15 : return true;
863 : }
864 :
865 36 : bool ObjectStore::_cmdMap(ICommand& cmd)
866 : {
867 36 : LB_TS_THREAD(_commandThread);
868 :
869 72 : MasterCMCommand command(cmd);
870 36 : const uint128_t& id = command.getObjectID();
871 :
872 36 : LBLOG(LOG_OBJECTS) << "Cmd map object " << command << " id " << id << "."
873 0 : << command.getInstanceID() << " req "
874 36 : << command.getRequestID() << std::endl;
875 :
876 72 : ObjectCMPtr masterCM;
877 : {
878 72 : lunchbox::ScopedFastRead mutex(_objects);
879 36 : ObjectsHash::const_iterator i = _objects->find(id);
880 36 : if (i != _objects->end())
881 : {
882 36 : const Objects& objects = i->second;
883 :
884 36 : for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
885 : {
886 36 : Object* object = *j;
887 36 : if (object->isMaster())
888 : {
889 36 : masterCM = object->_getChangeManager();
890 36 : break;
891 : }
892 : }
893 : }
894 : }
895 :
896 36 : if (!masterCM || !masterCM->addSlave(command))
897 : {
898 0 : LBWARN << "Can't find master object to map " << id << std::endl;
899 0 : NodePtr node = command.getNode();
900 0 : node->send(CMD_NODE_MAP_OBJECT_REPLY)
901 0 : << node->getNodeID() << id << command.getRequestedVersion()
902 0 : << command.getRequestID() << false << command.useCache() << false;
903 : }
904 :
905 36 : ++_counters[LocalNode::COUNTER_MAP_OBJECT_REMOTE];
906 72 : return true;
907 : }
908 :
909 36 : bool ObjectStore::_cmdMapSuccess(ICommand& command)
910 : {
911 36 : LB_TS_THREAD(_receiverThread);
912 :
913 36 : const uint128_t& nodeID = command.get<uint128_t>();
914 36 : const uint128_t& objectID = command.get<uint128_t>();
915 36 : const uint32_t requestID = command.get<uint32_t>();
916 36 : const uint32_t instanceID = command.get<uint32_t>();
917 36 : const Object::ChangeType changeType = command.get<Object::ChangeType>();
918 36 : const uint32_t masterInstanceID = command.get<uint32_t>();
919 :
920 : // Map success commands are potentially multicasted (see above)
921 : // verify that we are the intended receiver
922 36 : if (nodeID != _localNode->getNodeID())
923 0 : return true;
924 :
925 36 : LBLOG(LOG_OBJECTS) << "Cmd map object success " << command << " id "
926 0 : << objectID << "." << instanceID << " req " << requestID
927 36 : << std::endl;
928 :
929 : // set up change manager and attach object to dispatch table
930 : Object* object =
931 36 : static_cast<Object*>(_localNode->getRequestData(requestID));
932 36 : LBASSERT(object);
933 36 : LBASSERT(!object->isMaster());
934 :
935 36 : object->setupChangeManager(changeType, false, _localNode, masterInstanceID);
936 36 : _attach(object, objectID, instanceID);
937 36 : return true;
938 : }
939 :
940 36 : bool ObjectStore::_cmdMapReply(ICommand& command)
941 : {
942 36 : LB_TS_THREAD(_receiverThread);
943 :
944 : // Map reply commands are potentially multicasted (see above)
945 : // verify that we are the intended receiver
946 36 : if (command.get<uint128_t>() != _localNode->getNodeID())
947 0 : return true;
948 :
949 36 : const uint128_t& objectID = command.get<uint128_t>();
950 36 : const uint128_t& version = command.get<uint128_t>();
951 36 : const uint32_t requestID = command.get<uint32_t>();
952 36 : const bool result = command.get<bool>();
953 36 : const bool releaseCache = command.get<bool>();
954 36 : const bool useCache = command.get<bool>();
955 :
956 36 : LBLOG(LOG_OBJECTS) << "Cmd map object reply " << command << " id "
957 36 : << objectID << " req " << requestID << std::endl;
958 :
959 36 : LBASSERT(_localNode->getRequestData(requestID));
960 :
961 36 : if (result)
962 : {
963 : Object* object =
964 36 : static_cast<Object*>(_localNode->getRequestData(requestID));
965 36 : LBASSERT(object);
966 36 : LBASSERT(!object->isMaster());
967 :
968 36 : object->setMasterNode(command.getNode());
969 :
970 36 : if (useCache)
971 : {
972 0 : LBASSERT(releaseCache);
973 0 : LBASSERT(_instanceCache);
974 :
975 0 : const uint128_t& id = objectID;
976 0 : const InstanceCache::Data& cached = (*_instanceCache)[id];
977 0 : LBASSERT(cached != InstanceCache::Data::NONE);
978 0 : LBASSERT(!cached.versions.empty());
979 :
980 0 : object->addInstanceDatas(cached.versions, version);
981 0 : LBCHECK(_instanceCache->release(id, 2));
982 : }
983 36 : else if (releaseCache)
984 : {
985 0 : LBCHECK(_instanceCache->release(objectID, 1));
986 : }
987 : }
988 : else
989 : {
990 0 : if (releaseCache)
991 0 : _instanceCache->release(objectID, 1);
992 :
993 0 : LBWARN << "Could not map object " << objectID << std::endl;
994 : }
995 :
996 36 : _localNode->serveRequest(requestID, version);
997 36 : return true;
998 : }
999 :
1000 33 : bool ObjectStore::_cmdUnsubscribe(ICommand& command)
1001 : {
1002 33 : LB_TS_THREAD(_commandThread);
1003 33 : LBLOG(LOG_OBJECTS) << "Cmd unsubscribe object " << command << std::endl;
1004 :
1005 33 : const uint128_t& id = command.get<uint128_t>();
1006 33 : const uint32_t requestID = command.get<uint32_t>();
1007 33 : const uint32_t masterInstanceID = command.get<uint32_t>();
1008 33 : const uint32_t slaveInstanceID = command.get<uint32_t>();
1009 :
1010 66 : NodePtr node = command.getNode();
1011 :
1012 : {
1013 66 : lunchbox::ScopedFastWrite mutex(_objects);
1014 33 : ObjectsHash::const_iterator i = _objects->find(id);
1015 33 : if (i != _objects->end())
1016 : {
1017 33 : const Objects& objects = i->second;
1018 33 : for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
1019 : {
1020 33 : Object* object = *j;
1021 66 : if (object->isMaster() &&
1022 33 : object->getInstanceID() == masterInstanceID)
1023 : {
1024 33 : object->removeSlave(node, slaveInstanceID);
1025 33 : break;
1026 : }
1027 : }
1028 : }
1029 : }
1030 :
1031 33 : node->send(CMD_NODE_DETACH_OBJECT) << id << slaveInstanceID << requestID;
1032 66 : return true;
1033 : }
1034 :
1035 1 : bool ObjectStore::_cmdUnmap(ICommand& command)
1036 : {
1037 1 : LB_TS_THREAD(_receiverThread);
1038 1 : LBLOG(LOG_OBJECTS) << "Cmd unmap object " << command << std::endl;
1039 :
1040 1 : const uint128_t& objectID = command.get<uint128_t>();
1041 :
1042 1 : if (_instanceCache)
1043 1 : _instanceCache->erase(objectID);
1044 :
1045 1 : ObjectsHash::iterator i = _objects->find(objectID);
1046 1 : if (i == _objects->end()) // nothing to do
1047 0 : return true;
1048 :
1049 2 : const Objects objects = i->second;
1050 : {
1051 2 : lunchbox::ScopedFastWrite mutex(_objects);
1052 1 : _objects->erase(i);
1053 : }
1054 :
1055 2 : for (Objects::const_iterator j = objects.begin(); j != objects.end(); ++j)
1056 : {
1057 1 : Object* object = *j;
1058 1 : object->detach();
1059 : }
1060 :
1061 1 : return true;
1062 : }
1063 :
1064 4 : bool ObjectStore::_cmdSync(ICommand& cmd)
1065 : {
1066 4 : LB_TS_THREAD(_commandThread);
1067 8 : MasterCMCommand command(cmd);
1068 4 : const uint128_t& id = command.getObjectID();
1069 4 : LBINFO << command.getNode() << std::endl;
1070 4 : LBLOG(LOG_OBJECTS) << "Cmd sync object id " << id << "."
1071 0 : << command.getInstanceID() << " req "
1072 4 : << command.getRequestID() << std::endl;
1073 :
1074 4 : const uint32_t cacheInstanceID = command.getMasterInstanceID();
1075 8 : ObjectCMPtr cm;
1076 : {
1077 8 : lunchbox::ScopedFastRead mutex(_objects);
1078 4 : ObjectsHash::const_iterator i = _objects->find(id);
1079 4 : if (i != _objects->end())
1080 : {
1081 4 : const Objects& objects = i->second;
1082 :
1083 8 : for (Object* object : objects)
1084 : {
1085 4 : if (command.getInstanceID() == object->getInstanceID())
1086 : {
1087 0 : cm = object->_getChangeManager();
1088 0 : LBASSERT(cm);
1089 0 : break;
1090 : }
1091 :
1092 4 : if (command.getInstanceID() != CO_INSTANCE_ALL)
1093 0 : continue;
1094 :
1095 4 : cm = object->_getChangeManager();
1096 4 : LBASSERT(cm);
1097 4 : if (cacheInstanceID == object->getInstanceID())
1098 0 : break;
1099 : }
1100 4 : if (!cm)
1101 0 : LBWARN << "Can't find object to sync " << id << "."
1102 0 : << command.getInstanceID() << " in " << objects.size()
1103 0 : << " instances" << std::endl;
1104 : }
1105 4 : if (!cm)
1106 0 : LBWARN << "Can't find object to sync " << id
1107 0 : << ", no object with identifier" << std::endl;
1108 : }
1109 4 : if (!cm || !cm->sendSync(command))
1110 : {
1111 0 : NodePtr node = command.getNode();
1112 0 : node->send(CMD_NODE_SYNC_OBJECT_REPLY)
1113 0 : << node->getNodeID() << id << command.getRequestID() << false
1114 0 : << command.useCache() << false;
1115 : }
1116 8 : return true;
1117 : }
1118 :
1119 4 : bool ObjectStore::_cmdSyncReply(ICommand& command)
1120 : {
1121 4 : LB_TS_THREAD(_receiverThread);
1122 :
1123 : // Sync reply commands are potentially multicasted (see above)
1124 : // verify that we are the intended receiver
1125 4 : if (command.get<uint128_t>() != _localNode->getNodeID())
1126 0 : return true;
1127 :
1128 4 : const NodeID& id = command.get<NodeID>();
1129 4 : const uint32_t requestID = command.get<uint32_t>();
1130 4 : const bool result = command.get<bool>();
1131 4 : const bool releaseCache = command.get<bool>();
1132 4 : const bool useCache = command.get<bool>();
1133 4 : void* const data = _localNode->getRequestData(requestID);
1134 4 : ObjectDataIStream* const is = LBSAFECAST(ObjectDataIStream*, data);
1135 :
1136 4 : LBLOG(LOG_OBJECTS) << "Cmd sync object reply " << command << " req "
1137 4 : << requestID << std::endl;
1138 4 : if (result)
1139 : {
1140 4 : if (useCache)
1141 : {
1142 0 : LBASSERT(releaseCache);
1143 0 : LBASSERT(_instanceCache);
1144 :
1145 0 : const InstanceCache::Data& cached = (*_instanceCache)[id];
1146 0 : LBASSERT(cached != InstanceCache::Data::NONE);
1147 0 : LBASSERT(!cached.versions.empty());
1148 :
1149 0 : *is = *cached.versions.back();
1150 0 : LBCHECK(_instanceCache->release(id, 2));
1151 : }
1152 4 : else if (releaseCache)
1153 : {
1154 0 : LBCHECK(_instanceCache->release(id, 1));
1155 : }
1156 : }
1157 : else
1158 : {
1159 0 : if (releaseCache)
1160 0 : _instanceCache->release(id, 1);
1161 :
1162 0 : LBWARN << "Could not sync object " << id << " request " << requestID
1163 0 : << std::endl;
1164 : }
1165 :
1166 4 : _localNode->serveRequest(requestID, result);
1167 4 : return true;
1168 : }
1169 :
1170 51 : bool ObjectStore::_cmdInstance(ICommand& inCommand)
1171 : {
1172 51 : LB_TS_THREAD(_receiverThread);
1173 51 : LBASSERT(_localNode);
1174 :
1175 102 : ObjectDataICommand command(inCommand);
1176 51 : const NodeID& nodeID = command.get<NodeID>();
1177 51 : const uint32_t masterInstanceID = command.get<uint32_t>();
1178 51 : const uint32_t cmd = command.getCommand();
1179 :
1180 51 : LBLOG(LOG_OBJECTS) << "Cmd instance " << command << " master "
1181 51 : << masterInstanceID << " node " << nodeID << std::endl;
1182 :
1183 51 : command.setType(COMMANDTYPE_OBJECT);
1184 51 : command.setCommand(CMD_OBJECT_INSTANCE);
1185 :
1186 51 : const uint128_t& version = command.getVersion();
1187 51 : if (_instanceCache && version.high() == 0)
1188 : {
1189 51 : const ObjectVersion rev(command.getObjectID(), version);
1190 : #ifndef CO_AGGRESSIVE_CACHING // Issue Equalizer#82:
1191 51 : if (cmd != CMD_NODE_OBJECT_INSTANCE_PUSH)
1192 : #endif
1193 44 : _instanceCache->add(rev, masterInstanceID, command, 0);
1194 : }
1195 :
1196 51 : switch (cmd)
1197 : {
1198 : case CMD_NODE_OBJECT_INSTANCE:
1199 0 : LBASSERT(nodeID == 0);
1200 0 : LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
1201 0 : return true;
1202 :
1203 : case CMD_NODE_OBJECT_INSTANCE_MAP:
1204 32 : if (nodeID != _localNode->getNodeID()) // not for me
1205 0 : return true;
1206 :
1207 32 : LBASSERT(command.getInstanceID() <= CO_INSTANCE_MAX);
1208 32 : return dispatchObjectCommand(command);
1209 :
1210 : case CMD_NODE_OBJECT_INSTANCE_COMMIT:
1211 4 : LBASSERT(nodeID == 0);
1212 4 : LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
1213 4 : return dispatchObjectCommand(command);
1214 :
1215 : case CMD_NODE_OBJECT_INSTANCE_PUSH:
1216 7 : LBASSERT(nodeID == 0);
1217 7 : LBASSERT(command.getInstanceID() == CO_INSTANCE_NONE);
1218 7 : _pushData.addDataCommand(command.getObjectID(), command);
1219 7 : return true;
1220 :
1221 : case CMD_NODE_OBJECT_INSTANCE_SYNC:
1222 : {
1223 8 : if (nodeID != _localNode->getNodeID()) // not for me
1224 0 : return true;
1225 :
1226 8 : void* data = _localNode->getRequestData(command.getInstanceID());
1227 8 : LBASSERT(command.getInstanceID() != CO_INSTANCE_NONE);
1228 8 : LBASSERTINFO(data, this);
1229 :
1230 8 : ObjectDataIStream* is = LBSAFECAST(ObjectDataIStream*, data);
1231 8 : is->addDataCommand(command);
1232 8 : return true;
1233 : }
1234 :
1235 : default:
1236 0 : LBUNREACHABLE;
1237 0 : return false;
1238 : }
1239 : }
1240 :
1241 0 : bool ObjectStore::_cmdDisableSendOnRegister(ICommand& command)
1242 : {
1243 0 : LB_TS_THREAD(_commandThread);
1244 0 : LBASSERTINFO(_sendOnRegister > 0, _sendOnRegister);
1245 :
1246 0 : if (--_sendOnRegister == 0)
1247 : {
1248 0 : _sendQueue.clear();
1249 :
1250 0 : const Nodes& nodes = _localNode->getNodes(false);
1251 0 : for (NodePtr node : nodes)
1252 : {
1253 0 : ConnectionPtr multicast = node->getConnection(true);
1254 0 : ConnectionPtr connection = node->getConnection(false);
1255 0 : if (multicast)
1256 0 : multicast->finish();
1257 0 : if (connection && connection != multicast)
1258 0 : connection->finish();
1259 : }
1260 : }
1261 :
1262 0 : const uint32_t requestID = command.get<uint32_t>();
1263 0 : _localNode->serveRequest(requestID);
1264 0 : return true;
1265 : }
1266 :
1267 40 : bool ObjectStore::_cmdRemoveNode(ICommand& command)
1268 : {
1269 40 : LB_TS_THREAD(_commandThread);
1270 40 : LBLOG(LOG_OBJECTS) << "Cmd object " << command << std::endl;
1271 :
1272 40 : Node* node = command.get<Node*>();
1273 40 : const uint32_t requestID = command.get<uint32_t>();
1274 :
1275 80 : lunchbox::ScopedFastWrite mutex(_objects);
1276 65 : for (ObjectsHashCIter i = _objects->begin(); i != _objects->end(); ++i)
1277 : {
1278 25 : const Objects& objects = i->second;
1279 50 : for (ObjectsCIter j = objects.begin(); j != objects.end(); ++j)
1280 25 : (*j)->removeSlaves(node);
1281 : }
1282 :
1283 40 : if (requestID != LB_UNDEFINED_UINT32)
1284 7 : _localNode->serveRequest(requestID);
1285 : else
1286 33 : node->unref(); // node was ref'd before LocalNode::_handleDisconnect()
1287 :
1288 80 : return true;
1289 : }
1290 :
1291 5 : bool ObjectStore::_cmdPush(ICommand& command)
1292 : {
1293 5 : LB_TS_THREAD(_commandThread);
1294 :
1295 5 : const uint128_t& objectID = command.get<uint128_t>();
1296 5 : const uint128_t& groupID = command.get<uint128_t>();
1297 5 : const uint128_t& typeID = command.get<uint128_t>();
1298 :
1299 5 : ObjectDataIStream* is = _pushData.pull(objectID);
1300 :
1301 5 : _localNode->objectPush(groupID, typeID, objectID, *is);
1302 5 : _pushData.recycle(is);
1303 5 : return true;
1304 : }
1305 :
1306 0 : std::ostream& operator<<(std::ostream& os, ObjectStore* objectStore)
1307 : {
1308 0 : if (!objectStore)
1309 : {
1310 0 : os << "NULL objectStore";
1311 0 : return os;
1312 : }
1313 :
1314 0 : os << "objectStore (" << (void*)objectStore << ")";
1315 :
1316 0 : return os;
1317 : }
1318 63 : }
|