Skip to content

Fabric Spark - EH Connector #51211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions connector/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.sparkconnector</groupId>
<artifactId>SparkConnectorForEventHub</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>trident</groupId>
<artifactId>trident-token-library</artifactId>
<version>1</version> <!-- or latest stable -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.util.{Locale, UUID}

import scala.jdk.CollectionConverters._

import EventStreamsKafkaConnector.KafkaOptionsUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this import removed ?

import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.internal.{Logging, LogKeys, MDC}
Expand Down Expand Up @@ -58,7 +58,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with Logging {
import KafkaSourceProvider._

override def shortName(): String = "kafka"
override def shortName(): String = "eventstreams"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this


/**
* Returns the name and schema of the source. In addition, it also verifies whether the options
Expand All @@ -70,9 +70,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters)
validateStreamOptions(caseInsensitiveParameters)
require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean
val includeHeaders = translatedKafkaParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean
(shortName(), KafkaRecordToRowConverter.kafkaSchema(includeHeaders))
}

Expand All @@ -88,8 +89,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveParameters, metadataPath)

val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters)
val translatedKafkaParameters =
translateEventStreamProperties(caseInsensitiveParameters)
val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY,
Expand Down Expand Up @@ -128,7 +130,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String]): BaseRelation = {
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
validateBatchOptions(caseInsensitiveParameters)
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveParameters)
val translatedKafkaParameters = translateEventStreamProperties(caseInsensitiveParameters)
val specifiedKafkaParams = convertToSpecifiedParams(translatedKafkaParameters)

val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, STARTING_TIMESTAMP_OPTION_KEY,
Expand All @@ -142,8 +145,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
LatestOffsetRangeLimit)
assert(endingRelationOffsets != EarliestOffsetRangeLimit)

checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets)

val includeHeaders = caseInsensitiveParameters.getOrElse(INCLUDE_HEADERS, "false").toBoolean

new KafkaRelation(
Expand Down Expand Up @@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
validateGeneralOptions(params)
}

private def translateEventStreamProperties(params: CaseInsensitiveMap[String])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name should be translateEventStreamPropertiesToKafkaProperties

: CaseInsensitiveMap[String] = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reflection instead of import

if (params.contains(EVENTSTREAM_NAME_OPTION_KEY) ||
params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) {
validateEventStreamOptions(params)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should perform validation in eventstream jar, we should make minimal changes in spark code base

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, my suggestion is to remove this validation method from here

return CaseInsensitiveMap(KafkaOptionsUtils.buildKafkaOptionsFromSparkConfig(params));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name should be "EventStreamUtils", KafkaOptionsUtils is a generic name which is more suitable for open source kafka connector utils

}
params;
}

private def validateEventStreamOptions(params: CaseInsensitiveMap[String]) = {
// Stream specific options
if (!params.contains(EVENTSTREAM_NAME_OPTION_KEY) ||
!params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) ||
!params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) {
throw new IllegalArgumentException(
"All threee EventStream properties - " +
"(eventstream.itemid, eventstream.name and eventstrream.conumergroup) " +
"are necessarry to run spark with eventstream configs ")
}
}

private def validateBatchOptions(params: CaseInsensitiveMap[String]) = {
// Batch specific options
KafkaSourceProvider.getKafkaOffsetRangeLimit(
Expand Down Expand Up @@ -454,6 +478,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def toBatch(): Batch = {
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateBatchOptions(caseInsensitiveOptions)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new line changes should be removed

val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)

val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
Expand All @@ -466,8 +491,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, ENDING_OFFSETS_OPTION_KEY,
LatestOffsetRangeLimit)

checkOffsetLimitValidity(startingRelationOffsets, endingRelationOffsets)

new KafkaBatch(
strategy(caseInsensitiveOptions),
caseInsensitiveOptions,
Expand Down Expand Up @@ -562,6 +585,9 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private val SUBSCRIBE_PATTERN = "subscribepattern"
private val SUBSCRIBE = "subscribe"
private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN)
private val EVENTSTREAM_ARTIFACT_ID_OPTION_KEY = "eventstream.itemid";
private val EVENTSTREAM_NAME_OPTION_KEY = "eventstream.name";
private val EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY = "eventstream.consumerGroup";
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private[kafka010] val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
Expand Down Expand Up @@ -613,60 +639,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private val serClassName = classOf[ByteArraySerializer].getName
private val deserClassName = classOf[ByteArrayDeserializer].getName

def checkStartOffsetNotGreaterThanEndOffset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be I am missing something, but why are these methods removed ?

startOffset: Long,
endOffset: Long,
topicPartition: TopicPartition,
exception: (Long, Long, TopicPartition) => Exception): Unit = {
// earliest or latest offsets are negative and should not be compared
if (startOffset > endOffset && startOffset >= 0 && endOffset >= 0) {
throw exception(startOffset, endOffset, topicPartition)
}
}

def checkOffsetLimitValidity(
startOffset: KafkaOffsetRangeLimit,
endOffset: KafkaOffsetRangeLimit): Unit = {
startOffset match {
case start: SpecificOffsetRangeLimit if endOffset.isInstanceOf[SpecificOffsetRangeLimit] =>
val end = endOffset.asInstanceOf[SpecificOffsetRangeLimit]
if (start.partitionOffsets.keySet != end.partitionOffsets.keySet) {
throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets(
start.partitionOffsets.keySet, end.partitionOffsets.keySet
)
}
start.partitionOffsets.foreach {
case (tp, startOffset) =>
checkStartOffsetNotGreaterThanEndOffset(
startOffset,
end.partitionOffsets(tp),
tp,
KafkaExceptions.unresolvedStartOffsetGreaterThanEndOffset
)
}

case start: SpecificTimestampRangeLimit
if endOffset.isInstanceOf[SpecificTimestampRangeLimit] =>
val end = endOffset.asInstanceOf[SpecificTimestampRangeLimit]
if (start.topicTimestamps.keySet != end.topicTimestamps.keySet) {
throw KafkaExceptions.unmatchedTopicPartitionsBetweenOffsets(
start.topicTimestamps.keySet, end.topicTimestamps.keySet
)
}
start.topicTimestamps.foreach {
case (tp, startOffset) =>
checkStartOffsetNotGreaterThanEndOffset(
startOffset,
end.topicTimestamps(tp),
tp,
KafkaExceptions.unresolvedStartTimestampGreaterThanEndTimestamp
)
}

case _ => // do nothing
}
}

def getKafkaOffsetRangeLimit(
params: CaseInsensitiveMap[String],
globalOffsetTimestampOptionKey: String,
Expand Down