From e977305cafc00584acbff4253fe148c6ef7d4003 Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Tue, 24 Sep 2024 16:06:50 +0530 Subject: [PATCH 1/6] full load oplogs reading fixed --- dz_mongodb/__init__.py | 4 +- dz_mongodb/sync_strategies/change_streams.py | 82 +++++++++++++++++--- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/dz_mongodb/__init__.py b/dz_mongodb/__init__.py index 6e5aff6..b79bd4c 100644 --- a/dz_mongodb/__init__.py +++ b/dz_mongodb/__init__.py @@ -209,8 +209,8 @@ def sync_log_based_streams(client: MongoClient, timer.tags['database'] = database_name update_buffer_size = update_buffer_size or change_streams.MIN_UPDATE_BUFFER_LENGTH await_time_ms = await_time_ms or change_streams.DEFAULT_AWAIT_TIME_MS - - change_streams.sync_database(client[database_name], streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,start_date,document_remove) + + change_streams.sync_database(client, database_name, streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,start_date,document_remove) state = singer.set_currently_syncing(state, None) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index 6ca1497..b1ff9ed 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -1,14 +1,16 @@ import copy from datetime import datetime import time -from bson import ObjectId +from bson import ObjectId, Timestamp +import pymongo.mongo_client import singer import pymongo - +from pymongo.read_concern import ReadConcern from typing import Set, Dict, Optional, Generator from pymongo.collection import Collection from pymongo.database import Database from singer import utils +from pymongo import MongoClient from dz_mongodb.sync_strategies import common @@ -72,8 +74,55 @@ def get_token_from_state(streams_to_sync: Set[str], state: Dict) -> Optional[Dic return token_sorted[0] if token_sorted else None - -def sync_database(database: Database, +def check_resume_token_existance(client: MongoClient, resume_token_ts: datetime)->bool : + """ + function check if provided timestamp is present in oplog or not + """ + # no active transactions get current resume_token_ts + oplogRS = client["local"]["oplog.rs"] + oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.ASCENDING)]) + first_oplog_ts = oplog_obj.get("ts") + if not first_oplog_ts: + raise Exception("unable to read first oplog for resume token verification") + if resume_token_ts < first_oplog_ts.as_datetime(): + return False + return True + +def get_current_resume_token(client: MongoClient, database: Database) -> Timestamp: + """ + returns current timestamp resume token or oldest resume token of active transactions + """ + coll = client["config"].get_collection( + "transactions", + read_concern=ReadConcern("local") + ) + + filter = {"state": {"$in": ["prepared", "inProgress"]}} + opts = {"sort": [("startOpTime", 1)]} # first transaction in progress or prepared + + try: + result = coll.find_one(filter, **opts) + if not result: + # no active transactions get current timestamp + oplogRS = client["local"]["oplog.rs"] + oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.DESCENDING)]) + return oplog_obj.get("ts") + + raw_ts = result.get("startOpTime", {}).get("ts") + if not raw_ts: + raise Exception("config.transactions row had no startOpTime.ts field") + + if isinstance(raw_ts, Timestamp): + return raw_ts + else: + raise Exception("config.transactions startOpTime.ts was not a BSON timestamp") + + except pymongo.errors.PyMongoError as e: + raise Exception(f"config.transactions.findOne error: {e}") + + +def sync_database(client: MongoClient, + db_name: str, streams_to_sync: Dict[str, Dict], state: Dict, update_buffer_size: int, @@ -91,13 +140,12 @@ def sync_database(database: Database, update_buffer_size: the size of buffer used to hold detected updates await_time_ms: the maximum time in milliseconds for the log based to wait for changes before exiting """ + database = client[db_name] LOGGER.info('Starting LogBased sync for streams "%s" in database "%s"', list(streams_to_sync.keys()), database.name) - rows_saved = {} start_time = time.time() update_buffer = {} full_load = list() - for stream_id in streams_to_sync: update_buffer[stream_id] = set() rows_saved[stream_id] = 0 @@ -108,9 +156,14 @@ def sync_database(database: Database, stream_ids = set(streams_to_sync.keys()) - + start_at_op_time = None + start_after = get_token_from_state(stream_ids, state) # perform full load if no previous token exists if full_load_on_empty_state: + # preserve resume token from oplog + first_resume_token = get_current_resume_token(client, database) + LOGGER.info("add element here: ") + time.sleep(20) for tap_stream_id in full_load: table_name = streams_to_sync[tap_stream_id].get('table_name') collection = database[table_name] @@ -127,7 +180,16 @@ def sync_database(database: Database, row=row, time_extracted=utils.now(), time_deleted=None, document_remove=document_remove)) - + + # if start_after not present that means it is fist sync and with full load + if not start_after: + if not check_resume_token_existance(client,first_resume_token.as_datetime()): + raise Exception("Oplog Overflow: Resume token not found from oplogs") + start_at_op_time = first_resume_token + LOGGER.info('Resume token after full load: [%s]',first_resume_token) + + if not start_after or not start_at_op_time: + LOGGER.info("Running change stream watch from current timestamp") # Init a cursor to listen for changes from the last saved resume token # if there are no changes after MAX_AWAIT_TIME_MS, then we'll exit with database.watch( @@ -141,7 +203,8 @@ def sync_database(database: Database, ] }}], max_await_time_ms=await_time_ms, - start_after=get_token_from_state(stream_ids, state) + start_after=start_after, + start_at_operation_time= start_at_op_time, ) as cursor: while cursor.alive: @@ -161,7 +224,6 @@ def sync_database(database: Database, resume_token = { '_data': cursor.resume_token['_data'] } - # After MAX_AWAIT_TIME_MS has elapsed, the cursor will return None. # write state and exit if change is None: From 5c4502742bc981996035e5d97dd8570c39d2618d Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Tue, 24 Sep 2024 16:10:47 +0530 Subject: [PATCH 2/6] upgrade pip package --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 45470c8..13e2997 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ long_desc = fh.read() setup(name='dz-mongodb', - version='1.4.4', + version='1.4.5', description='Singer.io tap for extracting data from MongoDB - Datazip compatible', long_description=long_desc, long_description_content_type='text/markdown', From 17bbb06240b815da9413591f3c9d4036f00d87f8 Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Tue, 24 Sep 2024 16:12:14 +0530 Subject: [PATCH 3/6] remove testing change --- dz_mongodb/sync_strategies/change_streams.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index b1ff9ed..87ac6ba 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -78,7 +78,6 @@ def check_resume_token_existance(client: MongoClient, resume_token_ts: datetime) """ function check if provided timestamp is present in oplog or not """ - # no active transactions get current resume_token_ts oplogRS = client["local"]["oplog.rs"] oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.ASCENDING)]) first_oplog_ts = oplog_obj.get("ts") @@ -162,8 +161,6 @@ def sync_database(client: MongoClient, if full_load_on_empty_state: # preserve resume token from oplog first_resume_token = get_current_resume_token(client, database) - LOGGER.info("add element here: ") - time.sleep(20) for tap_stream_id in full_load: table_name = streams_to_sync[tap_stream_id].get('table_name') collection = database[table_name] From 57ce7398c317de88d9b78dfe85b1f091f4f6ee11 Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Wed, 25 Sep 2024 11:59:31 +0530 Subject: [PATCH 4/6] improvement --- dz_mongodb/sync_strategies/change_streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index 87ac6ba..c2b21eb 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -160,7 +160,9 @@ def sync_database(client: MongoClient, # perform full load if no previous token exists if full_load_on_empty_state: # preserve resume token from oplog - first_resume_token = get_current_resume_token(client, database) + first_resume_token = None + if not start_after: + first_resume_token = get_current_resume_token(client, database) for tap_stream_id in full_load: table_name = streams_to_sync[tap_stream_id].get('table_name') collection = database[table_name] From 95785c187a3fc2f0d4c948d614b4d90403016de5 Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Wed, 25 Sep 2024 12:01:32 +0530 Subject: [PATCH 5/6] improvement --- dz_mongodb/sync_strategies/change_streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index c2b21eb..a4f7c39 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -161,6 +161,7 @@ def sync_database(client: MongoClient, if full_load_on_empty_state: # preserve resume token from oplog first_resume_token = None + # if start_after not present that means it is fist sync and with full load if not start_after: first_resume_token = get_current_resume_token(client, database) for tap_stream_id in full_load: @@ -180,8 +181,7 @@ def sync_database(client: MongoClient, time_extracted=utils.now(), time_deleted=None, document_remove=document_remove)) - # if start_after not present that means it is fist sync and with full load - if not start_after: + if first_resume_token: if not check_resume_token_existance(client,first_resume_token.as_datetime()): raise Exception("Oplog Overflow: Resume token not found from oplogs") start_at_op_time = first_resume_token From 4a5b5486a7de3f87f925111c0c0eb89ca79117fc Mon Sep 17 00:00:00 2001 From: hashcode-ankit Date: Wed, 25 Sep 2024 13:33:33 +0530 Subject: [PATCH 6/6] chore: updating database name extraction as per issue of Dezy --- dz_mongodb/sync_strategies/change_streams.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index a4f7c39..2bb81a7 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -187,7 +187,7 @@ def sync_database(client: MongoClient, start_at_op_time = first_resume_token LOGGER.info('Resume token after full load: [%s]',first_resume_token) - if not start_after or not start_at_op_time: + if not start_after and not start_at_op_time: LOGGER.info("Running change stream watch from current timestamp") # Init a cursor to listen for changes from the last saved resume token # if there are no changes after MAX_AWAIT_TIME_MS, then we'll exit @@ -232,11 +232,14 @@ def sync_database(client: MongoClient, singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) break - - database_name = change["ns"]["db"] - # remove the prefix from the database name - database_name = database_name.split('_', 1)[-1] - tap_stream_id = f'{database_name}-{change["ns"]["coll"]}' + # can't use db name as sometime comes : 62a821b1fe15fd039ed2e450_test-users + # or sometime test-users + # database_name = change["ns"]["db"] + # database_name = database_name.split('_', 1)[-1] + if db_name not in change["ns"]["db"]: + continue + + tap_stream_id = f'{db_name}-{change["ns"]["coll"]}' operation = change['operationType']