@@ -105,22 +105,19 @@ case class Tester(config: TesterConfig) {
105
105
var errors = Array [ErrorMessage ]()
106
106
val metricExpectedTests = getExpected(config, job.sparkSession)
107
107
val configuredKeys = config.test.keys
108
-
109
108
val invalidSchemaMap = getTableNameToInvalidRowStructureIndexes(metricExpectedTests)
110
109
if (invalidSchemaMap.nonEmpty) return getInvalidSchemaErrors(invalidSchemaMap)
111
110
112
111
metricExpectedTests.keys.foreach(tableName => {
113
112
val actualResultsDf = extractTableContents(job.sparkSession, tableName, config.test.outputMode.get)
114
-
115
113
val (expectedResults, actualResults) = (metricExpectedTests(tableName), TestUtil .getRowsFromDf(actualResultsDf))
116
114
val (expectedResultsObjects, actualResultsObjects) = (EnrichedRows (expectedResults), EnrichedRows (actualResults))
117
115
val tableNameToAllExpectedColumns = metricExpectedTests.mapValues(v => v.head.keys.toList)
118
116
val allExpectedColumns = tableNameToAllExpectedColumns.getOrElse(tableName, List [String ]())
119
- val (isConfiguredKeysValid, keys ) = getConfiguredKeysValidToTableKeys(configuredKeys, tableName, allExpectedColumns, tableName)
117
+ val (isConfiguredKeysValid, tableKeys ) = getConfiguredKeysValidToTableKeys(configuredKeys, tableName, allExpectedColumns, tableName)
120
118
if (! isConfiguredKeysValid) {
121
- return getInvalidKeysNonExistingErrors(allExpectedColumns, keys , tableName)
119
+ return getInvalidKeysNonExistingErrors(allExpectedColumns, tableKeys , tableName)
122
120
}
123
- val tableKeys = keys
124
121
val keyColumns = KeyColumns (tableKeys)
125
122
val (expectedKeys, actualKeys) = (keyColumns.getKeysMapFromRows(expectedResults), keyColumns.getKeysMapFromDF(actualResultsDf))
126
123
val (expectedResultsDuplications, actualResultsDuplications) = (TestUtil .getDuplicatedRowToIndexes(expectedKeys),
@@ -133,16 +130,13 @@ case class Tester(config: TesterConfig) {
133
130
Array [ErrorMessage ](new DuplicatedHeaderErrorMessage ()) ++
134
131
ErrorMessage .getErrorMessagesByDuplications(ResultsType .expected, expectedResultsDuplications, printableExpectedResults, tableName, keyColumns) ++
135
132
ErrorMessage .getErrorMessagesByDuplications(ResultsType .actual, actualResultsDuplications, printableActualResults, tableName, keyColumns)
136
-
137
133
case _ =>
138
134
val sorter = TesterSortData (tableKeys)
139
135
if (expectedKeys.sortWith(sorter.sortStringRows).deep != actualKeys.sortWith(sorter.sortStringRows).deep) {
140
136
val (expErrorIndexes, actErrorIndexes) = compareKeys(expectedKeys, actualKeys)
141
- ErrorMessage .getErrorMessageByMismatchedKeys(printableExpectedResults, printableActualResults,
142
- expErrorIndexes, actErrorIndexes, keyColumns, tableName)
143
- } else {
144
- ErrorMessage .getErrorMessagesByMismatchedAllCols(tableKeys, printableExpectedResults, printableActualResults, job.sparkSession, tableName)
145
- }
137
+ ErrorMessage .getErrorMessageByMismatchedKeys(printableExpectedResults, printableActualResults, expErrorIndexes, actErrorIndexes,
138
+ keyColumns, tableName)
139
+ } else {ErrorMessage .getErrorMessagesByMismatchedAllCols(tableKeys, printableExpectedResults, printableActualResults, job.sparkSession, tableName)}
146
140
}
147
141
if (tableErrorDataArr.nonEmpty) {
148
142
errors ++= tableErrorDataArr
0 commit comments