-
Notifications
You must be signed in to change notification settings - Fork 28.6k
Fabric Spark - EH Connector #51211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Fabric Spark - EH Connector #51211
Conversation
@@ -58,7 +58,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
with Logging { | |||
import KafkaSourceProvider._ | |||
|
|||
override def shortName(): String = "kafka" | |||
override def shortName(): String = "eventstreams" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this
@@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
validateGeneralOptions(params) | |||
} | |||
|
|||
private def translateEventStreamProperties(params: CaseInsensitiveMap[String]) | |||
: CaseInsensitiveMap[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use reflection instead of import
@@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
validateGeneralOptions(params) | |||
} | |||
|
|||
private def translateEventStreamProperties(params: CaseInsensitiveMap[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name should be translateEventStreamPropertiesToKafkaProperties
if (params.contains(EVENTSTREAM_NAME_OPTION_KEY) || | ||
params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || | ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { | ||
validateEventStreamOptions(params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should perform validation in eventstream jar, we should make minimal changes in spark code base
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In short, my suggestion is to remove this validation method from here
params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) || | ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) { | ||
validateEventStreamOptions(params) | ||
return CaseInsensitiveMap(KafkaOptionsUtils.buildKafkaOptionsFromSparkConfig(params)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name should be "EventStreamUtils", KafkaOptionsUtils is a generic name which is more suitable for open source kafka connector utils
@@ -454,6 +478,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
override def toBatch(): Batch = { | |||
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) | |||
validateBatchOptions(caseInsensitiveOptions) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These new line changes should be removed
@@ -613,60 +639,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { | |||
private val serClassName = classOf[ByteArraySerializer].getName | |||
private val deserClassName = classOf[ByteArrayDeserializer].getName | |||
|
|||
def checkStartOffsetNotGreaterThanEndOffset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be I am missing something, but why are these methods removed ?
import org.apache.kafka.clients.consumer.ConsumerConfig | ||
import org.apache.kafka.clients.producer.ProducerConfig | ||
import org.apache.kafka.common.TopicPartition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this import removed ?
Add translate function for eventstream properties to kafka properties
Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?