Skip to content

Commit 0f5b50b

Browse files
committed
wdt logging improvements + wdtPing thrift + resource controller stats + throttler thrift crash fix
Summary: wdt logging improvements log port for accept timeout and log host/port and completion for thrift service calls added wdtPing command on thrift service fixed crash in update throttler call removed wdt_service_port - should use fbwdtoptions.thrift_port (-wdt_thrift_port flag) instead Reviewed By: uddipta Differential Revision: D2897431 fb-gh-sync-id: 8ad06e572a07aa8deaa93d15984d6d94d658e7e6
1 parent 47e28aa commit 0f5b50b

File tree

5 files changed

+63
-42
lines changed

5 files changed

+63
-42
lines changed

ReceiverThread.cpp

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ std::ostream &operator<<(std::ostream &os,
2727
return os;
2828
}
2929

30+
// TODO: TLOG or such that does LOG(level) << *this << " " << rest...
31+
3032
int64_t readAtLeast(ServerSocket &s, char *buf, int64_t max, int64_t atLeast,
3133
int64_t len) {
3234
VLOG(4) << "readAtLeast len " << len << " max " << max << " atLeast "
@@ -115,13 +117,13 @@ ReceiverState ReceiverThread::listen() {
115117
threadStats_.setLocalErrorCode(code);
116118
return FAILED;
117119
}
118-
LOG(INFO) << "Sleeping after failed attempt " << retry;
120+
LOG(INFO) << *this << " Sleeping after failed attempt " << retry;
119121
/* sleep override */
120122
usleep(options_.sleep_millis * 1000);
121123
}
122124
// one more/last try (stays true if it worked above)
123125
if (socket_->listen() != OK) {
124-
LOG(ERROR) << "Unable to listen/bind despite retries";
126+
LOG(ERROR) << *this << " Unable to listen/bind despite retries";
125127
threadStats_.setLocalErrorCode(CONN_ERROR);
126128
return FAILED;
127129
}
@@ -143,13 +145,14 @@ ReceiverState ReceiverThread::acceptFirstConnection() {
143145
return ACCEPT_WITH_TIMEOUT;
144146
}
145147
if (acceptAttempts == options_.max_accept_retries) {
146-
LOG(ERROR) << "unable to accept after " << acceptAttempts << " attempts";
148+
LOG(ERROR) << *this << " Unable to accept after " << acceptAttempts
149+
<< " attempts";
147150
threadStats_.setLocalErrorCode(CONN_ERROR);
148151
return FAILED;
149152
}
150153
if (wdtParent_->getCurAbortCode() != OK) {
151-
LOG(ERROR) << "Thread marked to abort while trying to accept first"
152-
<< " connection. Num attempts " << acceptAttempts;
154+
LOG(ERROR) << *this << " Thread marked to abort while trying to accept "
155+
<< "first connection. Num attempts " << acceptAttempts;
153156
// Even though there is a transition FAILED here
154157
// getCurAbortCode() is going to be checked again in the receiveOne.
155158
// So this is pretty much irrelevant
@@ -194,7 +197,7 @@ ReceiverState ReceiverThread::acceptWithTimeout() {
194197
socket_->acceptNextConnection(timeout, curConnectionVerified_);
195198
curConnectionVerified_ = false;
196199
if (code != OK) {
197-
LOG(ERROR) << "accept() failed with timeout " << timeout;
200+
LOG(ERROR) << *this << " accept() failed with timeout " << timeout;
198201
threadStats_.setLocalErrorCode(code);
199202
return FINISH_WITH_ERROR;
200203
}
@@ -224,7 +227,7 @@ ReceiverState ReceiverThread::sendLocalCheckpoint() {
224227
checkpoints);
225228
int written = socket_->write(buf_, checkpointLen);
226229
if (written != checkpointLen) {
227-
LOG(ERROR) << "unable to write local checkpoint. write mismatch "
230+
LOG(ERROR) << *this << " unable to write local checkpoint. write mismatch "
228231
<< checkpointLen << " " << written;
229232
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
230233
return ACCEPT_WITH_TIMEOUT;
@@ -241,8 +244,8 @@ ReceiverState ReceiverThread::readNextCmd() {
241244
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
242245
Protocol::kMinBufLength, numRead_);
243246
if (numRead_ < Protocol::kMinBufLength) {
244-
LOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
245-
<< numRead_;
247+
LOG(ERROR) << *this << " socket read failure " << Protocol::kMinBufLength
248+
<< " " << numRead_;
246249
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
247250
return ACCEPT_WITH_TIMEOUT;
248251
}
@@ -259,7 +262,7 @@ ReceiverState ReceiverThread::readNextCmd() {
259262
if (cmd == Protocol::SIZE_CMD) {
260263
return PROCESS_SIZE_CMD;
261264
}
262-
LOG(ERROR) << "received an unknown cmd " << cmd;
265+
LOG(ERROR) << *this << " received an unknown cmd " << cmd;
263266
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
264267
return FINISH_WITH_ERROR;
265268
}
@@ -273,7 +276,7 @@ ReceiverState ReceiverThread::processSettingsCmd() {
273276
bool success = Protocol::decodeVersion(
274277
buf_, off_, oldOffset_ + Protocol::kMaxVersion, senderProtocolVersion);
275278
if (!success) {
276-
LOG(ERROR) << "Unable to decode version " << threadIndex_;
279+
LOG(ERROR) << *this << " Unable to decode version " << threadIndex_;
277280
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
278281
return FINISH_WITH_ERROR;
279282
}
@@ -283,13 +286,14 @@ ReceiverState ReceiverThread::processSettingsCmd() {
283286
int negotiatedProtocol = Protocol::negotiateProtocol(
284287
senderProtocolVersion, threadProtocolVersion_);
285288
if (negotiatedProtocol == 0) {
286-
LOG(WARNING) << "Can not support sender with version "
289+
LOG(WARNING) << *this << " Can not support sender with version "
287290
<< senderProtocolVersion << ", aborting!";
288291
threadStats_.setLocalErrorCode(VERSION_INCOMPATIBLE);
289292
return SEND_ABORT_CMD;
290293
} else {
291294
LOG_IF(INFO, threadProtocolVersion_ != negotiatedProtocol)
292-
<< "Changing receiver protocol version to " << negotiatedProtocol;
295+
<< *this << "Changing receiver protocol version to "
296+
<< negotiatedProtocol;
293297
threadProtocolVersion_ = negotiatedProtocol;
294298
if (negotiatedProtocol != senderProtocolVersion) {
295299
threadStats_.setLocalErrorCode(VERSION_MISMATCH);
@@ -309,7 +313,7 @@ ReceiverState ReceiverThread::processSettingsCmd() {
309313
auto senderId = settings.transferId;
310314
auto transferId = wdtParent_->getTransferId();
311315
if (transferId != senderId) {
312-
LOG(ERROR) << "Receiver and sender id mismatch " << senderId << " "
316+
LOG(ERROR) << *this << "Receiver and sender id mismatch " << senderId << " "
313317
<< transferId;
314318
threadStats_.setLocalErrorCode(ID_MISMATCH);
315319
return SEND_ABORT_CMD;
@@ -374,7 +378,7 @@ ReceiverState ReceiverThread::processFileCmd() {
374378
ErrorCode transferStatus = (ErrorCode)buf_[off_++];
375379
if (transferStatus != OK) {
376380
// TODO: use this status information to implement fail fast mode
377-
VLOG(1) << "sender entered into error state "
381+
VLOG(1) << *this << " sender entered into error state "
378382
<< errorCodeToStr(transferStatus);
379383
}
380384
int16_t headerLen = folly::loadUnaligned<int16_t>(buf_ + off_);
@@ -387,7 +391,8 @@ ReceiverState ReceiverThread::processFileCmd() {
387391
readAtLeast(*socket_, buf_ + end, bufSize_ - end, headerLen, numRead_);
388392
}
389393
if (numRead_ < headerLen) {
390-
LOG(ERROR) << "Unable to read full header " << headerLen << " " << numRead_;
394+
LOG(ERROR) << *this << " Unable to read full header " << headerLen << " "
395+
<< numRead_;
391396
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
392397
return ACCEPT_WITH_TIMEOUT;
393398
}
@@ -402,7 +407,7 @@ ReceiverState ReceiverThread::processFileCmd() {
402407
threadStats_.addHeaderBytes(headerBytes);
403408
threadStats_.addEffectiveBytes(headerBytes, 0);
404409
if (!success) {
405-
LOG(ERROR) << "Error decoding at"
410+
LOG(ERROR) << *this << " Error decoding at"
406411
<< " ooff:" << oldOffset_ << " off_: " << off_
407412
<< " numRead_: " << numRead_;
408413
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
@@ -411,7 +416,7 @@ ReceiverState ReceiverThread::processFileCmd() {
411416

412417
// received a well formed file cmd, apply the pending checkpoint update
413418
checkpointIndex_ = pendingCheckpointIndex_;
414-
VLOG(1) << "Read id:" << blockDetails.fileName
419+
VLOG(1) << *this << " Read id:" << blockDetails.fileName
415420
<< " size:" << blockDetails.dataSize << " ooff:" << oldOffset_
416421
<< " off_: " << off_ << " numRead_: " << numRead_;
417422
auto &fileCreator = wdtParent_->getFileCreator();
@@ -461,7 +466,8 @@ ReceiverState ReceiverThread::processFileCmd() {
461466
// also means no leftOver so it's ok we use buf_ from start
462467
while (writer.getTotalWritten() < blockDetails.dataSize) {
463468
if (wdtParent_->getCurAbortCode() != OK) {
464-
LOG(ERROR) << "Thread marked for abort while processing a file."
469+
LOG(ERROR) << *this << "Thread marked for abort while processing "
470+
<< blockDetails.fileName << " " << blockDetails.seqId
465471
<< " port : " << socket_->getPort();
466472
return FAILED;
467473
}
@@ -488,8 +494,8 @@ ReceiverState ReceiverThread::processFileCmd() {
488494
if (writer.getTotalWritten() != blockDetails.dataSize) {
489495
// This can only happen if there are transmission errors
490496
// Write errors to disk are already taken care of above
491-
LOG(ERROR) << "could not read entire content for " << blockDetails.fileName
492-
<< " port " << socket_->getPort();
497+
LOG(ERROR) << *this << " could not read entire content for "
498+
<< blockDetails.fileName << " port " << socket_->getPort();
493499
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
494500
return ACCEPT_WITH_TIMEOUT;
495501
}
@@ -523,14 +529,14 @@ ReceiverState ReceiverThread::processFileCmd() {
523529
numRead_ = readAtLeast(*socket_, buf_ + off_, bufSize_ - off_,
524530
Protocol::kMinBufLength, numRead_);
525531
if (numRead_ < Protocol::kMinBufLength) {
526-
LOG(ERROR) << "socket read failure " << Protocol::kMinBufLength << " "
527-
<< numRead_;
532+
LOG(ERROR) << *this << " socket read failure " << Protocol::kMinBufLength
533+
<< " " << numRead_;
528534
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
529535
return ACCEPT_WITH_TIMEOUT;
530536
}
531537
Protocol::CMD_MAGIC cmd = (Protocol::CMD_MAGIC)buf_[off_++];
532538
if (cmd != Protocol::FOOTER_CMD) {
533-
LOG(ERROR) << "Expecting footer cmd, but received " << cmd;
539+
LOG(ERROR) << *this << " Expecting footer cmd, but received " << cmd;
534540
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
535541
return FINISH_WITH_ERROR;
536542
}
@@ -540,13 +546,13 @@ ReceiverState ReceiverThread::processFileCmd() {
540546
buf_, off_, oldOffset_ + Protocol::kMaxFooter, receivedChecksum,
541547
receivedTag, (footerType_ == ENC_TAG_FOOTER));
542548
if (!success) {
543-
LOG(ERROR) << "Unable to decode footer cmd";
549+
LOG(ERROR) << *this << " Unable to decode footer cmd";
544550
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
545551
return FINISH_WITH_ERROR;
546552
}
547553
if (footerType_ == CHECKSUM_FOOTER) {
548554
if (checksum != receivedChecksum) {
549-
LOG(ERROR) << "Checksum mismatch " << checksum << " "
555+
LOG(ERROR) << *this << " Checksum mismatch " << checksum << " "
550556
<< receivedChecksum << " port " << socket_->getPort()
551557
<< " file " << blockDetails.fileName;
552558
threadStats_.setLocalErrorCode(CHECKSUM_MISMATCH);
@@ -558,7 +564,7 @@ ReceiverState ReceiverThread::processFileCmd() {
558564
blocksWaitingVerification_.emplace_back(blockDetails);
559565
if (decryptorCtxSaved) {
560566
if (!socket_->verifyTag(receivedTag)) {
561-
LOG(ERROR) << *this << "GCM encryption tag mismatch "
567+
LOG(ERROR) << *this << " GCM encryption tag mismatch "
562568
<< folly::humanify(receivedTag) << " file "
563569
<< blockDetails.fileName;
564570
threadStats_.setLocalErrorCode(ENCRYPTION_ERROR);
@@ -597,7 +603,7 @@ void ReceiverThread::markReceivedBlocksVerified() {
597603
ReceiverState ReceiverThread::processDoneCmd() {
598604
VLOG(1) << *this << " entered PROCESS_DONE_CMD state";
599605
if (numRead_ != Protocol::kMinBufLength) {
600-
LOG(ERROR) << "Unexpected state for done command"
606+
LOG(ERROR) << *this << " Unexpected state for done command"
601607
<< " off_: " << off_ << " numRead_: " << numRead_;
602608
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
603609
return FINISH_WITH_ERROR;
@@ -610,7 +616,7 @@ ReceiverState ReceiverThread::processDoneCmd() {
610616
oldOffset_ + Protocol::kMaxDone,
611617
numBlocksSend, totalSenderBytes);
612618
if (!success) {
613-
LOG(ERROR) << "Unable to decode done cmd";
619+
LOG(ERROR) << *this << " Unable to decode done cmd";
614620
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
615621
return FINISH_WITH_ERROR;
616622
}
@@ -629,7 +635,7 @@ ReceiverState ReceiverThread::processSizeCmd() {
629635
bool success = Protocol::decodeSize(
630636
buf_, off_, oldOffset_ + Protocol::kMaxSize, totalSenderBytes);
631637
if (!success) {
632-
LOG(ERROR) << "Unable to decode size cmd";
638+
LOG(ERROR) << *this << " Unable to decode size cmd";
633639
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
634640
return FINISH_WITH_ERROR;
635641
}
@@ -686,7 +692,7 @@ ReceiverState ReceiverThread::sendFileChunks() {
686692
threadStats_.addHeaderBytes(written);
687693
}
688694
if (written != off) {
689-
LOG(ERROR) << "Socket write error " << off << " " << written;
695+
LOG(ERROR) << *this << " socket write err " << off << " " << written;
690696
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
691697
execFunnel->notifyFail();
692698
return ACCEPT_WITH_TIMEOUT;
@@ -712,7 +718,7 @@ ReceiverState ReceiverThread::sendFileChunks() {
712718
numEntriesWritten += numEntriesEncoded;
713719
}
714720
if (numEntriesWritten != numParsedChunksInfo) {
715-
LOG(ERROR) << "Could not write all the file chunks "
721+
LOG(ERROR) << *this << " Could not write all the file chunks "
716722
<< numParsedChunksInfo << " " << numEntriesWritten;
717723
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
718724
execFunnel->notifyFail();
@@ -722,7 +728,8 @@ ReceiverState ReceiverThread::sendFileChunks() {
722728
int64_t toRead = 1;
723729
int64_t numRead = socket_->read(buf_, toRead);
724730
if (numRead != toRead) {
725-
LOG(ERROR) << "Socket read error " << toRead << " " << numRead;
731+
LOG(ERROR) << *this << " Socket read error " << toRead << " "
732+
<< numRead;
726733
threadStats_.setLocalErrorCode(SOCKET_READ_ERROR);
727734
execFunnel->notifyFail();
728735
return ACCEPT_WITH_TIMEOUT;
@@ -750,7 +757,7 @@ ReceiverState ReceiverThread::sendGlobalCheckpoint() {
750757

751758
int written = socket_->write(buf_, off_);
752759
if (written != off_) {
753-
LOG(ERROR) << "unable to write error checkpoints";
760+
LOG(ERROR) << *this << " unable to write error checkpoints";
754761
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
755762
return ACCEPT_WITH_TIMEOUT;
756763
} else {
@@ -785,7 +792,7 @@ ReceiverState ReceiverThread::sendDoneCmd() {
785792
VLOG(1) << *this << " entered SEND_DONE_CMD state";
786793
buf_[0] = Protocol::DONE_CMD;
787794
if (socket_->write(buf_, 1) != 1) {
788-
PLOG(ERROR) << "unable to send DONE " << threadIndex_;
795+
PLOG(ERROR) << *this << " unable to send DONE " << threadIndex_;
789796
threadStats_.setLocalErrorCode(SOCKET_WRITE_ERROR);
790797
return ACCEPT_WITH_TIMEOUT;
791798
}
@@ -883,15 +890,15 @@ ReceiverState ReceiverThread::waitForFinishOrNewCheckpoint() {
883890

884891
void ReceiverThread::start() {
885892
if (buf_ == nullptr) {
886-
LOG(ERROR) << "Unable to allocate buffer";
893+
LOG(ERROR) << *this << " Unable to allocate buffer";
887894
threadStats_.setLocalErrorCode(MEMORY_ALLOCATION_ERROR);
888895
return;
889896
}
890897
ReceiverState state = LISTEN;
891898
while (true) {
892899
ErrorCode abortCode = wdtParent_->getCurAbortCode();
893900
if (abortCode != OK) {
894-
LOG(ERROR) << "Transfer aborted " << socket_->getPort() << " "
901+
LOG(ERROR) << *this << " Transfer aborted " << socket_->getPort() << " "
895902
<< errorCodeToStr(abortCode);
896903
threadStats_.setLocalErrorCode(ABORT);
897904
break;
@@ -928,7 +935,7 @@ ErrorCode ReceiverThread::init() {
928935
return ERROR;
929936
}
930937
checkpoint_.port = socket_->getPort();
931-
LOG(INFO) << "Listening on port " << socket_->getPort();
938+
LOG(INFO) << *this << " Listening on port " << socket_->getPort();
932939
return OK;
933940
}
934941

Throttler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ class Throttler {
3030
public:
3131
/**
3232
* Utility method that configures the avg rate, peak rate and bucket limit
33-
* based on the values passed to this method and returns a shared ptr
34-
* to an instance of this throttler
33+
* based on the values passed to this method and returns a shared ptr to an
34+
* instance of this throttler. It can return nullptr if throttling is off.
3535
*/
3636
static std::shared_ptr<Throttler> makeThrottler(
3737
double avgRateBytesPerSec, double peakRateBytesPerSec,

WdtResourceController.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,16 @@ WdtResourceController::~WdtResourceController() {
302302
shutdown();
303303
}
304304

305+
ErrorCode WdtResourceController::getCounts(int32_t &numNamespaces,
306+
int32_t &numSenders,
307+
int32_t &numReceivers) {
308+
GuardLock lock(controllerMutex_);
309+
numSenders = numSenders_;
310+
numReceivers = numReceivers_;
311+
numNamespaces = namespaceMap_.size();
312+
return OK;
313+
}
314+
305315
ErrorCode WdtResourceController::createSender(
306316
const std::string &wdtNamespace, const std::string &identifier,
307317
const WdtTransferRequest &wdtOperationRequest, SenderPtr &sender) {

WdtResourceController.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ class WdtResourceController : public WdtControllerBase {
229229
/// Default global namespace
230230
static const char *const kGlobalNamespace;
231231

232+
/// Return current counts
233+
ErrorCode getCounts(int32_t &numNamespaces, int32_t &numSenders,
234+
int32_t &numReceivers);
235+
232236
protected:
233237
typedef std::shared_ptr<WdtNamespaceController> NamespaceControllerPtr;
234238
/// Get the namespace controller

build/clean.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#!/bin/sh
2-
find . \( -name "*~" -o -name "*.orig" -o -name "*.rej" \) -print0 | \
3-
xargs -0 rm -v
2+
find . \( -name "*~" -o -name "*.orig" -o -name "*.rej" -o -name "#*#" \
3+
-o -name ".#*" \) -print0 | xargs -0 rm -v

0 commit comments

Comments
 (0)