Line data Source code
1 :
2 : /* Copyright (c) 2010-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "versionedMasterCM.h"
22 :
23 : #include "log.h"
24 : #include "object.h"
25 : #include "objectCommand.h"
26 : #include "objectDataICommand.h"
27 : #include "objectDataIStream.h"
28 :
29 : namespace co
30 : {
31 : typedef CommandFunc<VersionedMasterCM> CmdFunc;
32 :
33 12 : VersionedMasterCM::VersionedMasterCM( Object* object )
34 : : ObjectCM( object )
35 : , _version( VERSION_NONE )
36 12 : , _maxVersion( std::numeric_limits< uint64_t >::max( ))
37 : {
38 12 : LBASSERT( object );
39 12 : LBASSERT( object->getLocalNode( ));
40 :
41 : // sync commands are send to all instances, even the master gets it
42 : object->registerCommand( CMD_OBJECT_INSTANCE,
43 : CmdFunc( this, &VersionedMasterCM::_cmdDiscard ),
44 12 : 0 );
45 : object->registerCommand( CMD_OBJECT_DELTA,
46 : CmdFunc( this, &VersionedMasterCM::_cmdDiscard ),
47 12 : 0 );
48 :
49 : object->registerCommand( CMD_OBJECT_SLAVE_DELTA,
50 : CmdFunc( this, &VersionedMasterCM::_cmdSlaveDelta ),
51 12 : 0 );
52 : object->registerCommand( CMD_OBJECT_MAX_VERSION,
53 : CmdFunc( this, &VersionedMasterCM::_cmdMaxVersion ),
54 12 : 0 );
55 12 : }
56 :
57 24 : VersionedMasterCM::~VersionedMasterCM()
58 : {
59 12 : _slaves->clear();
60 12 : }
61 :
62 0 : uint128_t VersionedMasterCM::sync( const uint128_t& inVersion )
63 : {
64 0 : LBASSERTINFO( inVersion.high() != 0 || inVersion == VERSION_NEXT ||
65 : inVersion == VERSION_HEAD, inVersion );
66 : #if 0
67 : LBLOG( LOG_OBJECTS ) << "sync to v" << inVersion << ", id "
68 : << _object->getID() << "." << _object->getInstanceID()
69 : << std::endl;
70 : #endif
71 :
72 0 : if( inVersion == VERSION_NEXT )
73 0 : return _apply( _slaveCommits.pop( ));
74 :
75 0 : if( inVersion == VERSION_HEAD )
76 : {
77 0 : uint128_t version = VERSION_NONE;
78 0 : for( ObjectDataIStream* is = _slaveCommits.tryPop(); is;
79 0 : is = _slaveCommits.tryPop( ))
80 : {
81 0 : version = _apply( is );
82 : }
83 0 : return version;
84 : }
85 : // else apply only concrete slave commit
86 :
87 0 : return _apply( _slaveCommits.pull( inVersion ));
88 : }
89 :
90 0 : uint128_t VersionedMasterCM::_apply( ObjectDataIStream* is )
91 : {
92 0 : LBASSERT( !is->hasInstanceData( ));
93 0 : _object->unpack( *is );
94 0 : LBASSERTINFO( is->getRemainingBufferSize() == 0 &&
95 : is->nRemainingBuffers()==0,
96 : "Object " << lunchbox::className( _object ) <<
97 : " did not unpack all data" );
98 :
99 0 : const uint128_t version = is->getVersion();
100 0 : is->reset();
101 0 : _slaveCommits.recycle( is );
102 0 : return version;
103 : }
104 :
105 33 : bool VersionedMasterCM::addSlave( const MasterCMCommand& command )
106 : {
107 33 : LB_TS_THREAD( _cmdThread );
108 33 : Mutex mutex( _slaves );
109 :
110 33 : if( !ObjectCM::_addSlave( command, _version ))
111 0 : return false;
112 :
113 66 : SlaveData data;
114 33 : data.node = command.getNode();
115 33 : data.instanceID = command.getInstanceID();
116 33 : data.maxVersion = command.getMaxVersion();
117 33 : if( data.maxVersion == 0 )
118 0 : data.maxVersion = std::numeric_limits< uint64_t >::max();
119 33 : else if( data.maxVersion < std::numeric_limits< uint64_t >::max( ))
120 1 : data.maxVersion += _version.low();
121 :
122 33 : _slaveData.push_back( data );
123 33 : _updateMaxVersion();
124 :
125 33 : _slaves->push_back( data.node );
126 33 : lunchbox::usort( *_slaves );
127 66 : return true;
128 : }
129 :
130 32 : void VersionedMasterCM::removeSlave( NodePtr node, const uint32_t instanceID )
131 : {
132 32 : LB_TS_THREAD( _cmdThread );
133 32 : Mutex mutex( _slaves );
134 :
135 : // remove from subscribers
136 64 : SlaveData data;
137 32 : data.node = node;
138 32 : data.instanceID = instanceID;
139 32 : SlaveDatasIter i = lunchbox::find( _slaveData, data );
140 32 : LBASSERTINFO( i != _slaveData.end(), lunchbox::className( _object ));
141 32 : if( i == _slaveData.end( ))
142 32 : return;
143 :
144 32 : _slaveData.erase( i );
145 :
146 : // update _slaves node vector
147 32 : _slaves->clear();
148 123 : for( i = _slaveData.begin(); i != _slaveData.end(); ++i )
149 91 : _slaves->push_back( i->node );
150 32 : lunchbox::usort( *_slaves );
151 64 : _updateMaxVersion();
152 : }
153 :
154 25 : void VersionedMasterCM::removeSlaves( NodePtr node )
155 : {
156 25 : LB_TS_THREAD( _cmdThread );
157 :
158 25 : Mutex mutex( _slaves );
159 :
160 25 : NodesIter i = lunchbox::find( *_slaves, node );
161 25 : if( i == _slaves->end( ))
162 50 : return;
163 0 : _slaves->erase( i );
164 :
165 0 : for( SlaveDatasIter j = _slaveData.begin(); j != _slaveData.end(); )
166 : {
167 0 : if( j->node == node )
168 0 : j = _slaveData.erase( j );
169 : else
170 0 : ++j;
171 : }
172 0 : _updateMaxVersion();
173 : }
174 :
175 67 : void VersionedMasterCM::_updateMaxVersion()
176 : {
177 67 : uint64_t maxVersion = std::numeric_limits< uint64_t >::max();
178 284 : for( SlaveDatasCIter i = _slaveData.begin(); i != _slaveData.end(); ++i )
179 : {
180 220 : if( i->maxVersion != std::numeric_limits< uint64_t >::max() &&
181 3 : maxVersion > i->maxVersion )
182 : {
183 3 : maxVersion = i->maxVersion;
184 : }
185 : }
186 :
187 67 : if( _maxVersion != maxVersion )
188 4 : _maxVersion = maxVersion;
189 67 : }
190 :
191 : //---------------------------------------------------------------------------
192 : // command handlers
193 : //---------------------------------------------------------------------------
194 0 : bool VersionedMasterCM::_cmdSlaveDelta( ICommand& cmd )
195 : {
196 0 : ObjectDataICommand command( cmd );
197 :
198 0 : LB_TS_THREAD( _rcvThread );
199 :
200 0 : if( _slaveCommits.addDataCommand( command.get< uint128_t >(), command ))
201 0 : _object->notifyNewVersion();
202 0 : return true;
203 : }
204 :
205 2 : bool VersionedMasterCM::_cmdMaxVersion( ICommand& cmd )
206 : {
207 2 : ObjectICommand command( cmd );
208 2 : const uint64_t version = command.get< uint64_t >();
209 2 : const uint32_t slaveID = command.get< uint32_t >();
210 :
211 4 : Mutex mutex( _slaves );
212 :
213 : // Update slave's max version
214 4 : SlaveData data;
215 2 : data.node = command.getNode();
216 2 : data.instanceID = slaveID;
217 2 : SlaveDatasIter i = lunchbox::find( _slaveData, data );
218 2 : if( i == _slaveData.end( ))
219 : {
220 0 : LBWARN << "Got max version from unmapped slave" << std::endl;
221 0 : return true;
222 : }
223 :
224 2 : i->maxVersion = version;
225 2 : _updateMaxVersion();
226 4 : return true;
227 : }
228 :
229 60 : }
|