Skip to content

Moving predictor to WorkingArea to make it thread safe #13706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 104 commits into from
Closed
Show file tree
Hide file tree
Changes from 98 commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
de6880d
Addding SimpleMixedCompressor, SimpleMixedCompressionManager, and inc…
shubhajeet May 28, 2025
baf2a0f
Using CompressionWrapper instead
shubhajeet May 29, 2025
3ececad
<Replace this line with a title. Use 1 line only, 67 chars or less>
shubhajeet May 30, 2025
80bcf10
merging with master branch
shubhajeet May 31, 2025
2856b2b
ldb command seems to be working fine in commandline inspection
shubhajeet May 31, 2025
d700867
using universal counter
shubhajeet May 31, 2025
f248ab8
using static counter
shubhajeet May 31, 2025
6904a8e
Removing the fprint statement used in checking
shubhajeet May 31, 2025
35e2158
Trying to avoid warning error on build
shubhajeet May 31, 2025
fb64e26
Trying to avoid warning error on build
shubhajeet May 31, 2025
547377b
SimpleMixed RoundRobinManager test only if ZSTD is linked
shubhajeet May 31, 2025
a0ce6b4
Code cleanup
shubhajeet May 31, 2025
24393e5
Hoping to fix Status not checked assertion error.
shubhajeet May 31, 2025
496d630
setting bottomost compresion as ZSTD
shubhajeet May 31, 2025
1569a0e
changed some variable name
shubhajeet May 31, 2025
59ad491
Added the compression_type mix
shubhajeet Jun 1, 2025
9492245
Code refactoring as per the reviewers comment
shubhajeet Jun 2, 2025
a0fc95d
Moving CompressBlock api
shubhajeet Jun 3, 2025
89df03c
Fixing the some lines mistakenly deleted
shubhajeet Jun 3, 2025
e67ffd1
Merge branch 'mixed_compressor' into mixed_bench
shubhajeet Jun 3, 2025
2c43d0f
Merge branch 'main' into mixed_compressor
shubhajeet Jun 3, 2025
4dbd839
Merge branch 'main' into mixed_bench
shubhajeet Jun 3, 2025
f4e3d62
Adding flag same value rate
shubhajeet Jun 3, 2025
021d1e8
Avoiding unused type
shubhajeet Jun 3, 2025
86f60b2
Merge branch 'mixed_compressor' into mixed_bench
shubhajeet Jun 3, 2025
d7bcf15
same value always returns initial part
shubhajeet Jun 3, 2025
b30e81f
Moved the block align disable to finalize options function
shubhajeet Jun 3, 2025
5626e96
Merge branch 'mixed_compressor' into mixed_bench
shubhajeet Jun 3, 2025
1e244cb
adding separate compression_manager flag
shubhajeet Jun 4, 2025
238d6b2
Code refactoring
shubhajeet Jun 4, 2025
030d65a
refactoring code
shubhajeet Jun 4, 2025
2ecadb2
reverted compressioin_type.h
shubhajeet Jun 4, 2025
6cec9da
Merge branch 'mixed_compressor' into mixed_bench
shubhajeet Jun 4, 2025
f5c26e2
Merge branch 'main' into mixed_bench
shubhajeet Jun 4, 2025
b4bec8c
Code refactoring as per reviewers comment
shubhajeet Jun 4, 2025
87edb74
Create a Random class during initialization with seed and resused it
shubhajeet Jun 5, 2025
16fa4ba
Base of the auto skip compressor
shubhajeet Jun 6, 2025
f1a508f
Added required files
shubhajeet Jun 6, 2025
9cdf446
Merge branch 'main' into auto_skip
shubhajeet Jun 6, 2025
643d9b8
Added the test file (runtime error)
shubhajeet Jun 6, 2025
1e58e2b
Auto Skip Compressor Manaager is working
shubhajeet Jun 6, 2025
dc30e2e
Fixing buck file
shubhajeet Jun 6, 2025
facdd2c
Created separate Model class
shubhajeet Jun 6, 2025
33ff3f2
CorrectPrediction pass
shubhajeet Jun 6, 2025
a603b43
Adding support in the crash test
shubhajeet Jun 7, 2025
432c7b2
Changing name
shubhajeet Jun 7, 2025
7ed22c4
Testing code indepently by Mocking Predictor
shubhajeet Jun 8, 2025
cf24fa1
properly passing unit test
shubhajeet Jun 8, 2025
192dc9d
Fixing unused variable
shubhajeet Jun 8, 2025
0984f13
Adding more useful comments
shubhajeet Jun 8, 2025
823a4d3
Refactoring
shubhajeet Jun 8, 2025
b6b0645
refactoring
shubhajeet Jun 8, 2025
b0fe869
dealing with unused type_
shubhajeet Jun 8, 2025
20683d9
Fixing syntax to create large string containing all A
shubhajeet Jun 8, 2025
194d9d2
removing type_ AutoSkipCompressorWrapper
shubhajeet Jun 8, 2025
0056f01
error in constuction of string containing 1000 A fixed.
shubhajeet Jun 8, 2025
67b5728
Fixing the method of declaring string
shubhajeet Jun 8, 2025
1e7606b
making sure test only runs when zstd is supported
shubhajeet Jun 8, 2025
777c2cb
Zstd support added
shubhajeet Jun 8, 2025
f2c549d
in order to pass static analyzer
shubhajeet Jun 9, 2025
6329f66
Using CompressorWrapper instead of MulitCompressorWrapper
shubhajeet Jun 10, 2025
1c2f6ee
Renaming constant with kXXX
shubhajeet Jun 10, 2025
a5b1302
adding helper function for friendly and unfriendly put for compression
shubhajeet Jun 10, 2025
811582e
Added options for auto_tune
shubhajeet Jun 11, 2025
4073cb6
setting zstd as the compresion manager for the auto tune as well.
shubhajeet Jun 11, 2025
fb8deb4
auto skip does not have requirement for compression type zstd
shubhajeet Jun 11, 2025
722d277
Followed how compression manager flag was added to add auto_tune
shubhajeet Jun 11, 2025
4cb1b7f
dont run the test when no compression algorithm is used
shubhajeet Jun 11, 2025
b03ddd6
checking GetSupportedCompression to have compresion type
shubhajeet Jun 11, 2025
2d961b4
TEST_SYNC_CALLBACK working
shubhajeet Jun 11, 2025
22bcc15
Moved test code to compression test
shubhajeet Jun 12, 2025
b882e26
changed kExplorationPercentage to exploration_percentage_
shubhajeet Jun 12, 2025
37d76c6
Adding a factory function for the AutoSkipCompressionManager
shubhajeet Jun 12, 2025
c443b72
<Replace this line with a title. Use 1 line only, 67 chars or less>
shubhajeet Jun 12, 2025
3f5dc15
Added CompressBlockAndRecord
shubhajeet Jun 12, 2025
c3ccf18
Unable to write test code. Flush is creating new predictor.
shubhajeet Jun 12, 2025
42dc724
hacky way compression test working
shubhajeet Jun 13, 2025
65d5926
refactored
shubhajeet Jun 13, 2025
e01a160
merging with main
shubhajeet Jun 13, 2025
5b31bbb
Fixing linter error
shubhajeet Jun 13, 2025
698b39a
updated buck file
shubhajeet Jun 13, 2025
e124882
Using our out CompressionContext AutoSkipCompressionContext
shubhajeet Jun 13, 2025
d345449
Completed moving Predictor to the working area
shubhajeet Jun 13, 2025
ae0d8fa
refactoring
shubhajeet Jun 13, 2025
a24c740
avoiding touching lot of files
shubhajeet Jun 13, 2025
40450a1
Refactoring
shubhajeet Jun 13, 2025
c21aa0e
Refactoring
shubhajeet Jun 13, 2025
c5a0c38
Adding ReleasingWorkingArea code
shubhajeet Jun 13, 2025
791b44d
storing copy of type instead of alias to type
shubhajeet Jun 13, 2025
97fe9ce
removing some comments
shubhajeet Jun 13, 2025
2b35a4c
Refactoring basesd on reviewers comment
shubhajeet Jun 14, 2025
0d496db
wrapping wa
shubhajeet Jun 17, 2025
606ba10
Fixing naming convention
shubhajeet Jun 17, 2025
36893ec
Merge branch 'main' into working_area
shubhajeet Jun 17, 2025
44a4b39
Merge branch 'main' into working_area
shubhajeet Jun 17, 2025
6373f59
db crashtest enables parallel compression thread for auto skip
shubhajeet Jun 17, 2025
7959d9e
Merge branch 'main' into working_area
shubhajeet Jun 17, 2025
dcf2312
Merge branch 'main' into working_area
shubhajeet Jun 17, 2025
6b8e9ae
Fixed grammar error in comment
shubhajeet Jun 17, 2025
b5158e9
Adding move constructor and move assignment operator
shubhajeet Jun 17, 2025
ecdf96f
Renamed AutoSkipCompressionContext -> AutoSkipWorkingArea
shubhajeet Jun 17, 2025
1c46eff
refactoring as per review
shubhajeet Jun 17, 2025
0b223e3
Dealing with null working area and working area owner is not this class.
shubhajeet Jun 17, 2025
2e8dcfb
Refactoring
shubhajeet Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,7 @@ def finalize_and_sanitize(src_params):
dest_params["compression_type"] = "zstd"
dest_params["bottommost_compression_type"] = "zstd"
elif dest_params.get("compression_manager") == "autoskip":
# disabling compression parallel threads if auto skip manager is being used as the predictor is not thread safe
dest_params["compression_parallel_threads"] = 1
# esuring the compression is being used
# ensuring the compression is being used
if dest_params.get("compression_type") == "none":
dest_params["compression_type"] = random.choice(
["snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]
Expand Down
25 changes: 21 additions & 4 deletions util/auto_skip_compressor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,44 @@ Status AutoSkipCompressorWrapper::CompressBlock(
return CompressBlockAndRecord(uncompressed_data, compressed_output,
out_compression_type, wa);
} else {
auto prediction = predictor_->Predict();
auto predictor_ptr =
static_cast<AutoSkipCompressionContext*>(wa->get())->predictor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the API commend on CompressBlock,

  // The working area is optional and used to optimize repeated compression by
  // a single thread. ManagedWorkingArea is provided rather than just
  // WorkingArea so that it can be used only if the `owner` matches expectation.
  // This could be useful for a Compressor wrapping more than one alternative
  // underlying Compressor.

Specifically, the working area should only be used if the owner matches expectations. See BuiltinCompressorV2::CompressBlock checking owner() for example.

Also the working area is optional. It is typically provided by the RocksDB table builder but not guaranteed in general.

Copy link
Contributor Author

@shubhajeet shubhajeet Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pdillinger If the working area is not provided or a working area with wrong owner is provided, does it mean they are not using AutoSkip? Since the predictor is in the working area, we don't have access to any prediction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it is OK to provide degraded functionality (always attempt compression) if the WorkingArea is not provided. I might reconsider the exact contract for this in the future.

Some detail: Right now the compressed secondary cache can't efficiently provide a working area because we cannot predict which threads might access the cache, and how many times they might access it. It is perhaps unlikely that AutoSkip would be used with the compressed secondary cache, but it's possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The owner() check part is still outstanding also.

auto prediction = predictor_ptr->Predict();
if (prediction <= kProbabilityCutOff) {
// decide to compress
return CompressBlockAndRecord(uncompressed_data, compressed_output,
out_compression_type, wa);
} else {
// decide to bypass compression
// bypassed compression
Copy link
Contributor

@hx235 hx235 Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you accidentally reversed some good fix such as this comment. Make sure to check other places too

*out_compression_type = kNoCompression;
return Status::OK();
}
}
return Status::OK();
}

Compressor::ManagedWorkingArea AutoSkipCompressorWrapper::ObtainWorkingArea() {
auto wrap_wa = wrapped_->ObtainWorkingArea();
return ManagedWorkingArea(
static_cast<WorkingArea*>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static_cast should be unnecessary

new AutoSkipCompressionContext(std::move(wrap_wa))),
this);
}
void AutoSkipCompressorWrapper::ReleaseWorkingArea(WorkingArea* wa) {
delete static_cast<AutoSkipCompressionContext*>(wa);
}

Status AutoSkipCompressorWrapper::CompressBlockAndRecord(
Slice uncompressed_data, std::string* compressed_output,
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
auto wrap_wa =
&(static_cast<AutoSkipCompressionContext*>(wa->get())->wrapped);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this function take AutoSkipCompressionContext instead of WorkingArea to avoid so many casts?

Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output,
out_compression_type, wa);
out_compression_type, wrap_wa);
// determine if it was rejected or compressed
predictor_->Record(uncompressed_data, compressed_output, kOpts);
auto predictor_ptr =
static_cast<AutoSkipCompressionContext*>(wa->get())->predictor;
predictor_ptr->Record(uncompressed_data, compressed_output, kOpts);
return status;
}

Expand Down
20 changes: 18 additions & 2 deletions util/auto_skip_compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
#include "rocksdb/advanced_compression.h"

namespace ROCKSDB_NAMESPACE {
// Predict rejection probability using a moving window approach
// This class is not thread safe
// Predicts Rejection Probability using previous using past data of certain
// window size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another accidental revert?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure this PR is rebased on the upstream main after it includes all your previous fixes to auto skip. This is to not accidentally revert your changes.

class CompressionRejectionProbabilityPredictor {
public:
CompressionRejectionProbabilityPredictor(int window_size)
Expand All @@ -33,6 +33,20 @@ class CompressionRejectionProbabilityPredictor {
size_t window_size_;
};

class AutoSkipCompressionContext : public Compressor::WorkingArea {
public:
explicit AutoSkipCompressionContext(Compressor::ManagedWorkingArea&& wa)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can call this AutoSkipWorkingArea

: wrapped(std::move(wa)),
predictor(
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {}
~AutoSkipCompressionContext() {}
AutoSkipCompressionContext(const AutoSkipCompressionContext&) = delete;
AutoSkipCompressionContext& operator=(const AutoSkipCompressionContext&) =
delete;
Compressor::ManagedWorkingArea wrapped;
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor;
};

class AutoSkipCompressorWrapper : public CompressorWrapper {
public:
const char* Name() const override;
Expand All @@ -42,6 +56,8 @@ class AutoSkipCompressorWrapper : public CompressorWrapper {
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
CompressionType* out_compression_type,
ManagedWorkingArea* wa) override;
ManagedWorkingArea ObtainWorkingArea() override;
void ReleaseWorkingArea(WorkingArea* wa) override;

private:
Status CompressBlockAndRecord(Slice uncompressed_data,
Expand Down
Loading