XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.cc
Go to the documentation of this file.
1
3
4#include "XrdOuc/XrdOucEnv.hh"
12
13#define XRD_TRACE m_trace->
15
16#include <algorithm>
17#include <array>
18#include <cmath>
19#include <random>
20#include <sstream>
21
22#if defined(__linux__)
23
24#include <sched.h>
25unsigned XrdThrottleManager::GetTimerListHash() {
26 int cpu = sched_getcpu();
27 if (cpu < 0) {
28 return 0;
29 }
30 return cpu % m_timer_list_size;
31}
32
33#else
34
35unsigned XrdThrottleManager::GetTimerListHash() {
36 return 0;
37}
38
39#endif
40
41const char *
42XrdThrottleManager::TraceID = "ThrottleManager";
43
45 m_trace(tP),
46 m_log(lP),
47 m_interval_length_seconds(1.0),
48 m_bytes_per_second(-1),
49 m_ops_per_second(-1),
50 m_concurrency_limit(-1),
51 m_last_round_allocation(100*1024),
52 m_loadshed_host(""),
53 m_loadshed_port(0),
54 m_loadshed_frequency(0)
55{
56}
57
58void
60{
61
62 auto max_open = config.GetMaxOpen();
63 if (max_open != -1) SetMaxOpen(max_open);
64 auto max_conn = config.GetMaxConn();
65 if (max_conn != -1) SetMaxConns(max_conn);
66 auto max_wait = config.GetMaxWait();
67 if (max_wait != -1) SetMaxWait(max_wait);
68
70 config.GetThrottleIOPSRate(),
72 static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73
74 m_trace->What = config.GetTraceLevels();
75
76 auto loadshed_host = config.GetLoadshedHost();
77 auto loadshed_port = config.GetLoadshedPort();
78 auto loadshed_freq = config.GetLoadshedFreq();
79 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80 {
81 // Loadshed specified, so set it.
82 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83 }
84}
85
86void
88{
89 TRACE(DEBUG, "Initializing the throttle manager.");
90 // Initialize all our shares to zero.
91 m_primary_bytes_shares.resize(m_max_users);
92 m_secondary_bytes_shares.resize(m_max_users);
93 m_primary_ops_shares.resize(m_max_users);
94 m_secondary_ops_shares.resize(m_max_users);
95 for (auto & waiter : m_waiter_info) {
96 waiter.m_manager = this;
97 }
98
99 // Allocate each user 100KB and 10 ops to bootstrap;
100 for (int i=0; i<m_max_users; i++)
101 {
102 m_primary_bytes_shares[i] = m_last_round_allocation;
103 m_secondary_bytes_shares[i] = 0;
104 m_primary_ops_shares[i] = 10;
105 m_secondary_ops_shares[i] = 0;
106 }
107
108 int rc;
109 pthread_t tid;
110 if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111 m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112
113}
114
115std::tuple<std::string, uint16_t>
117 // Try various potential "names" associated with the request, from the most
118 // specific to most generic.
119 std::string user;
120
121 if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
122 if (client->vorg) user = std::string(client->vorg) + ":" + user;
123 } else if (client->eaAPI) {
124 std::string request_name;
125 if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
126 }
127 if (user.empty()) {user = client->name ? client->name : "nobody";}
128 uint16_t uid = GetUid(user.c_str());
129 return std::make_tuple(user, uid);
130}
131
132/*
133 * Take as many shares as possible to fulfill the request; update
134 * request with current remaining value, or zero if satisfied.
135 */
136inline void
137XrdThrottleManager::GetShares(int &shares, int &request)
138{
139 int remaining;
140 AtomicFSub(remaining, shares, request);
141 if (remaining > 0)
142 {
143 request -= (remaining < request) ? remaining : request;
144 }
145}
146
147/*
148 * Iterate through all of the secondary shares, attempting
149 * to steal enough to fulfill the request.
150 */
151void
152XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
153{
154 if (!reqsize && !reqops) return;
155 TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
156 TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
157
158 for (int i=uid+1; i % m_max_users == uid; i++)
159 {
160 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
161 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
162 }
163
164 TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
165 TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
166}
167
168/*
169 * Increment the number of files held open by a given entity. Returns false
170 * if the user is at the maximum; in this case, the internal counter is not
171 * incremented.
172 */
173bool
174XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
175{
176 if (m_max_open == 0 && m_max_conns == 0) return true;
177
178 const std::lock_guard<std::mutex> lock(m_file_mutex);
179 auto iter = m_file_counters.find(entity);
180 unsigned long cur_open_files = 0, cur_open_conns;
181 if (m_max_open) {
182 if (iter == m_file_counters.end()) {
183 m_file_counters[entity] = 1;
184 TRACE(FILES, "User " << entity << " has opened their first file");
185 cur_open_files = 1;
186 } else if (iter->second < m_max_open) {
187 iter->second++;
188 cur_open_files = iter->second;
189 } else {
190 std::stringstream ss;
191 ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
192 TRACE(FILES, ss.str());
193 error_message = ss.str();
194 return false;
195 }
196 }
197
198 if (m_max_conns) {
199 auto pid = XrdSysThread::Num();
200 auto conn_iter = m_active_conns.find(entity);
201 auto conn_count_iter = m_conn_counters.find(entity);
202 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
203 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
204 {
205 // note: we are rolling back the increment in open files
206 if (m_max_open) iter->second--;
207 std::stringstream ss;
208 ss << "User " << entity << " has hit the limit of " << m_max_conns <<
209 " open connections";
210 TRACE(CONNS, ss.str());
211 error_message = ss.str();
212 return false;
213 }
214 if (conn_iter == m_active_conns.end()) {
215 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
216 new std::unordered_map<pid_t, unsigned long>());
217 (*conn_map)[pid] = 1;
218 m_active_conns[entity] = std::move(conn_map);
219 if (conn_count_iter == m_conn_counters.end()) {
220 m_conn_counters[entity] = 1;
221 cur_open_conns = 1;
222 } else {
223 m_conn_counters[entity] ++;
224 cur_open_conns = m_conn_counters[entity];
225 }
226 } else {
227 auto pid_iter = conn_iter->second->find(pid);
228 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
229 (*(conn_iter->second))[pid] = 1;
230 conn_count_iter->second++;
231 cur_open_conns = conn_count_iter->second;
232 } else {
233 (*(conn_iter->second))[pid] ++;
234 cur_open_conns = conn_count_iter->second;
235 }
236 }
237 TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
238 }
239 if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
240 return true;
241}
242
243
244/*
245 * Decrement the number of files held open by a given entity.
246 *
247 * Returns false if the value would have fallen below zero or
248 * if the entity isn't tracked.
249 */
250bool
251XrdThrottleManager::CloseFile(const std::string &entity)
252{
253 if (m_max_open == 0 && m_max_conns == 0) return true;
254
255 bool result = true;
256 const std::lock_guard<std::mutex> lock(m_file_mutex);
257 if (m_max_open) {
258 auto iter = m_file_counters.find(entity);
259 if (iter == m_file_counters.end()) {
260 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
261 result = false;
262 } else if (iter->second == 0) {
263 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
264 result = false;
265 } else {
266 iter->second--;
267 }
268 if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
269 " remain open");
270 }
271
272 if (m_max_conns) {
273 auto pid = XrdSysThread::Num();
274 auto conn_iter = m_active_conns.find(entity);
275 auto conn_count_iter = m_conn_counters.find(entity);
276 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
277 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
278 " tracking");
279 return false;
280 }
281 auto pid_iter = conn_iter->second->find(pid);
282 if (pid_iter == conn_iter->second->end()) {
283 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
284 " tracking");
285 return false;
286 }
287 if (pid_iter->second == 0) {
288 TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
289 " plugin thinks was idle");
290 } else {
291 pid_iter->second--;
292 }
293 if (conn_count_iter == m_conn_counters.end()) {
294 TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
295 " observed an open file");
296 } else if (pid_iter->second == 0) {
297 if (conn_count_iter->second == 0) {
298 TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
299 " throttle plugin already thought all connections were idle");
300 } else {
301 conn_count_iter->second--;
302 TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
303 << conn_count_iter->second << " active connections remain");
304 }
305 }
306 }
307
308 return result;
309}
310
311
312/*
313 * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
314 * this applies the limits as best possible, stalling the thread if necessary.
315 */
316void
317XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
318{
319 if (m_bytes_per_second < 0)
320 reqsize = 0;
321 if (m_ops_per_second < 0)
322 reqops = 0;
323 while (reqsize || reqops)
324 {
325 // Subtract the requested out of the shares
326 AtomicBeg(m_compute_var);
327 GetShares(m_primary_bytes_shares[uid], reqsize);
328 if (reqsize)
329 {
330 TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
331 GetShares(m_secondary_bytes_shares[uid], reqsize);
332 TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
333 }
334 else
335 {
336 TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
337 }
338 GetShares(m_primary_ops_shares[uid], reqops);
339 if (reqops)
340 {
341 GetShares(m_secondary_ops_shares[uid], reqops);
342 }
343 StealShares(uid, reqsize, reqops);
344 AtomicEnd(m_compute_var);
345
346 if (reqsize || reqops)
347 {
348 if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
349 if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
350 m_compute_var.Wait();
351 m_loadshed_limit_hit++;
352 }
353 }
354
355}
356
357void
358XrdThrottleManager::UserIOAccounting()
359{
360 std::chrono::steady_clock::duration::rep total_active_time = 0;
361 for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
362 auto &timerList = m_timer_list[idx];
363 std::unique_lock<std::mutex> lock(timerList.m_mutex);
364 auto timer = timerList.m_first;
365 while (timer) {
366 auto next = timer->m_next;
367 auto uid = timer->m_owner;
368 auto &waiter = m_waiter_info[uid];
369 auto recent_duration = timer->Reset();
370 waiter.m_io_time += recent_duration.count();
371
372 total_active_time += recent_duration.count();
373 timer = next;
374 }
375 }
376 m_io_active_time += total_active_time;
377}
378
379void
380XrdThrottleManager::ComputeWaiterOrder()
381{
382 // Update the IO time for long-running I/O operations. This prevents,
383 // for example, a 2-minute I/O operation from causing a spike in
384 // concurrency because it's otherwise only reported at the end.
385 UserIOAccounting();
386
387 auto now = std::chrono::steady_clock::now();
388 auto elapsed = now - m_last_waiter_recompute_time;
389 m_last_waiter_recompute_time = now;
390 std::chrono::duration<double> elapsed_secs = elapsed;
391 // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
392 // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
393 // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
394
395 // The moving average will be used to determine how close the user is to their "fair share"
396 // of the concurrency limit among the users that are waiting.
397 auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
398
399 std::vector<double> share;
400 share.resize(m_max_users);
401 size_t users_with_waiters = 0;
402 // For each user, compute their current concurrency and determine how many waiting users
403 // total there are.
404 for (int i = 0; i < m_max_users; i++)
405 {
406 auto &waiter = m_waiter_info[i];
407 auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
408 std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
409 std::chrono::duration<double> io_duration_secs = io_duration;
410 auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
411 float new_concurrency = waiter.m_concurrency;
412
413 new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
414 waiter.m_concurrency = new_concurrency;
415 if (new_concurrency > 0) {
416 TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
417 }
418 unsigned waiting;
419 {
420 std::lock_guard<std::mutex> lock(waiter.m_mutex);
421 waiting = waiter.m_waiting;
422 }
423 if (waiting > 0)
424 {
425 share[i] = new_concurrency;
426 TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
427 // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
428 // have at least some minimal load
429 if (share[i] == 0) {
430 share[i] = 0.1;
431 }
432 users_with_waiters++;
433 }
434 else
435 {
436 share[i] = 0;
437 }
438 }
439 auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
440 std::vector<uint16_t> waiter_order;
441 waiter_order.resize(m_max_users);
442
443 // Calculate the share for each user. We assume the user should get a share proportional to how
444 // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
445 // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
446 double shares_sum = 0;
447 for (int idx = 0; idx < m_max_users; idx++)
448 {
449 if (share[idx]) {
450 shares_sum += fair_share / share[idx];
451 }
452 }
453
454 // We must quantize the overall shares into an array of 1024 elements. We do this by
455 // scaling up (or down) based on the total number of shares computed above. Note this
456 // quantization can lead to an over-provisioned user being assigned zero shares; thus,
457 // we scale based on (1024-#users) so we can give one extra share to each user.
458 auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
459 size_t offset = 0;
460 for (int uid = 0; uid < m_max_users; uid++) {
461 if (share[uid] > 0) {
462 auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
463 TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
464 for (unsigned idx = 0; idx < shares; idx++)
465 {
466 waiter_order[offset % m_max_users] = uid;
467 offset++;
468 }
469 }
470 }
471 if (offset < m_max_users) {
472 for (size_t idx = offset; idx < m_max_users; idx++) {
473 waiter_order[idx] = -1;
474 }
475 }
476 // Shuffle the order to randomize the wakeup order.
477 std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
478
479 // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
480 // not move constructible, which is a requirement for std::shuffle.
481 auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
482 std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
483
484 // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
485 // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
486 m_wake_order_active = (m_wake_order_active + 1) % 2;
487
488 m_waiter_offset = 0;
489
490 // If we find ourselves below the concurrency limit because we woke up too few operations in the last
491 // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
492 // the threads will just go back to sleep.
493 if (users_with_waiters) {
494 m_waiting_users = users_with_waiters;
495 auto io_active = m_io_active.load(std::memory_order_acquire);
496 for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
497 NotifyOne();
498 }
499 }
500}
501
502void *
503XrdThrottleManager::RecomputeBootstrap(void *instance)
504{
505 XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
506 manager->Recompute();
507 return NULL;
508}
509
510void
511XrdThrottleManager::Recompute()
512{
513 while (1)
514 {
515 // The connection counter can accumulate a number of known-idle connections.
516 // We only need to keep long-term memory of idle ones. Take this chance to garbage
517 // collect old connection counters.
518 if (m_max_open || m_max_conns) {
519 const std::lock_guard<std::mutex> lock(m_file_mutex);
520 for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
521 {
522 auto & conn_count = *iter;
523 if (!conn_count.second) {
524 iter = m_active_conns.erase(iter);
525 continue;
526 }
527 for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
528 if (iter2->second == 0) {
529 iter2 = conn_count.second->erase(iter2);
530 } else {
531 iter2++;
532 }
533 }
534 if (!conn_count.second->size()) {
535 iter = m_active_conns.erase(iter);
536 } else {
537 iter++;
538 }
539 }
540 for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
541 if (!iter->second) {
542 iter = m_conn_counters.erase(iter);
543 } else {
544 iter++;
545 }
546 }
547 for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
548 if (!iter->second) {
549 iter = m_file_counters.erase(iter);
550 } else {
551 iter++;
552 }
553 }
554 }
555
556 TRACE(DEBUG, "Recomputing fairshares for throttle.");
557 RecomputeInternal();
558 ComputeWaiterOrder();
559 TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
560 XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
561 }
562}
563
564/*
565 * The heart of the manager approach.
566 *
567 * This routine periodically recomputes the shares of each current user.
568 * Each user has a "primary" and a "secondary" share. At the end of the
569 * each time interval, the remaining primary share is moved to secondary.
570 * A user can utilize both shares; if both are gone, they must block until
571 * the next recompute interval.
572 *
573 * The secondary share can be "stolen" by any other user; so, if a user
574 * is idle or under-utilizing, their share can be used by someone else.
575 * However, they can never be completely starved, as no one can steal
576 * primary share.
577 *
578 * In this way, we violate the throttle for an interval, but never starve.
579 *
580 */
581void
582XrdThrottleManager::RecomputeInternal()
583{
584 // Compute total shares for this interval;
585 float intervals_per_second = 1.0/m_interval_length_seconds;
586 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
587 float total_ops_shares = m_ops_per_second / intervals_per_second;
588
589 // Compute the number of active users; a user is active if they used
590 // any primary share during the last interval;
591 AtomicBeg(m_compute_var);
592 float active_users = 0;
593 long bytes_used = 0;
594 for (int i=0; i<m_max_users; i++)
595 {
596 int primary = AtomicFAZ(m_primary_bytes_shares[i]);
597 if (primary != m_last_round_allocation)
598 {
599 active_users++;
600 if (primary >= 0)
601 m_secondary_bytes_shares[i] = primary;
602 primary = AtomicFAZ(m_primary_ops_shares[i]);
603 if (primary >= 0)
604 m_secondary_ops_shares[i] = primary;
605 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
606 }
607 }
608
609 if (active_users == 0)
610 {
611 active_users++;
612 }
613
614 // Note we allocate the same number of shares to *all* users, not
615 // just the active ones. If a new user becomes active in the next
616 // interval, we'll go over our bandwidth budget just a bit.
617 m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
618 int ops_shares = static_cast<int>(total_ops_shares / active_users);
619 TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
620 TRACE(IOPS, "Round ops allocation " << ops_shares);
621 for (int i=0; i<m_max_users; i++)
622 {
623 m_primary_bytes_shares[i] = m_last_round_allocation;
624 m_primary_ops_shares[i] = ops_shares;
625 }
626
627 AtomicEnd(m_compute_var);
628
629 // Reset the loadshed limit counter.
630 int limit_hit = m_loadshed_limit_hit.exchange(0);
631 TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
632
633 // Update the IO counters
634 m_compute_var.Lock();
635 m_stable_io_active = m_io_active.load(std::memory_order_acquire);
636 auto io_active = m_stable_io_active;
637 m_stable_io_total = m_io_total;
638 auto io_total = m_stable_io_total;
639 auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
640 m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
641
642 m_compute_var.UnLock();
643
644 auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
645 TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
646 if (m_gstream)
647 {
648 char buf[128];
649 auto len = snprintf(buf, 128,
650 R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
651 static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
652 auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
653 if (!suc)
654 {
655 TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
656 }
657 }
658 m_compute_var.Broadcast();
659}
660
661/*
662 * Do a simple hash across the username.
663 */
664uint16_t
665XrdThrottleManager::GetUid(const std::string &username)
666{
667 std::hash<std::string> hash_fn;
668 auto hash = hash_fn(username);
669 auto uid = static_cast<uint16_t>(hash % m_max_users);
670 TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
671 return uid;
672}
673
674/*
675 * Notify a single waiter thread that it can proceed.
676 */
677void
678XrdThrottleManager::NotifyOne()
679{
680 auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
681
682 for (size_t idx = 0; idx < m_max_users; ++idx)
683 {
684 auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
685 int16_t uid = wake_order[offset % m_max_users];
686 if (uid < 0)
687 {
688 continue;
689 }
690 auto &waiter_info = m_waiter_info[uid];
691 std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
692 if (waiter_info.m_waiting) {
693 waiter_info.NotifyOne(std::move(lock));
694 return;
695 }
696 }
697}
698
699/*
700 * Create an IO timer object; increment the number of outstanding IOs.
701 */
703XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
704{
705 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
706 m_io_total++;
707
708 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
709 {
710 // If the user has essentially no concurrency, then we let them
711 // temporarily exceed the limit. This prevents potential waits for
712 // every single read for an infrequent user.
713 if (m_waiter_info[uid].m_concurrency < 1)
714 {
715 break;
716 }
717 m_loadshed_limit_hit++;
718 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
719 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
720 ok = m_waiter_info[uid].Wait();
721 if (!ok) {
722 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
723 return XrdThrottleTimer();
724 }
725 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
726 }
727
728 ok = true;
729 return XrdThrottleTimer(this, uid);
730}
731
732/*
733 * Finish recording an IO timer.
734 */
735void
736XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
737{
738 m_io_active_time += event_duration.count();
739 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
740 m_waiter_info[uid].m_io_time += event_duration.count();
741 if (old_active == static_cast<unsigned>(m_concurrency_limit))
742 {
743 // If we are below the concurrency limit threshold and have another waiter
744 // for our user, then execute it immediately. Otherwise, we will give
745 // someone else a chance to run (as we have gotten more than our share recently).
746 unsigned waiting_users = m_waiting_users;
747 if (waiting_users == 0) waiting_users = 1;
748 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
749 {
750 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
751 if (m_waiter_info[uid].m_waiting > 0)
752 {
753 m_waiter_info[uid].NotifyOne(std::move(lock));
754 return;
755 }
756 }
757 NotifyOne();
758 }
759}
760
761/*
762 * Check the counters to see if we have hit any throttle limits in the
763 * current time period. If so, shed the client randomly.
764 *
765 * If the client has already been load-shedded once and reconnected to this
766 * server, then do not load-shed it again.
767 */
768bool
769XrdThrottleManager::CheckLoadShed(const std::string &opaque)
770{
771 if (m_loadshed_port == 0)
772 {
773 return false;
774 }
775 if (m_loadshed_limit_hit == 0)
776 {
777 return false;
778 }
779 if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
780 {
781 return false;
782 }
783 if (opaque.empty())
784 {
785 return false;
786 }
787 return true;
788}
789
790void
791XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
792{
793 if (m_loadshed_port == 0)
794 {
795 return;
796 }
797 if (opaque && opaque[0])
798 {
799 XrdOucEnv env(opaque);
800 // Do not load shed client if it has already been done once.
801 if (env.Get("throttle.shed") != 0)
802 {
803 return;
804 }
805 lsOpaque = opaque;
806 lsOpaque += "&throttle.shed=1";
807 }
808 else
809 {
810 lsOpaque = "throttle.shed=1";
811 }
812}
813
814void
815XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
816{
817 host = m_loadshed_host;
818 host += "?";
819 host += opaque;
820 port = m_loadshed_port;
821}
822
823bool
824XrdThrottleManager::Waiter::Wait()
825{
826 auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
827 {
828 std::unique_lock<std::mutex> lock(m_mutex);
829 m_waiting++;
830 m_cv.wait_until(lock, timeout,
831 [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
832 m_waiting--;
833 }
834 if (std::chrono::steady_clock::now() > timeout) {
835 return false;
836 }
837 return true;
838}
#define DEBUG(x)
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition XrdTrace.hh:63
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const