Skip to content

Commit eaeeabe

Browse files
Merge pull request #32 from datazip-inc/feat/start_date_in_log_based
feat: Start Date in log based sync
2 parents fb4c257 + 93b1004 commit eaeeabe

File tree

4 files changed

+20
-5
lines changed

4 files changed

+20
-5
lines changed

dz_mongodb/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def sync_log_based_streams(client: MongoClient,
174174
update_buffer_size: Optional[int],
175175
await_time_ms: Optional[int],
176176
full_load_on_empty_state: bool,
177+
start_date: Optional[str],
177178
document_remove:bool = False,
178179
):
179180
"""
@@ -209,7 +210,7 @@ def sync_log_based_streams(client: MongoClient,
209210
update_buffer_size = update_buffer_size or change_streams.MIN_UPDATE_BUFFER_LENGTH
210211
await_time_ms = await_time_ms or change_streams.DEFAULT_AWAIT_TIME_MS
211212

212-
change_streams.sync_database(client[database_name], streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,document_remove)
213+
change_streams.sync_database(client[database_name], streams, state, update_buffer_size, await_time_ms, full_load_on_empty_state,start_date,document_remove)
213214

214215
state = singer.set_currently_syncing(state, None)
215216
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
@@ -243,6 +244,7 @@ def do_sync(client: MongoClient, catalog: Dict, config: Dict, state: Dict):
243244
config.get('update_buffer_size'),
244245
config.get('await_time_ms'),
245246
config.get('full_load_on_empty_state'),
247+
config.get('start_date'),
246248
config.get('remove_document_prefix')
247249
)
248250
LOGGER.debug('Sync of log based streams done')

dz_mongodb/config_utils.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from typing import Dict
23

34
from dz_mongodb.errors import InvalidAwaitTimeError, InvalidLogBasedFullLoadOnEmptyState, InvalidUpdateBufferSizeError
@@ -43,4 +44,9 @@ def validate_config(config: Dict) -> None:
4344
full_load_on_empty_state = config['full_load_on_empty_state']
4445

4546
if full_load_on_empty_state not in [True, False]:
46-
raise InvalidLogBasedFullLoadOnEmptyState(full_load_on_empty_state, 'expected string boolean.')
47+
raise InvalidLogBasedFullLoadOnEmptyState(full_load_on_empty_state, 'expected string boolean.')
48+
49+
if 'start_date' in config:
50+
start_datetime = datetime.strptime(config['start_date'], "%Y-%m-%d")
51+
if not isinstance(start_datetime, datetime):
52+
raise ValueError('start_date must be a datetime object.')

dz_mongodb/sync_strategies/change_streams.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import copy
2+
from datetime import datetime
23
import time
4+
from bson import ObjectId
35
import singer
46
import pymongo
57

@@ -77,6 +79,7 @@ def sync_database(database: Database,
7779
update_buffer_size: int,
7880
await_time_ms: int,
7981
full_load_on_empty_state: bool,
82+
start_date: Optional[str],
8083
document_remove: bool = False
8184
) -> None:
8285
"""
@@ -111,9 +114,13 @@ def sync_database(database: Database,
111114
for tap_stream_id in full_load:
112115
table_name = streams_to_sync[tap_stream_id].get('table_name')
113116
collection = database[table_name]
114-
117+
filter = {}
118+
if start_date:
119+
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
120+
filter = { "_id": { "$gte": ObjectId.from_datetime(start_datetime) }}
121+
LOGGER.info('using filter for date[%s] to fetch data: %s',start_datetime, filter)
115122
# TODO: add batches
116-
with collection.find(sort=[("_id", pymongo.ASCENDING)]) as cursor:
123+
with collection.find(filter,sort=[("_id", pymongo.ASCENDING)]) as cursor:
117124
for row in cursor:
118125
rows_saved[tap_stream_id] += 1
119126
singer.write_message(common.row_to_singer_record(stream=streams_to_sync[tap_stream_id],

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
long_desc = fh.read()
66

77
setup(name='dz-mongodb',
8-
version='1.4.2',
8+
version='1.4.3',
99
description='Singer.io tap for extracting data from MongoDB - Datazip compatible',
1010
long_description=long_desc,
1111
long_description_content_type='text/markdown',

0 commit comments

Comments
 (0)