Skip to content

Commit 34d8f03

Browse files
shubhajeetfacebook-github-bot
authored andcommitted
Moving predictor to WorkingArea to make it thread safe (#13706)
Summary: **Summary:** We need to move the Predictor to WorkingArea so that it is local to each thread and thus is thread safe. Pull Request resolved: #13706 Test Plan: It should pass the test case written in ./compression_test. Reviewed By: pdillinger Differential Revision: D76836846 Pulled By: shubhajeet fbshipit-source-id: 0d0170baf65f4bb95ba107fec77151e66b8a4449
1 parent 05996cd commit 34d8f03

File tree

3 files changed

+53
-11
lines changed

3 files changed

+53
-11
lines changed

tools/db_crashtest.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,9 +1011,7 @@ def finalize_and_sanitize(src_params):
10111011
dest_params["compression_type"] = "zstd"
10121012
dest_params["bottommost_compression_type"] = "zstd"
10131013
elif dest_params.get("compression_manager") == "autoskip":
1014-
# disabling compression parallel threads if auto skip manager is being used as the predictor is not thread safe
1015-
dest_params["compression_parallel_threads"] = 1
1016-
# esuring the compression is being used
1014+
# ensuring the compression is being used
10171015
if dest_params.get("compression_type") == "none":
10181016
dest_params["compression_type"] = random.choice(
10191017
["snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]

util/auto_skip_compressor.cc

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,30 @@ const char* AutoSkipCompressorWrapper::Name() const {
5656
Status AutoSkipCompressorWrapper::CompressBlock(
5757
Slice uncompressed_data, std::string* compressed_output,
5858
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
59+
// Check if the managed working area is provided or owned by this object.
60+
// If not, bypass auto-skip logic since the working area lacks a predictor to
61+
// record or make necessary decisions to compress or bypass compression of the
62+
// block
63+
if (wa == nullptr || wa->owner() != this) {
64+
return wrapped_->CompressBlock(uncompressed_data, compressed_output,
65+
out_compression_type, wa);
66+
}
5967
bool exploration =
6068
Random::GetTLSInstance()->PercentTrue(kExplorationPercentage);
6169
TEST_SYNC_POINT_CALLBACK(
6270
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
6371
&exploration);
72+
auto autoskip_wa = static_cast<AutoSkipWorkingArea*>(wa->get());
6473
if (exploration) {
6574
return CompressBlockAndRecord(uncompressed_data, compressed_output,
66-
out_compression_type, wa);
75+
out_compression_type, autoskip_wa);
6776
} else {
68-
auto prediction = predictor_->Predict();
77+
auto predictor_ptr = autoskip_wa->predictor;
78+
auto prediction = predictor_ptr->Predict();
6979
if (prediction <= kProbabilityCutOff) {
7080
// decide to compress
7181
return CompressBlockAndRecord(uncompressed_data, compressed_output,
72-
out_compression_type, wa);
82+
out_compression_type, autoskip_wa);
7383
} else {
7484
// decide to bypass compression
7585
*out_compression_type = kNoCompression;
@@ -79,13 +89,22 @@ Status AutoSkipCompressorWrapper::CompressBlock(
7989
return Status::OK();
8090
}
8191

92+
Compressor::ManagedWorkingArea AutoSkipCompressorWrapper::ObtainWorkingArea() {
93+
auto wrap_wa = wrapped_->ObtainWorkingArea();
94+
return ManagedWorkingArea(new AutoSkipWorkingArea(std::move(wrap_wa)), this);
95+
}
96+
void AutoSkipCompressorWrapper::ReleaseWorkingArea(WorkingArea* wa) {
97+
delete static_cast<AutoSkipWorkingArea*>(wa);
98+
}
99+
82100
Status AutoSkipCompressorWrapper::CompressBlockAndRecord(
83101
Slice uncompressed_data, std::string* compressed_output,
84-
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
102+
CompressionType* out_compression_type, AutoSkipWorkingArea* wa) {
85103
Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output,
86-
out_compression_type, wa);
104+
out_compression_type, &(wa->wrapped));
87105
// determine if it was rejected or compressed
88-
predictor_->Record(uncompressed_data, compressed_output, kOpts);
106+
auto predictor_ptr = wa->predictor;
107+
predictor_ptr->Record(uncompressed_data, compressed_output, kOpts);
89108
return status;
90109
}
91110

util/auto_skip_compressor.h

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
namespace ROCKSDB_NAMESPACE {
1515
// Predict rejection probability using a moving window approach
16-
// This class is not thread safe
1716
class CompressionRejectionProbabilityPredictor {
1817
public:
1918
CompressionRejectionProbabilityPredictor(int window_size)
@@ -33,6 +32,30 @@ class CompressionRejectionProbabilityPredictor {
3332
size_t window_size_;
3433
};
3534

35+
class AutoSkipWorkingArea : public Compressor::WorkingArea {
36+
public:
37+
explicit AutoSkipWorkingArea(Compressor::ManagedWorkingArea&& wa)
38+
: wrapped(std::move(wa)),
39+
predictor(
40+
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
41+
~AutoSkipWorkingArea() {}
42+
AutoSkipWorkingArea(const AutoSkipWorkingArea&) = delete;
43+
AutoSkipWorkingArea& operator=(const AutoSkipWorkingArea&) = delete;
44+
AutoSkipWorkingArea(AutoSkipWorkingArea&& other) noexcept
45+
: wrapped(std::move(other.wrapped)),
46+
predictor(std::move(other.predictor)) {}
47+
48+
AutoSkipWorkingArea& operator=(AutoSkipWorkingArea&& other) noexcept {
49+
if (this != &other) {
50+
wrapped = std::move(other.wrapped);
51+
predictor = std::move(other.predictor);
52+
}
53+
return *this;
54+
}
55+
Compressor::ManagedWorkingArea wrapped;
56+
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
57+
};
58+
3659
class AutoSkipCompressorWrapper : public CompressorWrapper {
3760
public:
3861
const char* Name() const override;
@@ -42,12 +65,14 @@ class AutoSkipCompressorWrapper : public CompressorWrapper {
4265
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
4366
CompressionType* out_compression_type,
4467
ManagedWorkingArea* wa) override;
68+
ManagedWorkingArea ObtainWorkingArea() override;
69+
void ReleaseWorkingArea(WorkingArea* wa) override;
4570

4671
private:
4772
Status CompressBlockAndRecord(Slice uncompressed_data,
4873
std::string* compressed_output,
4974
CompressionType* out_compression_type,
50-
ManagedWorkingArea* wa);
75+
AutoSkipWorkingArea* wa);
5176
static constexpr int kExplorationPercentage = 10;
5277
static constexpr int kProbabilityCutOff = 50;
5378
const CompressionOptions kOpts;

0 commit comments

Comments
 (0)