Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataIStream.h"
22 :
23 : #include "commands.h"
24 : #include "objectCommand.h"
25 : #include "objectDataICommand.h"
26 :
27 : namespace co
28 : {
29 179402 : ObjectDataIStream::ObjectDataIStream()
30 179402 : : DataIStream( false )
31 : {
32 179402 : _reset();
33 179402 : }
34 :
35 0 : ObjectDataIStream::ObjectDataIStream( const ObjectDataIStream& rhs )
36 0 : : DataIStream( rhs )
37 : {
38 0 : *this = rhs;
39 0 : }
40 :
41 0 : ObjectDataIStream& ObjectDataIStream::operator = ( const ObjectDataIStream& rhs)
42 : {
43 0 : if( this != &rhs )
44 : {
45 0 : DataIStream::operator = ( rhs );
46 0 : _commands = rhs._commands;
47 0 : _version = rhs._version;
48 : }
49 0 : return *this;
50 : }
51 :
52 538206 : ObjectDataIStream::~ObjectDataIStream()
53 : {
54 179402 : _reset();
55 358804 : }
56 :
57 0 : void ObjectDataIStream::reset()
58 : {
59 0 : DataIStream::reset();
60 0 : _reset();
61 0 : }
62 :
63 358804 : void ObjectDataIStream::_reset()
64 : {
65 358804 : _usedCommand.clear();
66 358804 : _commands.clear();
67 358804 : _version = VERSION_INVALID;
68 358804 : }
69 :
70 179412 : void ObjectDataIStream::addDataCommand( ObjectDataICommand command )
71 : {
72 179412 : LB_TS_THREAD( _thread );
73 179412 : LBASSERT( !isReady( ));
74 :
75 : #ifndef NDEBUG
76 179412 : const uint128_t& version = command.getVersion();
77 179412 : const uint32_t sequence = command.getSequence();
78 :
79 179412 : if( _commands.empty( ))
80 : {
81 179402 : LBASSERTINFO( sequence == 0, sequence << " in " << command );
82 : }
83 : else
84 : {
85 10 : ObjectDataICommand previous( _commands.back() );
86 10 : const uint128_t& previousVersion = previous.getVersion();
87 10 : const uint32_t previousSequence = previous.getSequence();
88 10 : LBASSERTINFO( sequence == previousSequence+1,
89 : sequence << ", " << previousSequence );
90 10 : LBASSERT( version == previousVersion );
91 : }
92 : #endif
93 :
94 179412 : _commands.push_back( command );
95 179412 : if( command.isLast( ))
96 179402 : _setReady();
97 179412 : }
98 :
99 152 : bool ObjectDataIStream::hasInstanceData() const
100 : {
101 152 : if( !_usedCommand.isValid() && _commands.empty( ))
102 : {
103 0 : LBUNREACHABLE;
104 0 : return false;
105 : }
106 :
107 152 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
108 152 : _commands.front();
109 152 : return( command.getCommand() == CMD_OBJECT_INSTANCE );
110 : }
111 :
112 0 : NodePtr ObjectDataIStream::getRemoteNode() const
113 : {
114 0 : if( !_usedCommand.isValid() && _commands.empty( ))
115 0 : return 0;
116 :
117 0 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
118 0 : _commands.front();
119 0 : return command.getRemoteNode();
120 : }
121 :
122 0 : LocalNodePtr ObjectDataIStream::getLocalNode() const
123 : {
124 0 : if( !_usedCommand.isValid() && _commands.empty( ))
125 0 : return 0;
126 :
127 0 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
128 0 : _commands.front();
129 0 : return command.getLocalNode();
130 : }
131 :
132 537723 : size_t ObjectDataIStream::getDataSize() const
133 : {
134 537723 : size_t size = 0;
135 : typedef CommandDeque::const_iterator CommandDequeCIter;
136 1075458 : for( CommandDequeCIter i = _commands.begin(); i != _commands.end(); ++i )
137 : {
138 537735 : const ICommand& command = *i;
139 537735 : size += command.getSize();
140 : }
141 537723 : return size;
142 : }
143 :
144 179414 : uint128_t ObjectDataIStream::getPendingVersion() const
145 : {
146 179414 : if( _commands.empty( ))
147 0 : return VERSION_INVALID;
148 :
149 179414 : const ObjectDataICommand& cmd( _commands.back( ));
150 179414 : return cmd.getVersion();
151 : }
152 :
153 328 : bool ObjectDataIStream::getNextBuffer( uint32_t& compressor, uint32_t& nChunks,
154 : const void** chunkData, uint64_t& size )
155 : {
156 328 : if( _commands.empty( ))
157 : {
158 161 : _usedCommand.clear();
159 161 : return false;
160 : }
161 :
162 167 : _usedCommand = _commands.front();
163 167 : _commands.pop_front();
164 167 : if( !_usedCommand.isValid( ))
165 0 : return false;
166 :
167 167 : LBASSERT( _usedCommand.getCommand() == CMD_OBJECT_INSTANCE ||
168 : _usedCommand.getCommand() == CMD_OBJECT_DELTA ||
169 : _usedCommand.getCommand() == CMD_OBJECT_SLAVE_DELTA );
170 :
171 167 : ObjectDataICommand command( _usedCommand );
172 167 : const uint64_t dataSize = command.getDataSize();
173 :
174 167 : if( dataSize == 0 ) // empty command
175 4 : return getNextBuffer( compressor, nChunks, chunkData, size );
176 :
177 163 : size = dataSize;
178 163 : compressor = command.getCompressor();
179 163 : nChunks = command.getChunks();
180 163 : switch( command.getCommand( ))
181 : {
182 : case CMD_OBJECT_INSTANCE:
183 57 : command.get< NodeID >(); // nodeID
184 57 : command.get< uint32_t >(); // instanceID
185 57 : break;
186 : case CMD_OBJECT_SLAVE_DELTA:
187 0 : command.get< uint128_t >(); // commit UUID
188 0 : break;
189 : }
190 163 : *chunkData = command.getRemainingBuffer( command.getRemainingBufferSize( ));
191 :
192 163 : setSwapping( command.isSwapping( ));
193 163 : return true;
194 : }
195 :
196 66 : }
|