Skip to content

Conversation

@p1gp1g
Copy link

@p1gp1g p1gp1g commented Oct 31, 2024

This PR adds the possibility to timeout the SSE connection. It is now possible to fail as soon as an event-free timeout is not reached, making it easier to detect a lost connection.

For a real world example: I've an app connected to a server. It sends a ping at a keepalive interval. I've a job checking from time to time that everything works as expected (using a worker, so the period is 15min, the minimum). It happens that the connection is lost at the beginning of this interval, therefore the users are disconnected for several minutes.

With this PR we can directly catch a lost connection: https://codeberg.org/NextPush/nextpush-android/commit/f6edb178ddee8255e9de6e548ea75f5d3eb6b32a

PS: I've also added some comments, but I can remove this commit from the branch

/**
* Seconds elapsed between 2 events until connection failed. Doesn't timeout if null
*/
open var timeout: Long? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Call, I think there is a convention on timeout names. Such as readTimeoutMillis. So perhaps receiveTimeoutMillis?

I wonder if we should prefer Kotlin Duration here also?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally we use callTimeout in RealEventSource to track establishment, and it's obviously an existing top level think in OkHttp which extends to the end of a request.

read feel to I/O socket focused.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps idleTimeoutMillis?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Do you want me to update the PR or are you fine doing it ?

Copy link

@yuanjunli yuanjunli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result: ### 改动目的
此次PR的主要目的是为EventSourceListener类添加一个超时机制,允许用户设置两个事件之间的最大时间间隔。如果超过这个时间间隔没有收到新的事件,连接将被视为失败。此外,PR还对代码中的一些注释进行了完善,增加了对onEventonClosed方法的描述,使其更清晰易懂。

发现问题

  1. 超时机制的实现可能不够完善:在RealEventSource类中,超时机制的实现依赖于AsyncTimeout,并且只在onResponseprocessNextEvent中更新超时时间。如果事件处理逻辑复杂或耗时较长,可能会导致超时机制失效。
  2. 超时时间的单位不明确:在EventSourceListener中,timeout属性的单位是Long,但没有明确说明是秒还是毫秒,可能会导致使用时的混淆。
  3. 超时机制的取消逻辑可能存在问题:在onResponse中,如果listener.timeoutnull,则会调用call?.timeout()?.cancel()取消超时。然而,如果timeout在后续操作中被设置为非null值,可能会导致超时机制无法正常工作。

优化建议

  1. 明确超时时间的单位:建议在EventSourceListenertimeout属性的注释中明确说明单位是秒,或者在代码中使用Duration类型来避免混淆。
  2. 优化超时机制的实现:建议在RealEventSource类中增加对超时机制的全面检查,确保在所有可能的事件处理路径中都正确更新超时时间。可以考虑在processNextEvent中增加超时检查的逻辑,确保即使事件处理耗时较长,超时机制也能正常工作。
  3. 改进超时取消逻辑:建议在onResponse中增加对timeout属性的检查,确保在timeoutnull时正确取消超时,并在后续操作中正确处理timeout的变化。可以考虑在timeout属性发生变化时重新设置超时时间。

通过这些优化,可以确保超时机制的可靠性和代码的可维护性。

@yschimke
Copy link
Collaborator

yschimke commented Aug 8, 2025

Translation:

Purpose of the Changes
The primary purpose of this pull request (PR) is to add a timeout mechanism to the EventSourceListener class. This allows users to set a maximum time interval between two events. If no new events are received within this interval, the connection will be considered failed. Additionally, the PR refines some code comments, adding descriptions for the onEvent and onClosed methods to make them clearer and easier to understand.

Issues Found
Timeout implementation may be incomplete: The timeout mechanism in the RealEventSource class relies on AsyncTimeout and only updates the timeout duration in onResponse and processNextEvent. If the event-handling logic is complex or time-consuming, it could cause the timeout mechanism to fail.

Ambiguous timeout unit: The timeout property in EventSourceListener is a Long type, but the unit (seconds or milliseconds) is not explicitly stated. This could lead to confusion when using it.

Potential issue with timeout cancellation logic: In onResponse, if listener.timeout is null, call?.timeout()?.cancel() is called to cancel the timeout. However, if timeout is later set to a non-null value, it might prevent the timeout mechanism from working correctly.

Optimization Suggestions
Clarify the timeout unit: It's recommended to explicitly state the unit (e.g., seconds) in the comments for the timeout property in EventSourceListener or to use a Duration type in the code to avoid confusion.

Improve the timeout mechanism implementation: It's suggested to add a comprehensive check of the timeout mechanism within the RealEventSource class. This would ensure the timeout duration is correctly updated in all possible event-handling paths. Consider adding timeout check logic to processNextEvent to ensure the mechanism works correctly even when event processing takes a long time.

Improve timeout cancellation logic: It's recommended to add a check for the timeout property in onResponse to ensure that the timeout is correctly canceled when timeout is null and that changes to the timeout property are handled correctly in subsequent operations. You might consider resetting the timeout duration whenever the timeout property changes.

These optimizations will ensure the reliability of the timeout mechanism and the maintainability of the code.

@p1gp1g
Copy link
Author

p1gp1g commented Sep 26, 2025

I've renamed idleTimeoutMillis and rebased the patch

@yschimke
Copy link
Collaborator

This looks wrong as seconds. No tests which catch this.

@yschimke
Copy link
Collaborator

I'm a bit torn on whether this should or needs to be built in. I'm curious if a listener externally can implement a timeout. I'll try this locally.

It doesn't seem like a common pattern for python or js clients. So I'm not sure how common this need is.

Overall, it would need good tests to land, but also would like @swankjesse to bless the API

@p1gp1g
Copy link
Author

p1gp1g commented Sep 27, 2025

This looks wrong as seconds. No tests which catch this.

My bad, I'm pushing a fix

@p1gp1g
Copy link
Author

p1gp1g commented Sep 27, 2025

For comparison, this is this function in the rust sse_client crate: https://github.com/viniciusgerevini/sse-client/blob/master/src/tls.rs#L23

It makes sense to have this feature with SSE:

  • as SSE doesn't specify how the connection is kept
  • it's up to the server only to send pings, and to choose how often it sends them
  • the server can change the timeout
  • And the most important: when a client rely on SSE, it needs to know as soon as possible when the connection is broken

Then, it may be possible to rely on an external solution, let me know if you find something - I'll continue using my fork for now

@yschimke
Copy link
Collaborator

Thanks, that's useful context in the rust api

@yschimke
Copy link
Collaborator

@p1gp1g what's the difference with a read timeout here?

#9088

@p1gp1g
Copy link
Author

p1gp1g commented Sep 29, 2025

The difference is that the client don't have to restart the connection when the server sends the keepalive value.

Depending on the application, this may be the first thing the server sends to the client. In this case, with the readTimeout, all clients would have to restart at least once, during the first connection.

But this timeout is not supposed to change very often. So maybe that's fine having a less-efficient solution, where readTimeout is set during client build.

Example with the timeout set by the server:

  @Test
  fun readTimeoutAppliesOnceConnected() {
    client =
      client
        .newBuilder()
        .callTimeout(250, TimeUnit.MILLISECONDS)
        .readTimeout(500, TimeUnit.MILLISECONDS)
        .build()

    val body = object : MockResponseBody {
      override val contentLength: Long
        get() = -1

      override fun writeTo(sink: BufferedSink) {
        sink.writeUtf8("event: timeout\n")
        sink.writeUtf8("data: 100\n\n")
        sink.flush()
        repeat(10) {
          sink.writeUtf8("data: hey $it\n\n")
          sink.flush()
          if (it == 4) {
            sink.writeUtf8("event: timeout\n")
            sink.writeUtf8("data: 1000\n\n")
            sink.flush()
          }
          if (it < 4) {
            Thread.sleep(100)
          } else {
            Thread.sleep(1000)
          }
        }
      }
    }

    server.enqueue(
      MockResponse
        .Builder()
        .setHeader("content-type", "text/event-stream")
        .body(body)
        .build(),
    )
    val source = newEventSource()
    assertThat(source.request().url.encodedPath).isEqualTo("/")
    listener.assertOpen()
    listener.assertEvent(null, null, "100")
    listener.assertEvent(null, null, "hey 0")
    listener.assertEvent(null, null, "hey 1")
    listener.assertEvent(null, null, "hey 2")
    listener.assertEvent(null, null, "hey 3")
    listener.assertEvent(null, null, "hey 4")
    listener.assertEvent(null, null, "1000")
    listener.assertEvent(null, null, "hey 5")
    listener.assertEvent(null, null, "hey 6")
    // ...
  }

/**
* Milliseconds elapsed between 2 events until connection failed. Doesn't timeout if null
*/
open var idleTimeoutMillis: Long? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the rest of the library, could you make this non-nullable, using a value of 0 for no timeout?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think the listener is the right place for this property.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead it aught to be something like this

interface EventSource {
  ...

  fun timeout(): Timeout = Timeout.NONE
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean with a setter? We need to be able to change the EventSource timeout

interface EventSource {
  ...
  fun timeout(timeout: Timeout)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Timeout object is mutable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants