XRootD
XrdXrootdXeqPgrw.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d X r o o t d X e q P g r w . c c */
4 /* */
5 /* (c) 2020 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <cctype>
31 #include <cstdio>
32 #include <sys/uio.h>
33 
35 #include "XrdSys/XrdSysPlatform.hh"
36 
37 #include "XProtocol/XProtocol.hh"
38 
39 #include "Xrd/XrdBuffer.hh"
40 #include "Xrd/XrdLink.hh"
41 #include "XrdOuc/XrdOucCRC.hh"
42 #include "XrdOuc/XrdOucString.hh"
55 
56 /******************************************************************************/
57 /* G l o b a l s */
58 /******************************************************************************/
59 
61 
62 namespace
63 {
64 static const int pgPageSize = XrdProto::kXR_pgPageSZ;
65 static const int pgPageMask = XrdProto::kXR_pgPageSZ-1;
66 static const int pgUnitSize = XrdProto::kXR_pgUnitSZ;
67 }
68 
69 namespace
70 {
71 static const int pgAioMin = XrdXrootdPgrwAio::aioSZ
72  + XrdXrootdPgrwAio::aioSZ*8/10; // 1.8 of aiosz
73 static const int pgAioHalf= XrdXrootdPgrwAio::aioSZ/2;
74 }
75 
76 /******************************************************************************/
77 /* d o _ P g C l o s e */
78 /******************************************************************************/
79 
80 bool XrdXrootdProtocol::do_PgClose(XrdXrootdFile *fP, int &rc)
81 {
82  XrdXrootdPgwFob *fobP = fP->pgwFob;
83  int numErrs, numFixes, numLeft;
84 
85 // Make sure we have a fob
86 //
87  if (!fobP) return true;
88 
89 // Obtain the checksum status of this file and update statistics
90 //
91  numLeft = fobP->numOffs(&numErrs, &numFixes);
92  fP->Stats.pgUpdt(numErrs, numFixes, numLeft);
93 
94 // If there are uncorrected checksum, indicate failure. These will be logged
95 // when the fob is deleted later on.
96 //
97  if (numLeft)
98  {char ebuff[128];
99  snprintf(ebuff,sizeof(ebuff),"%d uncorrected checksum errors",numLeft);
100  rc = Response.Send(kXR_ChkSumErr, ebuff);
101  return false;
102  }
103 
104 // All is well
105 //
106  return true;
107 }
108 
109 /******************************************************************************/
110 /* d o _ P g R e a d */
111 /******************************************************************************/
112 
113 int XrdXrootdProtocol::do_PgRead()
114 {
115  int pathID;
117  numReads++;
118 
119 // Unmarshall the data
120 //
121  IO.IOLen = ntohl(Request.pgread.rlen);
122  n2hll(Request.pgread.offset, IO.Offset);
123 
124 // Perform a sanity check on the length
125 //
126  if (IO.IOLen <= 0)
127  return Response.Send(kXR_ArgInvalid, "Read length is invalid");
128 
129 // Find the file object
130 //
131  if (!FTab || !(IO.File = FTab->Get(fh.handle)))
133  "pgread does not refer to an open file");
134 
135 // Now handle the optional pathid and reqflags arguments.
136 //
137  IO.Flags = 0;
138  if (!Request.header.dlen) pathID = 0;
140  pathID = static_cast<int>(rargs->pathid);
141  if (Request.header.dlen > 1)
142  IO.Flags = static_cast<unsigned short>(rargs->reqflags);
143  }
144 
145 // Trace this
146 //
147  TRACEP(FSIO,pathID<<" pgread "<<IO.IOLen<<'@'<<IO.Offset
148  <<" fn=" <<IO.File->FileKey);
149 
150 // If we are monitoring, insert a read entry
151 //
152  if (Monitor.InOut())
155 
156 // Do statistics. They will not always be accurate because we may not be
157 // able to fully complete the I/O from the file. Note that we also count
158 // the checksums which a questionable practice.
159 //
161 
162 // Use synchronous reads unless async I/O is allowed, the read size is
163 // sufficient, and there are not too many async operations in flight.
164 //
165  if (IO.File->AsyncMode && IO.IOLen >= pgAioMin
166  && IO.Offset+IO.IOLen <= IO.File->Stats.fSize+pgAioHalf
169  {XrdXrootdProtocol *pP;
170  XrdXrootdPgrwAio *aioP=0;
171  int rc;
172 
173  if (!pathID) pP = this;
174  else {if (!(pP = VerifyStream(rc, pathID, false))) return rc;
175  if (pP->linkAioReq >= as_maxperlnk) pP = 0;
176  }
177 
178  if (pP)
179  {// Use of TmpRsp here is to avoid modying pP. It is built
180  // to contain the correct streamid for this request and the
181  // right Link for the pathID. It's used by Alloc to call
182  // XrdXrootdAioTask::Init which in turn makes a copy of TmpRsp
183  // to its own response object and also keeps the Link pointer.
184  XrdXrootdResponse TmpRsp;
185  TmpRsp = Response;
186  TmpRsp.Set(pP->Link);
187  aioP = XrdXrootdPgrwAio::Alloc(pP, TmpRsp, IO.File);
188  }
189  if (aioP)
190  {if (!IO.File->aioFob) IO.File->aioFob = new XrdXrootdAioFob;
191  aioP->Read(IO.Offset, IO.IOLen);
192  return 0;
193  }
194  SI->AsyncRej++;
195  }
196 
197 // See if an alternate path is required, offload the read
198 //
199  if (pathID) return do_Offload(&XrdXrootdProtocol::do_PgRIO, pathID);
200 
201 // Now do the read on the main path
202 //
203  return do_PgRIO();
204 }
205 
206 /******************************************************************************/
207 /* d o _ P g R I O */
208 /******************************************************************************/
209 
210 // IO.File = file to be read
211 // IO.Offset = Offset at which to read
212 // IO.IOLen = Number of bytes to read from file and write to socket
213 
214 int XrdXrootdProtocol::do_PgRIO()
215 {
216 // We restrict the maximum transfer size to generate no more than 1023 iovec
217 // elements where the first is used for the header.
218 //
219  static const int maxIOVZ = XrdSys::getIovMax();
220  static const int maxCSSZ = maxIOVZ/2 - 1;
221  static const int maxPGRD = maxCSSZ*pgPageSize; // 2,093,056 usually
222  static const int infoLen = sizeof(kXR_int64);
223 
224  struct pgReadResponse
226  kXR_int64 ofs;
227  } pgrResp;
228 
229  XrdSfsFile *sfsP = IO.File->XrdSfsp;
230  uint64_t pgrOpts = 0;
231  int dlen, fLen, lLen, rc, xframt, Quantum;
232  uint32_t csVec[maxCSSZ];
233  struct iovec iov[maxIOVZ];
234 
235 // Set flags, as needed
236 //
238 
239 // Preinitialize the header
240 //
241  pgrResp.rsp.bdy.requestid = kXR_pgread - kXR_1stRequest;
242  pgrResp.rsp.bdy.resptype = XrdProto::kXR_PartialResult;;
243  memset(pgrResp.rsp.bdy.reserved, 0, sizeof(pgrResp.rsp.bdy.reserved));
244 
245 // Calculate the total pages in the read request. Note that the first and
246 // last pages may require short reads if they are not fully aligned.
247 //
248  int pgOff, rPages, rLen = IO.IOLen;
249  rPages = XrdOucPgrwUtils::csNum(IO.Offset, IO.IOLen) * pgPageSize;
250 
251 // Compute the quantum.
252 //
253  Quantum = (maxPGRD > maxBuffsz ? maxBuffsz : maxPGRD);
254  if (rPages < Quantum) Quantum = rPages;
255 
256 // Make sure we have a large enough buffer. We may need to adjust it downward
257 // due to reallocation rounding.
258 //
259  if (!argp || Quantum < halfBSize || Quantum > argp->bsize)
260  {if ((rc = getBuff(1, Quantum)) <= 0) return rc;}
261  else if (hcNow < hcNext) hcNow++;
262  if (argp->bsize > maxPGRD) Quantum = maxPGRD;
263 
264 // Compute the number of iovec elements we need plus one for the header. The
265 // Quantum is gauranteed to be a multiple of pagesize now. Verify that this
266 // calculation was indeed correct to avoid overwriting the stack.
267 //
268  int items = Quantum / pgPageSize;
269  if (items > maxCSSZ)
270  return Response.Send(kXR_Impossible, "pgread logic error 1");
271 
272 // Preinitialize the io vector for checksums and data (leave 1st element free).
273 //
274  uint32_t *csVP = csVec;
275  char *buff = argp->buff;
276  int i = 1, n = items * 2;
277  while(i < n)
278  {iov[i ].iov_base = csVP++;
279  iov[i++].iov_len = sizeof(uint32_t);
280  iov[i ].iov_base = buff;
281  iov[i++].iov_len = pgPageSize;
282  buff += pgPageSize;
283  }
284 
285 // If this is an unaligned read, offset the unaligned segment in the buffer
286 // so that remaining pages are page-aligned. It will be reset when needed.
287 // We also calculate the actual length of the first read.
288 //
289  if ((pgOff = IO.Offset & pgPageMask))
290  {rLen = pgPageSize - pgOff;
291  buff = argp->buff + pgOff;
292  iov[2].iov_base = buff;
293  iov[2].iov_len = rLen;
294  rLen += Quantum - pgPageSize;
295  } else {
296  rLen = Quantum;
297  buff = argp->buff;
298  }
299  if (IO.IOLen < rLen) rLen = IO.IOLen;
300 
301 // Now read all of the data. For each read we must recacalculate the number
302 // of iovec elements that we will use to send the data as fewer bytes may have
303 // been read. In fact, no bytes may have been read.
304 //
305  long long ioOffset = IO.Offset;
306  do {if ((xframt = sfsP->pgRead(IO.Offset, buff, rLen, csVec, pgrOpts)) <= 0)
307  break;
308 
309  items = XrdOucPgrwUtils::csNum(IO.Offset, xframt, fLen, lLen);
310  iov[2].iov_len = fLen;
311  if (items > 1) iov[items<<1].iov_len = lLen;
312 
313  if (xframt < rLen || xframt == IO.IOLen)
314  {pgrResp.rsp.bdy.resptype = XrdProto::kXR_FinalResult;
315  IO.IOLen = 0;
316  } else {
317  IO.IOLen -= xframt; IO.Offset += xframt;
318  rLen = (IO.IOLen < Quantum ? IO.IOLen : Quantum);
319  }
320 
321  for (int i = 0; i < items; i++) csVec[i] = htonl(csVec[i]);
322 
323  pgrResp.ofs = htonll(ioOffset);
324 // char trBuff[512];
325 // snprintf(trBuff, sizeof(trBuff), "Xeq PGR: %d@%lld (%lld)\n",
326 // xframt, ioOffset, ioOffset>>12);
327 // std::cerr<<trBuff<<std::flush;
328  dlen = xframt + (items * sizeof(uint32_t));
329  if ((rc = Response.Send(pgrResp.rsp, infoLen, iov, items*2+1, dlen)) < 0)
330  return rc;
331 
332  if (pgOff)
333  {iov[2].iov_base = argp->buff;
334  iov[2].iov_len = pgPageSize;
335  buff = argp->buff;
336  pgOff = 0;
337  }
338 
339  ioOffset = IO.Offset;
340  } while(IO.IOLen > 0);
341 
342 // Determine why we ended here
343 //
344  if (xframt < 0) return fsError(xframt, 0, sfsP->error, 0, 0);
345 
346 // Return no bytes if we were tricked into sending a partial result
347 //
348  if (pgrResp.rsp.bdy.resptype != XrdProto::kXR_FinalResult)
349  {pgrResp.rsp.bdy.resptype = XrdProto::kXR_FinalResult;
350  pgrResp.rsp.bdy.dlen = 0;
351  pgrResp.ofs = htonll(IO.Offset);
352  return Response.Send(pgrResp.rsp, infoLen);
353  }
354  return 0;
355 }
356 
357 /******************************************************************************/
358 /* d o _ P g W r i t e */
359 /******************************************************************************/
360 
361 int XrdXrootdProtocol::do_PgWrite()
362 {
364  int pathID;
365  numWrites++;
366 
367 // Unmarshall the data
368 //
370  n2hll(Request.pgwrite.offset, IO.Offset);
371  pathID = Request.pgwrite.pathid;
372  IO.Flags = static_cast<unsigned short>(Request.pgwrite.reqflags);
373 
374 // Perform a sanity check on the length.
375 //
376  if (IO.IOLen <= (int)sizeof(kXR_unt32))
377  {Response.Send(kXR_ArgInvalid, "pgwrite length is invalid");
378  return Link->setEtext("pgwrite protocol violation");
379  }
380 
381 // Validate pathid, at least as much as we need to. If it's wrong then we
382 // don't know where the data is and we just let it go.
383 //
384  if (pathID && (pathID >= maxStreams || !Stream[pathID]))
385  return Response.Send(kXR_ArgInvalid, "invalid path ID");
386 
387 // Find the file object
388 //
389  if (!FTab || !(IO.File = FTab->Get(fh.handle)))
390  {IO.File = 0;
391  return do_WriteNone(pathID);
392  }
393 
394 // If the file object does not have a pgWrite object, allocate one.
395 //
396  if (IO.File->pgwFob == 0) IO.File->pgwFob = new XrdXrootdPgwFob(IO.File);
397 
398 // Trace this
399 //
400  TRACEP(FSIO, pathID<<" pgwrite "
401  <<(IO.Flags & XrdProto::kXR_pgRetry ? "retry " : "")
402  <<IO.IOLen<<'@'<<IO.Offset<<" fn=" <<IO.File->FileKey);
403 
404 // Do statistics. They will not always be accurate because we may not be
405 // able to fully complete the I/O to the file. Note that we also count
406 // the checksums which a questionable practice.
407 //
409 
410 // If we are monitoring, insert a write entry
411 //
412  if (Monitor.InOut())
415 
416 // See if an alternate path is required, offload the write
417 //
418  if (pathID) return do_Offload(&XrdXrootdProtocol::do_PgWIO, pathID);
419 
420 // Now do the write on the main path
421 //
422  return do_PgWIO(true);
423 }
424 
425 /******************************************************************************/
426 /* d o _ P g W A I O */
427 /******************************************************************************/
428 
429 // IO.File = file to be written
430 // IO.Offset = Offset at which to write
431 // IO.IOLen = Number of bytes to read from socket
432 // IO.Flags = Flags associated with request
433 
434 bool XrdXrootdProtocol::do_PgWAIO(int &rc)
435 {
436  XrdXrootdPgrwAio *aioP;
437 
438 // Make sure the client is fast enough to do this
439 //
440  if (myStalls >= as_maxstalls)
441  {SI->AsyncRej++;
442  myStalls--;
443  return false;
444  }
445 
446 // Allocate an aio request object
447 //
448  if (!(aioP = XrdXrootdPgrwAio::Alloc(this, Response, IO.File, pgwCtl)))
449  {SI->AsyncRej++;
450  return false;
451  }
452 
453 // Issue the write request
454 //
455  rc = aioP->Write(IO.Offset, IO.IOLen);
456  return true;
457 }
458 
459 /******************************************************************************/
460 /* d o _ P g W I O */
461 /******************************************************************************/
462 
463 // IO.File = file to be written
464 // IO.Offset = Offset at which to write
465 // IO.IOLen = Number of bytes to read from socket
466 // IO.Flags = Flags associated with request
467 
468 int XrdXrootdProtocol::do_PgWIO() {return do_PgWIO(true);}
469 
470 int XrdXrootdProtocol::do_PgWIO(bool isFresh)
471 {
472  struct iovec *ioV;
473  XrdSfsFile *sfsP = IO.File->XrdSfsp;
474  const char *eMsg;
475  char *buff;
476  kXR_unt32 *csVec;
477  int n, rc, Quantum, iovLen, iovNum, csNum;
478  bool isRetry = (IO.Flags & XrdProto::kXR_pgRetry) != 0;
479 
480 // Verify that we still have a control area and allocate a control object
481 // if we do not have one already. The object stays around until disconnect.
482 //
483  if (!IO.File->pgwFob)
484  return do_WriteNone(PathID, kXR_Impossible, "pgwrite logic error 1");
485  if (!pgwCtl) pgwCtl = new XrdXrootdPgwCtl(PathID);
486 
487 // If this is the first entry then check if the request is eligible for async
488 // I/O or if this is a retry request which, of course, is not eligible.
489 //
490  if (isFresh)
491  {if (IO.File->AsyncMode && IO.IOLen >= pgAioMin
493  && !isRetry && do_PgWAIO(rc)) return rc;
494  if (isRetry && !do_PgWIORetry(rc)) return rc;
495  if (!do_PgWIOSetup(pgwCtl)) return -1;
496  }
497 
498 // Either complete the current frame or start a new one. When we start a new
499 // one, the I/O will not return unless all of the data was successfully read.
500 // Hence, we update the length outstanding.
501 //
502 do{if (isFresh)
503  {if (!(ioV = pgwCtl->FrameInfo(iovNum, iovLen))) break;
504  IO.IOLen -= iovLen;
505  if ((rc = getData(this, "pgwrite", ioV, iovNum))) return rc;
506  }
507 
508 // We now have all the data, get checksum and data information
509 //
510  if (!(csVec = pgwCtl->FrameInfo(csNum, buff, Quantum, argp)))
511  return do_WriteNone(PathID, kXR_Impossible, "pgwrite logic error 2");
512 
513 // Convert checksums to host byte order
514 //
515  for (int i = 0; i < csNum; i++) csVec[i] = ntohl(csVec[i]);
516 
517 // Verify the checksums
518 //
519  XrdOucPgrwUtils::dataInfo dInfo(buff, csVec, IO.Offset, Quantum);
520  off_t bado;
521  int badc;
522  bool aOK = true;
523 
524  while(dInfo.count > 0 && !XrdOucPgrwUtils::csVer(dInfo, bado, badc))
525  {if ((eMsg = pgwCtl->boAdd(IO.File, bado, badc)))
526  return do_WriteNone(PathID, kXR_TooManyErrs, eMsg);
527  aOK = false;
528  }
529 
530 // Write the data out. The callee is responsible for unaligned writes!
531 //
532  if ((rc = sfsP->pgWrite(IO.Offset, buff, Quantum, csVec)) <= 0)
533  {IO.EInfo[0] = rc; IO.EInfo[1] = 0;
534  return do_WriteNone();
535  }
536 
537 // If this was a successful retry write, remove corrrected offset
538 //
539  if (aOK && IO.Flags & XrdProto::kXR_pgRetry)
540  IO.File->pgwFob->delOffs(IO.Offset, Quantum);
541 
542 // Update offset and advance to next frame
543 //
544  IO.Offset += Quantum;
545  isFresh = true;
546 
547  } while(pgwCtl->Advance());
548 
549 
550 // Return final result
551 //
552  buff = pgwCtl->boInfo(n);
553  return Response.Send(pgwCtl->resp, sizeof(pgwCtl->info), buff, n);
554 }
555 
556 /******************************************************************************/
557 /* d o _ P g W I O R e t r y */
558 /******************************************************************************/
559 
560 // IO.File = file to be written
561 // IO.Offset = Offset at which to write
562 // IO.IOLen = Number of bytes to read from socket
563 // IO.Flags = Flags associated with request
564 
565 bool XrdXrootdProtocol::do_PgWIORetry(int &rc)
566 {
567  static const int csLen = sizeof(kXR_unt32);
568  bool isBad;
569 
570 // Make sure the write does not cross a page bounday. For unaligned writes we
571 // can compute the exact length that we need. Otherwise, it can't be bigger
572 // than a unit's worth of data. Not precise but usually good enough.
573 //
574  if (IO.Offset & pgPageMask)
575  {int n = pgPageSize - (IO.Offset & pgPageMask);
576  isBad = IO.IOLen > (n + csLen);
577  } else isBad = IO.IOLen > pgUnitSize;
578 
579 // Deep six the write if it violates retry rules.
580 //
581  if (isBad)
582  {rc = do_WriteNone(PathID, kXR_ArgInvalid,
583  "pgwrite retry of more than one page not allowed");
584  return false;
585  }
586 
587 // Make sure that the offset is registered, if it is not, treat this as a
588 // regular write as this may have been a resend during write recovery.
589 //
590  if (!IO.File->pgwFob->hasOffs(IO.Offset, IO.IOLen - csLen))
591  {char buff[64];
592  snprintf(buff, sizeof(buff), "retry %d@%lld", IO.IOLen-csLen, IO.Offset);
593  eDest.Emsg("pgwRetry", buff, "not in error; fn=", IO.File->FileKey);
595  }
596 
597 // We can proceed with this write now.
598 //
599  return true;
600 }
601 
602 /******************************************************************************/
603 /* d o _ P g w I O S e t u p */
604 /******************************************************************************/
605 
606 // IO.File = file to be written
607 // IO.Offset = Offset at which to write
608 // IO.IOLen = Number of bytes to read from socket
609 // IO.Flags = Flags associated with request
610 
611 bool XrdXrootdProtocol::do_PgWIOSetup(XrdXrootdPgwCtl *pgwCtl)
612 {
613  const char *eMsg;
614  int Quantum;
615 
616 // Compute the minimum (4K) or maximum buffer size we will use.
617 //
619  Quantum = (IO.IOLen < pgPageSize ? pgPageSize : IO.IOLen);
620  else Quantum = XrdXrootdPgwCtl::maxBSize;
621 
622 // Make sure we have a large enough buffer
623 //
624  if (!argp || Quantum < halfBSize || argp->bsize < Quantum
626  {if (getBuff(0, Quantum) <= 0) return -1;}
627  else if (hcNow < hcNext) hcNow++;
628 
629 // Do the setup. If it fails then either the client sent an incorrect stream
630 // of the header was corrupted. In either case, it doesn't matter as we can't
631 // depend on the information to clear the stream. So, we close the connection.
632 //
633  if ((eMsg = pgwCtl->Setup(argp, IO.Offset, IO.IOLen)))
635  Link->setEtext("pgwrite protocol violation");
636  return false;
637  }
638  return true;
639 }
@ kXR_ArgInvalid
Definition: XProtocol.hh:990
@ kXR_Impossible
Definition: XProtocol.hh:1021
@ kXR_TooManyErrs
Definition: XProtocol.hh:1023
@ kXR_ChkSumErr
Definition: XProtocol.hh:1009
@ kXR_FileNotOpen
Definition: XProtocol.hh:994
kXR_char fhandle[4]
Definition: XProtocol.hh:531
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_char fhandle[4]
Definition: XProtocol.hh:509
@ kXR_1stRequest
Definition: XProtocol.hh:111
@ kXR_pgread
Definition: XProtocol.hh:142
kXR_int32 dlen
Definition: XProtocol.hh:159
long long kXR_int64
Definition: XPtypes.hh:98
unsigned int kXR_unt32
Definition: XPtypes.hh:90
#define eMsg(x)
#define TRACEP(act, x)
XrdSysTrace XrdXrootdTrace
int bsize
Definition: XrdBuffer.hh:46
char * buff
Definition: XrdBuffer.hh:45
static bool csVer(dataInfo &dInfo, off_t &bado, int &badc)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static const uint64_t Verify
Options for pgRead() and pgWrite() as noted below.
XrdOucErrInfo & error
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0)
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize wrlen, uint32_t *csvec, uint64_t opts=0)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void pgrOps(int rsz, bool isRetry=false)
void pgwOps(int wsz, bool isRetry=false)
void pgUpdt(int wErrs, int wFixd, int wUnc)
XrdXrootdFile * Get(int fnum)
XrdXrootdPgwFob * pgwFob
XrdSfsFile * XrdSfsp
XrdXrootdAioFob * aioFob
XrdXrootdFileStats Stats
XrdXrootdMonitor * Agent
void Add_rd(kXR_unt32 dictid, kXR_int32 rlen, kXR_int64 offset)
void Add_wr(kXR_unt32 dictid, kXR_int32 wlen, kXR_int64 offset)
static const int aioSZ
void Read(long long offs, int dlen) override
static XrdXrootdPgrwAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP, XrdXrootdPgwBadCS *bcsP=0)
int Write(long long offs, int dlen) override
const char * boAdd(XrdXrootdFile *fP, kXR_int64 foffs, int dlen=XrdProto::kXR_pgPageSZ)
char * boInfo(int &boLen)
ServerResponseStatus resp
static const int maxBSize
struct iovec * FrameInfo(int &iovn, int &rdlen)
const char * Setup(XrdBuffer *buffP, kXR_int64 fOffs, int totlen)
ServerResponseBody_pgWrite info
bool hasOffs(kXR_int64 foffs, int dlen)
bool delOffs(kXR_int64 foffs, int dlen)
int numOffs(int *errs=0, int *fixs=0)
static XrdXrootdStats * SI
XrdXrootdProtocol * VerifyStream(int &rc, int pID, bool lok=true)
XrdXrootdProtocol * Stream[maxStreams]
XrdXrootd::IOParms IO
XrdXrootdPgwCtl * pgwCtl
XrdXrootdFileTable * FTab
static XrdSysError & eDest
int getData(gdCallBack *gdcbP, const char *dtype, char *buff, int blen)
XrdXrootdMonitor::User Monitor
XrdXrootdResponse Response
static const int maxStreams
static RAtomic_int srvrAioOps
void Set(XrdLink *lp)
long long AsyncRej
static const int kXR_pgUnitSZ
Definition: XProtocol.hh:496
static const int kXR_pgPageSZ
Definition: XProtocol.hh:494
@ kXR_PartialResult
Definition: XProtocol.hh:1251
@ kXR_FinalResult
Definition: XProtocol.hh:1250
static const int kXR_pgRetry
Definition: XProtocol.hh:503
int getIovMax()
XrdXrootdFile * File