Rust의 커스텀 퓨처를 사용한 폴링 이해하기
Takashi Yamamoto
Infrastructure Engineer · Leapcell

소개
네트워킹, I/O 바운드 작업, 고 동시성 시스템과 같은 분야에서 고성능 및 반응형 애플리케이션을 구축하기 위해 비동기 프로그래밍은 필수적인 패러다임이 되었습니다. Rust는 강력한 타입 시스템과 소유권 모델을 통해 Future 트레잇을 기반으로 강력하고 안전한 비동기 프로그래밍 방식을 제공합니다. async/await 구문을 통해 퓨처와 상호작용하는 경우가 많지만, 이러한 추상화가 내부적으로 어떻게 작동하는지 진정으로 이해하는 것은 디버깅, 최적화, 심지어는 커스텀 비동기 컴포넌트 설계에도 중요합니다. 커스텀 Future를 작성하는 이 심층 분석은 폴링 메커니즘을 명확히 밝히고, 비동기 작업과 익스큐터 간의 근본적인 상호 작용을 보여주어 궁극적으로 Rust의 비동기 기능을 더 큰 자신감과 정밀도로 활용할 수 있도록 할 것입니다.
비동기 실행의 핵심: 폴링
커스텀 퓨처를 구성하기 전에 관련 핵심 개념에 대한 명확한 이해를 확립해 봅시다:
- Future 트레잇: Rust에서
Future는 아직 완료되었을 수도 있고 그렇지 않을 수도 있는 비동기 계산을 나타내는 트레잇입니다. 실행기가 퓨처의 진행 상황을 확인하기 위해 반복적으로 호출하는poll이라는 단일 메서드를 가지고 있습니다. - 익스큐터 (Executor): 익스큐터는
Future를 가져와poll메서드를 반복적으로 호출하여 완료될 때까지 진행시키는 역할을 합니다. 퓨처의 생명 주기를 관리하고, 작업을 예약하며, 작업이 진행될 준비가 되었을 때 작업을 깨우는 것을 처리합니다. 인기 있는 익스큐터로는tokio와async-std가 있습니다. - 폴링 (Polling): 미완료된
Future에 대해 익스큐터가poll메서드를 호출하는 행위입니다.poll이 호출되면 퓨처는 진행을 시도합니다. Poll열거형:poll메서드는Poll열거형을 반환하며, 두 가지 변형이 있습니다:Poll::Ready(T): 퓨처가 성공적으로 완료되었음을 나타내며,T는 계산의 결과입니다.Poll::Pending: 퓨처가 아직 완료되지 않았음을 나타냅니다.Pending이 반환될 때, 퓨처는 진행할 준비가 되었을 때 (Waker를 통해) 깨우도록 배열되어야 합니다.
Waker:Waker는 익스큐터가 제공하는 저수준 메커니즘으로, 퓨처가 다시 폴링될 준비가 되었음을 신호할 수 있도록 합니다. 퓨처가Poll::Pending을 반환할 때,Context에서Waker를 캡처하고 복제합니다. 나중에, 소켓에 데이터가 도착하거나 타이머가 만료되는 등 퓨처를 차단 해제할 준비가 된 이벤트가 발생하면, 퓨처는waker.wake_by_ref()를 호출하여 익스큐터에 다시 폴링하도록 알립니다.Context:poll메서드에 전달되는Context는Waker와 퓨처가 익스큐터와 상호 작용하는 데 유용한 기타 정보를 포함합니다.
커스텀 퓨처 구축: 간단한 지연
비차단 지연을 도입하는 커스텀 Future를 만들어 보겠습니다. 이를 통해 폴링 메커니즘을 직접 관찰할 수 있습니다.
마감일 (완료될 시점)과 깨우기 위한 선택적 Waker를 보유하는 Delay 구조체를 정의할 것입니다.
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; // 우리의 지연 퓨처의 상태를 나타냅니다. struct Delay { deadline: Instant, // 마감일이 지나면 퓨처를 깨우기 위해 Waker를 저장해야 합니다. // Arc<Mutex<Option<Waker>>>를 사용하면 스레드 간에 Waker를 공유하고 안전하게 수정할 수 있습니다. waker_storage: Arc<Mutex<Option<Waker>>>, // 타이머 스레드를 한 번만 스폰했는지 확인하는 플래그입니다. timer_thread_spawned: bool, } impl Delay { fn new(duration: Duration) -> Self { Delay { deadline: Instant::now() + duration, waker_storage: Arc::new(Mutex::new(None)), timer_thread_spawned: false, } } } // 우리의 Delay 구조체에 대한 Future 트레잇을 구현합니다. impl Future for Delay { // 우리 퓨처의 출력 타입은 단순히 지연을 나타내므로 unit입니다. type Output = (); // 퓨처의 핵심: poll 메서드 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // 마감일이 이미 지났으면 퓨처는 준비된 상태입니다. if Instant::now() >= self.deadline { println!("Delay future: Deadline reached. Returning Ready."); return Poll::Ready(()); } // --- Waker 저장 및 타이머 설정 (한 번만) --- // 타이머 스레드가 아직 스폰되지 않았다면 설정합니다. if !self.timer_thread_spawned { println!("Delay future: First poll. Storing waker and spawning timer thread."); // 컨텍스트에서 현재 Waker를 저장합니다. // 이 Waker는 타이머 스레드에서 이 작업을 깨우는 데 사용될 것입니다. let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone()); drop(waker_guard); // 잠금 해제 // Arc를 복제하여 새 스레드에 전달합니다. let waker_storage_clone = self.waker_storage.clone(); let duration_until_deadline = self.deadline.duration_since(Instant::now()); // 마감일 때까지 잠자고 그 후 원본 작업을 깨울 새 스레드를 스폰합니다. thread::spawn(move || { thread::sleep(duration_until_deadline); println!("Delay timer thread: Deadline passed. Waking up the task."); // Waker를 검색하고 작업을 깨웁니다. if let Some(waker) = waker_storage_clone.lock().unwrap().take() { waker.wake(); } }); // 타임아웃 스레드가 스폰되었음을 표시하여 재스폰을 방지합니다. self.timer_thread_spawned = true; } else { // 타이머 스레드가 이미 실행 중인 경우 후속 폴을 처리합니다. // 익스큐터가 작업을 이동하거나 다시 스케줄링하기로 결정하면 Waker를 업데이트하는 것이 중요합니다. // Waker가 업데이트되지 않으면 이전 Waker가 오래되어 작업이 깨우지 못하게 될 수 있습니다. let mut waker_guard = self.waker_storage.lock().unwrap(); if waker_guard.as_ref().map_or(true, |w| !w.will_wake(cx.waker())) { println!("Delay future: Waker changed. Updating."); *waker_guard = Some(cx.waker().clone()); } } // 마감일이 아직 지나지 않았다면 퓨처는 보류 중입니다. // 타이머 스레드가 `waker.wake()`를 호출하면 다시 폴링됩니다. println!("Delay future: Deadline not yet reached. Returning Pending."); Poll::Pending } } #[tokio::main] async fn main() { println!("Main: Starting program."); let delay_future = Delay::new(Duration::from_secs(2)); // 2초 지연 생성 println!("Main: Awaiting delay future..."); delay_future.await; // 커스텀 퓨처를 기다립니다. println!("Main: Delay completed. Program finished."); }
Delay 퓨처 설명:
-
struct Delay:deadline: 지연이 완료되어야 하는 시점(Instant)을 나타냅니다.waker_storage:Arc<Mutex<Option<Waker>>>는 필수적입니다.Waker는 퓨처 (self.waker_storage를 소유)와wake를 호출할 별도의thread::spawn간에 공유되어야 합니다.Arc는 공유 소유권을 가능하게 하고Mutex는Waker를 저장하고 검색하기 위한 안전한 내부 가변성을 제공합니다.Option은Waker가 저장되기 전에 첫 번째poll에서 사용 가능하지 않을 수 있기 때문에 사용됩니다.timer_thread_spawned: 동일한Delay퓨처에 대해 타이머 스레드를 한 번만 생성하도록 하는 간단한 불리언 플래그입니다.
-
impl Future for Delay:type Output = ();: 우리의 지연 퓨처는 단순히 완료되며 의미 있는 값은 생성하지 않습니다.poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>: 이것이 핵심입니다.if Instant::now() >= self.deadline: 모든 폴링마다 마감일이 지났는지 확인합니다. 그렇다면Ready이며Poll::Ready(())를 반환합니다.if !self.timer_thread_spawned: 이 조건 블록은 실제 타이머 (thread::spawn부분)가 한 번만 설정되도록 보장합니다.let mut waker_guard = self.waker_storage.lock().unwrap(); *waker_guard = Some(cx.waker().clone());:waker_storage에 대한 잠금을 획득하고, 현재Context에서Waker를 복제하여 저장합니다. 이Waker는 현재 폴링 중인 이 특정 작업을 가리킵니다.thread::spawn(...): 표준 Rust 스레드를 시작합니다. 이 스레드는 남은 시간 동안sleep()할 것입니다. 이것은 헬퍼 스레드 관점에서는 블로킹sleep이지만, 별도의 OS 스레드에 있기 때문에 익스큐터 스레드를 차단하지 않습니다.- 스폰된 스레드 내에서, 잠이 끝난 후 저장된
Waker를 검색하고waker.wake()를 호출합니다. 이wake()호출은 비동기 런타임 (우리main의 Tokio)에 이Waker와 관련된 작업이 다시 폴링될 준비가 되었음을 알립니다. self.timer_thread_spawned = true;: 타이머 스레드 복제를 방지하기 위해 플래그를 true로 설정합니다.
else { ... }: 타이머 스레드가 이미 스폰된 경우 (즉, 이미 보류 중인 퓨처에 대한 후속 폴드),Context의Waker가 변경되었는지 (!w.will_wake(cx.waker())) 여전히 확인해야 합니다. 변경되었다면 저장된Waker를 업데이트합니다. 이는 익스큐터가 작업을 이동하거나 다시 예약할 수 있기 때문에, 작업을 올바르게 알리기 위해 새로운Waker가 필요한 경우 중요합니다.Poll::Pending: 마감일이 지나지 않았고 타이머가 설정되어 있다면 퓨처는 여전히 기다리는 중입니다.Poll::Pending을 반환합니다. 익스큐터는waker.wake()가 호출될 때까지 이 퓨처를 폴링하는 것을 중지합니다.
tokio::main 및 await와 함께 작동하는 방식:
Delay::new(Duration::from_secs(2)):Delay인스턴스가 생성됩니다.delay_future.await: 여기서 마법이 일어납니다.- Tokio의 익스큐터가
delay_future를 받습니다. - 첫 번째 폴: 익스큐터가
delay_future.poll(cx)를 호출합니다.- 마감일이 충족되지 않습니다.
timer_thread_spawned는false입니다.cx의Waker가 복제되어delay_future.waker_storage에 저장됩니다.- 새로운
thread::spawn이 생성됩니다. 이 스레드는 2초 동안 잠자기 시작합니다. timer_thread_spawned는true로 설정됩니다.poll은Poll::Pending을 반환합니다.
Poll::Pending후 익스큐터의 행동: 익스큐터는 이제delay_future가 준비되지 않았음을 압니다. 작업을 제쳐두고 다른 준비된 작업을 폴링하거나waker.wake()호출을 기다립니다. 중요한 것은, Tokio 런타임 스레드는thread::spawn의thread::sleep에 의해 차단되지 않는다는 것입니다.- 2초 후:
thread::spawn이thread::sleep을 완료합니다.- 저장된
Waker를 검색하고waker.wake()를 호출합니다.
- 저장된
waker.wake()후 익스큐터의 행동: 익스큐터는delay_future와 관련된 작업에 대한 깨우기 신호를 받습니다.delay_future를 다시 폴링하도록 예약합니다.- 두 번째 (또는 이후) 폴: 익스큐터가
delay_future.poll(cx)를 다시 호출합니다.- 이제
Instant::now() >= self.deadline이 참입니다. poll은Poll::Ready(())를 반환합니다.
- 이제
- 완료:
delay_future.await표현식이 마침내 완료되고main함수가 계속 진행됩니다.
- Tokio의 익스큐터가
결론
커스텀 Delay 퓨처를 구현함으로써 Rust의 비동기 폴링 메커니즘에 대한 실질적인 이해를 얻었습니다. 익스큐터가 Future::poll을 반복적으로 호출하는 방법, Poll::Pending이 완료되지 않은 상태를 신호하는 방법, 그리고 결정적으로 Waker가 진행될 수 있을 때 익스큐터에 다시 폴링을 재개하도록 신호하는 퓨처를 허용하는 다리가 되는 방법을 확인했습니다. Waker를 통한 Future와 Executor 간의 이러한 명시적인 상호 작용은 Rust의 효율적이고 비차단적인 비동기 프로그래밍의 기반이며, 블로킹 스레드의 오버헤드 없이 고성능 및 확장 가능한 애플리케이션을 가능하게 합니다. 커스텀 Future 구현을 마스터하는 것은 Rust의 강력한 비동기 생태계에 대한 더 깊은 통찰력을 열어주는 고급 기술입니다.