-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Moving predictor to WorkingArea to make it thread safe #13706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 98 commits
de6880d
baf2a0f
3ececad
80bcf10
2856b2b
d700867
f248ab8
6904a8e
35e2158
fb64e26
547377b
a0ce6b4
24393e5
496d630
1569a0e
59ad491
9492245
a0fc95d
89df03c
e67ffd1
2c43d0f
4dbd839
f4e3d62
021d1e8
86f60b2
d7bcf15
b30e81f
5626e96
1e244cb
238d6b2
030d65a
2ecadb2
6cec9da
f5c26e2
b4bec8c
87edb74
16fa4ba
f1a508f
9cdf446
643d9b8
1e58e2b
dc30e2e
facdd2c
33ff3f2
a603b43
432c7b2
7ed22c4
cf24fa1
192dc9d
0984f13
823a4d3
b6b0645
b0fe869
20683d9
194d9d2
0056f01
67b5728
1e7606b
777c2cb
f2c549d
6329f66
1c2f6ee
a5b1302
811582e
4073cb6
fb8deb4
722d277
4cb1b7f
b03ddd6
2d961b4
22bcc15
b882e26
37d76c6
c443b72
3f5dc15
c3ccf18
42dc724
65d5926
e01a160
5b31bbb
698b39a
e124882
d345449
ae0d8fa
a24c740
40450a1
c21aa0e
c5a0c38
791b44d
97fe9ce
2b35a4c
0d496db
606ba10
36893ec
44a4b39
6373f59
7959d9e
dcf2312
6b8e9ae
b5158e9
ecdf96f
1c46eff
0b223e3
2e8dcfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,27 +65,44 @@ Status AutoSkipCompressorWrapper::CompressBlock( | |
return CompressBlockAndRecord(uncompressed_data, compressed_output, | ||
out_compression_type, wa); | ||
} else { | ||
auto prediction = predictor_->Predict(); | ||
auto predictor_ptr = | ||
static_cast<AutoSkipCompressionContext*>(wa->get())->predictor; | ||
auto prediction = predictor_ptr->Predict(); | ||
if (prediction <= kProbabilityCutOff) { | ||
// decide to compress | ||
return CompressBlockAndRecord(uncompressed_data, compressed_output, | ||
out_compression_type, wa); | ||
pdillinger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
// decide to bypass compression | ||
// bypassed compression | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like you accidentally reversed some good fix such as this comment. Make sure to check other places too |
||
*out_compression_type = kNoCompression; | ||
return Status::OK(); | ||
} | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Compressor::ManagedWorkingArea AutoSkipCompressorWrapper::ObtainWorkingArea() { | ||
auto wrap_wa = wrapped_->ObtainWorkingArea(); | ||
return ManagedWorkingArea( | ||
static_cast<WorkingArea*>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. static_cast should be unnecessary |
||
new AutoSkipCompressionContext(std::move(wrap_wa))), | ||
this); | ||
} | ||
void AutoSkipCompressorWrapper::ReleaseWorkingArea(WorkingArea* wa) { | ||
delete static_cast<AutoSkipCompressionContext*>(wa); | ||
} | ||
|
||
Status AutoSkipCompressorWrapper::CompressBlockAndRecord( | ||
Slice uncompressed_data, std::string* compressed_output, | ||
CompressionType* out_compression_type, ManagedWorkingArea* wa) { | ||
auto wrap_wa = | ||
&(static_cast<AutoSkipCompressionContext*>(wa->get())->wrapped); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this function take AutoSkipCompressionContext instead of WorkingArea to avoid so many casts? |
||
Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output, | ||
out_compression_type, wa); | ||
out_compression_type, wrap_wa); | ||
// determine if it was rejected or compressed | ||
predictor_->Record(uncompressed_data, compressed_output, kOpts); | ||
auto predictor_ptr = | ||
static_cast<AutoSkipCompressionContext*>(wa->get())->predictor; | ||
predictor_ptr->Record(uncompressed_data, compressed_output, kOpts); | ||
return status; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,8 @@ | |
#include "rocksdb/advanced_compression.h" | ||
|
||
namespace ROCKSDB_NAMESPACE { | ||
// Predict rejection probability using a moving window approach | ||
// This class is not thread safe | ||
// Predicts Rejection Probability using previous using past data of certain | ||
// window size | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another accidental revert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure this PR is rebased on the upstream main after it includes all your previous fixes to auto skip. This is to not accidentally revert your changes. |
||
class CompressionRejectionProbabilityPredictor { | ||
public: | ||
CompressionRejectionProbabilityPredictor(int window_size) | ||
|
@@ -33,6 +33,20 @@ class CompressionRejectionProbabilityPredictor { | |
size_t window_size_; | ||
}; | ||
|
||
class AutoSkipCompressionContext : public Compressor::WorkingArea { | ||
public: | ||
explicit AutoSkipCompressionContext(Compressor::ManagedWorkingArea&& wa) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can call this AutoSkipWorkingArea |
||
: wrapped(std::move(wa)), | ||
predictor( | ||
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {} | ||
~AutoSkipCompressionContext() {} | ||
AutoSkipCompressionContext(const AutoSkipCompressionContext&) = delete; | ||
AutoSkipCompressionContext& operator=(const AutoSkipCompressionContext&) = | ||
delete; | ||
Compressor::ManagedWorkingArea wrapped; | ||
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor; | ||
}; | ||
|
||
class AutoSkipCompressorWrapper : public CompressorWrapper { | ||
public: | ||
const char* Name() const override; | ||
|
@@ -42,6 +56,8 @@ class AutoSkipCompressorWrapper : public CompressorWrapper { | |
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output, | ||
CompressionType* out_compression_type, | ||
ManagedWorkingArea* wa) override; | ||
ManagedWorkingArea ObtainWorkingArea() override; | ||
void ReleaseWorkingArea(WorkingArea* wa) override; | ||
|
||
private: | ||
Status CompressBlockAndRecord(Slice uncompressed_data, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the API commend on CompressBlock,
Specifically, the working area should only be used if the owner matches expectations. See BuiltinCompressorV2::CompressBlock checking owner() for example.
Also the working area is optional. It is typically provided by the RocksDB table builder but not guaranteed in general.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pdillinger If the working area is not provided or a working area with wrong owner is provided, does it mean they are not using AutoSkip? Since the predictor is in the working area, we don't have access to any prediction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think it is OK to provide degraded functionality (always attempt compression) if the WorkingArea is not provided. I might reconsider the exact contract for this in the future.
Some detail: Right now the compressed secondary cache can't efficiently provide a working area because we cannot predict which threads might access the cache, and how many times they might access it. It is perhaps unlikely that AutoSkip would be used with the compressed secondary cache, but it's possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The owner() check part is still outstanding also.