Line data Source code
1 :
2 : /* Copyright (c) 2012-2016, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataOCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "objectDataICommand.h"
25 :
26 : namespace co
27 : {
28 :
29 : namespace detail
30 : {
31 :
32 : class ObjectDataOCommand
33 : {
34 : public:
35 170 : ObjectDataOCommand( co::DataOStream* stream_, const void* data_,
36 : const uint64_t size_ )
37 170 : : data( data_ )
38 : , size( 0 )
39 170 : , stream( stream_ )
40 : {
41 170 : if( stream )
42 : {
43 165 : size = stream->getCompressedDataSize();
44 165 : if( size == 0 )
45 157 : size = size_;
46 : }
47 170 : }
48 :
49 0 : ObjectDataOCommand( const ObjectDataOCommand& rhs )
50 0 : : data( rhs.data )
51 0 : , size( rhs.size )
52 0 : , stream( rhs.stream )
53 0 : {}
54 :
55 : const void* const data;
56 : uint64_t size;
57 : co::DataOStream* stream;
58 : };
59 :
60 : }
61 :
62 170 : ObjectDataOCommand::ObjectDataOCommand( const Connections& receivers,
63 : const uint32_t cmd, const uint32_t type,
64 : const uint128_t& id,
65 : const uint32_t instanceID,
66 : const uint128_t& version,
67 : const uint32_t sequence,
68 : const void* data,
69 : const uint64_t size,
70 : const bool isLast,
71 170 : DataOStream* stream )
72 : : ObjectOCommand( receivers, cmd, type, id, instanceID )
73 170 : , _impl( new detail::ObjectDataOCommand( stream, data, size ))
74 : {
75 170 : _init( version, sequence, size, isLast );
76 170 : }
77 :
78 0 : ObjectDataOCommand::ObjectDataOCommand( const ObjectDataOCommand& rhs )
79 : : ObjectOCommand( rhs )
80 0 : , _impl( new detail::ObjectDataOCommand( *rhs._impl ))
81 : {
82 0 : }
83 :
84 170 : void ObjectDataOCommand::_init( const uint128_t& version,
85 : const uint32_t sequence,
86 : const uint64_t size, const bool isLast )
87 : {
88 170 : *this << version << size << sequence << isLast;
89 :
90 170 : if( _impl->stream )
91 165 : _impl->stream->streamDataHeader( *this );
92 : else
93 5 : *this << std::string() << uint32_t(0); // compressor, nChunks
94 170 : }
95 :
96 340 : ObjectDataOCommand::~ObjectDataOCommand()
97 : {
98 170 : if( _impl->stream && _impl->size > 0 )
99 : {
100 165 : sendHeader( _impl->size );
101 165 : const Connections& connections = getConnections();
102 330 : for( ConnectionsCIter i = connections.begin(); i != connections.end();
103 : ++i )
104 : {
105 330 : ConnectionPtr conn = *i;
106 165 : _impl->stream->sendBody( conn, _impl->data, _impl->size );
107 : }
108 : }
109 :
110 170 : delete _impl;
111 170 : }
112 :
113 1 : ObjectDataICommand ObjectDataOCommand::_getCommand( LocalNodePtr node )
114 : {
115 1 : lunchbox::Bufferb& outBuffer = getBuffer();
116 : // cppcheck-suppress unreadVariable
117 1 : uint8_t* bytes = outBuffer.getData();
118 1 : reinterpret_cast< uint64_t* >( bytes )[ 0 ] = outBuffer.getSize();
119 :
120 2 : BufferPtr inBuffer = node->allocBuffer( outBuffer.getSize( ));
121 1 : inBuffer->swap( outBuffer );
122 2 : return ObjectDataICommand( node, node, inBuffer, false );
123 : }
124 :
125 66 : }
|