Skip to content

Fabric Spark - EH Connector #51211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Vikram-Atreya3
Copy link

Add translate function for eventstream properties to kafka properties

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

@@ -58,7 +58,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with Logging {
import KafkaSourceProvider._

override def shortName(): String = "kafka"
override def shortName(): String = "eventstreams"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this

@@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
validateGeneralOptions(params)
}

private def translateEventStreamProperties(params: CaseInsensitiveMap[String])
: CaseInsensitiveMap[String] = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reflection instead of import

@@ -357,6 +358,29 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
validateGeneralOptions(params)
}

private def translateEventStreamProperties(params: CaseInsensitiveMap[String])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name should be translateEventStreamPropertiesToKafkaProperties

if (params.contains(EVENTSTREAM_NAME_OPTION_KEY) ||
params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) {
validateEventStreamOptions(params)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should perform validation in eventstream jar, we should make minimal changes in spark code base

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In short, my suggestion is to remove this validation method from here

params.contains(EVENTSTREAM_ARTIFACT_ID_OPTION_KEY) ||
params.contains(EVENTSTREAM_CONSUMER_GROUP_OPTION_KEY)) {
validateEventStreamOptions(params)
return CaseInsensitiveMap(KafkaOptionsUtils.buildKafkaOptionsFromSparkConfig(params));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name should be "EventStreamUtils", KafkaOptionsUtils is a generic name which is more suitable for open source kafka connector utils

@@ -454,6 +478,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def toBatch(): Batch = {
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateBatchOptions(caseInsensitiveOptions)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new line changes should be removed

@@ -613,60 +639,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private val serClassName = classOf[ByteArraySerializer].getName
private val deserClassName = classOf[ByteArrayDeserializer].getName

def checkStartOffsetNotGreaterThanEndOffset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be I am missing something, but why are these methods removed ?

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this import removed ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants