Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectCM.h"
22 :
23 : #include "node.h"
24 : #include "nodeCommand.h"
25 : #include "nullCM.h"
26 : #include "object.h"
27 : #include "objectCommand.h"
28 : #include "objectDataOCommand.h"
29 : #include "objectInstanceDataOStream.h"
30 :
31 : #include <lunchbox/scopedMutex.h>
32 :
33 21 : co::ObjectCMPtr co::ObjectCM::ZERO = new co::NullCM;
34 :
35 : #ifdef CO_INSTRUMENT_MULTICAST
36 : lunchbox::a_int32_t co::ObjectCM::_hit(0);
37 : lunchbox::a_int32_t co::ObjectCM::_miss(0);
38 : #endif
39 :
40 : namespace co
41 : {
42 72 : ObjectCM::ObjectCM(Object* object)
43 72 : : _object(object)
44 : {
45 72 : }
46 :
47 72 : ObjectCM::~ObjectCM()
48 : {
49 72 : }
50 :
51 68 : void ObjectCM::exit()
52 : {
53 136 : lunchbox::ScopedFastWrite mutex(_lock);
54 68 : _object = 0;
55 68 : }
56 :
57 2 : void ObjectCM::push(const uint128_t& groupID, const uint128_t& typeID,
58 : const Nodes& nodes)
59 : {
60 2 : LBASSERT(_object);
61 2 : LBASSERT(!nodes.empty());
62 2 : if (nodes.empty())
63 : {
64 0 : LBWARN << "Push to an empty set of nodes" << std::endl;
65 0 : return;
66 : }
67 :
68 4 : ObjectInstanceDataOStream os(this);
69 2 : os.enablePush(getVersion(), nodes);
70 2 : _object->getInstanceData(os);
71 :
72 : // Send push notification to remote cmd thread while connections are valid
73 4 : OCommand(os.getConnections(), CMD_NODE_OBJECT_PUSH) << _object->getID()
74 2 : << groupID << typeID;
75 :
76 2 : os.disable(); // handled by remote recv thread
77 : }
78 :
79 4 : bool ObjectCM::sendSync(const MasterCMCommand& command)
80 : {
81 8 : lunchbox::ScopedFastWrite mutex(_lock);
82 4 : if (!_object)
83 : {
84 0 : LBWARN << "Sync from detached object requested" << std::endl;
85 0 : return false;
86 : }
87 :
88 4 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
89 : const bool useCache =
90 4 : command.useCache() &&
91 8 : command.getMasterInstanceID() == _object->getInstanceID() &&
92 4 : maxCachedVersion == getVersion();
93 :
94 4 : if (!useCache)
95 : {
96 8 : ObjectInstanceDataOStream os(this);
97 4 : os.enableSync(getVersion(), command);
98 4 : _object->getInstanceData(os);
99 4 : os.disable();
100 : }
101 8 : NodePtr node = command.getNode();
102 8 : node->send(CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/)
103 12 : << node->getNodeID() << command.getObjectID() << command.getRequestID()
104 12 : << true << command.useCache() << useCache;
105 4 : return true;
106 : }
107 :
108 36 : bool ObjectCM::_addSlave(const MasterCMCommand& command,
109 : const uint128_t& version)
110 : {
111 36 : LBASSERT(version != VERSION_NONE);
112 36 : LBASSERT(command.getType() == COMMANDTYPE_NODE);
113 36 : LBASSERT(command.getCommand() == CMD_NODE_MAP_OBJECT);
114 :
115 : // process request
116 36 : if (command.getRequestedVersion() == VERSION_NONE)
117 : {
118 : // no data to send, send empty version
119 4 : _sendMapSuccess(command, false /* mc */);
120 4 : _sendEmptyVersion(command, VERSION_NONE, false /* mc */);
121 4 : _sendMapReply(command, VERSION_NONE, true, false, false /* mc */);
122 4 : return true;
123 : }
124 :
125 : const bool replyUseCache =
126 32 : command.useCache() &&
127 32 : (command.getMasterInstanceID() == _object->getInstanceID());
128 32 : return _initSlave(command, version, replyUseCache);
129 : }
130 :
131 2 : bool ObjectCM::_initSlave(const MasterCMCommand& command,
132 : const uint128_t& replyVersion, bool replyUseCache)
133 : {
134 : #if 0
135 : LBLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
136 : << ", instantiate on " << node->getNodeID()
137 : << std::endl;
138 : #endif
139 :
140 : #ifndef NDEBUG
141 2 : const uint128_t& version = command.getRequestedVersion();
142 2 : if (version != VERSION_OLDEST && version < replyVersion)
143 0 : LBINFO << "Mapping version " << replyVersion << " instead of "
144 0 : << version << std::endl;
145 : #endif
146 :
147 2 : if (replyUseCache && command.getMinCachedVersion() <= replyVersion &&
148 0 : command.getMaxCachedVersion() >= replyVersion)
149 : {
150 : #ifdef CO_INSTRUMENT_MULTICAST
151 : ++_hit;
152 : #endif
153 0 : _sendMapSuccess(command, false);
154 0 : _sendMapReply(command, replyVersion, true, replyUseCache, false);
155 0 : return true;
156 : }
157 :
158 4 : lunchbox::ScopedFastWrite mutex(_lock);
159 2 : if (!_object)
160 : {
161 0 : LBWARN << "Map to detached object requested" << std::endl;
162 0 : return false;
163 : }
164 :
165 : #ifdef CO_INSTRUMENT_MULTICAST
166 : ++_miss;
167 : #endif
168 2 : replyUseCache = false;
169 :
170 2 : _sendMapSuccess(command, true);
171 :
172 : // send instance data
173 4 : ObjectInstanceDataOStream os(this);
174 2 : os.enableMap(replyVersion, command.getNode(), command.getInstanceID());
175 2 : _object->getInstanceData(os);
176 2 : os.disable();
177 2 : if (!os.hasSentData())
178 : // no data, send empty command to set version
179 0 : _sendEmptyVersion(command, replyVersion, true /* mc */);
180 :
181 2 : _sendMapReply(command, replyVersion, true, replyUseCache, true);
182 2 : return true;
183 : }
184 :
185 36 : void ObjectCM::_sendMapSuccess(const MasterCMCommand& command,
186 : const bool multicast)
187 : {
188 72 : command.getNode()->send(CMD_NODE_MAP_OBJECT_SUCCESS, multicast)
189 108 : << command.getNode()->getNodeID() << command.getObjectID()
190 108 : << command.getRequestID() << command.getInstanceID()
191 108 : << _object->getChangeType() << _object->getInstanceID();
192 36 : }
193 :
194 36 : void ObjectCM::_sendMapReply(const MasterCMCommand& command,
195 : const uint128_t& version, const bool result,
196 : const bool useCache, const bool multicast)
197 : {
198 72 : command.getNode()->send(CMD_NODE_MAP_OBJECT_REPLY, multicast)
199 108 : << command.getNode()->getNodeID() << command.getObjectID() << version
200 108 : << command.getRequestID() << result << command.useCache() << useCache;
201 36 : }
202 :
203 4 : void ObjectCM::_sendEmptyVersion(const MasterCMCommand& command,
204 : const uint128_t& version, const bool multicast)
205 : {
206 8 : NodePtr node = command.getNode();
207 8 : ConnectionPtr connection = node->getConnection(multicast);
208 :
209 12 : ObjectDataOCommand(Connections(1, connection), CMD_OBJECT_INSTANCE,
210 4 : COMMANDTYPE_OBJECT, _object->getID(),
211 : command.getInstanceID(), version, 0, 0, 0, true, 0)
212 12 : << NodeID() << _object->getInstanceID();
213 4 : }
214 63 : }
|