Go 동시성 패턴 - 프로듀서-소비자, 팬아웃/팬인, 파이프라인 심층 분석
Wenhao Wang
Dev Intern · Leapcell

Go의 내장 동시성 기능, 주로 고루틴과 채널은 고도로 동시적이고 병렬적인 애플리케이션을 작성할 수 있는 강력하면서도 우아한 방법을 제공합니다. 전통적인 스레드 기반 모델과 달리 Go의 접근 방식은 동시성 프로그래밍을 단순화하여 데드락 및 경쟁 상태와 같은 일반적인 함정에 빠질 가능성을 줄입니다. 이 문서는 Go의 몇 가지 핵심 동시성 패턴인 프로듀서-소비자, 팬아웃/팬인, 파이프라인을 심층적으로 살펴보고 실용적인 예제를 통해 구현과 이점을 설명합니다.
기초: 고루틴 및 채널
패턴을 자세히 살펴보기 전에 빌딩 블록을 간략하게 복습해 보겠습니다.
- 고루틴: 가볍고 독립적으로 실행되는 함수입니다. OS 스레드 수가 적게 다중화되어 매우 효율적입니다. 함수 호출 앞에 go를 붙여 고루틴을 시작합니다:go myFunction().
- 채널: 고루틴이 통신하고 동기화할 수 있는 타입화된 통로입니다. 메모리를 공유하여 통신하는 'Go 방식'이며, 메모리를 공유하지 않고 통신하는 방식보다 낫습니다. 동시 컴포넌트를 연결하는 파이프라고 생각하면 됩니다. make(chan Type)대신 채널을 만들고,ch <- value로 보내고,value := <-ch로 받습니다. 채널은 버퍼링(make(chan Type, capacity)) 또는 버퍼링되지 않은(make(chan Type)) 상태일 수 있습니다.
패턴 1: 프로듀서-소비자
프로듀서-소비자 패턴은 하나 이상의 '프로듀서'가 데이터를 생성하여 공유 버퍼에 넣고, 하나 이상의 '소비자'가 버퍼에서 데이터를 가져와 처리하는 클래식 동시성 디자인입니다. Go에서는 채널이 자연스럽게 이 공유 버퍼 역할을 합니다.
사용하는 이유
- 디커플링: 프로듀서는 데이터가 어떻게 소비되는지 알 필요가 없고, 소비자는 데이터가 어떻게 생산되는지 알 필요가 없습니다.
- 로드 스무딩: 프로듀서가 불규칙한 속도로 데이터를 생성하는 경우 버퍼가 소비자에게 흐름을 부드럽게 만들 수 있습니다.
- 동시성: 프로듀서와 소비자는 동시적으로 작동하여 전반적인 처리 속도를 높일 수 있습니다.
예제: 바운드 버퍼를 사용한 파일 처리
대용량 파일에서 줄을 읽고(프로듀서) 각 줄을 처리(소비자)해야 하는 시나리오를 상상해 봅시다.
package main import ( "bufio" "fmt" "os" "strconv" "strings" "sync" "time" ) // LineProducer는 파일에서 줄을 읽어 채널로 보냅니다. func LineProducer(filePath string, lines chan<- string, wg *sync.WaitGroup) { defer wg.Done() file, err := os.Open(filePath) if err != nil { fmt.Printf("Error opening file: %v\n", err) close(lines) // 오류 발생 시 채널이 닫히도록 보장 return } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { lines <- scanner.Text() // 줄을 채널로 보냅니다. } if err := scanner.Err(); err != nil { fmt.Printf("Error reading file: %v\n", err) } close(lines) // 중요: 더 이상 데이터가 없음을 알리기 위해 채널을 닫습니다. } // LineConsumer는 채널에서 받은 줄을 처리합니다. func LineConsumer(id int, lines <-chan string, processedCount *int64, wg *sync.WaitGroup) { defer wg.Done() for line := range lines { // CPU 집약적인 처리 시뮬레이션 time.Sleep(time.Millisecond * 10) num, err := strconv.Atoi(strings.TrimSpace(line)) if err == nil { // fmt.Printf("Consumer %d processed: %d (squared: %d)\n", id, num, num*num) } else { // fmt.Printf("Consumer %d skipped non-integer line: %s\n", id, line) } // 처리된 카운트를 안전하게 증가시키려면 뮤텍스 또는 원자적 연산을 사용합니다. // 편의를 위해 main에서 atomic.AddInt64를 사용합니다. } fmt.Printf("Consumer %d finished.\n", id) } func main() { const ( numConsumers = 5 bufferSize = 100 // 프로듀서/소비자 속도를 부드럽게 하기 위한 버퍼링된 채널 filePath = "data.txt" ) // 데모를 위해 더미 data.txt 생성 createDummyFile(filePath, 1000) linesChannel := make(chan string, bufferSize) // 버퍼링된 채널 var wg sync.WaitGroup var processed int64 // 실제 앱에서는 공유 카운터에 원자적 사용 // 프로듀서 시작 wg.Add(1) go LineProducer(filePath, linesChannel, &wg) // 소비자 시작 for i := 0; i < numConsumers; i++ { wg.Add(1) go LineConsumer(i+1, linesChannel, &processed, &wg) } // 모든 고루틴이 완료될 때까지 기다립니다. wg.Wait() fmt.Printf("All producers and consumers finished.\n") } // 더미 파일을 생성하는 헬퍼 함수 func createDummyFile(filePath string, numLines int) { file, err := os.Create(filePath) if err != nil { panic(err) } defer file.Close() writer := bufio.NewWriter(file) for i := 0; i < numLines; i++ { fmt.Fprintf(writer, "%d\n", i) } writer.Flush() fmt.Printf("Created dummy file: %s with %d lines.\n", filePath, numLines) }
이 예제에서:
- LineProducer는 프로듀서로, 줄을 읽어- linesChannel로 보냅니다.
- LineConsumer인스턴스는 소비자이며,- linesChannel에서 줄을 받아 처리합니다.
- linesChannel은 바운드 버퍼 역할을 합니다.- bufferSize는 프로듀서가 소비자를 너무 앞서나가지 못하게 하여 메모리를 고갈시킬 가능성을 방지합니다.
- sync.WaitGroup은 메인 프로그램이 종료되기 전에 모든 프로듀서와 소비자가 작업을 완료하기를 기다리는 데 중요합니다.
- LineProducer에서- linesChannel을 닫는 것은 필수적입니다. 이는 소비자에게 더 이상 데이터가 전송되지 않음을 알리고,- for line := range lines루프가 정상적으로 종료될 수 있도록 합니다.
패턴 2: 팬아웃 / 팬인
팬아웃 / 팬인 패턴은 작업 집합을 여러 작업자 고루틴(팬아웃)에 분배한 다음 결과를 단일 채널로 다시 수집(팬인)하는 것입니다. 이 패턴은 계산을 병렬화하는 데 탁월합니다.
사용하는 이유
- 병렬성: 여러 CPU 코어를 활용하거나 네트워크를 통해 작업을 분산합니다.
- 확장성: 증가하는 부하를 처리하기 위해 더 많은 작업자를 쉽게 추가할 수 있습니다.
- 작업 분배: 큰 문제를 더 작고 독립적인 하위 문제로 분해합니다.
예제: 숫자의 병렬 제곱
숫자 목록이 있고 이를 병렬로 제곱하고 싶다고 가정해 봅시다.
package main import ( "fmt" "sync" "time" ) // worker는 'in' 채널에서 숫자를 받아 처리하고 'out' 채널로 보냅니다. func worker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) { defer wg.Done() for n := range in { squared := n * n // fmt.Printf("Worker %d: processing %d -> %d\n", id, n, squared) time.Sleep(time.Millisecond * 50) // 작업 시뮬레이션 out <- squared } fmt.Printf("Worker %d finished.\n", id) } func main() { const ( numJobs = 20 numWorkers = 3 ) // 팬아웃: 여러 작업자에게 작업을 보냅니다. jobs := make(chan int, numJobs) results := make(chan int, numJobs) // 팬인을 위한 결과 채널 버퍼링 var workerWG sync.WaitGroup // 작업자 시작 (팬아웃) for w := 1; w <= numWorkers; w++ { workerWG.Add(1) go worker(w, jobs, results, &workerWG) } // jobs 채널에 작업을 보냅니다. for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 더 이상 보낼 작업 없음 // 모든 작업자가 현재 작업을 완료할 때까지 기다립니다. // 이것은 모든 결과가 'results' 채널로 전송되도록 합니다. workerWG.Wait() close(results) // 중요: 모든 작업자가 완료된 후 결과 채널을 닫습니다. // 팬인 수집기에게 더 이상 결과가 생성되지 않음을 알립니다. // 팬인: 결과 수집 fmt.Println("\nCollecting results:") for r := range results { fmt.Printf("Collected result: %d\n", r) } fmt.Println("All done!") }
설명:
- jobs채널: 초기 작업(제곱할 숫자)이 전송되는 곳입니다.
- results채널: 모든 작업자로부터 제곱된 숫자가 수집되는 곳입니다.
- 팬아웃: numWorkers개의 고루틴(worker함수)을 실행하며, 모두jobs채널에서 읽습니다.
- 작업 분배: 메인 고루틴은 숫자를 jobs채널로 보냅니다. Go 런타임은 자동으로 이러한 숫자를 사용 가능한worker고루틴에 분배합니다.
- 팬인: 메인 고루틴은 results채널에서 읽습니다.results는 모든 작업자가 완료되고 마지막 결과를 보낼 기회를 가진 후에만 닫히기 때문에,main의for r := range results루프는 모든 생성된 결과를 올바르게 수신한 다음 종료됩니다.workerWG는 모든 작업자가 완료될 때까지 기다리도록 합니다.
패턴 3: 파이프라인
A 파이프라인은 한 단계의 출력이 다음 단계의 입력이 되는 일련의 단계입니다. 각 단계는 일반적으로 동시적으로 작동합니다. Go에서는 채널을 사용하여 단계를 연결하여 파이프라인을 우아하게 구성합니다.
사용하는 이유
- 모듈성: 복잡한 작업을 더 작고 관리하기 쉬우며 재사용 가능한 구성 요소로 분해합니다.
- 동시성: 각 단계는 이전 단계에서 사용 가능한 데이터로 작동하여 동시적으로 실행될 수 있습니다.
- 처리량: 데이터가 파이프라인을 통해 흐르므로 종종 순차 처리보다 높은 처리량이 발생합니다.
예제: 텍스트 처리 파이프라인
다음과 같은 파이프라인을 구축해 보겠습니다.
- 숫자 시퀀스 생성(프로듀서).
- 짝수 필터링.
- 남은 홀수 제곱.
- 최종 결과 인쇄.
package main import ( "fmt" "sync" "time" ) // Generator 단계: 숫자 생성 func generate(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // Filter 단계: 짝수 필터링 func filterOdd(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { // 홀수만 유지 select { case out <- n: case <-done: return } } } }() return out } // Square 단계: 숫자 제곱 func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { squared := n * n select { case out <- squared: case <-done: return } } }() return out } func main() { // 모든 고루틴의 정상 종료를 위한 done 채널 done := make(chan struct{}) defer close(done) // main이 종료될 때 done 채널이 닫히도록 보장 // 단계 1: 숫자 생성 numbers := generate(done, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 단계 2: 짝수 필터링 oddNumbers := filterOdd(done, numbers) // 단계 3: 홀수 제곱 squaredOddNumbers := square(done, oddNumbers) // 최종 단계: 결과 소비 및 인쇄 fmt.Println("Pipeline results:") for result := range squaredOddNumbers { fmt.Printf("Result: %d\n", result) time.Sleep(time.Millisecond * 10) // 최종 처리 시뮬레이션 } fmt.Println("Pipeline finished.") }
이 파이프라인의 주요 측면:
- 연결된 채널: generate는 채널로 보내고, 이 채널은filterOdd의 입력으로 전달됩니다.filterOdd의 출력 채널은square의 입력으로 전달됩니다.
- <-chan int및- chan<- int: 이러한 방향 채널 유형을 사용하면 안전성과 가독성이 향상되며, 함수가 채널에서 보내는지 또는 받는지 명확하게 나타냅니다.
- 정상 종료 (done채널):done채널은 파이프라인의 모든 고루틴에게 처리를 중지하고 종료하도록 신호를 보내는 일반적인 패턴입니다.main이 종료될 때defer close(done)은done채널을 수신하는 모든 고루틴이 정상적으로 반환되도록 하여 고루틴 누수를 방지합니다. 이는 장기 실행 파이프라인이나 파이프라인의 초기 단계에서 오류가 발생하는 경우 특히 중요합니다.
- 각 단계는 독립적인 고루틴이며 동시적으로 작동합니다. generate가 숫자를 생성하는 즉시filterOdd는 이를 처리할 수 있고, 그런 다음square는 전체 입력이 생성될 때까지 기다리지 않고 숫자를 제곱할 수 있습니다.
패턴 결합 및 모범 사례
이러한 패턴은 상호 배타적이지 않으며, 정교한 동시 시스템을 구축하기 위해 결합될 수 있습니다. 예를 들어, 파이프라인의 한 단계는 하위 작업을 병렬화하기 위한 팬아웃/팬인 작업 자체가 될 수 있습니다.
Go 동시성을 위한 일반적인 모범 사례:
- 메모리를 공유하여 통신하고, 통신하여 메모리를 공유하지 마십시오: 이것은 Go의 좌우명입니다. 통신 및 동기화를 위해 채널을 사용하십시오.
- 고루틴은 저렴하므로 아낌없이 사용하십시오: 많은 고루틴을 실행하는 것을 두려워하지 마십시오.
- 완료 신호를 위해 채널을 닫으십시오: 더 이상 데이터가 전송되지 않을 때는 항상 채널을 닫으십시오. 이렇게 하면 수신 측의 for ... range루프가 차단되지 않습니다.
- sync.WaitGroup을 사용하여 고루틴 대기: 모든 고루틴이 메인 프로그램 종료 전에 완료되도록 보장하는 데 필수적입니다.
- 오류 처리 및 정상 종료: 취소 작업을 위한 done채널 또는 컨텍스트와 같은 메커니즘을 구현하고 모든 고루틴이 정리되도록 합니다.
- 전역 상태는 가능한 한 피하십시오: 공유 상태를 피할 수 없는 경우 sync.Mutex또는sync.RWMutex로 보호하거나, 단일 고루틴(예: '모니터' 고루틴)을 통해 액세스를 직렬화하는 것이 좋습니다.
- 취소 및 마감 시간에 context패키지 고려: 시간 초과, 마감 시간 또는 계단식 취소가 관련된 더 복잡한 시나리오의 경우context패키지가 필수적입니다.
- 채널을 적절하게 버퍼링하십시오: 버퍼링된 채널을 사용하여 버스트를 부드럽게 하거나 프로듀서가 차단 없이 앞서 나갈 수 있도록 하지만, 메모리 사용량에 주의십시오. 버퍼링되지 않은 채널은 엄격한 동기화(랑데부)를 강제합니다.
- 동시성 철저히 테스트하십시오: 동시성 버그는 미묘할 수 있습니다. Go 경쟁 탐지기를 위해 -race플래그를 사용합니다 (go run -race filename.go또는go test -race ./...).
결론
고루틴과 채널을 기반으로 하는 Go의 동시성 모델은 동시성 애플리케이션을 설계하는 직관적이고 강력한 방법을 제공합니다. 프로듀서-소비자, 팬아웃/팬인, 파이프라인과 같은 패턴을 이해하고 적용함으로써 개발자는 최신 멀티코어 프로세서를 효과적으로 활용하는 강력하고 확장 가능하며 효율적인 시스템을 구축할 수 있습니다. 이러한 패턴은 모듈성과 유지 관리성을 장려하여 Go에서의 동시성 프로그래밍을 훨씬 더 즐겁고 오류가 적은 경험으로 만듭니다. 이러한 패턴을 채택하면 Go 애플리케이션은 자연스럽게 더 동시적이고 성능이 향상될 것입니다.