Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_DATAISTREAM_H
23 : #define CO_DATAISTREAM_H
24 :
25 : #include <co/api.h>
26 : #include <co/types.h>
27 : #include <lunchbox/array.h> // used inline
28 : #include <lunchbox/bitOperation.h>
29 : #include <lunchbox/stdExt.h>
30 :
31 : #include <boost/type_traits.hpp>
32 : #include <map>
33 : #include <set>
34 : #include <vector>
35 :
36 : namespace co
37 : {
38 : namespace detail { class DataIStream; }
39 :
40 : /** A std::istream-like input data stream for binary data. */
41 : class DataIStream
42 : {
43 : public:
44 : /** @name Internal */
45 : //@{
46 : /** @internal @return the number of remaining buffers. */
47 : virtual size_t nRemainingBuffers() const = 0;
48 :
49 : virtual uint128_t getVersion() const = 0; //!< @internal
50 0 : virtual void reset() { _reset(); } //!< @internal
51 : void setSwapping( const bool onOff ); //!< @internal enable endian swap
52 : CO_API bool isSwapping() const; //!< @internal
53 : DataIStream& operator = ( const DataIStream& rhs ); //!< @internal
54 : //@}
55 :
56 : /** @name Data input */
57 : //@{
58 : /** @return a value from the stream. @version 1.0 */
59 83895 : template< typename T > T read()
60 83895 : { T value; *this >> value; return value; }
61 :
62 : /** Read a plain data item. @version 1.0 */
63 3602338 : template< class T > DataIStream& operator >> ( T& value )
64 3602338 : { _read( value, boost::has_trivial_copy< T >( )); return *this; }
65 :
66 : /** Read a C array. @version 1.0 */
67 18 : template< class T > DataIStream& operator >> ( Array< T > array )
68 18 : { _readArray( array, boost::has_trivial_copy< T >( )); return *this; }
69 :
70 : /**
71 : * Read a lunchbox::RefPtr. Refcount has to managed by caller.
72 : * @version 1.1
73 : */
74 : template< class T > DataIStream& operator >> ( lunchbox::RefPtr< T >& );
75 :
76 : /** Read a lunchbox::Buffer. @version 1.0 */
77 : template< class T > DataIStream& operator >> ( lunchbox::Buffer< T >& );
78 :
79 : /** Read a std::vector of serializable items. @version 1.0 */
80 : template< class T > DataIStream& operator >> ( std::vector< T >& );
81 :
82 : /** Read a std::map of serializable items. @version 1.0 */
83 : template< class K, class V >
84 : DataIStream& operator >> ( std::map< K, V >& );
85 :
86 : /** Read a std::set of serializable items. @version 1.0 */
87 : template< class T > DataIStream& operator >> ( std::set< T >& );
88 :
89 : /** Read a stde::hash_map of serializable items. @version 1.0 */
90 : template< class K, class V >
91 : DataIStream& operator >> ( stde::hash_map< K, V >& );
92 :
93 : /** Read a stde::hash_set of serializable items. @version 1.0 */
94 : template< class T > DataIStream& operator >> ( stde::hash_set< T >& );
95 :
96 : /**
97 : * @define CO_IGNORE_BYTESWAP: If set, no byteswapping of transmitted data
98 : * is performed. Enable when you get unresolved symbols for
99 : * lunchbox::byteswap and you don't care about mixed-endian environments.
100 : */
101 : # ifdef CO_IGNORE_BYTESWAP
102 : /** Byte-swap a plain data item. @version 1.0 */
103 : template< class T > static void swap( T& ) { /* nop */ }
104 : # else
105 : /** Byte-swap a plain data item. @version 1.0 */
106 0 : template< class T > static void swap( T& v ) { lunchbox::byteswap(v); }
107 : # endif
108 :
109 : /** @internal
110 : * Deserialize child objects.
111 : *
112 : * Existing children are synced to the new version. New children are created
113 : * by calling the <code>void create( C** child )</code> method on the
114 : * object, and are registered or mapped to the object's session. Removed
115 : * children are released by calling the <code>void release( C* )</code>
116 : * method on the object. The resulting child vector is created in
117 : * result. The old and result vector can be the same object, the result
118 : * vector is cleared and rebuild completely.
119 : */
120 : template< typename O, typename C >
121 : void deserializeChildren( O* object, const std::vector< C* >& old,
122 : std::vector< C* >& result );
123 :
124 : /** @deprecated
125 : * Get the pointer to the remaining data in the current buffer.
126 : *
127 : * The usage of this method is discouraged, no endian conversion or bounds
128 : * checking is performed by the DataIStream on the returned raw pointer.
129 : *
130 : * The buffer is advanced by the given size. If not enough data is present,
131 : * 0 is returned and the buffer is unchanged.
132 : *
133 : * The data written to the DataOStream by the sender is bucketized, it is
134 : * sent in multiple blocks. The remaining buffer and its size points into
135 : * one of the buffers, i.e., not all the data sent is returned by this
136 : * function. However, a write operation on the other end is never segmented,
137 : * that is, if the application writes n bytes to the DataOStream, a
138 : * symmetric read from the DataIStream has at least n bytes available.
139 : *
140 : * @param size the number of bytes to advance the buffer
141 : * @version 1.0
142 : */
143 : CO_API const void* getRemainingBuffer( const uint64_t size );
144 :
145 : /**
146 : * @return the size of the remaining data in the current buffer.
147 : * @version 1.0
148 : */
149 : CO_API uint64_t getRemainingBufferSize();
150 :
151 : /** @internal @return true if any data was read. */
152 : bool wasUsed() const;
153 :
154 : /** @return true if not all data has been read. @version 1.0 */
155 96 : bool hasData() { return _checkBuffer(); }
156 :
157 : /** @return the provider of the istream. */
158 : CO_API virtual NodePtr getRemoteNode() const = 0;
159 :
160 : /** @return the receiver of the istream. */
161 : CO_API virtual LocalNodePtr getLocalNode() const = 0;
162 : //@}
163 :
164 : protected:
165 : /** @name Internal */
166 : //@{
167 : CO_API explicit DataIStream( const bool swap );
168 : explicit DataIStream( const DataIStream& );
169 : CO_API virtual ~DataIStream();
170 :
171 : virtual bool getNextBuffer( CompressorInfo& info, uint32_t& nChunks,
172 : const void*& chunkData, uint64_t& size ) = 0;
173 : //@}
174 :
175 : private:
176 : detail::DataIStream* const _impl;
177 :
178 : /** Read a number of bytes from the stream into a buffer. */
179 : CO_API void _read( void* data, uint64_t size );
180 :
181 : /**
182 : * Check that the current buffer has data left, get the next buffer is
183 : * necessary, return false if no data is left.
184 : */
185 : CO_API bool _checkBuffer();
186 : CO_API void _reset();
187 :
188 : const uint8_t* _decompress( const void* data, const CompressorInfo& info,
189 : uint32_t nChunks, uint64_t dataSize );
190 :
191 : /** Read a vector of trivial data. */
192 : template< class T >
193 4 : DataIStream& _readFlatVector ( std::vector< T >& value )
194 : {
195 4 : uint64_t nElems = 0;
196 4 : *this >> nElems;
197 4 : LBASSERTINFO( nElems < LB_BIT48,
198 : "Out-of-sync DataIStream: " << nElems << " elements?" );
199 4 : value.resize( size_t( nElems ));
200 4 : if( nElems > 0 )
201 4 : *this >> Array< T >( &value.front(), nElems );
202 4 : return *this;
203 : }
204 :
205 : /** Byte-swap a plain data item. */
206 3602338 : template< class T > void _swap( T& value ) const
207 3602338 : { if( isSwapping( )) swap( value ); }
208 :
209 : /** Read a plain data item. */
210 3602338 : template< class T > void _read( T& value, const boost::true_type& )
211 3602338 : { _read( &value, sizeof( value )); _swap( value ); }
212 :
213 : /** Read a non-plain data item. */
214 : template< class T > void _read( T& value, const boost::false_type& )
215 : { _readSerializable( value, boost::is_base_of< servus::Serializable, T >( ));}
216 :
217 : /** Read a serializable object. */
218 : template< class T > void _readSerializable(T& value, const boost::true_type&);
219 :
220 : /** Read an Array of POD data */
221 : template< class T > void _readArray( Array< T >, const boost::true_type& );
222 :
223 : /** Read an Array of non-POD data */
224 : template< class T > void _readArray( Array< T >, const boost::false_type& );
225 :
226 : /** Byte-swap a C array. @version 1.0 */
227 5 : template< class T > void _swap( Array< T > array ) const
228 : {
229 5 : if( !isSwapping( ))
230 5 : return;
231 0 : #pragma omp parallel for
232 0 : for( ssize_t i = 0; i < ssize_t( array.num ); ++i )
233 0 : swap( array.data[i] );
234 : }
235 : };
236 : }
237 :
238 : #include "dataIStream.ipp" // template implementation
239 :
240 : #endif //CO_DATAISTREAM_H
|