diff --git a/dz_mongodb/__init__.py b/dz_mongodb/__init__.py index 6e5aff6..b79bd4c 100644 --- a/dz_mongodb/__init__.py +++ b/dz_mongodb/__init__.py @@ -209,8 +209,8 @@ def sync_log_based_streams(client: MongoClient, timer.tags['database'] = database_name update_buffer_size = update_buffer_size or change_streams.MIN_UPDATE_BUFFER_LENGTH await_time_ms = await_time_ms or change_streams.DEFAULT_AWAIT_TIME_MS - - change_streams.sync_database(client[database_name], streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,start_date,document_remove) + + change_streams.sync_database(client, database_name, streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,start_date,document_remove) state = singer.set_currently_syncing(state, None) singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) diff --git a/dz_mongodb/sync_strategies/change_streams.py b/dz_mongodb/sync_strategies/change_streams.py index 6ca1497..2bb81a7 100644 --- a/dz_mongodb/sync_strategies/change_streams.py +++ b/dz_mongodb/sync_strategies/change_streams.py @@ -1,14 +1,16 @@ import copy from datetime import datetime import time -from bson import ObjectId +from bson import ObjectId, Timestamp +import pymongo.mongo_client import singer import pymongo - +from pymongo.read_concern import ReadConcern from typing import Set, Dict, Optional, Generator from pymongo.collection import Collection from pymongo.database import Database from singer import utils +from pymongo import MongoClient from dz_mongodb.sync_strategies import common @@ -72,8 +74,54 @@ def get_token_from_state(streams_to_sync: Set[str], state: Dict) -> Optional[Dic return token_sorted[0] if token_sorted else None - -def sync_database(database: Database, +def check_resume_token_existance(client: MongoClient, resume_token_ts: datetime)->bool : + """ + function check if provided timestamp is present in oplog or not + """ + oplogRS = client["local"]["oplog.rs"] + oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.ASCENDING)]) + first_oplog_ts = oplog_obj.get("ts") + if not first_oplog_ts: + raise Exception("unable to read first oplog for resume token verification") + if resume_token_ts < first_oplog_ts.as_datetime(): + return False + return True + +def get_current_resume_token(client: MongoClient, database: Database) -> Timestamp: + """ + returns current timestamp resume token or oldest resume token of active transactions + """ + coll = client["config"].get_collection( + "transactions", + read_concern=ReadConcern("local") + ) + + filter = {"state": {"$in": ["prepared", "inProgress"]}} + opts = {"sort": [("startOpTime", 1)]} # first transaction in progress or prepared + + try: + result = coll.find_one(filter, **opts) + if not result: + # no active transactions get current timestamp + oplogRS = client["local"]["oplog.rs"] + oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.DESCENDING)]) + return oplog_obj.get("ts") + + raw_ts = result.get("startOpTime", {}).get("ts") + if not raw_ts: + raise Exception("config.transactions row had no startOpTime.ts field") + + if isinstance(raw_ts, Timestamp): + return raw_ts + else: + raise Exception("config.transactions startOpTime.ts was not a BSON timestamp") + + except pymongo.errors.PyMongoError as e: + raise Exception(f"config.transactions.findOne error: {e}") + + +def sync_database(client: MongoClient, + db_name: str, streams_to_sync: Dict[str, Dict], state: Dict, update_buffer_size: int, @@ -91,13 +139,12 @@ def sync_database(database: Database, update_buffer_size: the size of buffer used to hold detected updates await_time_ms: the maximum time in milliseconds for the log based to wait for changes before exiting """ + database = client[db_name] LOGGER.info('Starting LogBased sync for streams "%s" in database "%s"', list(streams_to_sync.keys()), database.name) - rows_saved = {} start_time = time.time() update_buffer = {} full_load = list() - for stream_id in streams_to_sync: update_buffer[stream_id] = set() rows_saved[stream_id] = 0 @@ -108,9 +155,15 @@ def sync_database(database: Database, stream_ids = set(streams_to_sync.keys()) - + start_at_op_time = None + start_after = get_token_from_state(stream_ids, state) # perform full load if no previous token exists if full_load_on_empty_state: + # preserve resume token from oplog + first_resume_token = None + # if start_after not present that means it is fist sync and with full load + if not start_after: + first_resume_token = get_current_resume_token(client, database) for tap_stream_id in full_load: table_name = streams_to_sync[tap_stream_id].get('table_name') collection = database[table_name] @@ -127,7 +180,15 @@ def sync_database(database: Database, row=row, time_extracted=utils.now(), time_deleted=None, document_remove=document_remove)) - + + if first_resume_token: + if not check_resume_token_existance(client,first_resume_token.as_datetime()): + raise Exception("Oplog Overflow: Resume token not found from oplogs") + start_at_op_time = first_resume_token + LOGGER.info('Resume token after full load: [%s]',first_resume_token) + + if not start_after and not start_at_op_time: + LOGGER.info("Running change stream watch from current timestamp") # Init a cursor to listen for changes from the last saved resume token # if there are no changes after MAX_AWAIT_TIME_MS, then we'll exit with database.watch( @@ -141,7 +202,8 @@ def sync_database(database: Database, ] }}], max_await_time_ms=await_time_ms, - start_after=get_token_from_state(stream_ids, state) + start_after=start_after, + start_at_operation_time= start_at_op_time, ) as cursor: while cursor.alive: @@ -161,7 +223,6 @@ def sync_database(database: Database, resume_token = { '_data': cursor.resume_token['_data'] } - # After MAX_AWAIT_TIME_MS has elapsed, the cursor will return None. # write state and exit if change is None: @@ -171,11 +232,14 @@ def sync_database(database: Database, singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) break - - database_name = change["ns"]["db"] - # remove the prefix from the database name - database_name = database_name.split('_', 1)[-1] - tap_stream_id = f'{database_name}-{change["ns"]["coll"]}' + # can't use db name as sometime comes : 62a821b1fe15fd039ed2e450_test-users + # or sometime test-users + # database_name = change["ns"]["db"] + # database_name = database_name.split('_', 1)[-1] + if db_name not in change["ns"]["db"]: + continue + + tap_stream_id = f'{db_name}-{change["ns"]["coll"]}' operation = change['operationType'] diff --git a/setup.py b/setup.py index 45470c8..13e2997 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ long_desc = fh.read() setup(name='dz-mongodb', - version='1.4.4', + version='1.4.5', description='Singer.io tap for extracting data from MongoDB - Datazip compatible', long_description=long_desc, long_description_content_type='text/markdown',