@@ -14,7 +14,7 @@ use crate::{
14
14
error:: Error ,
15
15
table:: {
16
16
delete_all_table_files,
17
- transaction:: { operation:: Operation as TableOperation , APPEND_KEY , REPLACE_KEY } ,
17
+ transaction:: { operation:: Operation as TableOperation , APPEND_INDEX , REPLACE_INDEX } ,
18
18
} ,
19
19
view:: transaction:: operation:: Operation as ViewOperation ,
20
20
} ;
@@ -25,7 +25,7 @@ use super::MaterializedView;
25
25
pub struct Transaction < ' view > {
26
26
materialized_view : & ' view mut MaterializedView ,
27
27
view_operations : Vec < ViewOperation > ,
28
- storage_table_operations : HashMap < String , TableOperation > ,
28
+ storage_table_operations : Vec < Option < TableOperation > > ,
29
29
branch : Option < String > ,
30
30
}
31
31
@@ -35,7 +35,7 @@ impl<'view> Transaction<'view> {
35
35
Transaction {
36
36
materialized_view : view,
37
37
view_operations : vec ! [ ] ,
38
- storage_table_operations : HashMap :: new ( ) ,
38
+ storage_table_operations : ( 0 .. 6 ) . map ( |_| None ) . collect ( ) , // 6 operation types
39
39
branch : branch. map ( ToString :: to_string) ,
40
40
}
41
41
}
@@ -69,30 +69,29 @@ impl<'view> Transaction<'view> {
69
69
refresh_state : RefreshState ,
70
70
) -> Result < Self , Error > {
71
71
let refresh_state = serde_json:: to_string ( & refresh_state) ?;
72
- self . storage_table_operations
73
- . entry ( REPLACE_KEY . to_owned ( ) )
74
- . and_modify ( |mut x| {
75
- if let TableOperation :: Replace {
76
- branch : _,
77
- files : old,
78
- additional_summary : old_lineage,
79
- } = & mut x
80
- {
81
- old. extend_from_slice ( & files) ;
82
- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
83
- REFRESH_STATE . to_owned( ) ,
84
- refresh_state. clone( ) ,
85
- ) ] ) ) ;
86
- }
87
- } )
88
- . or_insert ( TableOperation :: Replace {
72
+ if let Some ( ref mut operation) = self . storage_table_operations [ REPLACE_INDEX ] {
73
+ if let TableOperation :: Replace {
74
+ branch : _,
75
+ files : old,
76
+ additional_summary : old_lineage,
77
+ } = operation
78
+ {
79
+ old. extend_from_slice ( & files) ;
80
+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
81
+ REFRESH_STATE . to_owned( ) ,
82
+ refresh_state. clone( ) ,
83
+ ) ] ) ) ;
84
+ }
85
+ } else {
86
+ self . storage_table_operations [ REPLACE_INDEX ] = Some ( TableOperation :: Replace {
89
87
branch : self . branch . clone ( ) ,
90
88
files,
91
89
additional_summary : Some ( HashMap :: from_iter ( vec ! [ (
92
90
REFRESH_STATE . to_owned( ) ,
93
91
refresh_state,
94
92
) ] ) ) ,
95
93
} ) ;
94
+ }
96
95
Ok ( self )
97
96
}
98
97
@@ -103,24 +102,22 @@ impl<'view> Transaction<'view> {
103
102
refresh_state : RefreshState ,
104
103
) -> Result < Self , Error > {
105
104
let refresh_state = serde_json:: to_string ( & refresh_state) ?;
106
- self . storage_table_operations
107
- . entry ( APPEND_KEY . to_owned ( ) )
108
- . and_modify ( |mut x| {
109
- if let TableOperation :: Append {
110
- branch : _,
111
- data_files : old,
112
- delete_files : _,
113
- additional_summary : old_lineage,
114
- } = & mut x
115
- {
116
- old. extend_from_slice ( & files) ;
117
- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
118
- REFRESH_STATE . to_owned( ) ,
119
- refresh_state. clone( ) ,
120
- ) ] ) ) ;
121
- }
122
- } )
123
- . or_insert ( TableOperation :: Append {
105
+ if let Some ( ref mut operation) = self . storage_table_operations [ APPEND_INDEX ] {
106
+ if let TableOperation :: Append {
107
+ branch : _,
108
+ data_files : old,
109
+ delete_files : _,
110
+ additional_summary : old_lineage,
111
+ } = operation
112
+ {
113
+ old. extend_from_slice ( & files) ;
114
+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
115
+ REFRESH_STATE . to_owned( ) ,
116
+ refresh_state. clone( ) ,
117
+ ) ] ) ) ;
118
+ }
119
+ } else {
120
+ self . storage_table_operations [ APPEND_INDEX ] = Some ( TableOperation :: Append {
124
121
branch : self . branch . clone ( ) ,
125
122
data_files : files,
126
123
delete_files : Vec :: new ( ) ,
@@ -129,6 +126,7 @@ impl<'view> Transaction<'view> {
129
126
refresh_state,
130
127
) ] ) ) ,
131
128
} ) ;
129
+ }
132
130
Ok ( self )
133
131
}
134
132
@@ -139,24 +137,22 @@ impl<'view> Transaction<'view> {
139
137
refresh_state : RefreshState ,
140
138
) -> Result < Self , Error > {
141
139
let refresh_state = serde_json:: to_string ( & refresh_state) ?;
142
- self . storage_table_operations
143
- . entry ( APPEND_KEY . to_owned ( ) )
144
- . and_modify ( |mut x| {
145
- if let TableOperation :: Append {
146
- branch : _,
147
- data_files : _,
148
- delete_files : old,
149
- additional_summary : old_lineage,
150
- } = & mut x
151
- {
152
- old. extend_from_slice ( & files) ;
153
- * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
154
- REFRESH_STATE . to_owned( ) ,
155
- refresh_state. clone( ) ,
156
- ) ] ) ) ;
157
- }
158
- } )
159
- . or_insert ( TableOperation :: Append {
140
+ if let Some ( ref mut operation) = self . storage_table_operations [ APPEND_INDEX ] {
141
+ if let TableOperation :: Append {
142
+ branch : _,
143
+ data_files : _,
144
+ delete_files : old,
145
+ additional_summary : old_lineage,
146
+ } = operation
147
+ {
148
+ old. extend_from_slice ( & files) ;
149
+ * old_lineage = Some ( HashMap :: from_iter ( vec ! [ (
150
+ REFRESH_STATE . to_owned( ) ,
151
+ refresh_state. clone( ) ,
152
+ ) ] ) ) ;
153
+ }
154
+ } else {
155
+ self . storage_table_operations [ APPEND_INDEX ] = Some ( TableOperation :: Append {
160
156
branch : self . branch . clone ( ) ,
161
157
data_files : Vec :: new ( ) ,
162
158
delete_files : files,
@@ -165,6 +161,7 @@ impl<'view> Transaction<'view> {
165
161
refresh_state,
166
162
) ] ) ) ,
167
163
} ) ;
164
+ }
168
165
Ok ( self )
169
166
}
170
167
@@ -182,7 +179,8 @@ impl<'view> Transaction<'view> {
182
179
// Save old metadata to be able to remove old data after a rewrite operation
183
180
let delete_data = if self
184
181
. storage_table_operations
185
- . values ( )
182
+ . iter ( )
183
+ . flatten ( )
186
184
. any ( |x| matches ! ( x, TableOperation :: Replace { .. } ) )
187
185
{
188
186
Some ( storage_table. metadata ( ) . clone ( ) )
@@ -191,7 +189,7 @@ impl<'view> Transaction<'view> {
191
189
} ;
192
190
193
191
// Execute table operations
194
- for operation in self . storage_table_operations . into_values ( ) {
192
+ for operation in self . storage_table_operations . into_iter ( ) . flatten ( ) {
195
193
let ( requirement, update) = operation
196
194
. execute ( storage_table. metadata ( ) , storage_table. object_store ( ) )
197
195
. await ?;
0 commit comments