Goroutine과 Sync

Go 하면 어떤 특징이 떠오르시나요? 구글의 언어, 빠르다 등 전반적인 개념에서 goroutine, 채널 등 디테일한 부분까지 많은 것들이 있을텐데요, 저는 제가 처음 고를 접했을 때 가장 재미있게 봤던 것이 goroutine과 채널이였습니다.

오늘은 goroutine과 이를 잘 보조해주는 sync 패키지에 대한 이야기를 조금 하려고 합니다.

Goroutine

goroutine은 go에서 사용하는 경량 쓰레드입니다. 그래서 goroutine과 채널 등을 이용해서 동시성 프로그래밍이 가능하게 해주며, 타 언의 쓰레드와 다르게 system call이 아닌 go 자체에서 관리하기 때문에 다수 goroutine을 생성해도 go가 허용한 범위 안에서만 영향을 끼치게 됩니다. 덕분에 go로 만들어진 도구들은 다른 도구들보다 구동 속도가 월등히 빠릅니다.

package main

import "fmt"

func main(){
	go func (){
		fmt.Println("첫번째 goroutine이 실행되었습니다.")
	}
	go func (){
		fmt.Println("두번째 goroutine이 실행되었습니다.")
	}
	go func (){
		fmt.Println("세번째 goroutine이 실행되었습니다.")
	}
}

Sync (standard library)

sync는 go에서 동기화를 위한 스탠다드 라이브러리입니다. 여러 케이스에서 사용할 수 있겠지만, 멀티 쓰레드처럼 여러 갈래로 분기되어 처리되는 goroutine의 동기화를 위해선 꼭 사용되는 패키지입니다.

https://pkg.go.dev/sync

WaitGroup

그냥 goroutine만 사용한다면 main이 종료될 시 대기하지 않고 같이 종료되게 되는데요, 이를 방지 하기 위해 sync의 WaitGroup을 사용하여 지정한 개수의 goroutine이 완료될 때 까지 대기하게 할 수 있습니다.

package main

import (
	"fmt"
	"strconv"
	"sync"
)

func main() {
	fmt.Println("vim-go")

	wordlists := make(chan string)

	// Scanning
	concurrency := 10
	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			for word := range wordlists {
				fmt.Println(word)
			}
			wg.Done()
		}()
	}

	for j := 0; j < 100; j++ {
		wordlists <- strconv.Itoa(j)
	}
	close(wordlists)
	wg.Wait()
}

https://gist.github.com/hahwul/e723ac6847c33be0c5a59e04b60fb35c

저 또한 도구를 만들 때 이런 형태로 많이 사용하곤 합니다.

Mutex

mutex도 자주 사용되는 것 중 하나인데요, OS 공부할 떄 배운 mutex와 동일하게 동시 접근을 막기 위해 사용되는 타입입니다.

https://pkg.go.dev/sync#Mutex

mutex := &sync.Mutex{}
mutex.Lock()
// run ...
mutex.Unlock()

golang.org/x/sync

스탠다드 라이브러리는 아니지만, 하나의 sync 라이브러리가 더 존재합니다. golang.org/x/sync는 스탠다드 sync와 동일하게 동기화를 위한 라이브러리로 여기에는 errgroup semaphore singleflight syncmap 이렇게 4개의 하위 패키지가 존재합니다.

https://pkg.go.dev/golang.org/x/sync

errgroup

errgroup은 goroutine 그룹에 대한 동기화를 지원하며 이를 통해서 에러를 전파하고 context를 취소할 수 있도록 제공해주는 패키지입니다. 스탠다드 sync와 동일하게 Wait() 함수를 통해 대기하는 기능을 지원하며 에러가 발생하는 경우 내부 goroutine에게 에러를 전파해서 취소될 수 있도록 처리해 줍니다.

https://pkg.go.dev/golang.org/x/sync/errgroup

import (
	"fmt"
	"net/http"

	"golang.org/x/sync/errgroup"
)

func main() {
	g := new(errgroup.Group)
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		url := url
		g.Go(func() error {
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}

semaphore

semaphore는 weighted semaphore를 패키지단에서 제공합니다. semaphore를 사용하면 병렬 작업에서 goroutine의 갯수를 제한할 수 있습니다.

https://pkg.go.dev/golang.org/x/sync/semaphore

package main

import (
	"context"
	"fmt"
	"log"
	"runtime"

	"golang.org/x/sync/semaphore"
)

func main() {
	ctx := context.TODO()

	var (
		maxWorkers = runtime.GOMAXPROCS(0)
		sem        = semaphore.NewWeighted(int64(maxWorkers))
		out        = make([]int, 32)
	)

	for i := range out {
		if err := sem.Acquire(ctx, 1); err != nil {
			log.Printf("Failed to acquire semaphore: %v", err)
			break
		}

		go func(i int) {
			defer sem.Release(1)
			out[i] = collatzSteps(i + 1)
		}(i)
	}

	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
		log.Printf("Failed to acquire semaphore: %v", err)
	}

	fmt.Println(out)

}

func collatzSteps(n int) (steps int) {
	if n <= 0 {
		panic("nonpositive input")
	}

	for ; n > 1; steps++ {
		if steps < 0 {
			panic("too many steps")
		}

		if n%2 == 0 {
			n /= 2
			continue
		}

		const maxInt = int(^uint(0) >> 1)
		if n > (maxInt-1)/3 {
			panic("overflow")
		}
		n = 3*n + 1
	}

	return steps
}

singleflight

singleflight는 중복함수 호출을 억제하기 위한 메커니즘을 제공하는 패키지입니다. 이를 활용하면 여러 분기의 goroutine에서 중복으로 동일 함수를 호출하는 것을 제한할 수 있습니다.

정확하겐 제한한다기 보단 대기하게 되는데요, 어떤 특정 함수가 호출되어 실행중일 때 동일한 키(arguments)로 함수가 실행되면 함수를 새로 처리하는 것이 아닌 기존 함수의 처리를 기다립니다.

만약 아래와 같이 runApp 함수에 singleflight가 적용되었을 때 key에 따라서 처리 여부가 달라집니다.

var g singleflight.Group
ctx := context.Background()

runApp := func(key string) {
	g.Do(key, func() (interface{}, error) {
		fmt.Println(key)
	}
}

go runApp("test")
go runApp("test")
go runApp("hahwul")

https://pkg.go.dev/golang.org/x/sync/singleflight

syncmap

syncmap은 concurrent map 즉, 동기화된 map을 만들 수 있습니다. 해당 기능은 스탠다드 sync에도 존재하며 아래와 같은 형태로 동기화된 map을 생성할 수 있습니다.

https://pkg.go.dev/golang.org/x/sync/syncmap

syncMap := sync.Map

Conclusion

어디선가 들은 이야기인데, 1인 개발도 팀으로 개발하는 것과 같이 진행 하라는 말이 있습니다. 이는 과거의 나와 미래의 내가 같을 수가 없기 때문인데요. 그래서 정보는 명확하게, 코드는 누가봐도 잘 알 수 있도록 작성 하라는 말 인 것 같습니다.

예전에 goroutine과 channel 등을 사용하면서 기능을 구현할 때 WaitGroup, 잘써야 Mutex 정도만을 사용했던 제 자신이 부끄러워 지네요. 이제라도 적절한 상황에 잘 쓸 수 있도록 숙지 해야겠습니다. :D