Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-6734 Upgrade Jetty to 12 #4578

Draft
wants to merge 4 commits into
base: 3.1.0-eap
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ netty = "4.1.116.Final"
netty-tcnative = "2.0.69.Final"

jetty = "9.4.56.v20240826"
jetty-jakarta = "11.0.24"
jetty-jakarta = "12.0.16"
jetty-alpn-api = "1.1.3.v20160715"
jetty-alpn-boot = "8.1.13.v20181017"
jetty-alpn-openjdk8 = "9.4.56.v20240826"
Expand Down Expand Up @@ -160,12 +160,12 @@ jetty-alpn-openjdk8-server = { module = "org.eclipse.jetty:jetty-alpn-openjdk8-s
jetty-alpn-openjdk8-client = { module = "org.eclipse.jetty:jetty-alpn-openjdk8-client", version.ref = "jetty-alpn-openjdk8" }

jetty-client-jakarta = { module = "org.eclipse.jetty:jetty-client", version.ref = "jetty-jakarta" }
jetty-http2-server-jakarta = { module = "org.eclipse.jetty.http2:http2-server", version.ref = "jetty-jakarta" }
jetty-http2-client-jakarta = { module = "org.eclipse.jetty.http2:http2-client", version.ref = "jetty-jakarta" }
jetty-http2-server-jakarta = { module = "org.eclipse.jetty.http2:jetty-http2-server", version.ref = "jetty-jakarta" }
jetty-http2-client-jakarta = { module = "org.eclipse.jetty.http2:jetty-http2-client", version.ref = "jetty-jakarta" }
jetty-http2-client-transport-jakarta = { module = "org.eclipse.jetty.http2:http2-http-client-transport", version.ref = "jetty-jakarta" }
jetty-server-jakarta = { module = "org.eclipse.jetty:jetty-server", version.ref = "jetty-jakarta" }
jetty-servlets-jakarta = { module = "org.eclipse.jetty:jetty-servlets", version.ref = "jetty-jakarta" }
jetty-servlet-jakarta = { module = "org.eclipse.jetty:jetty-servlet", version.ref = "jetty-jakarta" }
jetty-servlets-jakarta = { module = "org.eclipse.jetty.ee10:jetty-ee10-servlets", version.ref = "jetty-jakarta" }
jetty-servlet-jakarta = { module = "org.eclipse.jetty.ee10:jetty-ee10-servlet", version.ref = "jetty-jakarta" }
jetty-alpn-server-jakarta = { module = "org.eclipse.jetty:jetty-alpn-server", version.ref = "jetty-jakarta" }
jetty-alpn-java-server-jakarta = { module = "org.eclipse.jetty:jetty-alpn-java-server", version.ref = "jetty-jakarta" }
jetty-alpn-java-client-jakarta = { module = "org.eclipse.jetty:jetty-alpn-java-client", version.ref = "jetty-jakarta" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.test.junit.coroutines.*
import io.ktor.network.tls.certificates.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.test.junit.coroutines.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ public final class io/ktor/client/engine/jetty/jakarta/JettyEngineConfig : io/kt
public fun <init> ()V
public final fun configureClient (Lkotlin/jvm/functions/Function1;)V
public final fun getClientCacheSize ()I
public final fun getSslContextFactory ()Lorg/eclipse/jetty/util/ssl/SslContextFactory;
public final fun getSslContextFactory ()Lorg/eclipse/jetty/util/ssl/SslContextFactory$Client;
public final fun setClientCacheSize (I)V
public final fun setSslContextFactory (Lorg/eclipse/jetty/util/ssl/SslContextFactory;)V
public final fun setSslContextFactory (Lorg/eclipse/jetty/util/ssl/SslContextFactory$Client;)V
}

public final class io/ktor/client/engine/jetty/jakarta/JettyEngineContainer : io/ktor/client/HttpClientEngineContainer {
Expand Down
3 changes: 1 addition & 2 deletions ktor-client/ktor-client-jetty-jakarta/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ kotlin.sourceSets {
api(project(":ktor-client:ktor-client-core"))

api(libs.jetty.http2.client.jakarta)
api(libs.jetty.alpn.openjdk8.client)
api(libs.jetty.alpn.java.client)
api(libs.jetty.alpn.java.client.jakarta)
}
}
commonTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.ktor.client.engine.jetty.jakarta

import io.ktor.client.engine.*
import org.eclipse.jetty.http2.client.*
import org.eclipse.jetty.util.ssl.*
import org.eclipse.jetty.http2.client.HTTP2Client
import org.eclipse.jetty.util.ssl.SslContextFactory

/**
* A configuration for the [Jetty] client engine.
Expand All @@ -17,7 +17,7 @@ public class JettyEngineConfig : HttpClientEngineConfig() {
/**
* Allows you to configure [SSL](https://ktor.io/docs/client-ssl.html) settings for this engine.
*/
public var sslContextFactory: SslContextFactory = SslContextFactory.Client()
public var sslContextFactory: SslContextFactory.Client = SslContextFactory.Client()

/**
* Specifies the size of cache that keeps recently used [JettyHttp2Engine] instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import kotlinx.coroutines.*
import org.eclipse.jetty.http.*
import org.eclipse.jetty.http2.api.*
import org.eclipse.jetty.http2.client.*
import org.eclipse.jetty.http2.client.internal.HTTP2ClientSession
import org.eclipse.jetty.http2.frames.*
import org.eclipse.jetty.io.Transport
import org.eclipse.jetty.util.*
import java.net.*
import java.nio.*
Expand Down Expand Up @@ -58,12 +60,20 @@ internal suspend fun HttpRequestData.executeRequest(
)
}

private val NoopListener = object : Session.Listener {}

internal suspend fun HTTP2Client.connect(
url: Url,
config: JettyEngineConfig
): Session = withPromise { promise ->
val factory = if (url.protocol.isSecure()) config.sslContextFactory else null
connect(factory, InetSocketAddress(url.host, url.port), Session.Listener.Adapter(), promise)
): Session = withPromise { promise: Promise<Session> ->
connect(
Transport.TCP_IP,
config.sslContextFactory,
InetSocketAddress(url.host, url.port),
NoopListener,
promise,
mutableMapOf<String, Object>() as Map<String, Object>
)
}

@OptIn(InternalAPI::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,34 @@ package io.ktor.client.engine.jetty.jakarta

import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.http.HttpMethod
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import org.eclipse.jetty.http.*
import org.eclipse.jetty.http2.*
import org.eclipse.jetty.http2.api.*
import org.eclipse.jetty.http2.client.*
import org.eclipse.jetty.http2.frames.*
import org.eclipse.jetty.util.*
import java.io.*
import java.nio.*
import java.nio.channels.*
import kotlin.coroutines.*
import kotlinx.coroutines.launch
import org.eclipse.jetty.http.MetaData
import org.eclipse.jetty.http2.ErrorCode
import org.eclipse.jetty.http2.HTTP2Session
import org.eclipse.jetty.http2.api.Stream
import org.eclipse.jetty.http2.frames.HeadersFrame
import org.eclipse.jetty.http2.frames.PushPromiseFrame
import org.eclipse.jetty.http2.frames.ResetFrame
import org.eclipse.jetty.util.Callback
import org.eclipse.jetty.util.Promise
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.ClosedChannelException
import java.util.concurrent.TimeoutException
import kotlin.coroutines.CoroutineContext

internal data class StatusWithHeaders(val statusCode: HttpStatusCode, val headers: Headers)

private data class JettyResponseChunk(val buffer: ByteBuffer, val callback: Callback)
private data class JettyResponseChunk(val buffer: ByteBuffer)

internal class JettyResponseListener(
private val request: HttpRequestData,
private val session: HTTP2ClientSession,
private val session: HTTP2Session,
private val channel: ByteWriteChannel,
private val callContext: CoroutineContext
) : Stream.Listener {
Expand All @@ -44,37 +50,46 @@ internal class JettyResponseListener(
return Ignore
}

override fun onIdleTimeout(stream: Stream, cause: Throwable): Boolean {
channel.close(cause)
return true
override fun onIdleTimeout(
stream: Stream?,
x: TimeoutException?,
promise: Promise<Boolean?>?
) {
channel.close(x)
}

override fun onReset(stream: Stream, frame: ResetFrame) {
val error = when (frame.error) {
0 -> null
ErrorCode.CANCEL_STREAM_ERROR.code -> ClosedChannelException()
else -> {
val code = ErrorCode.from(frame.error)
IOException("Connection reset ${code?.name ?: "with unknown error code ${frame.error}"}")
override fun onReset(
stream: Stream?,
frame: ResetFrame?,
callback: Callback?
) {
frame?.error?.let {
val error = when (frame.error) {
0 -> null
ErrorCode.CANCEL_STREAM_ERROR.code -> ClosedChannelException()
else -> {
val code = ErrorCode.from(frame.error)
IOException("Connection reset ${code?.name ?: "with unknown error code ${frame.error}"}")
}
}
}

error?.let { backendChannel.close(it) }
error?.let { backendChannel.close(it) }
}

onHeadersReceived.complete(null)
}

override fun onData(stream: Stream, frame: DataFrame, callback: Callback) {
val data = frame.data!!
override fun onDataAvailable(stream: Stream?) {
val streamData = stream?.readData() ?: return
val frame = streamData.frame()
try {
if (!backendChannel.trySend(JettyResponseChunk(data, callback)).isSuccess) {
if (!backendChannel.trySend(JettyResponseChunk(frame.byteBuffer)).isSuccess) {
throw IOException("backendChannel.offer() failed")
}

if (frame.isEndStream) backendChannel.close()
} catch (cause: Throwable) {
backendChannel.close(cause)
callback.failed(cause)
}
}

Expand All @@ -93,7 +108,7 @@ internal class JettyResponseListener(
}

override fun onHeaders(stream: Stream, frame: HeadersFrame) {
frame.metaData.fields.forEach { field ->
frame.metaData.httpFields.forEach { field ->
headersBuilder.append(field.name, field.value)
}

Expand All @@ -117,31 +132,23 @@ internal class JettyResponseListener(
@OptIn(DelicateCoroutinesApi::class)
private fun runResponseProcessing() = GlobalScope.launch(callContext) {
while (true) {
val (buffer, callback) = backendChannel.receiveCatching().getOrNull() ?: break
val (buffer) = backendChannel.receiveCatching().getOrNull() ?: break
try {
if (buffer.remaining() > 0) channel.writeFully(buffer)
callback.succeeded()
} catch (cause: ClosedWriteChannelException) {
callback.failed(cause)
session.endPoint.close()
break
} catch (cause: Throwable) {
callback.failed(cause)
session.endPoint.close()
throw cause
}
}
}.invokeOnCompletion { cause ->
channel.close(cause)
backendChannel.close()
GlobalScope.launch {
for ((_, callback) in backendChannel) {
callback.succeeded()
}
}
}

companion object {
private val Ignore = Stream.Listener.Adapter()
private val Ignore = object : Stream.Listener {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.ktor.http.cio
import io.ktor.http.cio.internals.*
import io.ktor.utils.io.*
import io.ktor.utils.io.bits.*
import io.ktor.utils.io.charsets.TooLongLineException
import io.ktor.utils.io.core.*
import io.ktor.utils.io.pool.*
import kotlinx.coroutines.*
Expand Down Expand Up @@ -77,8 +78,12 @@ public suspend fun decodeChunked(input: ByteReadChannel, out: ByteWriteChannel)
}

chunkSizeBuffer.clear()
if (!input.readUTF8LineTo(chunkSizeBuffer, 2)) {
throw EOFException("Invalid chunk: content block of size $chunkSize ended unexpectedly")
try {
if (!input.readUTF8LineTo(chunkSizeBuffer, 2)) {
throw EOFException("Invalid chunk: content block of size $chunkSize ended unexpectedly")
}
} catch (e: TooLongLineException) {
throw IOException("Expected CR+LF line ending but there was more content", e)
}
if (chunkSizeBuffer.isNotEmpty()) {
throw EOFException("Invalid chunk: content block should end with CR+LF")
Expand Down
1 change: 1 addition & 0 deletions ktor-io/api/ktor-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public final class io/ktor/utils/io/ByteWriteChannelOperationsKt {
public static final fun join (Lio/ktor/utils/io/ChannelJob;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun write (Lio/ktor/utils/io/ByteWriteChannel;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun write$default (Lio/ktor/utils/io/ByteWriteChannel;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun writeBuffer (Lio/ktor/utils/io/ByteWriteChannel;Lkotlinx/io/RawSource;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun writeBuffer (Lio/ktor/utils/io/ByteWriteChannel;Lkotlinx/io/RawSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun writeByte (Lio/ktor/utils/io/ByteWriteChannel;BLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun writeByteArray (Lio/ktor/utils/io/ByteWriteChannel;[BLkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
1 change: 1 addition & 0 deletions ktor-io/api/ktor-io.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/awaitFree
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/flushIfNeeded() // io.ktor.utils.io/flushIfNeeded|flushIfNeeded@io.ktor.utils.io.ByteWriteChannel(){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/write(kotlin/Int = ..., kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // io.ktor.utils.io/write|write@io.ktor.utils.io.ByteWriteChannel(kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/writeBuffer(kotlinx.io/RawSource) // io.ktor.utils.io/writeBuffer|writeBuffer@io.ktor.utils.io.ByteWriteChannel(kotlinx.io.RawSource){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/writeBuffer(kotlinx.io/RawSource, kotlin/Long) // io.ktor.utils.io/writeBuffer|writeBuffer@io.ktor.utils.io.ByteWriteChannel(kotlinx.io.RawSource;kotlin.Long){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/writeByte(kotlin/Byte) // io.ktor.utils.io/writeByte|writeByte@io.ktor.utils.io.ByteWriteChannel(kotlin.Byte){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/writeByteArray(kotlin/ByteArray) // io.ktor.utils.io/writeByteArray|writeByteArray@io.ktor.utils.io.ByteWriteChannel(kotlin.ByteArray){}[0]
final suspend fun (io.ktor.utils.io/ByteWriteChannel).io.ktor.utils.io/writeFully(kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // io.ktor.utils.io/writeFully|writeFully@io.ktor.utils.io.ByteWriteChannel(kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public suspend fun ByteWriteChannel.writeBuffer(value: RawSource) {
flushIfNeeded()
}

@OptIn(InternalAPI::class)
public suspend fun ByteWriteChannel.writeBuffer(value: RawSource, length: Long) {
writeBuffer.write(value, length)
flushIfNeeded()
}

@OptIn(InternalAPI::class)
public suspend fun ByteWriteChannel.writeStringUtf8(value: String) {
writeBuffer.writeText(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ public suspend fun ApplicationCall.respondSource(
status: HttpStatusCode? = null,
contentLength: Long? = null,
) {
respond(ChannelWriterContent({ writeBuffer(source) }, contentType, status, contentLength))
val write: suspend ByteWriteChannel.() -> Unit =
contentLength?.let { length ->
{ writeBuffer(source, length) }
} ?: { writeBuffer(source) }
respond(ChannelWriterContent(write, contentType, status, contentLength))
}

/**
Expand Down
Loading
Loading