Skip to content

Commit cbe722f

Browse files
committed
chore: concurrent search for collection
1 parent bde1b03 commit cbe722f

File tree

4 files changed

+52
-15
lines changed

4 files changed

+52
-15
lines changed

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,7 @@ test:
3333
pytest tests -v
3434

3535
test_cov:
36-
pytest --cov=dz_mongodb tests -v --cov-fail-under=42
36+
pytest --cov=dz_mongodb tests -v --cov-fail-under=42
37+
38+
local_build:
39+
python setup.py sdist

dz_mongodb/__init__.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sys
55
from typing import List, Dict, Optional
66
from urllib import parse
7-
7+
import concurrent.futures
88
import singer
99
from pymongo import MongoClient
1010
from singer import metadata, metrics, utils
@@ -13,6 +13,8 @@
1313
from dz_mongodb.sync_strategies import common
1414
from dz_mongodb.sync_strategies import full_table
1515
from dz_mongodb.sync_strategies import incremental
16+
from pymongo.database import Database
17+
1618
from dz_mongodb.config_utils import validate_config
1719
from dz_mongodb.db_utils import get_databases, produce_collection_schema
1820
from dz_mongodb.errors import InvalidReplicationMethodException, NoReadPrivilegeException
@@ -36,6 +38,19 @@
3638
FULL_TABLE_METHOD = 'FULL_TABLE'
3739

3840

41+
def process_collection(database: Database, collection_name: str):
42+
collection = database[collection_name]
43+
is_view = collection.options().get('viewOn') is not None
44+
45+
if is_view:
46+
LOGGER.info("Skipping view '%s' in database '%s'", collection_name, database.name)
47+
return None # Skip views
48+
49+
LOGGER.info("Getting collection info for db '%s', collection '%s'", database.name, collection_name)
50+
schema = produce_collection_schema(collection) # Produce the schema
51+
return schema
52+
53+
3954
def do_discover(client: MongoClient, config: Dict):
4055
"""
4156
Run discovery mode where the mongodb cluster is scanned and
@@ -55,17 +70,34 @@ def do_discover(client: MongoClient, config: Dict):
5570

5671
collection_names = database.list_collection_names()
5772

58-
for collection_name in [c for c in collection_names if not c.startswith("system.")]:
59-
60-
collection = database[collection_name]
61-
is_view = collection.options().get('viewOn') is not None
62-
63-
# Add support for views if needed here
64-
if is_view:
65-
continue
66-
67-
LOGGER.info("Getting collection info for db '%s', collection '%s'", database.name, collection_name)
68-
streams.append(produce_collection_schema(collection))
73+
with concurrent.futures.ThreadPoolExecutor() as executor:
74+
# List of futures for each collection that is not a system collection
75+
futures = {
76+
executor.submit(process_collection, database, collection_name): collection_name
77+
for collection_name in collection_names
78+
if not collection_name.startswith("system.")
79+
}
80+
81+
for future in concurrent.futures.as_completed(futures):
82+
collection_name = futures[future]
83+
try:
84+
result = future.result()
85+
if result:
86+
streams.append(result) # Store produced schema
87+
except Exception as exc:
88+
LOGGER.error("Error processing collection '%s': %s", collection_name, exc)
89+
90+
# for collection_name in [c for c in collection_names if not c.startswith("system.")]:
91+
92+
# collection = database[collection_name]
93+
# is_view = collection.options().get('viewOn') is not None
94+
95+
# # Add support for views if needed here
96+
# if is_view:
97+
# continue
98+
99+
# LOGGER.info("Getting collection info for db '%s', collection '%s'", database.name, collection_name)
100+
# streams.append(produce_collection_schema(collection))
69101

70102
json.dump({'streams': streams}, sys.stdout, indent=2)
71103

dz_mongodb/sync_strategies/change_streams.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def check_resume_token_existance(client: MongoClient, resume_token_ts: datetime)
8080
"""
8181
oplogRS = client["local"]["oplog.rs"]
8282
oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.ASCENDING)])
83-
first_oplog_ts = oplog_obj.get("ts")
83+
first_oplog_ts = oplog_obj.get("ts") if oplog_obj else None
8484
if not first_oplog_ts:
8585
raise Exception("unable to read first oplog for resume token verification")
8686
if resume_token_ts < first_oplog_ts.as_datetime():
@@ -105,6 +105,8 @@ def get_current_resume_token(client: MongoClient, database: Database) -> Timesta
105105
# no active transactions get current timestamp
106106
oplogRS = client["local"]["oplog.rs"]
107107
oplog_obj = oplogRS.find_one(sort = [("$natural", pymongo.DESCENDING)])
108+
if not oplog_obj:
109+
return None
108110
return oplog_obj.get("ts")
109111

110112
raw_ts = result.get("startOpTime", {}).get("ts")

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
long_desc = fh.read()
66

77
setup(name='dz-mongodb',
8-
version='1.4.5',
8+
version='1.4.6',
99
description='Singer.io tap for extracting data from MongoDB - Datazip compatible',
1010
long_description=long_desc,
1111
long_description_content_type='text/markdown',

0 commit comments

Comments
 (0)