From b63a762ba165be6fc86fd93e92f6110e6d6301a1 Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 13:08:04 +1100 Subject: [PATCH 1/9] fix --- python/delta_sharing/rest_client.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index e1103239a..25c80b265 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -390,11 +390,30 @@ def list_files_in_table( protocol_json = json.loads(next(lines)) metadata_json = json.loads(next(lines)) + # Read the remaining content into a single string + remaining_content = ''.join(lines) + + decoder = json.JSONDecoder() + idx = 0 + n = len(remaining_content) + add_files = [] + while idx < n: + try: + obj, end = decoder.raw_decode(remaining_content, idx) + add_files.append(AddFile.from_json(obj["file"])) + idx = end + # Skip any whitespace between JSON objects + while idx < n and remaining_content[idx].isspace(): + idx += 1 + except json.JSONDecodeError as e: + print(f"JSON Decode Error at position {idx}: {e}") + break + return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), metadata=Metadata.from_json(metadata_json["metaData"]), - add_files=[AddFile.from_json(json.loads(file)["file"]) for file in lines], + add_files=add_files, lines=[] ) From a3695f59e11c7e61ee0b97c997312d9d900d22fb Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 13:21:17 +1100 Subject: [PATCH 2/9] tests --- .../delta_sharing/tests/test_rest_client.py | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/python/delta_sharing/tests/test_rest_client.py b/python/delta_sharing/tests/test_rest_client.py index 8d9ee61a4..4cd8b1324 100644 --- a/python/delta_sharing/tests/test_rest_client.py +++ b/python/delta_sharing/tests/test_rest_client.py @@ -633,6 +633,141 @@ def test_list_files_in_table_timestamp( assert isinstance(e, HTTPError) assert "is after the latest available version" in str(e) +pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_with_embedded_newlines(rest_client: DataSharingRestClient): + # Testing the updated function with JSON data containing embedded newline characters + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "12345", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file1.parquet", "id": "abc123", "partitionValues": {}, "size": 1000, "stats": "{\"numRecords\": 100, \"value\": \"This is a multiline\nstring with embedded newlines\"}"}}\n' + '{"file": {"url": "file2.parquet", "id": "def456", "partitionValues": {}, "size": 1500, "stats": "{\"numRecords\": 150, \"value\": \"Another\nmultiline\nstring\"}"}}' + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '2' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_newlines", share="share_with_newlines", schema="default") + ) + + assert response.delta_table_version == 2 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="12345", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert response.add_files == [ + AddFile( + url="file1.parquet", + id="abc123", + partition_values={}, + size=1000, + stats=( + '{"numRecords": 100, "value": "This is a multiline\nstring with embedded newlines"}' + ), + ), + AddFile( + url="file2.parquet", + id="def456", + partition_values={}, + size=1500, + stats=( + '{"numRecords": 150, "value": "Another\nmultiline\nstring"}' + ), + ) + ] + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_partial_json_objects(rest_client: DataSharingRestClient): + # Testing the handling of partial JSON objects in the response + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "67890", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file3.parquet", "id": "ghi789", "partitionValues": {}, "size": 1200, "stats": "{\"numRecords\": 120, \"value\": \"Partial\nJSON\nobject\"}"}}\n' + '{"file": {"url": "file4.parquet", "id": "jkl012", "partitionValues": {}, "size": 1700, "stats": "{\"numRecords\": 170}' # Incomplete JSON object + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '3' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_partial_json", share="share_with_partial_json", schema="default") + ) + + assert response.delta_table_version == 3 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="67890", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert len(response.add_files) == 1 # Only the valid JSON object should be parsed + assert response.add_files[0] == AddFile( + url="file3.parquet", + id="ghi789", + partition_values={}, + size=1200, + stats=( + '{"numRecords": 120, "value": "Partial\nJSON\nobject"}' + ), + ) + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +def test_list_files_in_table_json_decode_error(rest_client: DataSharingRestClient): + # Testing handling of JSON decode errors with problematic lines + response_content = ( + '{"protocol": {"minReaderVersion": 1}}\n' + '{"metaData": {"id": "98765", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' + '{"file": {"url": "file5.parquet", "id": "mno345", "partitionValues": {}, "size": 2000, "stats": "{\"numRecords\": 1074361, \"value\": \"Develop2.1 Pipeline, Veracode \"}"}}\n' + '{"file": {"url": "file6.parquet", "id": "pqr678", "partitionValues": {}, "size": 2500, "stats": "{\"numRecords\": 200, \"value\": \"Another value\"}"}}' + ) + lines = iter(response_content.split('\n')) + + rest_client._post_internal = lambda *args, **kwargs: ({ + DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '4' + }, lines) + + response = rest_client.list_files_in_table( + Table(name="table_with_json_decode_error", share="share_with_json_decode_error", schema="default") + ) + + assert response.delta_table_version == 4 + assert response.protocol == Protocol(min_reader_version=1) + assert response.metadata == Metadata( + id="98765", + format={"provider": "parquet", "options": {}}, + schema_string="{}", + partition_columns=[] + ) + assert response.add_files == [ + AddFile( + url="file5.parquet", + id="mno345", + partition_values={}, + size=2000, + stats=( + '{"numRecords": 1074361, "value": "Develop2.1 Pipeline, Veracode "}' + ), + ), + AddFile( + url="file6.parquet", + id="pqr678", + partition_values={}, + size=2500, + stats=( + '{"numRecords": 200, "value": "Another value"}' + ), + ) + ] @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_list_table_changes( From 489626cc3d2f6d56b4dcef00fdb624d225f15de4 Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 14:14:46 +1100 Subject: [PATCH 3/9] fix --- python/delta_sharing/rest_client.py | 36 ++--- .../delta_sharing/tests/test_rest_client.py | 135 ------------------ 2 files changed, 18 insertions(+), 153 deletions(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 25c80b265..933cf0dfa 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -390,24 +390,24 @@ def list_files_in_table( protocol_json = json.loads(next(lines)) metadata_json = json.loads(next(lines)) - # Read the remaining content into a single string - remaining_content = ''.join(lines) - - decoder = json.JSONDecoder() - idx = 0 - n = len(remaining_content) - add_files = [] - while idx < n: - try: - obj, end = decoder.raw_decode(remaining_content, idx) - add_files.append(AddFile.from_json(obj["file"])) - idx = end - # Skip any whitespace between JSON objects - while idx < n and remaining_content[idx].isspace(): - idx += 1 - except json.JSONDecodeError as e: - print(f"JSON Decode Error at position {idx}: {e}") - break + def parse_json_stream(lines): + buffer = '' + decoder = json.JSONDecoder() + for line in lines: + buffer += line + while True: + buffer = buffer.lstrip() + if not buffer: + break + try: + obj, idx = decoder.raw_decode(buffer) + yield obj + buffer = buffer[idx:] + except json.JSONDecodeError: + # Not enough data to decode; read more lines + break + + add_files = [AddFile.from_json(json_obj["file"])for json_obj in parse_json_stream(lines)] return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), diff --git a/python/delta_sharing/tests/test_rest_client.py b/python/delta_sharing/tests/test_rest_client.py index 4cd8b1324..8d9ee61a4 100644 --- a/python/delta_sharing/tests/test_rest_client.py +++ b/python/delta_sharing/tests/test_rest_client.py @@ -633,141 +633,6 @@ def test_list_files_in_table_timestamp( assert isinstance(e, HTTPError) assert "is after the latest available version" in str(e) -pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) -def test_list_files_in_table_with_embedded_newlines(rest_client: DataSharingRestClient): - # Testing the updated function with JSON data containing embedded newline characters - response_content = ( - '{"protocol": {"minReaderVersion": 1}}\n' - '{"metaData": {"id": "12345", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' - '{"file": {"url": "file1.parquet", "id": "abc123", "partitionValues": {}, "size": 1000, "stats": "{\"numRecords\": 100, \"value\": \"This is a multiline\nstring with embedded newlines\"}"}}\n' - '{"file": {"url": "file2.parquet", "id": "def456", "partitionValues": {}, "size": 1500, "stats": "{\"numRecords\": 150, \"value\": \"Another\nmultiline\nstring\"}"}}' - ) - lines = iter(response_content.split('\n')) - - rest_client._post_internal = lambda *args, **kwargs: ({ - DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '2' - }, lines) - - response = rest_client.list_files_in_table( - Table(name="table_with_newlines", share="share_with_newlines", schema="default") - ) - - assert response.delta_table_version == 2 - assert response.protocol == Protocol(min_reader_version=1) - assert response.metadata == Metadata( - id="12345", - format={"provider": "parquet", "options": {}}, - schema_string="{}", - partition_columns=[] - ) - assert response.add_files == [ - AddFile( - url="file1.parquet", - id="abc123", - partition_values={}, - size=1000, - stats=( - '{"numRecords": 100, "value": "This is a multiline\nstring with embedded newlines"}' - ), - ), - AddFile( - url="file2.parquet", - id="def456", - partition_values={}, - size=1500, - stats=( - '{"numRecords": 150, "value": "Another\nmultiline\nstring"}' - ), - ) - ] - - -@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) -def test_list_files_in_table_partial_json_objects(rest_client: DataSharingRestClient): - # Testing the handling of partial JSON objects in the response - response_content = ( - '{"protocol": {"minReaderVersion": 1}}\n' - '{"metaData": {"id": "67890", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' - '{"file": {"url": "file3.parquet", "id": "ghi789", "partitionValues": {}, "size": 1200, "stats": "{\"numRecords\": 120, \"value\": \"Partial\nJSON\nobject\"}"}}\n' - '{"file": {"url": "file4.parquet", "id": "jkl012", "partitionValues": {}, "size": 1700, "stats": "{\"numRecords\": 170}' # Incomplete JSON object - ) - lines = iter(response_content.split('\n')) - - rest_client._post_internal = lambda *args, **kwargs: ({ - DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '3' - }, lines) - - response = rest_client.list_files_in_table( - Table(name="table_with_partial_json", share="share_with_partial_json", schema="default") - ) - - assert response.delta_table_version == 3 - assert response.protocol == Protocol(min_reader_version=1) - assert response.metadata == Metadata( - id="67890", - format={"provider": "parquet", "options": {}}, - schema_string="{}", - partition_columns=[] - ) - assert len(response.add_files) == 1 # Only the valid JSON object should be parsed - assert response.add_files[0] == AddFile( - url="file3.parquet", - id="ghi789", - partition_values={}, - size=1200, - stats=( - '{"numRecords": 120, "value": "Partial\nJSON\nobject"}' - ), - ) - - -@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) -def test_list_files_in_table_json_decode_error(rest_client: DataSharingRestClient): - # Testing handling of JSON decode errors with problematic lines - response_content = ( - '{"protocol": {"minReaderVersion": 1}}\n' - '{"metaData": {"id": "98765", "format": {"provider": "parquet", "options": {}}, "schemaString": "{}", "partitionColumns": []}}\n' - '{"file": {"url": "file5.parquet", "id": "mno345", "partitionValues": {}, "size": 2000, "stats": "{\"numRecords\": 1074361, \"value\": \"Develop2.1 Pipeline, Veracode \"}"}}\n' - '{"file": {"url": "file6.parquet", "id": "pqr678", "partitionValues": {}, "size": 2500, "stats": "{\"numRecords\": 200, \"value\": \"Another value\"}"}}' - ) - lines = iter(response_content.split('\n')) - - rest_client._post_internal = lambda *args, **kwargs: ({ - DataSharingRestClient.DELTA_TABLE_VERSION_HEADER: '4' - }, lines) - - response = rest_client.list_files_in_table( - Table(name="table_with_json_decode_error", share="share_with_json_decode_error", schema="default") - ) - - assert response.delta_table_version == 4 - assert response.protocol == Protocol(min_reader_version=1) - assert response.metadata == Metadata( - id="98765", - format={"provider": "parquet", "options": {}}, - schema_string="{}", - partition_columns=[] - ) - assert response.add_files == [ - AddFile( - url="file5.parquet", - id="mno345", - partition_values={}, - size=2000, - stats=( - '{"numRecords": 1074361, "value": "Develop2.1 Pipeline, Veracode "}' - ), - ), - AddFile( - url="file6.parquet", - id="pqr678", - partition_values={}, - size=2500, - stats=( - '{"numRecords": 200, "value": "Another value"}' - ), - ) - ] @pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) def test_list_table_changes( From 25ab2f9579e43a1b6a854439874d5eac7cc25ee7 Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 14:21:25 +1100 Subject: [PATCH 4/9] Format --- python/delta_sharing/rest_client.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 933cf0dfa..37c6d2455 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -407,8 +407,11 @@ def parse_json_stream(lines): # Not enough data to decode; read more lines break - add_files = [AddFile.from_json(json_obj["file"])for json_obj in parse_json_stream(lines)] - + add_files = [ + AddFile.from_json(json_obj["file"]) + for json_obj in parse_json_stream(lines) + ] + return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), From fe3f827c8c1c6d57790b9f65b0b4598312b5f2d1 Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 14:38:12 +1100 Subject: [PATCH 5/9] fix --- python/delta_sharing/rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 37c6d2455..778225d76 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -411,7 +411,7 @@ def parse_json_stream(lines): AddFile.from_json(json_obj["file"]) for json_obj in parse_json_stream(lines) ] - + return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), From 055c6948566353664000e8525efdc3c019fa76fc Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 14:45:11 +1100 Subject: [PATCH 6/9] fix --- python/delta_sharing/rest_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 778225d76..910d09b92 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -411,7 +411,7 @@ def parse_json_stream(lines): AddFile.from_json(json_obj["file"]) for json_obj in parse_json_stream(lines) ] - + return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), From 9ba178819b62ffc1ad3ef02f808b3c951ccb7edb Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 15:01:58 +1100 Subject: [PATCH 7/9] fix --- python/delta_sharing/rest_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 910d09b92..9d66ccc3b 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -411,7 +411,6 @@ def parse_json_stream(lines): AddFile.from_json(json_obj["file"]) for json_obj in parse_json_stream(lines) ] - return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), From 908f15ad78e140955b87c427dc7eaab532613e9a Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 15:23:20 +1100 Subject: [PATCH 8/9] fix --- python/delta_sharing/rest_client.py | 30 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index 9d66ccc3b..c24e762a7 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -387,35 +387,33 @@ def list_files_in_table( lines=[line for line in lines], ) else: - protocol_json = json.loads(next(lines)) - metadata_json = json.loads(next(lines)) - def parse_json_stream(lines): + import json + buffer = '' decoder = json.JSONDecoder() for line in lines: - buffer += line - while True: - buffer = buffer.lstrip() - if not buffer: - break + buffer += line.strip() + + while buffer: try: + # Attempt to decode a JSON object from the buffer obj, idx = decoder.raw_decode(buffer) - yield obj - buffer = buffer[idx:] + json_str = buffer[:idx] + yield json_str + buffer = buffer[idx:].lstrip() except json.JSONDecodeError: - # Not enough data to decode; read more lines + # Incomplete JSON data; read more lines break + parsed_lines = parse_json_stream(lines) + protocol_json = json.loads(next(parsed_lines)) + metadata_json = json.loads(next(parsed_lines)) - add_files = [ - AddFile.from_json(json_obj["file"]) - for json_obj in parse_json_stream(lines) - ] return ListFilesInTableResponse( delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), metadata=Metadata.from_json(metadata_json["metaData"]), - add_files=add_files, + add_files=[AddFile.from_json(json.loads(file)["file"]) for file in parsed_lines], lines=[] ) From 68ce33ccce2ab68463063d48d81a60f5fabd2c9d Mon Sep 17 00:00:00 2001 From: nisa Date: Wed, 9 Oct 2024 15:32:09 +1100 Subject: [PATCH 9/9] fix --- python/delta_sharing/rest_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/delta_sharing/rest_client.py b/python/delta_sharing/rest_client.py index c24e762a7..00e54235a 100644 --- a/python/delta_sharing/rest_client.py +++ b/python/delta_sharing/rest_client.py @@ -413,7 +413,10 @@ def parse_json_stream(lines): delta_table_version=int(headers.get("delta-table-version")), protocol=Protocol.from_json(protocol_json["protocol"]), metadata=Metadata.from_json(metadata_json["metaData"]), - add_files=[AddFile.from_json(json.loads(file)["file"]) for file in parsed_lines], + add_files=[ + AddFile.from_json(json.loads(file)["file"]) + for file in parsed_lines + ], lines=[] )