Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataIStream.h"
22 :
23 : #include "commands.h"
24 : #include "objectCommand.h"
25 : #include "objectDataICommand.h"
26 : #include <pression/data/CompressorInfo.h>
27 :
28 : namespace co
29 : {
30 197442 : ObjectDataIStream::ObjectDataIStream()
31 197442 : : DataIStream( false )
32 : {
33 197442 : _reset();
34 197442 : }
35 :
36 0 : ObjectDataIStream::ObjectDataIStream( const ObjectDataIStream& rhs )
37 0 : : DataIStream( rhs )
38 : {
39 0 : *this = rhs;
40 0 : }
41 :
42 0 : ObjectDataIStream& ObjectDataIStream::operator = ( const ObjectDataIStream& rhs)
43 : {
44 0 : if( this != &rhs )
45 : {
46 0 : DataIStream::operator = ( rhs );
47 0 : _commands = rhs._commands;
48 0 : _version = rhs._version;
49 : }
50 0 : return *this;
51 : }
52 :
53 592326 : ObjectDataIStream::~ObjectDataIStream()
54 : {
55 197442 : _reset();
56 394884 : }
57 :
58 0 : void ObjectDataIStream::reset()
59 : {
60 0 : DataIStream::reset();
61 0 : _reset();
62 0 : }
63 :
64 394884 : void ObjectDataIStream::_reset()
65 : {
66 394884 : _usedCommand.clear();
67 394884 : _commands.clear();
68 394884 : _version = VERSION_INVALID;
69 394884 : }
70 :
71 197452 : void ObjectDataIStream::addDataCommand( ObjectDataICommand command )
72 : {
73 197452 : LB_TS_THREAD( _thread );
74 197452 : LBASSERT( !isReady( ));
75 :
76 : #ifndef NDEBUG
77 197452 : const uint128_t& version = command.getVersion();
78 197452 : const uint32_t sequence = command.getSequence();
79 :
80 197452 : if( _commands.empty( ))
81 : {
82 197442 : LBASSERTINFO( sequence == 0, sequence << " in " << command );
83 : }
84 : else
85 : {
86 20 : ObjectDataICommand previous( _commands.back() );
87 10 : const uint128_t& previousVersion = previous.getVersion();
88 10 : const uint32_t previousSequence = previous.getSequence();
89 10 : LBASSERTINFO( sequence == previousSequence+1,
90 : sequence << ", " << previousSequence );
91 10 : LBASSERT( version == previousVersion );
92 : }
93 : #endif
94 :
95 197452 : _commands.push_back( command );
96 197452 : if( command.isLast( ))
97 197442 : _setReady();
98 197452 : }
99 :
100 152 : bool ObjectDataIStream::hasInstanceData() const
101 : {
102 152 : if( !_usedCommand.isValid() && _commands.empty( ))
103 : {
104 0 : LBUNREACHABLE;
105 0 : return false;
106 : }
107 :
108 304 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
109 304 : _commands.front();
110 152 : return( command.getCommand() == CMD_OBJECT_INSTANCE );
111 : }
112 :
113 0 : NodePtr ObjectDataIStream::getRemoteNode() const
114 : {
115 0 : if( !_usedCommand.isValid() && _commands.empty( ))
116 0 : return 0;
117 :
118 0 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
119 0 : _commands.front();
120 0 : return command.getRemoteNode();
121 : }
122 :
123 0 : LocalNodePtr ObjectDataIStream::getLocalNode() const
124 : {
125 0 : if( !_usedCommand.isValid() && _commands.empty( ))
126 0 : return 0;
127 :
128 0 : const ICommand& command = _usedCommand.isValid() ? _usedCommand :
129 0 : _commands.front();
130 0 : return command.getLocalNode();
131 : }
132 :
133 591843 : size_t ObjectDataIStream::getDataSize() const
134 : {
135 591843 : size_t size = 0;
136 : typedef CommandDeque::const_iterator CommandDequeCIter;
137 1183698 : for( CommandDequeCIter i = _commands.begin(); i != _commands.end(); ++i )
138 : {
139 591855 : const ICommand& command = *i;
140 591855 : size += command.getSize();
141 : }
142 591843 : return size;
143 : }
144 :
145 197454 : uint128_t ObjectDataIStream::getPendingVersion() const
146 : {
147 197454 : if( _commands.empty( ))
148 0 : return VERSION_INVALID;
149 :
150 394908 : const ObjectDataICommand& cmd( _commands.back( ));
151 197454 : return cmd.getVersion();
152 : }
153 :
154 328 : bool ObjectDataIStream::getNextBuffer( CompressorInfo& info, uint32_t& nChunks,
155 : const void*& chunkData, uint64_t& size )
156 : {
157 328 : if( _commands.empty( ))
158 : {
159 161 : _usedCommand.clear();
160 161 : return false;
161 : }
162 :
163 167 : _usedCommand = _commands.front();
164 167 : _commands.pop_front();
165 167 : if( !_usedCommand.isValid( ))
166 0 : return false;
167 :
168 167 : LBASSERT( _usedCommand.getCommand() == CMD_OBJECT_INSTANCE ||
169 : _usedCommand.getCommand() == CMD_OBJECT_DELTA ||
170 : _usedCommand.getCommand() == CMD_OBJECT_SLAVE_DELTA );
171 :
172 334 : ObjectDataICommand command( _usedCommand );
173 167 : const uint64_t dataSize = command.getDataSize();
174 :
175 167 : if( dataSize == 0 ) // empty command
176 4 : return getNextBuffer( info, nChunks, chunkData, size );
177 :
178 163 : size = dataSize;
179 163 : info = command.getCompressorInfo();
180 163 : nChunks = command.getChunks();
181 163 : switch( command.getCommand( ))
182 : {
183 : case CMD_OBJECT_INSTANCE:
184 57 : command.get< NodeID >(); // nodeID
185 57 : command.get< uint32_t >(); // instanceID
186 57 : break;
187 : case CMD_OBJECT_SLAVE_DELTA:
188 0 : command.get< uint128_t >(); // commit UUID
189 0 : break;
190 : }
191 163 : chunkData = command.getRemainingBuffer( command.getRemainingBufferSize( ));
192 :
193 163 : setSwapping( command.isSwapping( ));
194 163 : return true;
195 : }
196 :
197 66 : }
|