Skip to content

[SPARK-52507][CORE] Attempt to read missing block from fallback storage #51202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockFetcherIterator}
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, FallbackStorage, ShuffleBlockFetcherIterator}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

Expand Down Expand Up @@ -88,7 +88,8 @@ private[spark] class BlockStoreShuffleReader[K, C](
SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED),
SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM),
readMetrics,
fetchContinuousBlocksInBatch).toCompletionIterator
fetchContinuousBlocksInBatch,
FallbackStorage.getFallbackStorage(SparkEnv.get.conf)).toCompletionIterator

val serializerInstance = dep.serializer.newInstance()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
}
}

/**
* Read a ManagedBuffer.
*/
def read(blockId: BlockId): ManagedBuffer = FallbackStorage.read(conf, blockId)

def exists(shuffleId: Int, filename: String): Boolean = {
val hash = JavaUtils.nonNegativeHash(filename)
fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ final class ShuffleBlockFetcherIterator(
checksumAlgorithm: String,
shuffleMetrics: ShuffleReadMetricsReporter,
doBatchFetch: Boolean,
fallbackStorage: Option[FallbackStorage],
clock: Clock = new SystemClock())
extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging {

Expand Down Expand Up @@ -973,14 +974,28 @@ final class ShuffleBlockFetcherIterator(
}

case FailureFetchResult(blockId, mapIndex, address, e) =>
var error = e
var errorMsg: String = null
if (e.isInstanceOf[OutOfDirectMemoryError]) {
val logMessage = log"Block ${MDC(BLOCK_ID, blockId)} fetch failed after " +
log"${MDC(MAX_ATTEMPTS, maxAttemptsOnNettyOOM)} retries due to Netty OOM"
logError(logMessage)
errorMsg = logMessage.message
} else if (fallbackStorage.isDefined) {
try {
val buf = fallbackStorage.get.read(blockId)
results.put(SuccessFetchResult(blockId, mapIndex, address, buf.size(), buf,
isNetworkReqDone = false))
result = null
error = null
} catch {
case t: Throwable =>
logInfo(s"Failed to read block from fallback storage: $blockId", t)
}
}
if (error != null) {
throwFetchFailedException(blockId, mapIndex, address, error, Some(errorMsg))
}
throwFetchFailedException(blockId, mapIndex, address, e, Some(errorMsg))

case DeferFetchRequestResult(request) =>
val address = request.address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
checksumEnabled: Boolean = true,
checksumAlgorithm: String = "ADLER32",
shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
doBatchFetch: Boolean = false,
fallbackStorage: Option[FallbackStorage] = None): ShuffleBlockFetcherIterator = {
val tContext = taskContext.getOrElse(TaskContext.empty())
new ShuffleBlockFetcherIterator(
tContext,
Expand All @@ -218,7 +219,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
checksumEnabled,
checksumAlgorithm,
shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),
doBatchFetch)
doBatchFetch,
fallbackStorage)
}
// scalastyle:on argcount

Expand Down Expand Up @@ -1122,6 +1124,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite {
assert(e.getMessage.contains("fetch failed after 10 retries due to Netty OOM"))
}

test("SPARK-XXXXX: missing blocks attempts to read from fallback storage") {
val blockManager = createMockBlockManager()

configureMockTransfer(Map.empty)
val remoteBmId = BlockManagerId("test-remote-client-1", "test-remote-host", 2)
val blockId = ShuffleBlockId(0, 0, 0)
val blocksByAddress = Map[BlockManagerId, Seq[(BlockId, Long, Int)]](
(remoteBmId, Seq((blockId, 1L, 0)))
)

// iterator with no FallbackStorage cannot find the block
{
val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress)
val e = intercept[FetchFailedException] {
iterator.next()
}
assert(e.getCause != null)
assert(e.getCause.isInstanceOf[BlockNotFoundException])
assert(e.getCause.getMessage.contains("Block shuffle_0_0_0 not found"))
}

// iterator with FallbackStorage that does not store the block cannot find it either
val fallbackStorage = mock(classOf[FallbackStorage])

{
when(fallbackStorage.read(ShuffleBlockId(0, 0, 1))).thenReturn(new TestManagedBuffer(127))
val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress,
fallbackStorage = Some(fallbackStorage))
val e = intercept[FetchFailedException] {
iterator.next()
}
assert(e.getCause != null)
assert(e.getCause.isInstanceOf[BlockNotFoundException])
assert(e.getCause.getMessage.contains("Block shuffle_0_0_0 not found"))
}

// iterator with FallbackStorage that stores the block can find it
{
when(fallbackStorage.read(ShuffleBlockId(0, 0, 0))).thenReturn(new TestManagedBuffer(127))
val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress = blocksByAddress,
fallbackStorage = Some(fallbackStorage))
assert(iterator.hasNext)
val (id, _) = iterator.next()
assert(id === ShuffleBlockId(0, 0, 0))
assert(!iterator.hasNext)
}
}

/**
* Prepares the transfer to trigger success for all the blocks present in blockChunks. It will
* trigger failure of block which is not part of blockChunks.
Expand Down