再び Go の学習を再開していて、以前理解が曖昧だった点を重点的に学び直しています。そのうちの1つ goroutine, channel について記事にしょうと思います。

Go の並行処理は goroutine と channel という2つの仕組みを中心に設計されています。channel を通じてデータをやり取りすることで安全な並行処理を実現できます。この記事ではサンプルコードを交えて解説します。

コードは以下のレポジトリにあります。

https://github.com/jedipunkz/go-tips

goroutine とは

goroutine は Go のランタイムが管理する軽量なスレッドです。go キーワードをつけて関数を呼び出すだけで並行実行できます。

通常の関数呼び出しは呼び出し元が処理完了を待ちますが、go をつけると呼び出し元はすぐに次の処理へ進み、関数は別の goroutine で並行して動きます。

channel とは

channel は goroutine 間でデータを安全にやり取りするためのパイプです。make(chan T) で作成し、<- 演算子で送受信します。

  • 送信: ch <- value
  • 受信: value := <-ch

channel を使うと goroutine 間で双方向にデータをやり取りできます。

channel にはバッファなしとバッファありの2種類があります。バッファなし channel は送受信が揃うまでブロックするため、goroutine 間の同期点として機能します。バッファあり channel はバッファに空きがある限り送信側はブロックせず先へ進めます。

パターン1: 基本的な channel の使い方

まずは最もシンプルなパターンです。producer goroutine が値を送信し、main goroutine が受信します。

使い所

Web から複数のファイルをダウンロードして DB に保存するような処理を考えると、ネットワーク待ちや DB への書き込み待ちがほとんどで CPU はほぼ遊んでいます。こういった I/O バウンドな処理では、1件ずつ順番に処理するより goroutine と channel でパイプライン化するほうが効率的です。たとえば「URL リストを読み込む goroutine」と「その URL から HTTP GET する goroutine」を分離し、channel でつなぐことで取得と後続処理を並行して進められます。

package main

import "fmt"

func producer(ch chan<- int) {
	for i := 1; i <= 5; i++ {
		fmt.Printf("送信: %d\n", i)
		ch <- i
	}
	close(ch)
}

func main() {
	ch := make(chan int)

	go producer(ch)

	for v := range ch {
		fmt.Printf("受信: %d\n", v)
	}
}

chan<- int は送信専用 channel を表す型です。関数の引数でこのように型を絞ることで、誤った方向での操作をコンパイル時に防げます。close(ch) を呼ぶと for range ループが終了します。

実行結果

送信: 1
送信: 2
受信: 1
受信: 2
送信: 3
送信: 4
受信: 3
受信: 4
送信: 5
受信: 5

送信と受信が交互に出力されているのは、バッファなし channel のため送信側が受信側の準備を待つためです。

パターン2: 並行処理

複数の goroutine で並列にタスクを実行し、結果を収集するパターンです。sync.WaitGroup を使って全 worker の完了を待ちます。

使い所

Web から N 件のファイルをダウンロードして DB に保存する処理は、1件あたりの時間のほとんどがネットワーク待ちと DB 書き込み待ちです。CPU はほぼ使わないため、goroutine で並列化しても CPU の奪い合いにならず、単純にスループットが上がります。このパターンでは各 URL をワーカーに割り当て、全件完了を WaitGroup で待ちつつ結果を channel で収集することで、順次処理に比べて大幅に処理時間を短縮できます。

package main

import (
	"fmt"
	"sync"
)

func worker(id int, wg *sync.WaitGroup, results chan<- string) {
	defer wg.Done()
	result := fmt.Sprintf("worker %d の処理完了", id)
	results <- result
}

func main() {
	const numWorkers = 5
	results := make(chan string, numWorkers)

	var wg sync.WaitGroup
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, &wg, results)
	}

	// 全 worker 完了後にチャンネルをクローズ
	go func() {
		wg.Wait()
		close(results)
	}()

	for r := range results {
		fmt.Println(r)
	}
}

results をバッファあり channel (make(chan string, numWorkers)) にすることで、受信側の準備が整う前に全 worker が送信を完了できます。wg.Wait() の後に close する goroutine を別途起動することで、for range による結果の収集と並行して待機処理が走ります。

実行結果

worker 1 の処理完了
worker 2 の処理完了
worker 3 の処理完了
worker 4 の処理完了
worker 5 の処理完了

goroutine のスケジューリングにより順序は実行ごとに変わる可能性があります。

パターン3: 複数 channel の合流

複数の channel からのデータを1つの channel に合流させるパターンです。

使い所

複数の外部 API に問い合わせて結果をまとめて返す処理に向いています。たとえばユーザー情報サービス・注文サービス・在庫サービスに並列リクエストを送り、全ての応答が揃ったらレスポンスを組み立てるケースです。各サービスへのリクエストはネットワーク待ちなので CPU は使わず、直列に呼ぶと 3 サービス分の待ち時間が積み重なりますが、並列化すれば最も遅いサービスの応答時間だけで済みます。

package main

import (
	"fmt"
	"sync"
)

func generate(id int, vals ...int) <-chan int {
	ch := make(chan int)
	go func() {
		defer close(ch)
		for _, v := range vals {
			fmt.Printf("source %d: 送信 %d\n", id, v)
			ch <- v
		}
	}()
	return ch
}

func merge(channels ...<-chan int) <-chan int {
	merged := make(chan int)
	var wg sync.WaitGroup

	forward := func(ch <-chan int) {
		defer wg.Done()
		for v := range ch {
			merged <- v
		}
	}

	wg.Add(len(channels))
	for _, ch := range channels {
		go forward(ch)
	}

	go func() {
		wg.Wait()
		close(merged)
	}()

	return merged
}

func main() {
	ch1 := generate(1, 1, 2, 3)
	ch2 := generate(2, 4, 5, 6)
	ch3 := generate(3, 7, 8, 9)

	for v := range merge(ch1, ch2, ch3) {
		fmt.Printf("受信: %d\n", v)
	}
}

<-chan int は受信専用 channel を表します。generate 関数は channel を返し、呼び出し側は送信を気にせず受信だけに集中できます。merge では各 source channel に対して goroutine を起動し、全て完了したら merged channel を close します。

実行結果

source 1: 送信 1
source 2: 送信 4
source 1: 送信 2
source 3: 送信 7
source 3: 送信 8
source 1: 送信 3
受信: 1
受信: 7
受信: 2
受信: 3
受信: 8
source 3: 送信 9
受信: 9
受信: 4
source 2: 送信 5
source 2: 送信 6
受信: 5
受信: 6

複数の source が並列に動いているため、送信順序と受信順序が一致しない点に注目してください。受信側ではソースを意識せず全データを順次処理できます。

パターン4: スロットリング

同時実行数を制限したい場合、バッファあり channel をセマフォとして使うパターンが有効です。

使い所

Web から大量のファイルをダウンロードして DB に保存する処理を goroutine で並列化した場合、制限なく goroutine を起動すると DB コネクションを使い果たしたり、接続先サーバーのレート制限に引っかかったりします。このパターンではセマフォで同時実行数に上限を設けることで、並列化によるスループット向上と、リソース枯渇や外部制限への抵触を防ぐ安全性を両立できます。golang.org/x/sync/semaphore パッケージを使う方法もありますが、channel でシンプルに実装できます。

package main

import (
	"fmt"
	"time"
)

func task(id int, sem chan struct{}, done chan<- struct{}) {
	sem <- struct{}{} // セマフォ取得(空きがなければブロック)
	defer func() {
		<-sem // セマフォ解放
		done <- struct{}{}
	}()

	fmt.Printf("タスク %d 開始\n", id)
	time.Sleep(500 * time.Millisecond) // 処理時間をシミュレート
	fmt.Printf("タスク %d 完了\n", id)
}

func main() {
	const (
		numTasks    = 10
		concurrency = 3 // 同時実行数の上限
	)

	sem := make(chan struct{}, concurrency)
	done := make(chan struct{}, numTasks)

	for i := 1; i <= numTasks; i++ {
		go task(i, sem, done)
	}

	for i := 0; i < numTasks; i++ {
		<-done
	}
	fmt.Println("全タスク完了")
}

make(chan struct{}, concurrency) でバッファサイズ=同時実行数の上限となるセマフォを作ります。struct{} はサイズ0の型なのでメモリ効率が良いです。sem <- struct{}{} でセマフォを取得し、バッファが満杯なら空きができるまでブロックします。

実行結果

タスク 10 開始
タスク 1 開始
タスク 5 開始
タスク 1 完了
タスク 3 開始
タスク 10 完了
タスク 2 開始
タスク 5 完了
タスク 7 開始
タスク 2 完了
タスク 6 開始
タスク 3 完了
タスク 4 開始
タスク 7 完了
タスク 8 開始
タスク 4 完了
タスク 8 完了
タスク 9 開始
タスク 6 完了
タスク 9 完了
全タスク完了

10タスクあるにもかかわらず、同時に「開始」が表示されるのは最大3つまでに制限されています。あるタスクが「完了」した直後に次のタスクが「開始」するという動きが確認できます。

パターン5: タイムアウト処理

context.WithTimeoutselect を組み合わせることで、処理のタイムアウトをシンプルに実装できます。

使い所

Web アプリのハンドラーが外部 API や DB に問い合わせる処理は、相手の応答を待つだけで CPU はほぼ使いません。しかし相手が遅延や障害を起こすと、そのままではレスポンスが返らずユーザーを長時間待たせてしまいます。context.WithTimeout を使うことで「N 秒以内に応答がなければエラーを返す」という SLA をコードで表現できます。net/http のリクエストハンドラーでは r.Context() を downstream の呼び出しにそのまま渡すのが定石で、クライアントが接続を切った瞬間に進行中の DB クエリや HTTP リクエストも連鎖してキャンセルされます。

package main

import (
	"context"
	"fmt"
	"time"
)

func slowTask(ctx context.Context, result chan<- string) {
	select {
	case <-time.After(2 * time.Second): // 重い処理をシミュレート
		result <- "処理完了"
	case <-ctx.Done():
		fmt.Println("タスクがキャンセルされました:", ctx.Err())
		return
	}
}

func run(timeout time.Duration) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	result := make(chan string, 1)
	go slowTask(ctx, result)

	select {
	case r := <-result:
		fmt.Println("成功:", r)
	case <-ctx.Done():
		fmt.Println("タイムアウト:", ctx.Err())
	}
}

func main() {
	fmt.Println("=== タイムアウトなし (3秒) ===")
	run(3 * time.Second)

	fmt.Println("=== タイムアウトあり (1秒) ===")
	run(1 * time.Second)
}

select は複数の channel 操作を同時に待機し、最初に準備できたものを実行します。ctx.Done() は context がキャンセルまたはタイムアウトした時に close される channel です。defer cancel() を忘れずに呼ぶことで、タイムアウト前に処理が完了した場合もリソースが解放されます。

実行結果

=== タイムアウトなし (3秒) ===
成功: 処理完了
=== タイムアウトあり (1秒) ===
タスクがキャンセルされました: context deadline exceeded
タイムアウト: context deadline exceeded

タイムアウトが 3 秒の場合は処理(2秒)が完了して「成功」となります。タイムアウトが 1 秒の場合は処理完了前に deadline を超えるため、goroutine 内でもキャンセルを検知して早期リターンし、呼び出し元でもタイムアウトとして扱われています。