XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.cc
Go to the documentation of this file.
1
3
4#include "XrdOuc/XrdOucEnv.hh"
12
13#define XRD_TRACE m_trace->
15
16#include <algorithm>
17#include <array>
18#include <cmath>
19#include <random>
20#include <sstream>
21
22#if defined(__linux__)
23
24#include <sched.h>
25unsigned XrdThrottleManager::GetTimerListHash() {
26 int cpu = sched_getcpu();
27 if (cpu < 0) {
28 return 0;
29 }
30 return cpu % m_timer_list_size;
31}
32
33#else
34
35unsigned XrdThrottleManager::GetTimerListHash() {
36 return 0;
37}
38
39#endif
40
41const char *
42XrdThrottleManager::TraceID = "ThrottleManager";
43
45 m_trace(tP),
46 m_log(lP),
47 m_interval_length_seconds(1.0),
48 m_bytes_per_second(-1),
49 m_ops_per_second(-1),
50 m_concurrency_limit(-1),
51 m_last_round_allocation(100*1024),
52 m_loadshed_host(""),
53 m_loadshed_port(0),
54 m_loadshed_frequency(0)
55{
56}
57
58void
60{
61
62 auto max_open = config.GetMaxOpen();
63 if (max_open != -1) SetMaxOpen(max_open);
64 auto max_conn = config.GetMaxConn();
65 if (max_conn != -1) SetMaxConns(max_conn);
66 auto max_wait = config.GetMaxWait();
67 if (max_wait != -1) SetMaxWait(max_wait);
68
70 config.GetThrottleIOPSRate(),
72 static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73
74 m_trace->What = config.GetTraceLevels();
75
76 auto loadshed_host = config.GetLoadshedHost();
77 auto loadshed_port = config.GetLoadshedPort();
78 auto loadshed_freq = config.GetLoadshedFreq();
79 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80 {
81 // Loadshed specified, so set it.
82 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83 }
84}
85
86void
88{
89 TRACE(DEBUG, "Initializing the throttle manager.");
90 // Initialize all our shares to zero.
91 m_primary_bytes_shares.resize(m_max_users);
92 m_secondary_bytes_shares.resize(m_max_users);
93 m_primary_ops_shares.resize(m_max_users);
94 m_secondary_ops_shares.resize(m_max_users);
95 for (auto & waiter : m_waiter_info) {
96 waiter.m_manager = this;
97 }
98
99 // Allocate each user 100KB and 10 ops to bootstrap;
100 for (int i=0; i<m_max_users; i++)
101 {
102 m_primary_bytes_shares[i] = m_last_round_allocation;
103 m_secondary_bytes_shares[i] = 0;
104 m_primary_ops_shares[i] = 10;
105 m_secondary_ops_shares[i] = 0;
106 }
107
108 int rc;
109 pthread_t tid;
110 if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111 m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112
113}
114
115std::tuple<std::string, uint16_t>
117 // client can be null, if so, return nobody
118 if (!client) {
119 return std::make_tuple("nobody", GetUid("nobody"));
120 }
121
122 // Try various potential "names" associated with the request, from the most
123 // specific to most generic.
124 std::string user;
125
126 if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
127 if (client->vorg) user = std::string(client->vorg) + ":" + user;
128 } else if (client->eaAPI) {
129 std::string request_name;
130 if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
131 }
132 if (user.empty()) {user = client->name ? client->name : "nobody";}
133 uint16_t uid = GetUid(user.c_str());
134 return std::make_tuple(user, uid);
135}
136
137/*
138 * Take as many shares as possible to fulfill the request; update
139 * request with current remaining value, or zero if satisfied.
140 */
141inline void
142XrdThrottleManager::GetShares(int &shares, int &request)
143{
144 int remaining;
145 AtomicFSub(remaining, shares, request);
146 if (remaining > 0)
147 {
148 request -= (remaining < request) ? remaining : request;
149 }
150}
151
152/*
153 * Iterate through all of the secondary shares, attempting
154 * to steal enough to fulfill the request.
155 */
156void
157XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
158{
159 if (!reqsize && !reqops) return;
160 TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
161 TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
162
163 for (int i=uid+1; i % m_max_users == uid; i++)
164 {
165 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
166 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
167 }
168
169 TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
170 TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
171}
172
173/*
174 * Increment the number of files held open by a given entity. Returns false
175 * if the user is at the maximum; in this case, the internal counter is not
176 * incremented.
177 */
178bool
179XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
180{
181 if (m_max_open == 0 && m_max_conns == 0) return true;
182
183 const std::lock_guard<std::mutex> lock(m_file_mutex);
184 auto iter = m_file_counters.find(entity);
185 unsigned long cur_open_files = 0, cur_open_conns;
186 if (m_max_open) {
187 if (iter == m_file_counters.end()) {
188 m_file_counters[entity] = 1;
189 TRACE(FILES, "User " << entity << " has opened their first file");
190 cur_open_files = 1;
191 } else if (iter->second < m_max_open) {
192 iter->second++;
193 cur_open_files = iter->second;
194 } else {
195 std::stringstream ss;
196 ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
197 TRACE(FILES, ss.str());
198 error_message = ss.str();
199 return false;
200 }
201 }
202
203 if (m_max_conns) {
204 auto pid = XrdSysThread::Num();
205 auto conn_iter = m_active_conns.find(entity);
206 auto conn_count_iter = m_conn_counters.find(entity);
207 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
208 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
209 {
210 // note: we are rolling back the increment in open files
211 if (m_max_open) iter->second--;
212 std::stringstream ss;
213 ss << "User " << entity << " has hit the limit of " << m_max_conns <<
214 " open connections";
215 TRACE(CONNS, ss.str());
216 error_message = ss.str();
217 return false;
218 }
219 if (conn_iter == m_active_conns.end()) {
220 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
221 new std::unordered_map<pid_t, unsigned long>());
222 (*conn_map)[pid] = 1;
223 m_active_conns[entity] = std::move(conn_map);
224 if (conn_count_iter == m_conn_counters.end()) {
225 m_conn_counters[entity] = 1;
226 cur_open_conns = 1;
227 } else {
228 m_conn_counters[entity] ++;
229 cur_open_conns = m_conn_counters[entity];
230 }
231 } else {
232 auto pid_iter = conn_iter->second->find(pid);
233 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
234 (*(conn_iter->second))[pid] = 1;
235 conn_count_iter->second++;
236 cur_open_conns = conn_count_iter->second;
237 } else {
238 (*(conn_iter->second))[pid] ++;
239 cur_open_conns = conn_count_iter->second;
240 }
241 }
242 TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
243 }
244 if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
245 return true;
246}
247
248
249/*
250 * Decrement the number of files held open by a given entity.
251 *
252 * Returns false if the value would have fallen below zero or
253 * if the entity isn't tracked.
254 */
255bool
256XrdThrottleManager::CloseFile(const std::string &entity)
257{
258 if (m_max_open == 0 && m_max_conns == 0) return true;
259
260 bool result = true;
261 const std::lock_guard<std::mutex> lock(m_file_mutex);
262 if (m_max_open) {
263 auto iter = m_file_counters.find(entity);
264 if (iter == m_file_counters.end()) {
265 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
266 result = false;
267 } else if (iter->second == 0) {
268 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
269 result = false;
270 } else {
271 iter->second--;
272 }
273 if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
274 " remain open");
275 }
276
277 if (m_max_conns) {
278 auto pid = XrdSysThread::Num();
279 auto conn_iter = m_active_conns.find(entity);
280 auto conn_count_iter = m_conn_counters.find(entity);
281 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
282 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
283 " tracking");
284 return false;
285 }
286 auto pid_iter = conn_iter->second->find(pid);
287 if (pid_iter == conn_iter->second->end()) {
288 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
289 " tracking");
290 return false;
291 }
292 if (pid_iter->second == 0) {
293 TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
294 " plugin thinks was idle");
295 } else {
296 pid_iter->second--;
297 }
298 if (conn_count_iter == m_conn_counters.end()) {
299 TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
300 " observed an open file");
301 } else if (pid_iter->second == 0) {
302 if (conn_count_iter->second == 0) {
303 TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
304 " throttle plugin already thought all connections were idle");
305 } else {
306 conn_count_iter->second--;
307 TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
308 << conn_count_iter->second << " active connections remain");
309 }
310 }
311 }
312
313 return result;
314}
315
316
317/*
318 * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
319 * this applies the limits as best possible, stalling the thread if necessary.
320 */
321void
322XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
323{
324 if (m_bytes_per_second < 0)
325 reqsize = 0;
326 if (m_ops_per_second < 0)
327 reqops = 0;
328 while (reqsize || reqops)
329 {
330 // Subtract the requested out of the shares
331 AtomicBeg(m_compute_var);
332 GetShares(m_primary_bytes_shares[uid], reqsize);
333 if (reqsize)
334 {
335 TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
336 GetShares(m_secondary_bytes_shares[uid], reqsize);
337 TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
338 }
339 else
340 {
341 TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
342 }
343 GetShares(m_primary_ops_shares[uid], reqops);
344 if (reqops)
345 {
346 GetShares(m_secondary_ops_shares[uid], reqops);
347 }
348 StealShares(uid, reqsize, reqops);
349 AtomicEnd(m_compute_var);
350
351 if (reqsize || reqops)
352 {
353 if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
354 if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
355 m_compute_var.Wait();
356 m_loadshed_limit_hit++;
357 }
358 }
359
360}
361
362void
363XrdThrottleManager::UserIOAccounting()
364{
365 std::chrono::steady_clock::duration::rep total_active_time = 0;
366 for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
367 auto &timerList = m_timer_list[idx];
368 std::unique_lock<std::mutex> lock(timerList.m_mutex);
369 auto timer = timerList.m_first;
370 while (timer) {
371 auto next = timer->m_next;
372 auto uid = timer->m_owner;
373 auto &waiter = m_waiter_info[uid];
374 auto recent_duration = timer->Reset();
375 waiter.m_io_time += recent_duration.count();
376
377 total_active_time += recent_duration.count();
378 timer = next;
379 }
380 }
381 m_io_active_time += total_active_time;
382}
383
384void
385XrdThrottleManager::ComputeWaiterOrder()
386{
387 // Update the IO time for long-running I/O operations. This prevents,
388 // for example, a 2-minute I/O operation from causing a spike in
389 // concurrency because it's otherwise only reported at the end.
390 UserIOAccounting();
391
392 auto now = std::chrono::steady_clock::now();
393 auto elapsed = now - m_last_waiter_recompute_time;
394 m_last_waiter_recompute_time = now;
395 std::chrono::duration<double> elapsed_secs = elapsed;
396 // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
397 // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
398 // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
399
400 // The moving average will be used to determine how close the user is to their "fair share"
401 // of the concurrency limit among the users that are waiting.
402 auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
403
404 std::vector<double> share;
405 share.resize(m_max_users);
406 size_t users_with_waiters = 0;
407 // For each user, compute their current concurrency and determine how many waiting users
408 // total there are.
409 for (int i = 0; i < m_max_users; i++)
410 {
411 auto &waiter = m_waiter_info[i];
412 auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
413 std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
414 std::chrono::duration<double> io_duration_secs = io_duration;
415 auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
416 float new_concurrency = waiter.m_concurrency;
417
418 new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
419 waiter.m_concurrency = new_concurrency;
420 if (new_concurrency > 0) {
421 TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
422 }
423 unsigned waiting;
424 {
425 std::lock_guard<std::mutex> lock(waiter.m_mutex);
426 waiting = waiter.m_waiting;
427 }
428 if (waiting > 0)
429 {
430 share[i] = new_concurrency;
431 TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
432 // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
433 // have at least some minimal load
434 if (share[i] == 0) {
435 share[i] = 0.1;
436 }
437 users_with_waiters++;
438 }
439 else
440 {
441 share[i] = 0;
442 }
443 }
444 auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
445 std::vector<uint16_t> waiter_order;
446 waiter_order.resize(m_max_users);
447
448 // Calculate the share for each user. We assume the user should get a share proportional to how
449 // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
450 // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
451 double shares_sum = 0;
452 for (int idx = 0; idx < m_max_users; idx++)
453 {
454 if (share[idx]) {
455 shares_sum += fair_share / share[idx];
456 }
457 }
458
459 // We must quantize the overall shares into an array of 1024 elements. We do this by
460 // scaling up (or down) based on the total number of shares computed above. Note this
461 // quantization can lead to an over-provisioned user being assigned zero shares; thus,
462 // we scale based on (1024-#users) so we can give one extra share to each user.
463 auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
464 size_t offset = 0;
465 for (int uid = 0; uid < m_max_users; uid++) {
466 if (share[uid] > 0) {
467 auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
468 TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
469 for (unsigned idx = 0; idx < shares; idx++)
470 {
471 waiter_order[offset % m_max_users] = uid;
472 offset++;
473 }
474 }
475 }
476 if (offset < m_max_users) {
477 for (size_t idx = offset; idx < m_max_users; idx++) {
478 waiter_order[idx] = -1;
479 }
480 }
481 // Shuffle the order to randomize the wakeup order.
482 std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
483
484 // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
485 // not move constructible, which is a requirement for std::shuffle.
486 auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
487 std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
488
489 // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
490 // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
491 m_wake_order_active = (m_wake_order_active + 1) % 2;
492
493 m_waiter_offset = 0;
494
495 // If we find ourselves below the concurrency limit because we woke up too few operations in the last
496 // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
497 // the threads will just go back to sleep.
498 if (users_with_waiters) {
499 m_waiting_users = users_with_waiters;
500 auto io_active = m_io_active.load(std::memory_order_acquire);
501 for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
502 NotifyOne();
503 }
504 }
505}
506
507void *
508XrdThrottleManager::RecomputeBootstrap(void *instance)
509{
510 XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
511 manager->Recompute();
512 return NULL;
513}
514
515void
516XrdThrottleManager::Recompute()
517{
518 while (1)
519 {
520 // The connection counter can accumulate a number of known-idle connections.
521 // We only need to keep long-term memory of idle ones. Take this chance to garbage
522 // collect old connection counters.
523 if (m_max_open || m_max_conns) {
524 const std::lock_guard<std::mutex> lock(m_file_mutex);
525 for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
526 {
527 auto & conn_count = *iter;
528 if (!conn_count.second) {
529 iter = m_active_conns.erase(iter);
530 continue;
531 }
532 for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
533 if (iter2->second == 0) {
534 iter2 = conn_count.second->erase(iter2);
535 } else {
536 iter2++;
537 }
538 }
539 if (!conn_count.second->size()) {
540 iter = m_active_conns.erase(iter);
541 } else {
542 iter++;
543 }
544 }
545 for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
546 if (!iter->second) {
547 iter = m_conn_counters.erase(iter);
548 } else {
549 iter++;
550 }
551 }
552 for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
553 if (!iter->second) {
554 iter = m_file_counters.erase(iter);
555 } else {
556 iter++;
557 }
558 }
559 }
560
561 TRACE(DEBUG, "Recomputing fairshares for throttle.");
562 RecomputeInternal();
563 ComputeWaiterOrder();
564 TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
565 XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
566 }
567}
568
569/*
570 * The heart of the manager approach.
571 *
572 * This routine periodically recomputes the shares of each current user.
573 * Each user has a "primary" and a "secondary" share. At the end of the
574 * each time interval, the remaining primary share is moved to secondary.
575 * A user can utilize both shares; if both are gone, they must block until
576 * the next recompute interval.
577 *
578 * The secondary share can be "stolen" by any other user; so, if a user
579 * is idle or under-utilizing, their share can be used by someone else.
580 * However, they can never be completely starved, as no one can steal
581 * primary share.
582 *
583 * In this way, we violate the throttle for an interval, but never starve.
584 *
585 */
586void
587XrdThrottleManager::RecomputeInternal()
588{
589 // Compute total shares for this interval;
590 float intervals_per_second = 1.0/m_interval_length_seconds;
591 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
592 float total_ops_shares = m_ops_per_second / intervals_per_second;
593
594 // Compute the number of active users; a user is active if they used
595 // any primary share during the last interval;
596 AtomicBeg(m_compute_var);
597 float active_users = 0;
598 long bytes_used = 0;
599 for (int i=0; i<m_max_users; i++)
600 {
601 int primary = AtomicFAZ(m_primary_bytes_shares[i]);
602 if (primary != m_last_round_allocation)
603 {
604 active_users++;
605 if (primary >= 0)
606 m_secondary_bytes_shares[i] = primary;
607 primary = AtomicFAZ(m_primary_ops_shares[i]);
608 if (primary >= 0)
609 m_secondary_ops_shares[i] = primary;
610 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
611 }
612 }
613
614 if (active_users == 0)
615 {
616 active_users++;
617 }
618
619 // Note we allocate the same number of shares to *all* users, not
620 // just the active ones. If a new user becomes active in the next
621 // interval, we'll go over our bandwidth budget just a bit.
622 m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
623 int ops_shares = static_cast<int>(total_ops_shares / active_users);
624 TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
625 TRACE(IOPS, "Round ops allocation " << ops_shares);
626 for (int i=0; i<m_max_users; i++)
627 {
628 m_primary_bytes_shares[i] = m_last_round_allocation;
629 m_primary_ops_shares[i] = ops_shares;
630 }
631
632 AtomicEnd(m_compute_var);
633
634 // Reset the loadshed limit counter.
635 int limit_hit = m_loadshed_limit_hit.exchange(0);
636 TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
637
638 // Update the IO counters
639 m_compute_var.Lock();
640 m_stable_io_active = m_io_active.load(std::memory_order_acquire);
641 auto io_active = m_stable_io_active;
642 m_stable_io_total = m_io_total;
643 auto io_total = m_stable_io_total;
644 auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
645 m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
646
647 m_compute_var.UnLock();
648
649 auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
650 TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
651 if (m_gstream)
652 {
653 char buf[128];
654 auto len = snprintf(buf, 128,
655 R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
656 static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
657 auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
658 if (!suc)
659 {
660 TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
661 }
662 }
663 m_compute_var.Broadcast();
664}
665
666/*
667 * Do a simple hash across the username.
668 */
669uint16_t
670XrdThrottleManager::GetUid(const std::string &username)
671{
672 std::hash<std::string> hash_fn;
673 auto hash = hash_fn(username);
674 auto uid = static_cast<uint16_t>(hash % m_max_users);
675 TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
676 return uid;
677}
678
679/*
680 * Notify a single waiter thread that it can proceed.
681 */
682void
683XrdThrottleManager::NotifyOne()
684{
685 auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
686
687 for (size_t idx = 0; idx < m_max_users; ++idx)
688 {
689 auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
690 int16_t uid = wake_order[offset % m_max_users];
691 if (uid < 0)
692 {
693 continue;
694 }
695 auto &waiter_info = m_waiter_info[uid];
696 std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
697 if (waiter_info.m_waiting) {
698 waiter_info.NotifyOne(std::move(lock));
699 return;
700 }
701 }
702}
703
704/*
705 * Create an IO timer object; increment the number of outstanding IOs.
706 */
708XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
709{
710 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
711 m_io_total++;
712
713 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
714 {
715 // If the user has essentially no concurrency, then we let them
716 // temporarily exceed the limit. This prevents potential waits for
717 // every single read for an infrequent user.
718 if (m_waiter_info[uid].m_concurrency < 1)
719 {
720 break;
721 }
722 m_loadshed_limit_hit++;
723 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
724 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
725 ok = m_waiter_info[uid].Wait();
726 if (!ok) {
727 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
728 return XrdThrottleTimer();
729 }
730 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731 }
732
733 ok = true;
734 return XrdThrottleTimer(this, uid);
735}
736
737/*
738 * Finish recording an IO timer.
739 */
740void
741XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
742{
743 m_io_active_time += event_duration.count();
744 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
745 m_waiter_info[uid].m_io_time += event_duration.count();
746 if (old_active == static_cast<unsigned>(m_concurrency_limit))
747 {
748 // If we are below the concurrency limit threshold and have another waiter
749 // for our user, then execute it immediately. Otherwise, we will give
750 // someone else a chance to run (as we have gotten more than our share recently).
751 unsigned waiting_users = m_waiting_users;
752 if (waiting_users == 0) waiting_users = 1;
753 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
754 {
755 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
756 if (m_waiter_info[uid].m_waiting > 0)
757 {
758 m_waiter_info[uid].NotifyOne(std::move(lock));
759 return;
760 }
761 }
762 NotifyOne();
763 }
764}
765
766/*
767 * Check the counters to see if we have hit any throttle limits in the
768 * current time period. If so, shed the client randomly.
769 *
770 * If the client has already been load-shedded once and reconnected to this
771 * server, then do not load-shed it again.
772 */
773bool
774XrdThrottleManager::CheckLoadShed(const std::string &opaque)
775{
776 if (m_loadshed_port == 0)
777 {
778 return false;
779 }
780 if (m_loadshed_limit_hit == 0)
781 {
782 return false;
783 }
784 if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
785 {
786 return false;
787 }
788 if (opaque.empty())
789 {
790 return false;
791 }
792 return true;
793}
794
795void
796XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
797{
798 if (m_loadshed_port == 0)
799 {
800 return;
801 }
802 if (opaque && opaque[0])
803 {
804 XrdOucEnv env(opaque);
805 // Do not load shed client if it has already been done once.
806 if (env.Get("throttle.shed") != 0)
807 {
808 return;
809 }
810 lsOpaque = opaque;
811 lsOpaque += "&throttle.shed=1";
812 }
813 else
814 {
815 lsOpaque = "throttle.shed=1";
816 }
817}
818
819void
820XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
821{
822 host = m_loadshed_host;
823 host += "?";
824 host += opaque;
825 port = m_loadshed_port;
826}
827
828bool
829XrdThrottleManager::Waiter::Wait()
830{
831 auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
832 {
833 std::unique_lock<std::mutex> lock(m_mutex);
834 m_waiting++;
835 m_cv.wait_until(lock, timeout,
836 [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
837 m_waiting--;
838 }
839 if (std::chrono::steady_clock::now() > timeout) {
840 return false;
841 }
842 return true;
843}
#define DEBUG(x)
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition XrdTrace.hh:63
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const