Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : *
4 : * This file is part of Collage <https://github.com/Eyescale/Collage>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "global.h"
21 :
22 : #include <pression/pluginRegistry.h>
23 :
24 : #include <limits>
25 : #include <stdlib.h>
26 :
27 : #include <vector>
28 : #include <sstream>
29 :
30 : namespace co
31 : {
32 : #define SEPARATOR '#'
33 :
34 : #ifndef Darwin
35 : # define BIG_SEND
36 : #endif
37 :
38 : namespace
39 : {
40 :
41 22 : static uint32_t _getObjectBufferSize()
42 : {
43 22 : const char* env = getenv( "CO_OBJECT_BUFFER_SIZE" );
44 22 : if( !env )
45 22 : return 60000;
46 :
47 0 : const int64_t size = atoi( env );
48 0 : if( size > 0 )
49 0 : return size;
50 :
51 0 : return 60000;
52 : }
53 :
54 22 : static int32_t _getTimeout()
55 : {
56 22 : const char* env = getenv( "CO_TIMEOUT" );
57 22 : if( !env )
58 22 : return 300000; // == 5min
59 :
60 0 : const int32_t size = atoi( env );
61 0 : if( size <= 0 )
62 0 : return 300000; // == 5min
63 :
64 0 : return size;
65 : }
66 :
67 : uint16_t _defaultPort = 0;
68 22 : uint32_t _objectBufferSize = _getObjectBufferSize();
69 : int32_t _iAttributes[Global::IATTR_ALL] =
70 : {
71 : 100, // INSTANCE_CACHE_SIZE
72 : 100, // NODE_SEND_QUEUE_SIZE
73 : 100, // NODE_SEND_QUEUE_AGE
74 : 10, // RSP_TIMEOUT
75 : 1, // RSP_ERROR_DOWNSCALE
76 : 5, // RSP_ERROR_UPSCALE
77 : 20, // RSP_ERROR_MAXSCALE
78 : 3, // RSP_MIN_SENDRATE_SHIFT
79 : #ifdef BIG_SEND
80 : 64, // RSP_NUM_BUFFERS
81 : 5, // RSP_ACK_FREQUENCY
82 : 65000, // UDP_MTU
83 : #else
84 : 1024, // RSP_NUM_BUFFERS
85 : 17, // RSP_ACK_FREQUENCY
86 : 1470, // UDP_MTU
87 : #endif
88 : 524288, // UDP_BUFFER_SIZE
89 : 1, // QUEUE_MIN_SIZE
90 : 1, // QUEUE_REFILL
91 : 8, // RDMA_RING_BUFFER_SIZE_MB
92 : 512, // RDMA_SEND_QUEUE_DEPTH
93 : 5000, // RDMA_RESOLVE_TIMEOUT_MS
94 : 1, // IATTR_ROBUSTNESS
95 22 : _getTimeout(), // IATTR_TIMEOUT_DEFAULT
96 : 1023, // IATTR_OBJECT_COMPRESSION
97 : 0, // IATTR_CMD_QUEUE_LIMIT
98 22 : };
99 : }
100 :
101 0 : bool Global::fromString(const std::string& data )
102 : {
103 0 : if( data.empty() || data[0] != SEPARATOR )
104 0 : return false;
105 :
106 0 : std::vector< uint32_t > newGlobals;
107 0 : newGlobals.reserve( IATTR_ALL );
108 :
109 0 : size_t endMarker( 1u );
110 : while( true )
111 : {
112 0 : const size_t startMarker = data.find( SEPARATOR, endMarker );
113 0 : if( startMarker == std::string::npos )
114 0 : break;
115 :
116 0 : endMarker = data.find( SEPARATOR, startMarker + 1 );
117 0 : if( endMarker == std::string::npos )
118 0 : break;
119 :
120 : const std::string sub = data.substr( startMarker + 1,
121 0 : endMarker - startMarker - 1 );
122 0 : if( !sub.empty() && (isdigit( sub[0] ) || sub[0] == '-') )
123 0 : newGlobals.push_back( atoi( sub.c_str( )) );
124 : else
125 0 : break;
126 0 : }
127 :
128 : // only apply a 'complete' global list
129 0 : if( newGlobals.size() != IATTR_ALL )
130 : {
131 0 : LBWARN << "Expected " << unsigned( IATTR_ALL ) << " globals, got "
132 0 : << newGlobals.size() << std::endl;
133 0 : return false;
134 : }
135 :
136 0 : std::copy( newGlobals.begin(), newGlobals.end(), _iAttributes );
137 0 : return true;
138 : }
139 :
140 0 : std::string Global::toString()
141 : {
142 0 : std::stringstream stream;
143 0 : stream << SEPARATOR << SEPARATOR;
144 :
145 0 : for( uint32_t i = 0; i < IATTR_ALL; ++i )
146 0 : stream << _iAttributes[i] << SEPARATOR;
147 :
148 0 : stream << SEPARATOR;
149 0 : return stream.str();
150 : }
151 :
152 0 : void Global::setDefaultPort( const uint16_t port )
153 : {
154 0 : _defaultPort = port;
155 0 : }
156 :
157 0 : uint16_t Global::getDefaultPort()
158 : {
159 0 : return _defaultPort;
160 : }
161 :
162 2 : void Global::setObjectBufferSize( const uint32_t size )
163 : {
164 2 : _objectBufferSize = size;
165 2 : }
166 :
167 314110 : uint32_t Global::getObjectBufferSize()
168 : {
169 314110 : return _objectBufferSize;
170 : }
171 :
172 398 : pression::PluginRegistry& Global::getPluginRegistry()
173 : {
174 398 : static pression::PluginRegistry pluginRegistry;
175 398 : return pluginRegistry;
176 : }
177 :
178 3 : void Global::setIAttribute( const IAttribute attr, const int32_t value )
179 : {
180 3 : _iAttributes[ attr ] = value;
181 3 : }
182 :
183 2141788 : int32_t Global::getIAttribute( const IAttribute attr )
184 : {
185 2141788 : return _iAttributes[ attr ];
186 : }
187 :
188 1034612 : uint32_t Global::getTimeout()
189 : {
190 1034612 : return getIAttribute( IATTR_ROBUSTNESS ) ?
191 1034637 : getIAttribute( IATTR_TIMEOUT_DEFAULT ) : LB_TIMEOUT_INDEFINITE;
192 : }
193 :
194 0 : uint32_t Global::getKeepaliveTimeout()
195 : {
196 0 : const char* env = getenv( "CO_KEEPALIVE_TIMEOUT" );
197 0 : if( !env )
198 0 : return 2000; // ms
199 :
200 0 : const int64_t size = atoi( env );
201 0 : if( size == 0 )
202 0 : return 2000; // ms
203 :
204 0 : return size;
205 : }
206 :
207 55 : size_t Global::getCommandQueueLimit()
208 : {
209 55 : const int32_t limit = getIAttribute( IATTR_CMD_QUEUE_LIMIT );
210 55 : if( limit > 0 )
211 0 : return size_t( limit ) << 10;
212 55 : return std::numeric_limits< size_t >::max();
213 : }
214 :
215 66 : }
|