Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package okhttp3.sse
import okhttp3.Response

abstract class EventSourceListener {
/**
* Milliseconds elapsed between 2 events until connection failed. Doesn't timeout if null
*/
open var idleTimeoutMillis: Long? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the rest of the library, could you make this non-nullable, using a value of 0 for no timeout?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the listener is the right place for this property.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead it aught to be something like this

interface EventSource {
  ...

  fun timeout(): Timeout = Timeout.NONE
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean with a setter? We need to be able to change the EventSource timeout

interface EventSource {
  ...
  fun timeout(timeout: Timeout)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Timeout object is mutable.


/**
* Invoked when an event source has been accepted by the remote peer and may begin transmitting
* events.
Expand Down
29 changes: 27 additions & 2 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package okhttp3.sse.internal

import java.io.IOException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Request
Expand All @@ -24,6 +26,8 @@ import okhttp3.ResponseBody
import okhttp3.internal.stripBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okio.AsyncTimeout
import okio.Timeout.Companion.timeout

internal class RealEventSource private constructor(
private val call: Call?,
Expand All @@ -38,6 +42,20 @@ internal class RealEventSource private constructor(

@Volatile private var canceled = false

private fun updateTimeout(call: Call?, duration: Duration) {
val timeout = call?.timeout()
if (timeout is AsyncTimeout) {
timeout.apply {
// If a timeout is in process, we exit it before entering again
if (this.timeoutNanos() > 0L) {
exit()
}
timeout(duration)
enter()
}
}
}

override fun onResponse(
call: Call,
response: Response,
Expand All @@ -63,8 +81,11 @@ internal class RealEventSource private constructor(
return
}

// This is a long-lived response. Cancel full-call timeouts.
call?.timeout()?.cancel()
// This is a long-lived response. Cancel full-call timeouts if no timeout has been set
listener.idleTimeoutMillis?.let {
// We spend at most timeout seconds if set
updateTimeout(call, it.milliseconds)
} ?: call?.timeout()?.cancel()

// Replace the body with a stripped one so the callbacks can't see real data.
val response = response.stripBody()
Expand All @@ -74,6 +95,10 @@ internal class RealEventSource private constructor(
if (!canceled) {
listener.onOpen(this, response)
while (!canceled && reader.processNextEvent()) {
listener.idleTimeoutMillis?.let {
// We spend at most timeout seconds if set
updateTimeout(call, it.milliseconds)
}
}
}
} catch (e: Exception) {
Expand Down
Loading