Featured image of post Goroutine과 Sync

Goroutine과 Sync

Go 하면 어떤 특징이 떠오르시나요? 구글의 언어, 빠르다 등 전반적인 개념에서 goroutine, 채널 등 디테일한 부분까지 많은 것들이 있을텐데요, 저는 제가 처음 고를 접했을 때 가장 재미있게 봤던 것이 goroutine과 채널이였습니다.

오늘은 goroutine과 이를 잘 보조해주는 sync 패키지에 대한 이야기를 조금 하려고 합니다.

Goroutine

goroutine은 go에서 사용하는 경량 쓰레드입니다. 그래서 goroutine과 채널 등을 이용해서 동시성 프로그래밍이 가능하게 해주며, 타 언의 쓰레드와 다르게 system call이 아닌 go 자체에서 관리하기 때문에 다수 goroutine을 생성해도 go가 허용한 범위 안에서만 영향을 끼치게 됩니다. 덕분에 go로 만들어진 도구들은 다른 도구들보다 구동 속도가 월등히 빠릅니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import "fmt"

func main(){
	go func (){
		fmt.Println("첫번째 goroutine이 실행되었습니다.")
	}
	go func (){
		fmt.Println("두번째 goroutine이 실행되었습니다.")
	}
	go func (){
		fmt.Println("세번째 goroutine이 실행되었습니다.")
	}
}

Sync (standard library)

sync는 go에서 동기화를 위한 스탠다드 라이브러리입니다. 여러 케이스에서 사용할 수 있겠지만, 멀티 쓰레드처럼 여러 갈래로 분기되어 처리되는 goroutine의 동기화를 위해선 꼭 사용되는 패키지입니다.

https://pkg.go.dev/sync

WaitGroup

그냥 goroutine만 사용한다면 main이 종료될 시 대기하지 않고 같이 종료되게 되는데요, 이를 방지 하기 위해 sync의 WaitGroup을 사용하여 지정한 개수의 goroutine이 완료될 때 까지 대기하게 할 수 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
	"fmt"
	"strconv"
	"sync"
)

func main() {
	fmt.Println("vim-go")

	wordlists := make(chan string)

	// Scanning
	concurrency := 10
	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			for word := range wordlists {
				fmt.Println(word)
			}
			wg.Done()
		}()
	}

	for j := 0; j < 100; j++ {
		wordlists <- strconv.Itoa(j)
	}
	close(wordlists)
	wg.Wait()
}

https://gist.github.com/hahwul/e723ac6847c33be0c5a59e04b60fb35c

저 또한 도구를 만들 때 이런 형태로 많이 사용하곤 합니다.

Mutex

mutex도 자주 사용되는 것 중 하나인데요, OS 공부할 떄 배운 mutex와 동일하게 동시 접근을 막기 위해 사용되는 타입입니다.

https://pkg.go.dev/sync#Mutex

1
2
3
4
mutex := &sync.Mutex{}
mutex.Lock()
// run ...
mutex.Unlock()

golang.org/x/sync

스탠다드 라이브러리는 아니지만, 하나의 sync 라이브러리가 더 존재합니다. golang.org/x/sync는 스탠다드 sync와 동일하게 동기화를 위한 라이브러리로 여기에는 errgroup semaphore singleflight syncmap 이렇게 4개의 하위 패키지가 존재합니다.

https://pkg.go.dev/golang.org/x/sync

errgroup

errgroup은 goroutine 그룹에 대한 동기화를 지원하며 이를 통해서 에러를 전파하고 context를 취소할 수 있도록 제공해주는 패키지입니다. 스탠다드 sync와 동일하게 Wait() 함수를 통해 대기하는 기능을 지원하며 에러가 발생하는 경우 내부 goroutine에게 에러를 전파해서 취소될 수 있도록 처리해 줍니다.

https://pkg.go.dev/golang.org/x/sync/errgroup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import (
	"fmt"
	"net/http"

	"golang.org/x/sync/errgroup"
)

func main() {
	g := new(errgroup.Group)
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	for _, url := range urls {
		url := url
		g.Go(func() error {
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}

semaphore

semaphore는 weighted semaphore를 패키지단에서 제공합니다. semaphore를 사용하면 병렬 작업에서 goroutine의 갯수를 제한할 수 있습니다.

https://pkg.go.dev/golang.org/x/sync/semaphore

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
	"context"
	"fmt"
	"log"
	"runtime"

	"golang.org/x/sync/semaphore"
)

func main() {
	ctx := context.TODO()

	var (
		maxWorkers = runtime.GOMAXPROCS(0)
		sem        = semaphore.NewWeighted(int64(maxWorkers))
		out        = make([]int, 32)
	)

	for i := range out {
		if err := sem.Acquire(ctx, 1); err != nil {
			log.Printf("Failed to acquire semaphore: %v", err)
			break
		}

		go func(i int) {
			defer sem.Release(1)
			out[i] = collatzSteps(i + 1)
		}(i)
	}

	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
		log.Printf("Failed to acquire semaphore: %v", err)
	}

	fmt.Println(out)

}

func collatzSteps(n int) (steps int) {
	if n <= 0 {
		panic("nonpositive input")
	}

	for ; n > 1; steps++ {
		if steps < 0 {
			panic("too many steps")
		}

		if n%2 == 0 {
			n /= 2
			continue
		}

		const maxInt = int(^uint(0) >> 1)
		if n > (maxInt-1)/3 {
			panic("overflow")
		}
		n = 3*n + 1
	}

	return steps
}

singleflight

singleflight는 중복함수 호출을 억제하기 위한 메커니즘을 제공하는 패키지입니다. 이를 활용하면 여러 분기의 goroutine에서 중복으로 동일 함수를 호출하는 것을 제한할 수 있습니다.

정확하겐 제한한다기 보단 대기하게 되는데요, 어떤 특정 함수가 호출되어 실행중일 때 동일한 키(arguments)로 함수가 실행되면 함수를 새로 처리하는 것이 아닌 기존 함수의 처리를 기다립니다.

만약 아래와 같이 runApp 함수에 singleflight가 적용되었을 때 key에 따라서 처리 여부가 달라집니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var g singleflight.Group
ctx := context.Background()

runApp := func(key string) {
	g.Do(key, func() (interface{}, error) {
		fmt.Println(key)
	}
}

go runApp("test")
go runApp("test")
go runApp("hahwul")

https://pkg.go.dev/golang.org/x/sync/singleflight

syncmap

syncmap은 concurrent map 즉, 동기화된 map을 만들 수 있습니다. 해당 기능은 스탠다드 sync에도 존재하며 아래와 같은 형태로 동기화된 map을 생성할 수 있습니다.

https://pkg.go.dev/golang.org/x/sync/syncmap

1
syncMap := sync.Map

Conclusion

어디선가 들은 이야기인데, 1인 개발도 팀으로 개발하는 것과 같이 진행 하라는 말이 있습니다. 이는 과거의 나와 미래의 내가 같을 수가 없기 때문인데요. 그래서 정보는 명확하게, 코드는 누가봐도 잘 알 수 있도록 작성 하라는 말 인 것 같습니다.

예전에 goroutine과 channel 등을 사용하면서 기능을 구현할 때 WaitGroup, 잘써야 Mutex 정도만을 사용했던 제 자신이 부끄러워 지네요. 이제라도 적절한 상황에 잘 쓸 수 있도록 숙지 해야겠습니다. :D

Licensed under CC BY-NC-SA 4.0