Skip to content

[Test] #51221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open

[Test] #51221

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Apache Spark


Spark is a unified analytics engine for large-scale data processing. It provides
high-level APIs in Scala, Java, Python, and R (Deprecated), and an optimized engine that
supports general computation graphs for data analysis. It also supports a
Expand Down
5 changes: 0 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -853,11 +853,6 @@
"Please fit or load a model smaller than <modelMaxSize> bytes."
]
},
"MODEL_SUMMARY_LOST" : {
"message" : [
"The model <objectName> summary is lost because the cached model is offloaded."
]
},
"UNSUPPORTED_EXCEPTION" : {
"message" : [
"<message>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,17 @@ class FMClassifier @Since("3.0.0") (
factors: Matrix,
objectiveHistory: Array[Double]): FMClassificationModel = {
val model = copyValues(new FMClassificationModel(uid, intercept, linear, factors))
model.createSummary(dataset, objectiveHistory)
model
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val summary = new FMClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
model.setSummary(Some(summary))
}

@Since("3.0.0")
Expand Down Expand Up @@ -334,42 +343,6 @@ class FMClassificationModel private[classification] (
s"uid=${super.toString}, numClasses=$numClasses, numFeatures=$numFeatures, " +
s"factorSize=${$(factorSize)}, fitLinear=${$(fitLinear)}, fitIntercept=${$(fitIntercept)}"
}

private[spark] def createSummary(
dataset: Dataset[_], objectiveHistory: Array[Double]
): Unit = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = findSummaryModel()
val summary = new FMClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
setSummary(Some(summary))
}

override private[spark] def saveSummary(path: String): Unit = {
ReadWriteUtils.saveObjectToLocal[Tuple1[Array[Double]]](
path, Tuple1(summary.objectiveHistory),
(data, dos) => {
ReadWriteUtils.serializeDoubleArray(data._1, dos)
}
)
}

override private[spark] def loadSummary(path: String, dataset: DataFrame): Unit = {
val Tuple1(objectiveHistory: Array[Double])
= ReadWriteUtils.loadObjectFromLocal[Tuple1[Array[Double]]](
path,
dis => {
Tuple1(ReadWriteUtils.deserializeDoubleArray(dis))
}
)
createSummary(dataset, objectiveHistory)
}
}

@Since("3.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,17 @@ class LinearSVC @Since("2.2.0") (
intercept: Double,
objectiveHistory: Array[Double]): LinearSVCModel = {
val model = copyValues(new LinearSVCModel(uid, coefficients, intercept))
model.createSummary(dataset, objectiveHistory)
model
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, rawPredictionColName, predictionColName) = model.findSummaryModel()
val summary = new LinearSVCTrainingSummaryImpl(
summaryModel.transform(dataset),
rawPredictionColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
model.setSummary(Some(summary))
}

private def trainImpl(
Expand Down Expand Up @@ -436,42 +445,6 @@ class LinearSVCModel private[classification] (
override def toString: String = {
s"LinearSVCModel: uid=$uid, numClasses=$numClasses, numFeatures=$numFeatures"
}

private[spark] def createSummary(
dataset: Dataset[_], objectiveHistory: Array[Double]
): Unit = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, rawPredictionColName, predictionColName) = findSummaryModel()
val summary = new LinearSVCTrainingSummaryImpl(
summaryModel.transform(dataset),
rawPredictionColName,
predictionColName,
$(labelCol),
weightColName,
objectiveHistory)
setSummary(Some(summary))
}

override private[spark] def saveSummary(path: String): Unit = {
ReadWriteUtils.saveObjectToLocal[Tuple1[Array[Double]]](
path, Tuple1(summary.objectiveHistory),
(data, dos) => {
ReadWriteUtils.serializeDoubleArray(data._1, dos)
}
)
}

override private[spark] def loadSummary(path: String, dataset: DataFrame): Unit = {
val Tuple1(objectiveHistory: Array[Double])
= ReadWriteUtils.loadObjectFromLocal[Tuple1[Array[Double]]](
path,
dis => {
Tuple1(ReadWriteUtils.deserializeDoubleArray(dis))
}
)
createSummary(dataset, objectiveHistory)
}
}

@Since("2.2.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,29 @@ class LogisticRegression @Since("1.2.0") (
objectiveHistory: Array[Double]): LogisticRegressionModel = {
val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, checkMultinomial(numClasses)))
model.createSummary(dataset, objectiveHistory)
model
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val logRegSummary = if (numClasses <= 2) {
new BinaryLogisticRegressionTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
$(featuresCol),
weightColName,
objectiveHistory)
} else {
new LogisticRegressionTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
$(featuresCol),
weightColName,
objectiveHistory)
}
model.setSummary(Some(logRegSummary))
}

private def createBounds(
Expand Down Expand Up @@ -1302,54 +1323,6 @@ class LogisticRegressionModel private[spark] (
override def toString: String = {
s"LogisticRegressionModel: uid=$uid, numClasses=$numClasses, numFeatures=$numFeatures"
}

private[spark] def createSummary(
dataset: Dataset[_], objectiveHistory: Array[Double]
): Unit = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = findSummaryModel()
val logRegSummary = if (numClasses <= 2) {
new BinaryLogisticRegressionTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
$(featuresCol),
weightColName,
objectiveHistory)
} else {
new LogisticRegressionTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
$(featuresCol),
weightColName,
objectiveHistory)
}
setSummary(Some(logRegSummary))
}

override private[spark] def saveSummary(path: String): Unit = {
ReadWriteUtils.saveObjectToLocal[Tuple1[Array[Double]]](
path, Tuple1(summary.objectiveHistory),
(data, dos) => {
ReadWriteUtils.serializeDoubleArray(data._1, dos)
}
)
}

override private[spark] def loadSummary(path: String, dataset: DataFrame): Unit = {
val Tuple1(objectiveHistory: Array[Double])
= ReadWriteUtils.loadObjectFromLocal[Tuple1[Array[Double]]](
path,
dis => {
Tuple1(ReadWriteUtils.deserializeDoubleArray(dis))
}
)
createSummary(dataset, objectiveHistory)
}
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,14 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
objectiveHistory: Array[Double]): MultilayerPerceptronClassificationModel = {
val model = copyValues(new MultilayerPerceptronClassificationModel(uid, weights))

model.createSummary(dataset, objectiveHistory)
model
val (summaryModel, _, predictionColName) = model.findSummaryModel()
val summary = new MultilayerPerceptronClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
predictionColName,
$(labelCol),
"",
objectiveHistory)
model.setSummary(Some(summary))
}
}

Expand Down Expand Up @@ -359,39 +365,6 @@ class MultilayerPerceptronClassificationModel private[ml] (
s"MultilayerPerceptronClassificationModel: uid=$uid, numLayers=${$(layers).length}, " +
s"numClasses=$numClasses, numFeatures=$numFeatures"
}

private[spark] def createSummary(
dataset: Dataset[_], objectiveHistory: Array[Double]
): Unit = {
val (summaryModel, _, predictionColName) = findSummaryModel()
val summary = new MultilayerPerceptronClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
predictionColName,
$(labelCol),
"",
objectiveHistory)
setSummary(Some(summary))
}

override private[spark] def saveSummary(path: String): Unit = {
ReadWriteUtils.saveObjectToLocal[Tuple1[Array[Double]]](
path, Tuple1(summary.objectiveHistory),
(data, dos) => {
ReadWriteUtils.serializeDoubleArray(data._1, dos)
}
)
}

override private[spark] def loadSummary(path: String, dataset: DataFrame): Unit = {
val Tuple1(objectiveHistory: Array[Double])
= ReadWriteUtils.loadObjectFromLocal[Tuple1[Array[Double]]](
path,
dis => {
Tuple1(ReadWriteUtils.deserializeDoubleArray(dis))
}
)
createSummary(dataset, objectiveHistory)
}
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,26 @@ class RandomForestClassifier @Since("1.4.0") (
numFeatures: Int,
numClasses: Int): RandomForestClassificationModel = {
val model = copyValues(new RandomForestClassificationModel(uid, trees, numFeatures, numClasses))
model.createSummary(dataset)
model
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = model.findSummaryModel()
val rfSummary = if (numClasses <= 2) {
new BinaryRandomForestClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
Array(0.0))
} else {
new RandomForestClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
predictionColName,
$(labelCol),
weightColName,
Array(0.0))
}
model.setSummary(Some(rfSummary))
}

@Since("1.4.1")
Expand Down Expand Up @@ -375,35 +393,6 @@ class RandomForestClassificationModel private[ml] (
@Since("2.0.0")
override def write: MLWriter =
new RandomForestClassificationModel.RandomForestClassificationModelWriter(this)

private[spark] def createSummary(dataset: Dataset[_]): Unit = {
val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val (summaryModel, probabilityColName, predictionColName) = findSummaryModel()
val rfSummary = if (numClasses <= 2) {
new BinaryRandomForestClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
probabilityColName,
predictionColName,
$(labelCol),
weightColName,
Array(0.0))
} else {
new RandomForestClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
predictionColName,
$(labelCol),
weightColName,
Array(0.0))
}
setSummary(Some(rfSummary))
}

override private[spark] def saveSummary(path: String): Unit = {}

override private[spark] def loadSummary(path: String, dataset: DataFrame): Unit = {
createSummary(dataset)
}
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ class BisectingKMeansModel private[ml] (
override def summary: BisectingKMeansSummary = super.summary

override def estimatedSize: Long = SizeEstimator.estimate(parentModel)

// BisectingKMeans model hasn't supported offloading, so put an empty `saveSummary` here for now
override private[spark] def saveSummary(path: String): Unit = {}
}

object BisectingKMeansModel extends MLReadable[BisectingKMeansModel] {
Expand Down
Loading