Skip to content

Commit 6af3fdc

Browse files
adapter/mir: include metadata about the original query
Pass a metadata flag to the server to include info about the original query before it was rewritten. In this case, we inform the server if the original query had a `WHERE IN` clause so the server can optimize the MIR and remove unnecessary post-lookups. Change-Id: Ic6c149947e18e57a4423f7b42a402daa4d2106f4 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/9680 Tested-by: Buildkite CI Reviewed-by: Jason Brown <[email protected]>
1 parent 00e6e6f commit 6af3fdc

File tree

10 files changed

+417
-154
lines changed

10 files changed

+417
-154
lines changed

logictests/post_lookup.test

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,14 @@ select sum(val) v from bar where id <= 100 group by val order by v desc limit 1
124124
----
125125
30
126126

127+
# Aggregated query, range condition, lookup column with distinct values, DESC order
128+
query I
129+
select sum(val) v from bar where id <= 100 group by val order by v desc
130+
----
131+
40
132+
30
133+
20
134+
127135
# Aggregated query, range condition, lookup column with distinct values, ASC order, LIMIT + OFFSET
128136
query I
129137
select sum(val) v from bar where id <= 100 group by val order by v asc limit 1 offset 2

nom-sql/src/select.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ pub fn nested_selection(
325325
having,
326326
order,
327327
limit_clause,
328+
metadata: vec![],
328329
};
329330

330331
Ok((i, result))

readyset-mysql/tests/integration.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3044,10 +3044,10 @@ async fn create_duplicate_named_caches() {
30443044
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a = ?")
30453045
.await
30463046
.unwrap();
3047-
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3047+
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a = ?")
30483048
.await
30493049
.unwrap();
3050-
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3050+
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a = ?")
30513051
.await
30523052
.unwrap();
30533053

@@ -3073,10 +3073,10 @@ async fn create_duplicate_query_id_and_name_caches() {
30733073
conn.query_drop("CREATE CACHE name2 FROM SELECT b FROM foo WHERE a = ?")
30743074
.await
30753075
.unwrap();
3076-
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3076+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a = 1")
30773077
.await
30783078
.unwrap();
3079-
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a IN (?, ?, ?)")
3079+
conn.query_drop("CREATE CACHE name1 FROM SELECT b FROM foo WHERE a = 3")
30803080
.await
30813081
.unwrap();
30823082

readyset-server/src/controller/sql/mir/mod.rs

Lines changed: 256 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2622,52 +2622,78 @@ impl SqlToMirConverter {
26222622
project_order,
26232623
);
26242624

2625-
let post_lookup_aggregates = if view_key.index_type == IndexType::HashMap {
2626-
// If we have aggregates under the IndexType::HashMap, they aren't necessarily
2627-
// post-lookup operations. For example, `select sum(col2) from t where col1 =
2628-
// ?`, the aggregate will be handled in the dataflow graph.
2629-
// But if the query originally contained a `where col1 in
2630-
// (?, ?)`, the aggregate does need to be executed as a
2631-
// post-lookup. Adding a post-lookup is necessary for `where in` for correctly
2632-
// aggregating results, but a mild perf impediment for aggregates with a simple
2633-
// equality (we'll run an aggregation on a single row). However, we've lost the
2634-
// "did this come from a `where in` information" way above, as it's rewritten in
2635-
// the adapter. Hence, to avoid that penalty on all users,
2636-
// only add the post-lookup to users who have opted in to
2637-
// using post-lookups.
2625+
let order_by = query_graph
2626+
.order
2627+
.as_ref()
2628+
.map(|order| order.iter().map(|(c, ot)| (Column::from(c), *ot)).collect());
2629+
2630+
let mut limit = query_graph.pagination.as_ref().map(|p| p.limit);
2631+
let offset = query_graph.pagination.as_ref().and_then(|p| p.offset);
2632+
let is_topk_query = order_by.is_some() && limit.is_some() && offset.is_none();
2633+
let is_range_query = view_key.index_type == IndexType::BTreeMap;
2634+
2635+
let are_post_lookups_required = query_graph.collapsed_where_in
2636+
|| (order_by.is_some() && !is_topk_query)
2637+
|| (is_topk_query && !self.config.allow_topk)
2638+
|| is_range_query;
2639+
2640+
let mut post_lookup_aggregates = if are_post_lookups_required {
2641+
// When a query contains WHERE col IN (?, ?, ...), it gets rewritten
2642+
// (or collapsed) to WHERE col = ? during SQL parsing, with the
2643+
// collapsed_where_in flag set to indicate this transformation.
2644+
//
2645+
// This creates a correctness issue for aggregates: the original multi-value IN clause
2646+
// should aggregate across all matching rows, but the rewritten single-value equality
2647+
// will only see one row at a time. To fix this, we need post-lookup aggregation that
2648+
// combines results from multiple point lookups.
2649+
//
2650+
// Example:
2651+
// Original: SELECT sum(amount) FROM orders WHERE id IN (1, 2, 3)
2652+
// Rewritten: SELECT sum(amount) FROM orders WHERE id = ? (executed 3 times)
2653+
// Solution: Sum the results from each execution via post-lookup aggregation
2654+
//
2655+
// Another scenario is when aggregated results are used in an order by clause
2656+
// without a topk node (either because the query didn't have a limit or the
2657+
// feature wasn't enabled). In this case, we also need post-lookup aggregation
2658+
// for correctness.
2659+
//
2660+
// And obviously if the query is a range query, we need post-lookup aggregation
2661+
// since we can't precompute aggregations over different ranges.
2662+
//
2663+
// Note: Post-lookup operations have performance overhead, so they're gated behind
2664+
// the allow_post_lookup config flag.
26382665
if self.config.allow_post_lookup {
2639-
match post_lookup_aggregates(query_graph, query_name) {
2640-
Ok(aggs) => aggs,
2641-
// This part is a hack. When we get an ReadySetError::Unsupported,
2642-
// that is because the aggregate was a AVG, COUNT(DISTINCT..), or
2643-
// SUM(DISTINCT..). We can only support those (currently!) when the
2644-
// query contained an equality clause, and
2645-
// not a `where in` clause (that was
2646-
// rewritten as an equality). As mentioned above, we don't know which
2647-
// one the original query had, thus this
2648-
// code opts to preserve the functionality
2649-
// of the simple equality. Once again, this only applies if the user
2650-
// opted in to using "experimental"
2651-
// post-lookups.
2652-
Err(ReadySetError::Unsupported(..)) => None,
2653-
Err(e) => return Err(e),
2654-
}
2666+
post_lookup_aggregates(query_graph, query_name)?
26552667
} else {
2656-
None
2668+
unsupported!(
2669+
"Queries which perform operations post-lookup are not supported"
2670+
);
26572671
}
26582672
} else {
2659-
post_lookup_aggregates(query_graph, query_name)?
2673+
None
26602674
};
26612675

2662-
let order_by = query_graph
2663-
.order
2664-
.as_ref()
2665-
.map(|order| order.iter().map(|(c, ot)| (Column::from(c), *ot)).collect());
2666-
2667-
let limit = query_graph.pagination.as_ref().map(|p| p.limit);
2676+
// If the query is a topk query, and the user has opted in to using
2677+
// topk feature, remove the limit, order by and agg from the post-lookup
2678+
// operations UNLESS the original query had a WHERE IN. In that case,
2679+
// post-lookups are required for correctness.
2680+
if self.config.allow_topk && is_topk_query && !are_post_lookups_required {
2681+
limit = None;
2682+
post_lookup_aggregates = None;
2683+
// TODO: even though we are doing topk, we still need the reader
2684+
// to order stuff. Becuase TopK communictes the diff,
2685+
// and the reader keeps the values in ASC order if ORDER BY
2686+
// is not specified. Please refer to [reader_map::Values] struct.
2687+
// order_by = None;
2688+
}
26682689

2690+
// order_by is required by almost all queries. Only complain if it does
2691+
// aggregations as well.
2692+
// If a query contains just a limit without an order by, the adapter will
2693+
// automatically remove the limit and keep it for itself, so the server won't have
2694+
// to worry about it.
26692695
if !self.config.allow_post_lookup
2670-
&& (post_lookup_aggregates.is_some() || order_by.is_some() || limit.is_some())
2696+
&& ((post_lookup_aggregates.is_some() && order_by.is_some()) || limit.is_some())
26712697
{
26722698
unsupported!("Queries which perform operations post-lookup are not supported");
26732699
}
@@ -2740,3 +2766,195 @@ impl SqlToMirConverter {
27402766
Ok(leaf)
27412767
}
27422768
}
2769+
2770+
#[cfg(test)]
2771+
mod tests {
2772+
use std::collections::HashMap;
2773+
2774+
use crate::{
2775+
controller::sql::mir::SqlToMirConverter,
2776+
sql::mir::{Config, LeafBehavior},
2777+
};
2778+
use mir::node::MirNodeInner;
2779+
use mir::NodeIndex;
2780+
use readyset_errors::ReadySetResult;
2781+
use readyset_sql::ast::{Column, ColumnSpecification, Relation, SelectMetadata, SqlType};
2782+
2783+
use crate::controller::sql::query_graph::to_query_graph;
2784+
use readyset_sql_parsing::parse_select;
2785+
2786+
fn sql_to_mir_test(
2787+
name: &str,
2788+
qg: crate::sql::query_graph::QueryGraph,
2789+
) -> ReadySetResult<(SqlToMirConverter, NodeIndex)> {
2790+
let mut converter = SqlToMirConverter::default();
2791+
converter.set_config(Config {
2792+
allow_topk: true,
2793+
allow_post_lookup: true,
2794+
..Default::default()
2795+
});
2796+
2797+
let _ = converter.make_base_node(
2798+
&Relation::from("topk_test"),
2799+
&[
2800+
ColumnSpecification {
2801+
column: Column::from("topk_test.a"),
2802+
sql_type: SqlType::Int(None),
2803+
generated: None,
2804+
constraints: vec![],
2805+
comment: None,
2806+
},
2807+
ColumnSpecification {
2808+
column: Column::from("topk_test.b"),
2809+
sql_type: SqlType::Int(None),
2810+
generated: None,
2811+
constraints: vec![],
2812+
comment: None,
2813+
},
2814+
ColumnSpecification {
2815+
column: Column::from("topk_test.c"),
2816+
sql_type: SqlType::Int(None),
2817+
generated: None,
2818+
constraints: vec![],
2819+
comment: None,
2820+
},
2821+
],
2822+
None,
2823+
)?;
2824+
2825+
let node = converter.named_query_to_mir(
2826+
&Relation::from(name),
2827+
&qg,
2828+
&HashMap::new(),
2829+
LeafBehavior::Leaf,
2830+
)?;
2831+
2832+
Ok((converter, node))
2833+
}
2834+
2835+
macro_rules! test_topk_scenario {
2836+
(
2837+
name: $test_name:ident,
2838+
query: $query_str:literal,
2839+
query_name: $query_name:literal,
2840+
collapsed_where_in: $collapsed:expr,
2841+
expect_leaf: {
2842+
aggregates: $expect_agg:expr,
2843+
order_by: $expect_order:expr,
2844+
limit: $expect_limit:expr
2845+
},
2846+
expect_topk_node: $expect_topk:expr
2847+
) => {
2848+
#[test]
2849+
fn $test_name() -> ReadySetResult<()> {
2850+
let mut query =
2851+
parse_select(readyset_sql::Dialect::PostgreSQL, $query_str).unwrap();
2852+
2853+
if $collapsed {
2854+
query.metadata.push(SelectMetadata::CollapsedWhereIn);
2855+
}
2856+
2857+
let qg = to_query_graph(query).unwrap();
2858+
let (mut converter, node) = sql_to_mir_test($query_name, qg)?;
2859+
let query = converter.make_mir_query($query_name.into(), node);
2860+
2861+
// Check leaf node properties
2862+
if let MirNodeInner::Leaf {
2863+
aggregates,
2864+
order_by,
2865+
limit,
2866+
..
2867+
} = &query.get_node(node).unwrap().inner
2868+
{
2869+
assert_eq!(aggregates.is_some(), $expect_agg, "aggregates mismatch");
2870+
assert_eq!(order_by.is_some(), $expect_order, "order_by mismatch");
2871+
assert_eq!(limit.is_some(), $expect_limit, "limit mismatch");
2872+
} else {
2873+
panic!("Expected leaf node");
2874+
}
2875+
2876+
// Check for TopK node existence
2877+
let mut has_topk = false;
2878+
for node in query.topo_nodes() {
2879+
if let MirNodeInner::TopK { .. } = &query.get_node(node).unwrap().inner {
2880+
has_topk = true;
2881+
break;
2882+
}
2883+
}
2884+
2885+
if $expect_topk {
2886+
assert!(has_topk, "topk node not found");
2887+
} else {
2888+
assert!(!has_topk, "unexpected topk node found");
2889+
}
2890+
2891+
Ok(())
2892+
}
2893+
};
2894+
}
2895+
2896+
test_topk_scenario! {
2897+
name: topk_node_exists,
2898+
query: "SELECT a FROM topk_test ORDER BY b LIMIT 3",
2899+
query_name: "q1",
2900+
collapsed_where_in: false,
2901+
expect_leaf: {
2902+
aggregates: false,
2903+
order_by: true,
2904+
limit: false
2905+
},
2906+
expect_topk_node: true
2907+
}
2908+
2909+
test_topk_scenario! {
2910+
name: topk_node_exists_with_where_in,
2911+
query: "SELECT a FROM topk_test WHERE b = 1 ORDER BY c LIMIT 3",
2912+
query_name: "q1",
2913+
collapsed_where_in: true,
2914+
expect_leaf: {
2915+
aggregates: false,
2916+
order_by: true,
2917+
limit: true
2918+
},
2919+
expect_topk_node: true
2920+
}
2921+
2922+
test_topk_scenario! {
2923+
name: aggregate_with_where_in,
2924+
query: "SELECT sum(topk_test.a) FROM topk_test WHERE b = 5 GROUP BY c ORDER BY b",
2925+
query_name: "q2",
2926+
collapsed_where_in: true,
2927+
expect_leaf: {
2928+
aggregates: true,
2929+
order_by: true,
2930+
limit: false
2931+
},
2932+
expect_topk_node: false
2933+
}
2934+
2935+
test_topk_scenario! {
2936+
name: topk_without_where_in,
2937+
query: "SELECT avg(topk_test.a) FROM topk_test WHERE topk_test.b = 5 GROUP BY topk_test.c ORDER BY topk_test.b LIMIT 10",
2938+
query_name: "q2",
2939+
collapsed_where_in: false,
2940+
expect_leaf: {
2941+
aggregates: false,
2942+
order_by: true,
2943+
limit: false
2944+
},
2945+
expect_topk_node: true
2946+
}
2947+
2948+
test_topk_scenario! {
2949+
name: topk_with_where_in,
2950+
query: "SELECT a FROM topk_test WHERE b = 5 ORDER BY a LIMIT 10",
2951+
query_name: "q1",
2952+
collapsed_where_in: true,
2953+
expect_leaf: {
2954+
aggregates: false,
2955+
order_by: true,
2956+
limit: true
2957+
},
2958+
expect_topk_node: true
2959+
}
2960+
}

readyset-server/src/controller/sql/query_graph.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use readyset_sql::analysis::{is_aggregate, ReferredColumns};
1717
use readyset_sql::ast::{
1818
self, BinaryOperator, Column, Expr, FieldDefinitionExpr, FieldReference, FunctionExpr, InValue,
1919
ItemPlaceholder, JoinConstraint, JoinOperator, JoinRightSide, LimitClause, Literal, OrderBy,
20-
OrderType, Relation, SelectStatement, SqlIdentifier, TableExpr, TableExprInner,
20+
OrderType, Relation, SelectMetadata, SelectStatement, SqlIdentifier, TableExpr, TableExprInner,
2121
};
2222
use readyset_sql::DialectDisplay;
2323
use readyset_sql_passes::{is_correlated, is_predicate, map_aggregates, LogicalOp};
@@ -294,6 +294,9 @@ pub struct QueryGraph {
294294
pub pagination: Option<Pagination>,
295295
/// True if the query is correlated (is a subquery that refers to columns in an outer query)
296296
pub is_correlated: bool,
297+
/// True if the adapter rewrote a WHERE IN clause to an equality. In this case
298+
/// post-lookup operations are required for correctness
299+
pub collapsed_where_in: bool,
297300
}
298301

299302
impl QueryGraph {
@@ -487,6 +490,7 @@ impl Hash for QueryGraph {
487490
self.order.hash(state);
488491
self.pagination.hash(state);
489492
self.is_correlated.hash(state);
493+
self.collapsed_where_in.hash(state);
490494
}
491495
}
492496

@@ -1523,6 +1527,7 @@ pub fn to_query_graph(stmt: SelectStatement) -> ReadySetResult<QueryGraph> {
15231527
pagination,
15241528
order,
15251529
is_correlated,
1530+
collapsed_where_in: stmt.metadata.contains(&SelectMetadata::CollapsedWhereIn),
15261531
})
15271532
}
15281533

0 commit comments

Comments
 (0)