Skip to content

Commit a6ea9f4

Browse files
Fix P2P-based joins with explicit npartitions (#8470)
1 parent 33b2c72 commit a6ea9f4

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

distributed/shuffle/_merge.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ def _cull_dependencies(
271271
"""
272272
deps = {}
273273
parts_out = parts_out or self._keys_to_parts(keys)
274-
keys = {(self.name_input_left, i) for i in range(self.npartitions)}
275-
keys |= {(self.name_input_right, i) for i in range(self.npartitions)}
274+
keys = {(self.name_input_left, i) for i in range(self.n_partitions_left)}
275+
keys |= {(self.name_input_right, i) for i in range(self.n_partitions_right)}
276276
# Protect against mutations later on with frozenset
277277
keys = frozenset(keys)
278278
for part in parts_out:
@@ -352,6 +352,7 @@ def cull(self, keys: Iterable[str], all_keys: Any) -> tuple[HashJoinP2PLayer, di
352352
parameter.
353353
"""
354354
parts_out = self._keys_to_parts(keys)
355+
355356
culled_deps = self._cull_dependencies(keys, parts_out=parts_out)
356357
if parts_out != set(self.parts_out):
357358
culled_layer = self._cull(parts_out)

distributed/shuffle/tests/test_merge.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,20 @@ async def test_index_merge_p2p(c, s, a, b, how):
478478
)
479479

480480

481+
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
482+
@gen_cluster(client=True)
483+
async def test_merge_with_npartitions(c, s, a, b, npartitions):
484+
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
485+
486+
left = dd.from_pandas(pdf, npartitions=10)
487+
right = dd.from_pandas(pdf, npartitions=5)
488+
489+
expected = pdf.merge(pdf)
490+
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
491+
result = await c.compute(left.merge(right, npartitions=npartitions))
492+
assert_eq(result, expected, check_index=False)
493+
494+
481495
class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager):
482496
seen: set[ShuffleId]
483497
block_get_or_create: asyncio.Event

0 commit comments

Comments
 (0)