Skip to content

Commit 33ace75

Browse files
authored
Loom Support (square#7367)
1 parent 4ae6ce4 commit 33ace75

File tree

13 files changed

+152
-62
lines changed

13 files changed

+152
-62
lines changed

gradle.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ kotlin.js.compiler=ir
88
kotlin.incremental.js.ir=true
99
androidBuild=false
1010
graalBuild=false
11+
loomBuild=false

okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,21 @@
1616
package okhttp3
1717

1818
import android.annotation.SuppressLint
19+
import java.util.concurrent.ThreadFactory
1920
import java.util.concurrent.TimeUnit
2021
import java.util.logging.Handler
2122
import java.util.logging.Level
2223
import java.util.logging.LogManager
2324
import java.util.logging.LogRecord
2425
import java.util.logging.Logger
26+
import okhttp3.internal.buildConnectionPool
2527
import okhttp3.internal.concurrent.TaskRunner
2628
import okhttp3.internal.connection.RealConnectionPool
2729
import okhttp3.internal.http2.Http2
30+
import okhttp3.internal.taskRunnerInternal
2831
import okhttp3.testing.Flaky
32+
import okhttp3.testing.PlatformRule.Companion.LOOM_PROPERTY
33+
import okhttp3.testing.PlatformRule.Companion.getPlatformSystemProperty
2934
import org.junit.jupiter.api.Assertions.assertEquals
3035
import org.junit.jupiter.api.Assertions.fail
3136
import org.junit.jupiter.api.extension.AfterEachCallback
@@ -116,10 +121,9 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
116121
fun newClient(): OkHttpClient {
117122
var client = testClient
118123
if (client == null) {
119-
client = OkHttpClient.Builder()
124+
client = initialClientBuilder()
120125
.dns(SINGLE_INET_ADDRESS_DNS) // Prevent unexpected fallback addresses.
121126
.eventListenerFactory { ClientRuleEventListener(logger = ::addEvent) }
122-
.connectionPool(ConnectionPool(connectionListener = connectionListener))
123127
.build()
124128
connectionListener.forbidLock(RealConnectionPool.get(client.connectionPool))
125129
connectionListener.forbidLock(client.dispatcher)
@@ -128,6 +132,29 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
128132
return client
129133
}
130134

135+
private fun initialClientBuilder(): OkHttpClient.Builder = if (isLoom()) {
136+
val backend = TaskRunner.RealBackend(loomThreadFactory())
137+
val taskRunner = TaskRunner(backend)
138+
139+
OkHttpClient.Builder()
140+
.connectionPool(buildConnectionPool(connectionListener = connectionListener, taskRunner = taskRunner))
141+
.dispatcher(Dispatcher(backend.executor))
142+
.taskRunnerInternal(taskRunner)
143+
} else {
144+
OkHttpClient.Builder()
145+
.connectionPool(ConnectionPool(connectionListener = connectionListener))
146+
}
147+
148+
private fun loomThreadFactory(): ThreadFactory {
149+
val ofVirtual = Thread::class.java.getMethod("ofVirtual").invoke(null)
150+
151+
return Class.forName("java.lang.Thread\$Builder").getMethod("factory").invoke(ofVirtual) as ThreadFactory
152+
}
153+
154+
private fun isLoom(): Boolean {
155+
return getPlatformSystemProperty() == LOOM_PROPERTY
156+
}
157+
131158
fun newClientBuilder(): OkHttpClient.Builder {
132159
return newClient().newBuilder()
133160
}

okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.Semaphore
2525
import java.util.concurrent.TimeUnit
2626
import java.util.concurrent.atomic.AtomicBoolean
2727
import java.util.logging.Logger
28+
import kotlin.concurrent.withLock
2829

2930
/**
3031
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
@@ -47,24 +48,18 @@ import java.util.logging.Logger
4748
class TaskFaker : Closeable {
4849
@Suppress("NOTHING_TO_INLINE")
4950
internal inline fun Any.assertThreadHoldsLock() {
50-
if (assertionsEnabled && !Thread.holdsLock(this)) {
51+
if (assertionsEnabled && !taskRunner.lock.isHeldByCurrentThread) {
5152
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
5253
}
5354
}
5455

5556
@Suppress("NOTHING_TO_INLINE")
5657
internal inline fun Any.assertThreadDoesntHoldLock() {
57-
if (assertionsEnabled && Thread.holdsLock(this)) {
58+
if (assertionsEnabled && taskRunner.lock.isHeldByCurrentThread) {
5859
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
5960
}
6061
}
6162

62-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
63-
internal inline fun Any.wait() = (this as Object).wait()
64-
65-
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "NOTHING_TO_INLINE")
66-
internal inline fun Any.notifyAll() = (this as Object).notifyAll()
67-
6863
val logger = Logger.getLogger("TaskFaker." + instance++)
6964

7065
/** Though this executor service may hold many threads, they are not executed concurrently. */
@@ -108,9 +103,9 @@ class TaskFaker : Closeable {
108103
val acquiredTaskRunnerLock = AtomicBoolean()
109104

110105
tasksExecutor.execute {
111-
synchronized(taskRunner) {
106+
taskRunner.lock.withLock {
112107
acquiredTaskRunnerLock.set(true)
113-
taskRunner.notifyAll()
108+
taskRunner.condition.signalAll()
114109

115110
tasksRunningCount++
116111
if (tasksRunningCount > 1) isParallel = true
@@ -130,7 +125,7 @@ class TaskFaker : Closeable {
130125

131126
// Execute() must not return until the launched task stalls.
132127
while (!acquiredTaskRunnerLock.get()) {
133-
taskRunner.wait()
128+
taskRunner.condition.await()
134129
}
135130
}
136131

@@ -141,7 +136,7 @@ class TaskFaker : Closeable {
141136
check(waitingCoordinatorThread != null)
142137

143138
stalledTasks.remove(waitingCoordinatorThread)
144-
taskRunner.notifyAll()
139+
taskRunner.condition.signalAll()
145140
}
146141

147142
override fun coordinatorWait(taskRunner: TaskRunner, nanos: Long) {
@@ -170,7 +165,7 @@ class TaskFaker : Closeable {
170165
stalledTasks += currentThread
171166
try {
172167
while (currentThread in stalledTasks) {
173-
taskRunner.wait()
168+
taskRunner.condition.await()
174169
}
175170
} catch (e: InterruptedException) {
176171
stalledTasks.remove(currentThread)
@@ -182,7 +177,7 @@ class TaskFaker : Closeable {
182177
taskRunner.assertThreadHoldsLock()
183178

184179
stalledTasks.clear()
185-
taskRunner.notifyAll()
180+
taskRunner.condition.signalAll()
186181
}
187182

188183
/** Runs all tasks that are ready. Used by the test thread only. */
@@ -194,7 +189,7 @@ class TaskFaker : Closeable {
194189
fun advanceUntil(newTime: Long) {
195190
taskRunner.assertThreadDoesntHoldLock()
196191

197-
synchronized(taskRunner) {
192+
taskRunner.lock.withLock {
198193
isRunningAllTasks = true
199194
nanoTime = newTime
200195
unstallTasks()
@@ -207,7 +202,7 @@ class TaskFaker : Closeable {
207202
taskRunner.assertThreadDoesntHoldLock()
208203

209204
while (true) {
210-
synchronized(taskRunner) {
205+
taskRunner.lock.withLock {
211206
if (tasksRunningCount == stalledTasks.size) {
212207
isRunningAllTasks = false
213208
return@waitForTasksToStall // All stalled.
@@ -222,7 +217,7 @@ class TaskFaker : Closeable {
222217
fun assertNoMoreTasks() {
223218
taskRunner.assertThreadDoesntHoldLock()
224219

225-
synchronized(taskRunner) {
220+
taskRunner.lock.withLock {
226221
assertThat(stalledTasks).isEmpty()
227222
}
228223
}
@@ -234,7 +229,7 @@ class TaskFaker : Closeable {
234229
// Make sure the coordinator is ready to be interrupted.
235230
runTasks()
236231

237-
synchronized(taskRunner) {
232+
taskRunner.lock.withLock {
238233
val toInterrupt = waitingCoordinatorThread ?: error("no thread currently waiting")
239234
taskBecameStalled.drainPermits()
240235
toInterrupt.interrupt()
@@ -247,10 +242,10 @@ class TaskFaker : Closeable {
247242
fun runNextTask() {
248243
taskRunner.assertThreadDoesntHoldLock()
249244

250-
synchronized(taskRunner) {
245+
taskRunner.lock.withLock {
251246
check(stalledTasks.size >= 1) { "no tasks to run" }
252247
stalledTasks.removeFirst()
253-
taskRunner.notifyAll()
248+
taskRunner.condition.signalAll()
254249
}
255250

256251
waitForTasksToStall()

okhttp/api/okhttp.api

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ public final class okhttp3/ConnectionListener$Companion {
378378
public final class okhttp3/ConnectionPool {
379379
public fun <init> ()V
380380
public fun <init> (IJLjava/util/concurrent/TimeUnit;)V
381-
public fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;)V
382-
public synthetic fun <init> (IJLjava/util/concurrent/TimeUnit;Lokhttp3/ConnectionListener;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
381+
public fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;)V
382+
public synthetic fun <init> (Lokhttp3/ConnectionListener;IJLjava/util/concurrent/TimeUnit;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
383383
public final fun connectionCount ()I
384384
public final fun evictAll ()V
385385
public final fun idleConnectionCount ()I

okhttp/src/jvmMain/kotlin/okhttp3/Cache.kt

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.security.cert.CertificateEncodingException
2424
import java.security.cert.CertificateException
2525
import java.security.cert.CertificateFactory
2626
import java.util.TreeSet
27+
import java.util.concurrent.TimeUnit
2728
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
2829
import okhttp3.MediaType.Companion.toMediaTypeOrNull
2930
import okhttp3.internal.EMPTY_HEADERS
@@ -32,6 +33,7 @@ import okhttp3.internal.cache.CacheStrategy
3233
import okhttp3.internal.cache.DiskLruCache
3334
import okhttp3.internal.closeQuietly
3435
import okhttp3.internal.concurrent.TaskRunner
36+
import okhttp3.internal.connection.RealConnectionPool
3537
import okhttp3.internal.http.HttpMethod
3638
import okhttp3.internal.http.StatusLine
3739
import okhttp3.internal.platform.Platform
@@ -141,18 +143,30 @@ import okio.buffer
141143
*
142144
* [rfc_7234]: http://tools.ietf.org/html/rfc7234
143145
*/
144-
class Cache(
146+
class Cache internal constructor(
145147
directory: Path,
146148
maxSize: Long,
147-
fileSystem: FileSystem
149+
fileSystem: FileSystem,
150+
taskRunner: TaskRunner
148151
) : Closeable, Flushable {
152+
constructor(
153+
directory: Path,
154+
maxSize: Long,
155+
fileSystem: FileSystem,
156+
) : this(
157+
directory,
158+
maxSize,
159+
fileSystem,
160+
TaskRunner.INSTANCE
161+
)
162+
149163
internal val cache = DiskLruCache(
150164
fileSystem = fileSystem,
151165
directory = directory,
152166
appVersion = VERSION,
153167
valueCount = ENTRY_COUNT,
154168
maxSize = maxSize,
155-
taskRunner = TaskRunner.INSTANCE
169+
taskRunner = taskRunner
156170
)
157171

158172
// read and write statistics, all guarded by 'this'.

okhttp/src/jvmMain/kotlin/okhttp3/ConnectionPool.kt

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,45 @@ import okhttp3.internal.connection.RealConnectionPool
3333
class ConnectionPool internal constructor(
3434
internal val delegate: RealConnectionPool
3535
) {
36-
constructor(
37-
maxIdleConnections: Int,
38-
keepAliveDuration: Long,
39-
timeUnit: TimeUnit
36+
internal constructor(
37+
maxIdleConnections: Int = 5,
38+
keepAliveDuration: Long = 5,
39+
timeUnit: TimeUnit = TimeUnit.MINUTES,
40+
taskRunner: TaskRunner = TaskRunner.INSTANCE,
41+
connectionListener: ConnectionListener = ConnectionListener.NONE,
4042
) : this(RealConnectionPool(
41-
taskRunner = TaskRunner.INSTANCE,
43+
taskRunner = taskRunner,
4244
maxIdleConnections = maxIdleConnections,
4345
keepAliveDuration = keepAliveDuration,
4446
timeUnit = timeUnit,
45-
connectionListener = ConnectionListener.NONE
47+
connectionListener = connectionListener
4648
))
4749

4850
constructor(
51+
connectionListener: ConnectionListener = ConnectionListener.NONE,
4952
maxIdleConnections: Int = 5,
5053
keepAliveDuration: Long = 5,
5154
timeUnit: TimeUnit = TimeUnit.MINUTES,
52-
connectionListener: ConnectionListener = ConnectionListener.NONE
53-
) : this(RealConnectionPool(
55+
) : this(
5456
taskRunner = TaskRunner.INSTANCE,
5557
maxIdleConnections = maxIdleConnections,
5658
keepAliveDuration = keepAliveDuration,
5759
timeUnit = timeUnit,
5860
connectionListener = connectionListener
59-
))
61+
)
62+
63+
// Public API
64+
constructor(
65+
maxIdleConnections: Int,
66+
keepAliveDuration: Long,
67+
timeUnit: TimeUnit,
68+
) : this(
69+
maxIdleConnections = maxIdleConnections,
70+
keepAliveDuration = keepAliveDuration,
71+
timeUnit = timeUnit,
72+
taskRunner = TaskRunner.INSTANCE,
73+
connectionListener = ConnectionListener.NONE
74+
)
6075

6176
constructor() : this(5, 5, TimeUnit.MINUTES)
6277

okhttp/src/jvmMain/kotlin/okhttp3/OkHttpClient.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,10 @@ open class OkHttpClient internal constructor(
718718
this.cache = cache
719719
}
720720

721+
internal fun taskRunner(taskRunner: TaskRunner) = apply {
722+
this.taskRunner = taskRunner
723+
}
724+
721725
/**
722726
* Sets the DNS service used to lookup IP addresses for hostnames.
723727
*

okhttp/src/jvmMain/kotlin/okhttp3/internal/-UtilJvm.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import java.util.Locale
2727
import java.util.TimeZone
2828
import java.util.concurrent.ThreadFactory
2929
import java.util.concurrent.TimeUnit
30+
import java.util.concurrent.locks.ReentrantLock
3031
import kotlin.text.Charsets.UTF_16BE
3132
import kotlin.text.Charsets.UTF_16LE
3233
import kotlin.text.Charsets.UTF_32BE
@@ -299,13 +300,27 @@ internal val assertionsEnabled: Boolean = OkHttpClient::class.java.desiredAssert
299300
internal val okHttpName: String =
300301
OkHttpClient::class.java.name.removePrefix("okhttp3.").removeSuffix("Client")
301302

303+
@Suppress("NOTHING_TO_INLINE")
304+
internal inline fun ReentrantLock.assertHeld() {
305+
if (assertionsEnabled && !this.isHeldByCurrentThread) {
306+
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
307+
}
308+
}
309+
302310
@Suppress("NOTHING_TO_INLINE")
303311
internal inline fun Any.assertThreadHoldsLock() {
304312
if (assertionsEnabled && !Thread.holdsLock(this)) {
305313
throw AssertionError("Thread ${Thread.currentThread().name} MUST hold lock on $this")
306314
}
307315
}
308316

317+
@Suppress("NOTHING_TO_INLINE")
318+
internal inline fun ReentrantLock.assertNotHeld() {
319+
if (assertionsEnabled && this.isHeldByCurrentThread) {
320+
throw AssertionError("Thread ${Thread.currentThread().name} MUST NOT hold lock on $this")
321+
}
322+
}
323+
309324
@Suppress("NOTHING_TO_INLINE")
310325
internal inline fun Any.assertThreadDoesntHoldLock() {
311326
if (assertionsEnabled && Thread.holdsLock(this)) {

0 commit comments

Comments
 (0)