Line data Source code
1 :
2 : /* Copyright (c) 2012-2016, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "oCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "iCommand.h"
25 :
26 : namespace co
27 : {
28 : namespace detail
29 : {
30 :
31 72182 : class OCommand
32 : {
33 : public:
34 72182 : OCommand( co::Dispatcher* const dispatcher_, LocalNodePtr localNode_ )
35 72182 : : isLocked( false )
36 : , size( 0 )
37 : , dispatcher( dispatcher_ )
38 72182 : , localNode( localNode_ )
39 72182 : {}
40 :
41 : bool isLocked;
42 : uint64_t size;
43 : co::Dispatcher* const dispatcher;
44 : LocalNodePtr localNode;
45 : };
46 :
47 : }
48 :
49 72148 : OCommand::OCommand( const Connections& receivers, const uint32_t cmd,
50 72148 : const uint32_t type )
51 : : DataOStream()
52 72148 : , _impl( new detail::OCommand( 0, 0 ))
53 : {
54 72147 : _setupConnections( receivers );
55 72146 : _init( cmd, type );
56 72143 : }
57 :
58 34 : OCommand::OCommand( Dispatcher* const dispatcher, LocalNodePtr localNode,
59 34 : const uint32_t cmd, const uint32_t type )
60 : : DataOStream()
61 34 : , _impl( new detail::OCommand( dispatcher, localNode ))
62 : {
63 34 : _init( cmd, type );
64 34 : }
65 :
66 0 : OCommand::OCommand( const OCommand& rhs )
67 : : DataOStream( const_cast< OCommand& >( rhs ))
68 0 : , _impl( new detail::OCommand( *rhs._impl ))
69 : {
70 0 : }
71 :
72 144364 : OCommand::~OCommand()
73 : {
74 72182 : if( _impl->isLocked )
75 : {
76 165 : LBASSERT( _impl->size > 0 );
77 165 : const uint64_t size = _impl->size + getBuffer().getSize();
78 165 : const Connections& connections = getConnections();
79 165 : if( size < COMMAND_MINSIZE ) // Fill send to minimal size
80 : {
81 152 : const size_t delta = COMMAND_MINSIZE - size;
82 152 : void* padding = alloca( delta );
83 912 : for( ConnectionsCIter i = connections.begin();
84 608 : i != connections.end(); ++i )
85 : {
86 304 : ConnectionPtr connection = *i;
87 152 : connection->send( padding, delta, true );
88 : }
89 : }
90 990 : for( ConnectionsCIter i = connections.begin();
91 660 : i != connections.end(); ++i )
92 : {
93 330 : ConnectionPtr connection = *i;
94 165 : connection->unlockSend();
95 : }
96 165 : _impl->isLocked = false;
97 165 : _impl->size = 0;
98 165 : reset();
99 : }
100 : else
101 72017 : disable();
102 :
103 72182 : if( _impl->dispatcher )
104 : {
105 34 : LBASSERT( _impl->localNode );
106 :
107 : // #145 proper local command dispatch?
108 34 : LBASSERT( _impl->size == 0 );
109 34 : const uint64_t size = getBuffer().getSize();
110 68 : BufferPtr buffer = _impl->localNode->allocBuffer( size );
111 34 : buffer->swap( getBuffer( ));
112 34 : reinterpret_cast< uint64_t* >( buffer->getData( ))[ 0 ] = size;
113 :
114 68 : ICommand cmd( _impl->localNode, _impl->localNode, buffer, false );
115 34 : _impl->dispatcher->dispatchCommand( cmd );
116 : }
117 :
118 72182 : delete _impl;
119 72182 : }
120 :
121 72180 : void OCommand::_init( const uint32_t cmd, const uint32_t type )
122 : {
123 : #ifndef COLLAGE_BIGENDIAN
124 : // big endian hosts swap handshake commands to little endian...
125 72180 : LBASSERTINFO( cmd < CMD_NODE_MAXIMUM, std::hex << "0x" << cmd << std::dec );
126 : #endif
127 72180 : enableSave();
128 72180 : _enable();
129 72181 : *this << uint64_t( 0 )/* size */ << type << cmd;
130 72177 : }
131 :
132 165 : void OCommand::sendHeader( const uint64_t additionalSize )
133 : {
134 165 : LBASSERT( !_impl->dispatcher );
135 165 : LBASSERT( !_impl->isLocked );
136 165 : LBASSERT( additionalSize > 0 );
137 :
138 165 : const Connections& connections = getConnections();
139 330 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
140 : {
141 330 : ConnectionPtr connection = *i;
142 165 : connection->lockSend();
143 : }
144 165 : _impl->isLocked = true;
145 165 : _impl->size = additionalSize;
146 165 : flush( true );
147 165 : }
148 :
149 3 : size_t OCommand::getSize()
150 : {
151 3 : return sizeof( uint64_t ) + sizeof( uint32_t ) + sizeof( uint32_t );
152 : }
153 :
154 72140 : void OCommand::sendData( const void* buffer LB_UNUSED, const uint64_t size,
155 : const bool last LB_UNUSED )
156 : {
157 72140 : LBASSERT( !_impl->dispatcher );
158 72140 : LBASSERT( last );
159 72140 : LBASSERTINFO( size >= 16, size );
160 72139 : LBASSERT( getBuffer().getData() == buffer );
161 72136 : LBASSERT( getBuffer().getSize() == size );
162 72134 : LBASSERT( getBuffer().getMaxSize() >= COMMAND_MINSIZE );
163 :
164 : // Update size field
165 : // cppcheck-suppress unreadVariable
166 72129 : uint8_t* bytes = getBuffer().getData();
167 72128 : reinterpret_cast< uint64_t* >( bytes )[ 0 ] = _impl->size + size;
168 72128 : const uint64_t sendSize = _impl->isLocked ? size : LB_MAX( size,
169 : COMMAND_MINSIZE);
170 72128 : const Connections& connections = getConnections();
171 144273 : for( auto connection : connections )
172 : {
173 72146 : if ( connection )
174 72146 : connection->send( bytes, sendSize, _impl->isLocked );
175 : else
176 0 : LBERROR << "Can't send data, node is closed" << std::endl;
177 : }
178 72147 : }
179 :
180 66 : }
|