-
Notifications
You must be signed in to change notification settings - Fork 28.6k
Fabric Spark - EH Connector #51211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fabric Spark - EH Connector #51211
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,9 @@ import java.util.{Locale, UUID} | |
|
||
import scala.jdk.CollectionConverters._ | ||
|
||
import EventStreamsKafkaConnector.KafkaOptionsUtils | ||
import org.apache.kafka.clients.consumer.ConsumerConfig | ||
import org.apache.kafka.clients.producer.ProducerConfig | ||
import org.apache.kafka.common.TopicPartition | ||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} | ||
|
||
import org.apache.spark.internal.{Logging, LogKeys, MDC} | ||
|
@@ -58,7 +58,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
with Logging { | ||
import KafkaSourceProvider._ | ||
|
||
override def shortName(): String = "kafka" | ||
override def shortName(): String = "eventstreams" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will fix this |
||
|
||
/** | ||
* Returns the name and schema of the source. In addition, it also verifies whether the options | ||
|
@@ -70,9 +70,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
providerName: String, | ||
parameters: Map[String, String]): (String, StructType) = { | ||
val caseInsensitiveParameters = CaseInsensitiveMap(parameters) | ||
val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters) | ||
validateStreamOptions(caseInsensitiveParameters) | ||
require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") | ||
val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean | ||
val includeHeaders = translatedKafkaParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean | ||
(shortName(), KafkaRecordToRowConverter.kafkaSchema(includeHeaders)) | ||
} | ||
|
||
|
@@ -88,8 +89,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
// partial data since Kafka will assign partitions to multiple consumers having the same group | ||
// id. Hence, we should generate a unique id for each query. | ||
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveParameters, metadataPath) | ||
|
||
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) | ||
val translatedKafkaParameters = | ||
translateEventStreamProperties(caseInsensitiveParameters) | ||
val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters) | ||
|
||
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( | ||
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, | ||
|
@@ -128,7 +130,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
parameters: Map[String, String]): BaseRelation = { | ||
val caseInsensitiveParameters = CaseInsensitiveMap(parameters) | ||
validateBatchOptions(caseInsensitiveParameters) | ||
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters) | ||
val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters) | ||
val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters) | ||
|
||
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( | ||
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY, | ||
|
@@ -142,8 +145,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
LatestOffsetRangeLimit) | ||
assert(endingRelationOffsets != EarliestOffsetRangeLimit) | ||
|
||
checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) | ||
|
||
val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean | ||
|
||
new KafkaRelation( | ||
|
@@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
validateGeneralOptions(params) | ||
} | ||
|
||
private def translateEventStreamProperties(params: CaseInsensitiveMap[String]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method name should be translateEventStreamPropertiesToKafkaProperties |
||
: CaseInsensitiveMap[String] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use reflection instead of import |
||
if (params.contains(EVENTSTREAM_NAME_OPTION_KEY) || | ||
params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || | ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { | ||
validateEventStreamOptions(params) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should perform validation in eventstream jar, we should make minimal changes in spark code base There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In short, my suggestion is to remove this validation method from here |
||
return CaseInsensitiveMap(KafkaOptionsUtils.buildKafkaOptionsFromSparkConfig(params)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method name should be "EventStreamUtils", KafkaOptionsUtils is a generic name which is more suitable for open source kafka connector utils |
||
} | ||
params; | ||
} | ||
|
||
private def validateEventStreamOptions(params: CaseInsensitiveMap[String]) = { | ||
// Stream specific options | ||
if (!params.contains(EVENTSTREAM_NAME_OPTION_KEY) || | ||
!params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || | ||
!params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { | ||
throw new IllegalArgumentException( | ||
"All threee EventStream properties - " + | ||
"(eventstream.itemid, eventstream.name and eventstrream.conumergroup) " + | ||
"are necessarry to run spark with eventstream configs ") | ||
} | ||
} | ||
|
||
private def validateBatchOptions(params: CaseInsensitiveMap[String]) = { | ||
// Batch specific options | ||
KafkaSourceProvider.getKafkaOffsetRangeLimit( | ||
|
@@ -454,6 +478,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
override def toBatch(): Batch = { | ||
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) | ||
validateBatchOptions(caseInsensitiveOptions) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These new line changes should be removed |
||
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) | ||
|
||
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( | ||
|
@@ -466,8 +491,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |
ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY, | ||
LatestOffsetRangeLimit) | ||
|
||
checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets) | ||
|
||
new KafkaBatch( | ||
strategy(caseInsensitiveOptions), | ||
caseInsensitiveOptions, | ||
|
@@ -562,6 +585,9 @@ private[kafka010] object KafkaSourceProvider extends Logging { | |
private val SUBSCRIBE_PATTERN = "subscribepattern" | ||
private val SUBSCRIBE = "subscribe" | ||
private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN) | ||
private val EVENTSTREAM_ARTIFACT_ID_OPTION_KEY = "eventstream.itemid"; | ||
private val EVENTSTREAM_NAME_OPTION_KEY = "eventstream.name"; | ||
private val EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY = "eventstream.consumerGroup"; | ||
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" | ||
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" | ||
private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp" | ||
|
@@ -613,60 +639,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { | |
private val serClassName = classOf[ByteArraySerializer].getName | ||
private val deserClassName = classOf[ByteArrayDeserializer].getName | ||
|
||
def checkStartOffsetNotGreaterThanEndOffset( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be I am missing something, but why are these methods removed ? |
||
startOffset: Long, | ||
endOffset: Long, | ||
topicPartition: TopicPartition, | ||
exception: (Long, Long, TopicPartition) => Exception): Unit = { | ||
// earliest or latest offsets are negative and should not be compared | ||
if (startOffset > endOffset && startOffset >= 0 && endOffset >= 0) { | ||
throw exception(startOffset, endOffset, topicPartition) | ||
} | ||
} | ||
|
||
def checkOffsetLimitValidity( | ||
startOffset: KafkaOffsetRangeLimit, | ||
endOffset: KafkaOffsetRangeLimit): Unit = { | ||
startOffset match { | ||
case start: SpecificOffsetRangeLimit if endOffset.isInstanceOf[SpecificOffsetRangeLimit] => | ||
val end = endOffset.asInstanceOf[SpecificOffsetRangeLimit] | ||
if (start.partitionOffsets.keySet != end.partitionOffsets.keySet) { | ||
throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( | ||
start.partitionOffsets.keySet, end.partitionOffsets.keySet | ||
) | ||
} | ||
start.partitionOffsets.foreach { | ||
case (tp, startOffset) => | ||
checkStartOffsetNotGreaterThanEndOffset( | ||
startOffset, | ||
end.partitionOffsets(tp), | ||
tp, | ||
KafkaExceptions.unresolvedStartOffsetGreaterThanEndOffset | ||
) | ||
} | ||
|
||
case start: SpecificTimestampRangeLimit | ||
if endOffset.isInstanceOf[SpecificTimestampRangeLimit] => | ||
val end = endOffset.asInstanceOf[SpecificTimestampRangeLimit] | ||
if (start.topicTimestamps.keySet != end.topicTimestamps.keySet) { | ||
throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets( | ||
start.topicTimestamps.keySet, end.topicTimestamps.keySet | ||
) | ||
} | ||
start.topicTimestamps.foreach { | ||
case (tp, startOffset) => | ||
checkStartOffsetNotGreaterThanEndOffset( | ||
startOffset, | ||
end.topicTimestamps(tp), | ||
tp, | ||
KafkaExceptions.unresolvedStartTimestampGreaterThanEndTimestamp | ||
) | ||
} | ||
|
||
case _ => // do nothing | ||
} | ||
} | ||
|
||
def getKafkaOffsetRangeLimit( | ||
params: CaseInsensitiveMap[String], | ||
globalOffsetTimestampOptionKey: String, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this import removed ?