Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectCM.h"
22 :
23 : #include "nodeCommand.h"
24 : #include "nullCM.h"
25 : #include "node.h"
26 : #include "object.h"
27 : #include "objectCommand.h"
28 : #include "objectInstanceDataOStream.h"
29 : #include "objectDataOCommand.h"
30 :
31 : #include <lunchbox/scopedMutex.h>
32 :
33 22 : co::ObjectCMPtr co::ObjectCM::ZERO = new co::NullCM;
34 :
35 : #ifdef CO_INSTRUMENT_MULTICAST
36 : lunchbox::a_int32_t co::ObjectCM::_hit( 0 );
37 : lunchbox::a_int32_t co::ObjectCM::_miss( 0 );
38 : #endif
39 :
40 : namespace co
41 : {
42 85 : ObjectCM::ObjectCM( Object* object )
43 85 : : _object( object )
44 85 : {}
45 :
46 85 : ObjectCM::~ObjectCM()
47 85 : {}
48 :
49 92 : void ObjectCM::exit()
50 : {
51 184 : lunchbox::ScopedFastWrite mutex( _lock );
52 92 : _object = 0;
53 92 : }
54 :
55 2 : void ObjectCM::push( const uint128_t& groupID, const uint128_t& typeID,
56 : const Nodes& nodes )
57 : {
58 2 : LBASSERT( _object );
59 2 : LBASSERT( !nodes.empty( ));
60 2 : if( nodes.empty( ))
61 : {
62 0 : LBWARN << "Push to an empty set of nodes" << std::endl;
63 0 : return;
64 : }
65 :
66 4 : ObjectInstanceDataOStream os( this );
67 2 : os.enablePush( getVersion(), nodes );
68 2 : _object->getInstanceData( os );
69 :
70 : // Send push notification to remote cmd thread while connections are valid
71 4 : OCommand( os.getConnections(), CMD_NODE_OBJECT_PUSH )
72 4 : << _object->getID() << groupID << typeID;
73 :
74 2 : os.disable(); // handled by remote recv thread
75 : }
76 :
77 4 : bool ObjectCM::sendSync( const MasterCMCommand& command )
78 : {
79 8 : lunchbox::ScopedFastWrite mutex( _lock );
80 4 : if( !_object )
81 : {
82 0 : LBWARN << "Sync from detached object requested" << std::endl;
83 0 : return false;
84 : }
85 :
86 4 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
87 : const bool useCache =
88 4 : command.useCache() &&
89 8 : command.getMasterInstanceID() == _object->getInstanceID() &&
90 4 : maxCachedVersion == getVersion();
91 :
92 4 : if( !useCache )
93 : {
94 8 : ObjectInstanceDataOStream os( this );
95 4 : os.enableSync( getVersion(), command );
96 4 : _object->getInstanceData( os );
97 4 : os.disable();
98 : }
99 8 : NodePtr node = command.getNode();
100 8 : node->send( CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/ )
101 12 : << node->getNodeID() << command.getObjectID() << command.getRequestID()
102 12 : << true << command.useCache() << useCache;
103 4 : return true;
104 : }
105 :
106 42 : bool ObjectCM::_addSlave( const MasterCMCommand& command,
107 : const uint128_t& version )
108 : {
109 42 : LBASSERT( version != VERSION_NONE );
110 42 : LBASSERT( command.getType() == COMMANDTYPE_NODE );
111 42 : LBASSERT( command.getCommand() == CMD_NODE_MAP_OBJECT );
112 :
113 : // process request
114 42 : if( command.getRequestedVersion() == VERSION_NONE )
115 : {
116 : // no data to send, send empty version
117 4 : _sendMapSuccess( command, false /* mc */ );
118 4 : _sendEmptyVersion( command, VERSION_NONE, false /* mc */ );
119 4 : _sendMapReply( command, VERSION_NONE, true, false, false /* mc */ );
120 4 : return true;
121 : }
122 :
123 38 : const bool replyUseCache = command.useCache() &&
124 38 : (command.getMasterInstanceID() == _object->getInstanceID( ));
125 38 : return _initSlave( command, version, replyUseCache );
126 : }
127 :
128 8 : bool ObjectCM::_initSlave( const MasterCMCommand& command,
129 : const uint128_t& replyVersion, bool replyUseCache )
130 : {
131 : #if 0
132 : LBLOG( LOG_OBJECTS ) << "Object id " << _object->_id << " v" << _version
133 : << ", instantiate on " << node->getNodeID()
134 : << std::endl;
135 : #endif
136 :
137 : #ifndef NDEBUG
138 8 : const uint128_t& version = command.getRequestedVersion();
139 8 : if( version != VERSION_OLDEST && version < replyVersion )
140 0 : LBINFO << "Mapping version " << replyVersion << " instead of "
141 0 : << version << std::endl;
142 : #endif
143 :
144 8 : if( replyUseCache &&
145 8 : command.getMinCachedVersion() <= replyVersion &&
146 0 : command.getMaxCachedVersion() >= replyVersion )
147 : {
148 : #ifdef CO_INSTRUMENT_MULTICAST
149 : ++_hit;
150 : #endif
151 0 : _sendMapSuccess( command, false );
152 0 : _sendMapReply( command, replyVersion, true, replyUseCache, false );
153 0 : return true;
154 : }
155 :
156 16 : lunchbox::ScopedFastWrite mutex( _lock );
157 8 : if( !_object )
158 : {
159 0 : LBWARN << "Map to detached object requested" << std::endl;
160 0 : return false;
161 : }
162 :
163 : #ifdef CO_INSTRUMENT_MULTICAST
164 : ++_miss;
165 : #endif
166 8 : replyUseCache = false;
167 :
168 8 : _sendMapSuccess( command, true );
169 :
170 : // send instance data
171 16 : ObjectInstanceDataOStream os( this );
172 8 : os.enableMap( replyVersion, command.getNode(), command.getInstanceID( ));
173 8 : _object->getInstanceData( os );
174 8 : os.disable();
175 8 : if( !os.hasSentData( ))
176 : // no data, send empty command to set version
177 0 : _sendEmptyVersion( command, replyVersion, true /* mc */ );
178 :
179 8 : _sendMapReply( command, replyVersion, true, replyUseCache, true );
180 8 : return true;
181 : }
182 :
183 42 : void ObjectCM::_sendMapSuccess( const MasterCMCommand& command,
184 : const bool multicast )
185 : {
186 84 : command.getNode()->send( CMD_NODE_MAP_OBJECT_SUCCESS, multicast )
187 126 : << command.getNode()->getNodeID() << command.getObjectID()
188 126 : << command.getRequestID() << command.getInstanceID()
189 126 : << _object->getChangeType() << _object->getInstanceID();
190 42 : }
191 :
192 42 : void ObjectCM::_sendMapReply( const MasterCMCommand& command,
193 : const uint128_t& version, const bool result,
194 : const bool useCache, const bool multicast )
195 : {
196 84 : command.getNode()->send( CMD_NODE_MAP_OBJECT_REPLY, multicast )
197 126 : << command.getNode()->getNodeID() << command.getObjectID()
198 126 : << version << command.getRequestID() << result
199 126 : << command.useCache() << useCache;
200 42 : }
201 :
202 4 : void ObjectCM::_sendEmptyVersion( const MasterCMCommand& command,
203 : const uint128_t& version,
204 : const bool multicast )
205 : {
206 8 : NodePtr node = command.getNode();
207 8 : ConnectionPtr connection = node->getConnection( multicast );
208 :
209 12 : ObjectDataOCommand( Connections( 1, connection ), CMD_OBJECT_INSTANCE,
210 4 : COMMANDTYPE_OBJECT, _object->getID(),
211 : command.getInstanceID(), version, 0, 0, 0, true, 0 )
212 12 : << NodeID() << _object->getInstanceID();
213 4 : }
214 :
215 66 : }
|