Skip to content

Commit 1b3c6f5

Browse files
authored
Support HTTPS downgrade when not using the proxy. Add support for wildcard DNS support for no proxy hosts. (#749) (#751)
1 parent e7f8c1d commit 1b3c6f5

File tree

6 files changed

+384
-44
lines changed

6 files changed

+384
-44
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -50,66 +50,30 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
5050
private[sharing] def createHttpClient() = {
5151
val proxyConfigOpt = ConfUtils.getProxyConfig(getConf)
5252
val maxConnections = ConfUtils.maxConnections(getConf)
53+
val neverUseHttps = ConfUtils.getNeverUseHttps(getConf)
5354
val config = RequestConfig.custom()
5455
.setConnectTimeout(timeoutInSeconds * 1000)
5556
.setConnectionRequestTimeout(timeoutInSeconds * 1000)
5657
.setSocketTimeout(timeoutInSeconds * 1000).build()
5758

5859
logDebug(s"Creating delta sharing httpClient with timeoutInSeconds: $timeoutInSeconds.")
59-
val clientBuilder = HttpClientBuilder.create()
60+
val clientBuilder = DeltaSharingFileSystemHttpClientBuilder.create()
6061
.setMaxConnTotal(maxConnections)
6162
.setMaxConnPerRoute(maxConnections)
6263
.setDefaultRequestConfig(config)
6364
// Disable the default retry behavior because we have our own retry logic.
6465
// See `RetryUtils.runWithExponentialBackoff`.
6566
.disableAutomaticRetries()
6667

68+
if (neverUseHttps) {
69+
clientBuilder.setDisableHttps()
70+
}
71+
6772
// Set proxy if provided.
6873
proxyConfigOpt.foreach { proxyConfig =>
69-
7074
val proxy = new HttpHost(proxyConfig.host, proxyConfig.port)
7175
clientBuilder.setProxy(proxy)
72-
73-
val neverUseHttps = ConfUtils.getNeverUseHttps(getConf)
74-
if (neverUseHttps) {
75-
val httpRequestDowngradeExecutor = new HttpRequestExecutor {
76-
override def execute(
77-
request: HttpRequest,
78-
connection: HttpClientConnection,
79-
context: HttpContext): HttpResponse = {
80-
try {
81-
val modifiedUri: URI = {
82-
new URIBuilder(request.getRequestLine.getUri).setScheme("http").build()
83-
}
84-
val wrappedRequest = new RequestWrapper(request)
85-
wrappedRequest.setURI(modifiedUri)
86-
87-
return super.execute(wrappedRequest, connection, context)
88-
} catch {
89-
case e: Exception =>
90-
logInfo("Failed to downgrade the request to http", e)
91-
}
92-
super.execute(request, connection, context)
93-
}
94-
}
95-
clientBuilder.setRequestExecutor(httpRequestDowngradeExecutor)
96-
}
97-
if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) {
98-
val routePlanner = new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) {
99-
override def determineRoute(target: HttpHost,
100-
request: HttpRequest,
101-
context: HttpContext): HttpRoute = {
102-
if (proxyConfig.noProxyHosts.contains(target.getHostName)) {
103-
// Direct route (no proxy)
104-
new HttpRoute(target)
105-
} else {
106-
// Route via proxy
107-
new HttpRoute(target, proxy)
108-
}
109-
}
110-
}
111-
clientBuilder.setRoutePlanner(routePlanner)
112-
}
76+
clientBuilder.setNoProxyHosts(proxyConfig.noProxyHosts)
11377
}
11478
clientBuilder.build()
11579
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.sharing.client
18+
19+
import java.net.URI
20+
21+
import org.apache.http.{HttpHost, HttpRequest}
22+
import org.apache.http.client.methods.{CloseableHttpResponse, HttpRequestWrapper}
23+
import org.apache.http.client.utils.URIBuilder
24+
import org.apache.http.conn.ClientConnectionManager
25+
import org.apache.http.impl.client.CloseableHttpClient
26+
import org.apache.http.params.HttpParams
27+
import org.apache.http.protocol.HttpContext
28+
29+
private[sharing] case class DeltaSharingFileSystemHttpClient(
30+
noProxyHttpClient: CloseableHttpClient,
31+
proxyHttpClient: CloseableHttpClient,
32+
useProxy: Boolean,
33+
noProxyHosts: Seq[String],
34+
disableHttps: Boolean) extends CloseableHttpClient {
35+
36+
private[sharing] def hasNoProxyHostsMatch(host: String): Boolean = {
37+
noProxyHosts.exists(record => {
38+
// Wildcard DNS records support
39+
if (record.startsWith("*.")) {
40+
host.endsWith(record.drop(1))
41+
} else {
42+
host == record
43+
}
44+
})
45+
}
46+
47+
override protected def doExecute(
48+
target: HttpHost,
49+
request: HttpRequest,
50+
context: HttpContext
51+
): CloseableHttpResponse = {
52+
val (updatedTarget, updatedRequest) = if (disableHttps && target.getSchemeName == "https") {
53+
val modifiedUri: URI = new URIBuilder(request.getRequestLine.getUri).setScheme("http").build()
54+
val wrappedRequest = HttpRequestWrapper.wrap(request)
55+
wrappedRequest.setURI(modifiedUri)
56+
(new HttpHost(target.getHostName, target.getPort, "http"), wrappedRequest)
57+
} else {
58+
(target, request)
59+
}
60+
if (useProxy && !hasNoProxyHostsMatch(target.getHostName)) {
61+
proxyHttpClient.execute(updatedTarget, updatedRequest, context)
62+
} else {
63+
noProxyHttpClient.execute(updatedTarget, updatedRequest, context)
64+
}
65+
}
66+
67+
override def close(): Unit = {
68+
noProxyHttpClient.close()
69+
proxyHttpClient.close()
70+
}
71+
72+
// This is deprecated since 4.3, so overriding with a dummy implementation
73+
override def getParams(): HttpParams = {
74+
throw new UnsupportedOperationException("getParams")
75+
}
76+
77+
// This is deprecated since 4.3, so overriding with a dummy implementation
78+
override def getConnectionManager(): ClientConnectionManager = {
79+
throw new UnsupportedOperationException("getConnectionManager")
80+
}
81+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.sharing.client
18+
19+
import org.apache.http.HttpHost
20+
import org.apache.http.client.config.RequestConfig
21+
import org.apache.http.impl.client.HttpClientBuilder
22+
23+
private[sharing] class DeltaSharingFileSystemHttpClientBuilder {
24+
private var noProxyHttpClientClientBuilder: HttpClientBuilder = HttpClientBuilder.create()
25+
private var proxyHttpClientClientBuilder: HttpClientBuilder = HttpClientBuilder.create()
26+
private var useProxy: Boolean = false
27+
private var noProxyHosts: Seq[String] = Seq.empty
28+
private var disableHttps: Boolean = false
29+
30+
def setMaxConnTotal(maxConnTotal: Int): DeltaSharingFileSystemHttpClientBuilder = {
31+
noProxyHttpClientClientBuilder.setMaxConnTotal(maxConnTotal)
32+
proxyHttpClientClientBuilder.setMaxConnTotal(maxConnTotal)
33+
this
34+
}
35+
36+
def setMaxConnPerRoute(maxConnPerRoute: Int): DeltaSharingFileSystemHttpClientBuilder = {
37+
noProxyHttpClientClientBuilder.setMaxConnPerRoute(maxConnPerRoute)
38+
proxyHttpClientClientBuilder.setMaxConnPerRoute(maxConnPerRoute)
39+
this
40+
}
41+
42+
def setDefaultRequestConfig(config: RequestConfig): DeltaSharingFileSystemHttpClientBuilder = {
43+
noProxyHttpClientClientBuilder.setDefaultRequestConfig(config)
44+
proxyHttpClientClientBuilder.setDefaultRequestConfig(config)
45+
this
46+
}
47+
48+
def disableAutomaticRetries(): DeltaSharingFileSystemHttpClientBuilder = {
49+
noProxyHttpClientClientBuilder.disableAutomaticRetries()
50+
proxyHttpClientClientBuilder.disableAutomaticRetries()
51+
this
52+
}
53+
54+
def setProxy(proxy: HttpHost): DeltaSharingFileSystemHttpClientBuilder = {
55+
proxyHttpClientClientBuilder.setProxy(proxy)
56+
useProxy = true
57+
this
58+
}
59+
60+
def setNoProxyHosts(noProxyHosts: Seq[String]): DeltaSharingFileSystemHttpClientBuilder = {
61+
this.noProxyHosts = noProxyHosts
62+
this
63+
}
64+
65+
def setDisableHttps(): DeltaSharingFileSystemHttpClientBuilder = {
66+
disableHttps = true
67+
this
68+
}
69+
70+
def build(): DeltaSharingFileSystemHttpClient = {
71+
DeltaSharingFileSystemHttpClient(
72+
noProxyHttpClient = noProxyHttpClientClientBuilder.build(),
73+
proxyHttpClient = proxyHttpClientClientBuilder.build(),
74+
useProxy = useProxy,
75+
noProxyHosts = noProxyHosts,
76+
disableHttps = disableHttps
77+
)
78+
}
79+
}
80+
81+
private[sharing] object DeltaSharingFileSystemHttpClientBuilder {
82+
def create(): DeltaSharingFileSystemHttpClientBuilder = {
83+
new DeltaSharingFileSystemHttpClientBuilder
84+
}
85+
}

client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ object ConfUtils {
4848
val TIMEOUT_CONF = "spark.delta.sharing.network.timeout"
4949
val TIMEOUT_DEFAULT = "320s"
5050

51+
// Note: There is a separate pool for proxy and non-proxy connections.
52+
// Thus, if you use both spark.delta.sharing.network.proxyHost and
53+
// spark.delta.sharing.network.noProxyHosts,
54+
// the max number of connections will be 2 * spark.delta.sharing.network.maxConnections.
5155
val MAX_CONNECTION_CONF = "spark.delta.sharing.network.maxConnections"
5256
val MAX_CONNECTION_DEFAULT = 64
5357

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.sharing.client
18+
19+
import org.apache.http.impl.client.HttpClients
20+
import org.apache.spark.SparkFunSuite
21+
22+
class DeltaSharingFileSystemHttpClientSuite extends SparkFunSuite {
23+
/**
24+
* Helper method to create a test DeltaSharingFileSystemHttpClient with mock dependencies
25+
*/
26+
private def createTestHttpClient(
27+
noProxyHosts: Seq[String] = Seq.empty): DeltaSharingFileSystemHttpClient = {
28+
val mockHttpClient = HttpClients.createDefault()
29+
DeltaSharingFileSystemHttpClient(
30+
noProxyHttpClient = mockHttpClient,
31+
proxyHttpClient = mockHttpClient,
32+
useProxy = true,
33+
noProxyHosts = noProxyHosts,
34+
disableHttps = false
35+
)
36+
}
37+
38+
test("hasNoProxyHostsMatch - exact host matches") {
39+
val client = createTestHttpClient(noProxyHosts = Seq("example.com", "localhost", "127.0.0.1"))
40+
41+
assert(client.hasNoProxyHostsMatch("example.com"))
42+
assert(client.hasNoProxyHostsMatch("localhost"))
43+
assert(client.hasNoProxyHostsMatch("127.0.0.1"))
44+
assert(!client.hasNoProxyHostsMatch("different.com"))
45+
assert(!client.hasNoProxyHostsMatch("sub.example.com"))
46+
}
47+
48+
test("hasNoProxyHostsMatch - wildcard DNS matches") {
49+
val client = createTestHttpClient(noProxyHosts = Seq("*.example.com", "*.internal"))
50+
51+
// Should match wildcard patterns
52+
assert(client.hasNoProxyHostsMatch("api.example.com"))
53+
assert(client.hasNoProxyHostsMatch("sub.example.com"))
54+
assert(client.hasNoProxyHostsMatch("deep.nested.example.com"))
55+
assert(client.hasNoProxyHostsMatch("service.internal"))
56+
57+
// Should NOT match the wildcard domain itself
58+
assert(!client.hasNoProxyHostsMatch("example.com"))
59+
assert(!client.hasNoProxyHostsMatch("internal"))
60+
61+
// Should NOT match different domains
62+
assert(!client.hasNoProxyHostsMatch("example.org"))
63+
assert(!client.hasNoProxyHostsMatch("notexample.com"))
64+
assert(!client.hasNoProxyHostsMatch("external"))
65+
}
66+
67+
test("hasNoProxyHostsMatch - empty noProxyHosts list") {
68+
val client = createTestHttpClient(noProxyHosts = Seq.empty)
69+
70+
assert(!client.hasNoProxyHostsMatch("example.com"))
71+
assert(!client.hasNoProxyHostsMatch("localhost"))
72+
assert(!client.hasNoProxyHostsMatch("127.0.0.1"))
73+
}
74+
75+
test("hasNoProxyHostsMatch - edge cases") {
76+
// These edge cases may not be correct, but for simplicity,
77+
// we will allow some of these behaviors.
78+
val client = createTestHttpClient(noProxyHosts = Seq("*.", "*.168.1.1"))
79+
80+
assert(!client.hasNoProxyHostsMatch("example.com"))
81+
assert(client.hasNoProxyHostsMatch("."))
82+
assert(client.hasNoProxyHostsMatch("192.168.1.1"))
83+
}
84+
}

0 commit comments

Comments
 (0)