Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_DATAOSTREAM_H
23 : #define CO_DATAOSTREAM_H
24 :
25 : #include <co/api.h>
26 : #include <co/types.h>
27 :
28 : #include <lunchbox/array.h> // used inline
29 : #include <lunchbox/stdExt.h>
30 :
31 : #include <boost/noncopyable.hpp>
32 : #include <boost/type_traits.hpp>
33 : #include <map>
34 : #include <set>
35 : #include <vector>
36 :
37 : namespace co
38 : {
39 : namespace detail { class DataOStream; }
40 : namespace DataStreamTest { class Sender; }
41 :
42 : /**
43 : * A std::ostream-like interface for object serialization.
44 : *
45 : * Implements buffering, retaining and compressing data in a binary format.
46 : * Derived classes send the data using the appropriate commands.
47 : */
48 : class DataOStream : public boost::noncopyable
49 : {
50 : public:
51 : /** @name Internal */
52 : //@{
53 : /** @internal Disable and flush the output to the current receivers. */
54 : CO_API void disable();
55 :
56 : /** @internal Enable copying of all data into a saved buffer. */
57 : void enableSave();
58 :
59 : /** @internal Disable copying of all data into a saved buffer. */
60 : void disableSave();
61 :
62 : /** @internal @return if data was sent since the last enable() */
63 : CO_API bool hasSentData() const;
64 :
65 : /** @internal */
66 : CO_API const Connections& getConnections() const;
67 :
68 : /** @internal Stream the data header (compressor, nChunks). */
69 : DataOStream& streamDataHeader( DataOStream& os );
70 :
71 : /** @internal Send the (compressed) data using the given connection. */
72 : void sendBody( ConnectionPtr connection, const void* data,
73 : const uint64_t dataSize );
74 :
75 : /** @internal @return the compressed data size, 0 if uncompressed.*/
76 : uint64_t getCompressedDataSize() const;
77 : //@}
78 :
79 : /** @name Data output */
80 : //@{
81 : /** Write a plain data item by copying it to the stream. @version 1.0 */
82 292083 : template< class T > DataOStream& operator << ( const T& value )
83 292083 : { _write( value, boost::has_trivial_copy< T >( )); return *this; }
84 :
85 : /** Write a C array. @version 1.0 */
86 19 : template< class T > DataOStream& operator << ( const Array< T > array )
87 19 : { _writeArray( array, boost::has_trivial_copy<T>( )); return *this; }
88 :
89 : /**
90 : * Write a lunchbox::RefPtr. Refcount has to managed by caller.
91 : * @version 1.1
92 : */
93 : template< class T >
94 : DataOStream& operator << ( const lunchbox::RefPtr< T >& ptr );
95 :
96 : /** Write a lunchbox::Buffer. @version 1.0 */
97 : template< class T >
98 : DataOStream& operator << ( const lunchbox::Buffer< T >& buffer );
99 :
100 : /** Transmit a request identifier. @version 1.1.1 */
101 : template< class T >
102 20194 : DataOStream& operator << ( const lunchbox::Request<T>& request )
103 20194 : { return (*this) << request.getID(); }
104 :
105 : /** Write a std::vector of serializable items. @version 1.0 */
106 : template< class T >
107 : DataOStream& operator << ( const std::vector< T >& value );
108 :
109 : /** Write a std::map of serializable items. @version 1.0 */
110 : template< class K, class V >
111 : DataOStream& operator << ( const std::map< K, V >& value );
112 :
113 : /** Write a std::set of serializable items. @version 1.0 */
114 : template< class T >
115 : DataOStream& operator << ( const std::set< T >& value );
116 :
117 : /** Write a stde::hash_map of serializable items. @version 1.0 */
118 : template< class K, class V >
119 : DataOStream& operator << ( const stde::hash_map< K, V >& value );
120 :
121 : /** Write a stde::hash_set of serializable items. @version 1.0 */
122 : template< class T >
123 : DataOStream& operator << ( const stde::hash_set< T >& value );
124 :
125 : /** @internal
126 : * Serialize child objects.
127 : *
128 : * The DataIStream has a deserialize counterpart to this method. All
129 : * child objects have to be registered or mapped beforehand.
130 : */
131 : template< typename C >
132 : void serializeChildren( const std::vector< C* >& children );
133 : //@}
134 :
135 : protected:
136 : CO_API DataOStream(); //!< @internal
137 : explicit DataOStream( DataOStream& rhs ); //!< @internal
138 : virtual CO_API ~DataOStream(); //!< @internal
139 :
140 : /** @internal */
141 : CO_API lunchbox::Bufferb& getBuffer();
142 :
143 : /** @internal Initialize the given compressor. */
144 : void _initCompressor( const uint32_t compressor );
145 :
146 : /** @internal Enable output. */
147 : CO_API void _enable();
148 :
149 : /** @internal Flush remaining data in the buffer. */
150 : void flush( const bool last );
151 :
152 : /** @internal Set up the connection list for a group of nodes, using
153 : * multicast where possible.
154 : */
155 : void _setupConnections( const Nodes& receivers );
156 :
157 : void _setupConnections( const Connections& connections );
158 :
159 : /** @internal Set up the connection (list) for one node. */
160 : void _setupConnection( NodePtr node, const bool useMulticast );
161 :
162 : /** @internal Needed by unit test. */
163 : CO_API void _setupConnection( ConnectionPtr connection );
164 : friend class DataStreamTest::Sender;
165 :
166 : /** @internal Resend the saved buffer to all enabled connections. */
167 : void _resend();
168 :
169 : void _clearConnections(); //!< @internal
170 :
171 : /** @internal @name Data sending, used by the subclasses */
172 : //@{
173 : /** @internal Send a data buffer (command) to the receivers. */
174 : virtual void sendData( const void* buffer, const uint64_t size,
175 : const bool last ) = 0;
176 : //@}
177 :
178 : /** @internal Reset the whole stream. */
179 : virtual CO_API void reset();
180 :
181 : private:
182 : detail::DataOStream* const _impl;
183 :
184 : /** Collect compressed data. */
185 : CO_API uint64_t _getCompressedData( void** chunks,
186 : uint64_t* chunkSizes ) const;
187 :
188 : /** Write a number of bytes from data into the stream. */
189 : CO_API void _write( const void* data, uint64_t size );
190 :
191 : /** Helper function preparing data for sendData() as needed. */
192 : void _sendData( const void* data, const uint64_t size );
193 :
194 : /** Reset after sending a buffer. */
195 : void _resetBuffer();
196 :
197 : /** Write a vector of trivial data. */
198 : template< class T >
199 4 : DataOStream& _writeFlatVector( const std::vector< T >& value )
200 : {
201 4 : const uint64_t nElems = value.size();
202 4 : _write( &nElems, sizeof( nElems ));
203 4 : if( nElems > 0 )
204 4 : _write( &value.front(), nElems * sizeof( T ));
205 4 : return *this;
206 : }
207 :
208 : /** Write a plain data item. */
209 292086 : template< class T > void _write( const T& value, const boost::true_type& )
210 292086 : { _write( &value, sizeof( value )); }
211 :
212 : /** Write a non-plain data item. */
213 : template< class T > void _write( const T& value, const boost::false_type& )
214 : { _writeSerializable( value, boost::is_base_of< servus::Serializable, T >());}
215 :
216 : /** Write a serializable object. */
217 : template< class T>
218 : void _writeSerializable( const T& value, const boost::true_type& );
219 :
220 : /** Write an Array of POD data */
221 : template< class T >
222 1 : void _writeArray( const Array< T > array, const boost::true_type& )
223 1 : { _write( array.data, array.getNumBytes( )); }
224 :
225 : /** Write an Array of non-POD data */
226 : template< class T >
227 1 : void _writeArray( const Array< T > array, const boost::false_type& )
228 : {
229 3 : for( size_t i = 0; i < array.num; ++i )
230 2 : *this << array.data[ i ];
231 1 : }
232 :
233 : /** Send the trailing data (command) to the receivers */
234 : void _sendFooter( const void* buffer, const uint64_t size );
235 : };
236 :
237 : std::ostream& operator << ( std::ostream&, const DataOStream& );
238 : }
239 :
240 : #include "dataOStream.ipp" // template implementation
241 :
242 : #endif //CO_DATAOSTREAM_H
|