Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_DATAOSTREAM_H
23 : #define CO_DATAOSTREAM_H
24 :
25 : #include <co/api.h>
26 : #include <co/types.h>
27 :
28 : #include <lunchbox/array.h> // used inline
29 : #include <lunchbox/stdExt.h>
30 :
31 : #include <boost/noncopyable.hpp>
32 : #include <boost/type_traits.hpp>
33 : #include <map>
34 : #include <set>
35 : #include <vector>
36 :
37 : namespace co
38 : {
39 : namespace detail { class DataOStream; }
40 : namespace DataStreamTest { class Sender; }
41 :
42 : /**
43 : * A std::ostream-like interface for object serialization.
44 : *
45 : * Implements buffering, retaining and compressing data in a binary format.
46 : * Derived classes send the data using the appropriate commands.
47 : */
48 : class DataOStream : public boost::noncopyable
49 : {
50 : public:
51 : /** @name Internal */
52 : //@{
53 : /** @internal Disable and flush the output to the current receivers. */
54 : CO_API void disable();
55 :
56 : /** @internal Enable copying of all data into a saved buffer. */
57 : void enableSave();
58 :
59 : /** @internal Disable copying of all data into a saved buffer. */
60 : void disableSave();
61 :
62 : /** @internal @return if data was sent since the last enable() */
63 : CO_API bool hasSentData() const;
64 :
65 : /** @internal */
66 : CO_API const Connections& getConnections() const;
67 :
68 : /** @internal Stream the data header (compressor, nChunks). */
69 : DataOStream& streamDataHeader( DataOStream& os );
70 :
71 : /** @internal Send the (compressed) data using the given connection. */
72 : void sendBody( ConnectionPtr connection, const void* data,
73 : const uint64_t dataSize );
74 :
75 : /** @internal @return the compressed data size, 0 if uncompressed.*/
76 : uint64_t getCompressedDataSize() const;
77 : //@}
78 :
79 : /** @name Data output */
80 : //@{
81 : /** Write a plain data item by copying it to the stream. @version 1.0 */
82 291991 : template< class T > DataOStream& operator << ( const T& value )
83 291991 : { _write( value, boost::has_trivial_copy< T >( )); return *this; }
84 :
85 : /** Write a C array. @version 1.0 */
86 19 : template< class T > DataOStream& operator << ( const Array< T > array )
87 19 : { _writeArray( array, boost::has_trivial_copy<T>( )); return *this; }
88 :
89 : /**
90 : * Write a lunchbox::RefPtr. Refcount has to managed by caller.
91 : * @version 1.1
92 : */
93 : template< class T >
94 : DataOStream& operator << ( const lunchbox::RefPtr< T >& ptr );
95 :
96 : /** Write a lunchbox::Buffer. @version 1.0 */
97 : template< class T >
98 : DataOStream& operator << ( const lunchbox::Buffer< T >& buffer );
99 :
100 : /** Transmit a request identifier. @version 1.1.1 */
101 : template< class T >
102 20194 : DataOStream& operator << ( const lunchbox::Request<T>& request )
103 20194 : { return (*this) << request.getID(); }
104 :
105 : /** Write a std::vector of serializable items. @version 1.0 */
106 : template< class T >
107 : DataOStream& operator << ( const std::vector< T >& value );
108 :
109 : /** Write a std::map of serializable items. @version 1.0 */
110 : template< class K, class V >
111 : DataOStream& operator << ( const std::map< K, V >& value );
112 :
113 : /** Write a std::set of serializable items. @version 1.0 */
114 : template< class T >
115 : DataOStream& operator << ( const std::set< T >& value );
116 :
117 : /** Write a stde::hash_map of serializable items. @version 1.0 */
118 : template< class K, class V >
119 : DataOStream& operator << ( const stde::hash_map< K, V >& value );
120 :
121 : /** Write a stde::hash_set of serializable items. @version 1.0 */
122 : template< class T >
123 : DataOStream& operator << ( const stde::hash_set< T >& value );
124 :
125 : /** @internal
126 : * Serialize child objects.
127 : *
128 : * The DataIStream has a deserialize counterpart to this method. All
129 : * child objects have to be registered or mapped beforehand.
130 : */
131 : template< typename C >
132 : void serializeChildren( const std::vector< C* >& children );
133 : //@}
134 :
135 : CO_API static std::ostream& printStatistics( std::ostream& ); //!< @internal
136 : CO_API static void clearStatistics(); //!< @internal
137 :
138 : protected:
139 : CO_API DataOStream(); //!< @internal
140 : explicit DataOStream( DataOStream& rhs ); //!< @internal
141 : virtual CO_API ~DataOStream(); //!< @internal
142 :
143 : /** @internal */
144 : CO_API lunchbox::Bufferb& getBuffer();
145 :
146 : /** @internal Initialize the given compressor. */
147 : void _setCompressor( const pression::data::CompressorInfo& info );
148 :
149 : /** @internal Enable output. */
150 : CO_API void _enable();
151 :
152 : /** @internal Flush remaining data in the buffer. */
153 : void flush( const bool last );
154 :
155 : /** @internal Set up the connection list for a group of nodes, using
156 : * multicast where possible.
157 : */
158 : void _setupConnections( const Nodes& receivers );
159 :
160 : void _setupConnections( const Connections& connections );
161 :
162 : /** @internal Set up the connection (list) for one node. */
163 : void _setupConnection( NodePtr node, const bool useMulticast );
164 :
165 : /** @internal Needed by unit test. */
166 : CO_API void _setupConnection( ConnectionPtr connection );
167 : friend class DataStreamTest::Sender;
168 :
169 : /** @internal Resend the saved buffer to all enabled connections. */
170 : void _resend();
171 :
172 : void _clearConnections(); //!< @internal
173 :
174 : /** @internal @name Data sending, used by the subclasses */
175 : //@{
176 : /** @internal Send a data buffer (command) to the receivers. */
177 : virtual void sendData( const void* buffer, const uint64_t size,
178 : const bool last ) = 0;
179 : //@}
180 :
181 : /** @internal Reset the whole stream. */
182 : virtual CO_API void reset();
183 :
184 : private:
185 : detail::DataOStream* const _impl;
186 :
187 : /** Collect compressed data. */
188 : CO_API uint64_t _getCompressedData( const uint8_t** chunks,
189 : uint64_t* chunkSizes ) const;
190 :
191 : /** Write a number of bytes from data into the stream. */
192 : CO_API void _write( const void* data, uint64_t size );
193 :
194 : /** Helper function preparing data for sendData() as needed. */
195 : void _sendData( const void* data, const uint64_t size );
196 :
197 : /** Reset after sending a buffer. */
198 : void _resetBuffer();
199 :
200 : /** Write a vector of trivial data. */
201 : template< class T >
202 4 : DataOStream& _writeFlatVector( const std::vector< T >& value )
203 : {
204 4 : const uint64_t nElems = value.size();
205 4 : _write( &nElems, sizeof( nElems ));
206 4 : if( nElems > 0 )
207 4 : _write( &value.front(), nElems * sizeof( T ));
208 4 : return *this;
209 : }
210 :
211 : /** Write a plain data item. */
212 291987 : template< class T > void _write( const T& value, const boost::true_type& )
213 291987 : { _write( &value, sizeof( value )); }
214 :
215 : /** Write a non-plain data item. */
216 : template< class T > void _write( const T& value, const boost::false_type& )
217 : {
218 : _writeSerializable( value,
219 : boost::is_base_of< servus::Serializable, T >( ));
220 : }
221 :
222 : /** Write a serializable object. */
223 : template< class T>
224 : void _writeSerializable( const T& value, const boost::true_type& );
225 :
226 : /** Write an Array of POD data */
227 : template< class T >
228 1 : void _writeArray( const Array< T > array, const boost::true_type& )
229 1 : { _write( array.data, array.getNumBytes( )); }
230 :
231 : /** Write an Array of non-POD data */
232 : template< class T >
233 1 : void _writeArray( const Array< T > array, const boost::false_type& )
234 : {
235 3 : for( size_t i = 0; i < array.num; ++i )
236 2 : *this << array.data[ i ];
237 1 : }
238 :
239 : /** Send the trailing data (command) to the receivers */
240 : void _sendFooter( const void* buffer, const uint64_t size );
241 : };
242 : }
243 :
244 : #include "dataOStream.ipp" // template implementation
245 :
246 : #endif //CO_DATAOSTREAM_H
|