Skip to content

Commit 9a1d9f3

Browse files
tclemCopilot
andcommitted
Add WaiterGuard RAII for Session::send_and_wait (cancel-safety)
Closes RFD-400 review finding #2 (idle_waiter slot leak on external cancellation). See cancel-safety review session db4b1ac8-... for the full report. `Session::send_and_wait` installs an `IdleWaiter` slot under `self.idle_waiter`, then awaits the response. Internal failure paths (send_inner error, oneshot closure, internal timeout) clean up the slot. External cancellation — caller wraps `send_and_wait` in `tokio::time::timeout`, races it in `select!`, or aborts the JoinHandle — does not. The cleanup at the event loop's `else` branch only fires when the channel itself closes (i.e., the entire session is going away), not when an individual call is cancelled. Effect: any external cancellation between "install waiter" and "drain response" leaves `idle_waiter = Some(...)` forever. All subsequent `send` and `send_and_wait` calls return `SendWhileWaiting`, permanently bricking the session. Fix: `WaiterGuard` RAII helper that takes the slot on Drop. Construct right after install, let RAII handle every exit path. The explicit `self.idle_waiter.lock().take()` calls inside the function disappear, and the slot is cleared on every cancellation path automatically. Mutex conversion (folded in from finding #5): - `Session::idle_waiter`: `tokio::sync::Mutex<Option<IdleWaiter>>` -> `parking_lot::Mutex<Option<IdleWaiter>>`. Lock is never held across `.await` in any code path; the conversion is what makes WaiterGuard's synchronous Drop work without needing an async-spawn fallback. `event_loop` mutex stays `tokio::sync::Mutex` for now — commit C converts it as part of cooperative shutdown (so the conversion lands alongside the matching change to Drop for Session). Tests: two new regression tests in tests/session_test.rs - `send_and_wait_outer_cancellation_clears_waiter`: outer `tokio::time::timeout(50ms, ...)` around `send_and_wait` with a 60-second inner timeout. Outer fires, dropping the future. Verify the next `send` succeeds (no SendWhileWaiting). - `send_and_wait_drop_clears_waiter`: explicit `JoinHandle::abort` of an in-flight send_and_wait. Verify the next `send` succeeds. Existing 211 tests continue to pass; total now 213. Validation: cargo test --all-features, cargo doc -D warnings, cargo +nightly fmt --check, cargo clippy -- -D warnings all clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4a46f18 commit 9a1d9f3

2 files changed

Lines changed: 169 additions & 23 deletions

File tree

rust/src/session.rs

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
33
use std::sync::Arc;
44
use std::time::Duration;
55

6+
use parking_lot::Mutex as ParkingLotMutex;
67
use serde_json::Value;
78
use tokio::sync::{Mutex, oneshot};
89
use tokio::task::JoinHandle;
@@ -42,6 +43,27 @@ struct IdleWaiter {
4243
last_assistant_message: Option<SessionEvent>,
4344
}
4445

46+
/// RAII guard that clears the [`Session::idle_waiter`] slot on drop. Used
47+
/// by [`Session::send_and_wait`] to ensure the slot doesn't leak if the
48+
/// caller's future is cancelled (outer `tokio::time::timeout` / `select!`
49+
/// / dropped JoinHandle). Synchronous clear via `parking_lot::Mutex` —
50+
/// no async drop needed.
51+
///
52+
/// Without this, an outer cancellation between "install waiter" and
53+
/// "drain channel" would leave the slot occupied, causing all subsequent
54+
/// `send` and `send_and_wait` calls on the session to return
55+
/// [`SendWhileWaiting`](SessionError::SendWhileWaiting). Closes RFD-400
56+
/// review finding #2.
57+
struct WaiterGuard {
58+
slot: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
59+
}
60+
61+
impl Drop for WaiterGuard {
62+
fn drop(&mut self) {
63+
self.slot.lock().take();
64+
}
65+
}
66+
4567
/// A session on a GitHub Copilot CLI server.
4668
///
4769
/// Created via [`Client::create_session`] or [`Client::resume_session`].
@@ -61,7 +83,12 @@ pub struct Session {
6183
client: Client,
6284
event_loop: Mutex<Option<JoinHandle<()>>>,
6385
/// Only populated while a `send_and_wait` call is in flight.
64-
idle_waiter: Arc<Mutex<Option<IdleWaiter>>>,
86+
///
87+
/// Sync `parking_lot::Mutex` because the lock is never held across an
88+
/// `.await`, and synchronous access lets the `WaiterGuard` RAII helper
89+
/// in `send_and_wait` clear the slot from a `Drop` impl on caller-side
90+
/// cancellation. See RFD-400 review (cancel-safety hardening).
91+
idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
6592
/// Capabilities negotiated with the CLI, updated on `capabilities.changed` events.
6693
capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
6794
/// Broadcast channel for runtime event subscribers — see [`Session::subscribe`].
@@ -166,7 +193,7 @@ impl Session {
166193
let _ = handle.await;
167194
}
168195
// Fail any pending send_and_wait so it returns immediately.
169-
if let Some(waiter) = self.idle_waiter.lock().await.take() {
196+
if let Some(waiter) = self.idle_waiter.lock().take() {
170197
let _ = waiter
171198
.tx
172199
.send(Err(Error::Session(SessionError::EventLoopClosed)));
@@ -187,7 +214,7 @@ impl Session {
187214
/// Returns an error if a [`send_and_wait`](Self::send_and_wait) call is
188215
/// currently in flight, since the plain send would race with the waiter.
189216
pub async fn send(&self, opts: impl Into<MessageOptions>) -> Result<String, Error> {
190-
if self.idle_waiter.lock().await.is_some() {
217+
if self.idle_waiter.lock().is_some() {
191218
return Err(Error::Session(SessionError::SendWhileWaiting));
192219
}
193220
self.send_inner(opts.into()).await
@@ -250,6 +277,14 @@ impl Session {
250277
/// Only one `send_and_wait` call may be active per session at a time.
251278
/// Calling [`send`](Self::send) while a `send_and_wait`
252279
/// is in flight will also return an error.
280+
///
281+
/// # Cancel safety
282+
///
283+
/// **Cancel-safe.** A `WaiterGuard` clears the in-flight slot on every
284+
/// exit path (success, internal failure, internal timeout, *and*
285+
/// external cancellation via `tokio::time::timeout` / `select!` /
286+
/// dropped JoinHandle). Subsequent `send` and `send_and_wait` calls on
287+
/// this session will succeed normally — the slot is never leaked.
253288
pub async fn send_and_wait(
254289
&self,
255290
opts: impl Into<MessageOptions>,
@@ -259,7 +294,7 @@ impl Session {
259294
let (tx, rx) = oneshot::channel();
260295

261296
{
262-
let mut guard = self.idle_waiter.lock().await;
297+
let mut guard = self.idle_waiter.lock();
263298
if guard.is_some() {
264299
return Err(Error::Session(SessionError::SendWhileWaiting));
265300
}
@@ -269,28 +304,26 @@ impl Session {
269304
});
270305
}
271306

272-
let result = tokio::time::timeout(timeout_duration, async {
273-
if let Err(e) = self.send_inner(opts).await {
274-
self.idle_waiter.lock().await.take();
275-
return Err(e);
276-
}
307+
// RAII: clears the idle_waiter slot on every exit path, including
308+
// external cancellation (caller's outer `select!` / `timeout` /
309+
// dropped future). Without this, an outer cancellation would leak
310+
// the slot and brick subsequent `send`/`send_and_wait` calls.
311+
let _waiter_guard = WaiterGuard {
312+
slot: self.idle_waiter.clone(),
313+
};
277314

315+
let result = tokio::time::timeout(timeout_duration, async {
316+
self.send_inner(opts).await?;
278317
match rx.await {
279318
Ok(result) => result,
280-
Err(_) => {
281-
self.idle_waiter.lock().await.take();
282-
Err(Error::Session(SessionError::EventLoopClosed))
283-
}
319+
Err(_) => Err(Error::Session(SessionError::EventLoopClosed)),
284320
}
285321
})
286322
.await;
287323

288324
match result {
289325
Ok(inner) => inner,
290-
Err(_) => {
291-
self.idle_waiter.lock().await.take();
292-
Err(Error::Session(SessionError::Timeout(timeout_duration)))
293-
}
326+
Err(_) => Err(Error::Session(SessionError::Timeout(timeout_duration))),
294327
}
295328
}
296329

@@ -751,7 +784,7 @@ impl Client {
751784
));
752785
let channels = self.register_session(&session_id);
753786

754-
let idle_waiter = Arc::new(Mutex::new(None));
787+
let idle_waiter = Arc::new(ParkingLotMutex::new(None));
755788
let (event_tx, _) = tokio::sync::broadcast::channel(512);
756789
let event_loop = spawn_event_loop(
757790
session_id.clone(),
@@ -851,7 +884,7 @@ impl Client {
851884
));
852885
let channels = self.register_session(&cli_session_id);
853886

854-
let idle_waiter = Arc::new(Mutex::new(None));
887+
let idle_waiter = Arc::new(ParkingLotMutex::new(None));
855888
let (event_tx, _) = tokio::sync::broadcast::channel(512);
856889
let event_loop = spawn_event_loop(
857890
cli_session_id.clone(),
@@ -905,7 +938,7 @@ fn spawn_event_loop(
905938
command_handlers: Arc<CommandHandlerMap>,
906939
session_fs_provider: Option<Arc<dyn SessionFsProvider>>,
907940
channels: crate::router::SessionChannels,
908-
idle_waiter: Arc<Mutex<Option<IdleWaiter>>>,
941+
idle_waiter: Arc<ParkingLotMutex<Option<IdleWaiter>>>,
909942
capabilities: Arc<parking_lot::RwLock<SessionCapabilities>>,
910943
event_tx: tokio::sync::broadcast::Sender<SessionEvent>,
911944
) -> JoinHandle<()> {
@@ -933,7 +966,7 @@ fn spawn_event_loop(
933966
}
934967
}
935968
// Channels closed — fail any pending send_and_wait.
936-
if let Some(waiter) = idle_waiter.lock().await.take() {
969+
if let Some(waiter) = idle_waiter.lock().take() {
937970
let _ = waiter
938971
.tx
939972
.send(Err(Error::Session(SessionError::EventLoopClosed)));
@@ -1022,7 +1055,7 @@ async fn handle_notification(
10221055
handler: &Arc<dyn SessionHandler>,
10231056
command_handlers: &Arc<CommandHandlerMap>,
10241057
notification: SessionEventNotification,
1025-
idle_waiter: &Arc<Mutex<Option<IdleWaiter>>>,
1058+
idle_waiter: &Arc<ParkingLotMutex<Option<IdleWaiter>>>,
10261059
capabilities: &Arc<parking_lot::RwLock<SessionCapabilities>>,
10271060
event_tx: &tokio::sync::broadcast::Sender<SessionEvent>,
10281061
) {
@@ -1035,7 +1068,7 @@ async fn handle_notification(
10351068
SessionEventType::AssistantMessage
10361069
| SessionEventType::SessionIdle
10371070
| SessionEventType::SessionError => {
1038-
let mut guard = idle_waiter.lock().await;
1071+
let mut guard = idle_waiter.lock();
10391072
if let Some(waiter) = guard.as_mut() {
10401073
match event_type {
10411074
SessionEventType::AssistantMessage => {

rust/tests/session_test.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,119 @@ async fn send_and_wait_times_out() {
17731773
));
17741774
}
17751775

1776+
/// Cancel-safety regression: an outer `tokio::time::timeout` around
1777+
/// `send_and_wait` must NOT leak the `idle_waiter` slot. After the outer
1778+
/// timeout fires and drops the future, subsequent `send` and
1779+
/// `send_and_wait` calls must succeed without `SendWhileWaiting`.
1780+
///
1781+
/// Closes RFD-400 review finding #2.
1782+
#[tokio::test]
1783+
async fn send_and_wait_outer_cancellation_clears_waiter() {
1784+
let (session, mut server) = create_session_pair(Arc::new(NoopHandler)).await;
1785+
let session = Arc::new(session);
1786+
1787+
// First call: wrap in outer timeout much shorter than the inner
1788+
// wait_timeout. The outer timeout expires, dropping the
1789+
// send_and_wait future before the idle/error event arrives.
1790+
let handle = tokio::spawn({
1791+
let session = session.clone();
1792+
async move {
1793+
tokio::time::timeout(
1794+
Duration::from_millis(50),
1795+
session.send_and_wait(
1796+
MessageOptions::new("first").with_wait_timeout(Duration::from_secs(60)),
1797+
),
1798+
)
1799+
.await
1800+
}
1801+
});
1802+
1803+
let request = server.read_request().await;
1804+
server.respond(&request, serde_json::json!({})).await;
1805+
1806+
// Outer timeout fires → Err(Elapsed) returned, future is dropped.
1807+
let outer_result = timeout(Duration::from_secs(2), handle)
1808+
.await
1809+
.unwrap()
1810+
.unwrap();
1811+
assert!(outer_result.is_err(), "outer timeout should have elapsed");
1812+
1813+
// The WaiterGuard's Drop should have cleared the slot. A subsequent
1814+
// `send` must NOT return SendWhileWaiting.
1815+
let send_handle = tokio::spawn({
1816+
let session = session.clone();
1817+
async move { session.send("second").await }
1818+
});
1819+
1820+
let request = server.read_request().await;
1821+
assert_eq!(request["method"], "session.send");
1822+
assert_eq!(request["params"]["prompt"], "second");
1823+
server
1824+
.respond(
1825+
&request,
1826+
serde_json::json!({ "messageId": "msg-after-cancel" }),
1827+
)
1828+
.await;
1829+
1830+
let result = timeout(TIMEOUT, send_handle).await.unwrap().unwrap();
1831+
assert_eq!(result.unwrap(), "msg-after-cancel");
1832+
}
1833+
1834+
/// Cancel-safety regression: explicitly dropping the JoinHandle of an
1835+
/// in-flight `send_and_wait` must clear the waiter slot via WaiterGuard's
1836+
/// Drop. The next `send` must succeed.
1837+
///
1838+
/// Closes RFD-400 review finding #2.
1839+
#[tokio::test]
1840+
async fn send_and_wait_drop_clears_waiter() {
1841+
let (session, mut server) = create_session_pair(Arc::new(NoopHandler)).await;
1842+
let session = Arc::new(session);
1843+
1844+
// Start a send_and_wait, let it install the waiter, then abort the
1845+
// task before any idle/error event arrives.
1846+
let handle = tokio::spawn({
1847+
let session = session.clone();
1848+
async move {
1849+
session
1850+
.send_and_wait(
1851+
MessageOptions::new("aborted").with_wait_timeout(Duration::from_secs(60)),
1852+
)
1853+
.await
1854+
}
1855+
});
1856+
1857+
// Drain the session.send RPC so we know the waiter is installed.
1858+
let request = server.read_request().await;
1859+
server.respond(&request, serde_json::json!({})).await;
1860+
1861+
// Now abort the in-flight send_and_wait. The WaiterGuard drops as
1862+
// the future unwinds, clearing the slot.
1863+
handle.abort();
1864+
let _ = handle.await;
1865+
1866+
// Give the runtime a moment to run the drop.
1867+
tokio::task::yield_now().await;
1868+
1869+
// Next `send` must succeed — no SendWhileWaiting.
1870+
let send_handle = tokio::spawn({
1871+
let session = session.clone();
1872+
async move { session.send("after-abort").await }
1873+
});
1874+
1875+
let request = server.read_request().await;
1876+
assert_eq!(request["method"], "session.send");
1877+
assert_eq!(request["params"]["prompt"], "after-abort");
1878+
server
1879+
.respond(
1880+
&request,
1881+
serde_json::json!({ "messageId": "msg-after-abort" }),
1882+
)
1883+
.await;
1884+
1885+
let result = timeout(TIMEOUT, send_handle).await.unwrap().unwrap();
1886+
assert_eq!(result.unwrap(), "msg-after-abort");
1887+
}
1888+
17761889
#[tokio::test]
17771890
async fn elicitation_requested_dispatches_to_handler_and_responds() {
17781891
use github_copilot_sdk::types::ElicitationResult;

0 commit comments

Comments
 (0)