1
1
import copy
2
2
from datetime import datetime
3
3
import time
4
- from bson import ObjectId
4
+ from bson import ObjectId , Timestamp
5
+ import pymongo .mongo_client
5
6
import singer
6
7
import pymongo
7
-
8
+ from pymongo . read_concern import ReadConcern
8
9
from typing import Set , Dict , Optional , Generator
9
10
from pymongo .collection import Collection
10
11
from pymongo .database import Database
11
12
from singer import utils
13
+ from pymongo import MongoClient
12
14
13
15
from dz_mongodb .sync_strategies import common
14
16
@@ -72,8 +74,54 @@ def get_token_from_state(streams_to_sync: Set[str], state: Dict) -> Optional[Dic
72
74
73
75
return token_sorted [0 ] if token_sorted else None
74
76
75
-
76
- def sync_database (database : Database ,
77
+ def check_resume_token_existance (client : MongoClient , resume_token_ts : datetime )-> bool :
78
+ """
79
+ function check if provided timestamp is present in oplog or not
80
+ """
81
+ oplogRS = client ["local" ]["oplog.rs" ]
82
+ oplog_obj = oplogRS .find_one (sort = [("$natural" , pymongo .ASCENDING )])
83
+ first_oplog_ts = oplog_obj .get ("ts" )
84
+ if not first_oplog_ts :
85
+ raise Exception ("unable to read first oplog for resume token verification" )
86
+ if resume_token_ts < first_oplog_ts .as_datetime ():
87
+ return False
88
+ return True
89
+
90
+ def get_current_resume_token (client : MongoClient , database : Database ) -> Timestamp :
91
+ """
92
+ returns current timestamp resume token or oldest resume token of active transactions
93
+ """
94
+ coll = client ["config" ].get_collection (
95
+ "transactions" ,
96
+ read_concern = ReadConcern ("local" )
97
+ )
98
+
99
+ filter = {"state" : {"$in" : ["prepared" , "inProgress" ]}}
100
+ opts = {"sort" : [("startOpTime" , 1 )]} # first transaction in progress or prepared
101
+
102
+ try :
103
+ result = coll .find_one (filter , ** opts )
104
+ if not result :
105
+ # no active transactions get current timestamp
106
+ oplogRS = client ["local" ]["oplog.rs" ]
107
+ oplog_obj = oplogRS .find_one (sort = [("$natural" , pymongo .DESCENDING )])
108
+ return oplog_obj .get ("ts" )
109
+
110
+ raw_ts = result .get ("startOpTime" , {}).get ("ts" )
111
+ if not raw_ts :
112
+ raise Exception ("config.transactions row had no startOpTime.ts field" )
113
+
114
+ if isinstance (raw_ts , Timestamp ):
115
+ return raw_ts
116
+ else :
117
+ raise Exception ("config.transactions startOpTime.ts was not a BSON timestamp" )
118
+
119
+ except pymongo .errors .PyMongoError as e :
120
+ raise Exception (f"config.transactions.findOne error: { e } " )
121
+
122
+
123
+ def sync_database (client : MongoClient ,
124
+ db_name : str ,
77
125
streams_to_sync : Dict [str , Dict ],
78
126
state : Dict ,
79
127
update_buffer_size : int ,
@@ -91,13 +139,12 @@ def sync_database(database: Database,
91
139
update_buffer_size: the size of buffer used to hold detected updates
92
140
await_time_ms: the maximum time in milliseconds for the log based to wait for changes before exiting
93
141
"""
142
+ database = client [db_name ]
94
143
LOGGER .info ('Starting LogBased sync for streams "%s" in database "%s"' , list (streams_to_sync .keys ()), database .name )
95
-
96
144
rows_saved = {}
97
145
start_time = time .time ()
98
146
update_buffer = {}
99
147
full_load = list ()
100
-
101
148
for stream_id in streams_to_sync :
102
149
update_buffer [stream_id ] = set ()
103
150
rows_saved [stream_id ] = 0
@@ -108,9 +155,15 @@ def sync_database(database: Database,
108
155
109
156
110
157
stream_ids = set (streams_to_sync .keys ())
111
-
158
+ start_at_op_time = None
159
+ start_after = get_token_from_state (stream_ids , state )
112
160
# perform full load if no previous token exists
113
161
if full_load_on_empty_state :
162
+ # preserve resume token from oplog
163
+ first_resume_token = None
164
+ # if start_after not present that means it is fist sync and with full load
165
+ if not start_after :
166
+ first_resume_token = get_current_resume_token (client , database )
114
167
for tap_stream_id in full_load :
115
168
table_name = streams_to_sync [tap_stream_id ].get ('table_name' )
116
169
collection = database [table_name ]
@@ -127,7 +180,15 @@ def sync_database(database: Database,
127
180
row = row ,
128
181
time_extracted = utils .now (),
129
182
time_deleted = None , document_remove = document_remove ))
130
-
183
+
184
+ if first_resume_token :
185
+ if not check_resume_token_existance (client ,first_resume_token .as_datetime ()):
186
+ raise Exception ("Oplog Overflow: Resume token not found from oplogs" )
187
+ start_at_op_time = first_resume_token
188
+ LOGGER .info ('Resume token after full load: [%s]' ,first_resume_token )
189
+
190
+ if not start_after and not start_at_op_time :
191
+ LOGGER .info ("Running change stream watch from current timestamp" )
131
192
# Init a cursor to listen for changes from the last saved resume token
132
193
# if there are no changes after MAX_AWAIT_TIME_MS, then we'll exit
133
194
with database .watch (
@@ -141,7 +202,8 @@ def sync_database(database: Database,
141
202
]
142
203
}}],
143
204
max_await_time_ms = await_time_ms ,
144
- start_after = get_token_from_state (stream_ids , state )
205
+ start_after = start_after ,
206
+ start_at_operation_time = start_at_op_time ,
145
207
) as cursor :
146
208
while cursor .alive :
147
209
@@ -161,7 +223,6 @@ def sync_database(database: Database,
161
223
resume_token = {
162
224
'_data' : cursor .resume_token ['_data' ]
163
225
}
164
-
165
226
# After MAX_AWAIT_TIME_MS has elapsed, the cursor will return None.
166
227
# write state and exit
167
228
if change is None :
@@ -171,11 +232,14 @@ def sync_database(database: Database,
171
232
singer .write_message (singer .StateMessage (value = copy .deepcopy (state )))
172
233
173
234
break
174
-
175
- database_name = change ["ns" ]["db" ]
176
- # remove the prefix from the database name
177
- database_name = database_name .split ('_' , 1 )[- 1 ]
178
- tap_stream_id = f'{ database_name } -{ change ["ns" ]["coll" ]} '
235
+ # can't use db name as sometime comes : 62a821b1fe15fd039ed2e450_test-users
236
+ # or sometime test-users
237
+ # database_name = change["ns"]["db"]
238
+ # database_name = database_name.split('_', 1)[-1]
239
+ if db_name not in change ["ns" ]["db" ]:
240
+ continue
241
+
242
+ tap_stream_id = f'{ db_name } -{ change ["ns" ]["coll" ]} '
179
243
180
244
operation = change ['operationType' ]
181
245
0 commit comments