C++ Chapter 19.6 : 멀티 스레딩 예제 (벡터 내적)
카테고리: Cpp
태그: Cpp Programming
인프런에 있는 홍정모 교수님의 홍정모의 따라 하며 배우는 C++ 강의를 듣고 정리한 필기입니다. 😀  
🌜 [홍정모의 따라 하며 배우는 C++]강의 들으러 가기!
chapter 19. 모던 C++ 필수 요소들
멀티 스레딩 예제 (벡터 내적)
🔔 벡터 내적(Dot Product)이란
\[\vec{A}\cdot\vec{B} = x_A{\cdot}x_B + y_A{\cdot}y_B \]
- 두 벡터의 성분끼리 곱해서 더한 스칼라 값
- 수학적으로는 A dot B 으로 표현한다.
- 벡터 내적의 원리와 개념 참고
🔔 벡터 내적을 계산 하는 코드
7 가지 방법으로 벡터 내적을 계산해보고 각각의 방법이 얼마나 효율적인지 실행 시간을 재볼 것이다.
전체 코드와 비교
#include <iostream>
#include <chrono>
#include <mutex>	
#include <utility>
#include <vector>	
#include <atomic>
#include <numeric>		// std::inner_product
#include <random>
#include <execution>	// parallel execution
#include <future>
#include <thread>
using namespace std;
mutex mtx;
void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}
void dotProductLock(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	//cout << "Thread Start" << endl;
	for (unsigned int i = start; i < end; ++i)
	{
		std::scoped_lock lock(mtx);
		sum += (v0[i] * v1[i]);
	}
	//cout << "Thread End" << endl;
}
void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}
auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end)
{
	int sum = 0;
	for (unsigned int i = start; i < end; ++i)
		sum += (v0[i] * v1[i]);
	return sum;
}
int main()
{
	const long long numData = 100'000'000;
	const unsigned int numThread = 4;
	// thread 개수 늘린다고 degree of multithreading이 높아지지는 않음
	vector<int> vec0, vec1;
	vec0.reserve(numData);
	vec1.reserve(numData);
	std::random_device seed;
	std::mt19937_64 makerand(seed());
	std::uniform_int_distribution<> range(1, 10);
	for (long long i = 0; i < numData; ++i)
	{
		vec0.push_back(range(makerand));
		vec1.push_back(range(makerand));
	}
	cout << "실험 1번 - std::inner_product\n";
	{
		const auto sta = std::chrono::steady_clock::now();		// 시간 측정 시작
		const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
		// 두 벡터의 개수가 같다고 가정하므로 vec1은 begin만 있어도 되며 ull은 unsigned long long의 약자
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;		
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	cout << "실험 2번 - Naive\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		unsigned long long sum = 0;
		vector<std::thread> threads;
		threads.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	cout << "실험 3번 - Lockguard\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		unsigned long long sum = 0;
		vector<std::thread> threads;
		threads.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	cout << "실험 4번 - Atomic\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		atomic<unsigned long long> sum = 0;
		vector<std::thread> threads;
		threads.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductAtomic, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	cout << "실험 5번 - Future\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		unsigned long long sum = 0;
		vector<std::future<int>> futures;
		futures.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1),
				t * numPerThread, (t + 1) * numPerThread);
		for (unsigned int t = 0; t < numThread; ++t)
			sum += futures[t].get();
		
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	// TODO : Future의 divide and conquer 방식은 thread에서도 구현해보자.
	// async 대신에 thread와 promise를 사용해서 future을 사용해보자.
	
  cout << "실험 6번 - promise\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		
		vector<std::promise<int>> prom;
		vector<std::future<int>> future;
		vector<std::thread> threads;
		prom.resize(numThread);
		future.resize(numThread);
		threads.resize(numThread);
		
		unsigned long long sum(0);
		unsigned numPerThread = numData / numThread;
		for (unsigned i = 0; i < numThread; ++i)
		{
			future[i] = prom[i].get_future();
			unsigned long long tempSum(0);
			threads[i] = std::thread([&](std::promise<int>&& prom)
				{
					std::scoped_lock lock2(mtx);
					
					for (unsigned int j = 0; j < numPerThread; ++j)
						tempSum += (vec0[j] * vec1[j]);
					prom.set_value(tempSum);
				}, std::move(prom[i]));
			threads[i].join();
		}
		for (unsigned int t = 0; t < numThread; ++t)
			sum += future[t].get();
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
	cout << "실험 7번 - std::transform_reduce\n";
	{
		const auto sta = std::chrono::steady_clock::now();
		const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
		const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
		cout << "연산에 소요된 시간 = " << dur.count() << '\n';
		cout << "결과값 = " << sum << '\n';
		cout << '\n';
	}
}
💎출력💎
실험 1번 - std::inner_product
연산에 소요된 시간 = 0.068767
결과값 = 3025113384
실험 2번 - Naive
연산에 소요된 시간 = 0.04685
결과값 = 889569211
실험 3번 - Lockguard
연산에 소요된 시간 = 2.15617
결과값 = 3025113384
실험 4번 - Atomic
연산에 소요된 시간 = 1.31186
결과값 = 3025113384
실험 5번 - Future
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384
실험 6번 - promise
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384
실험 7번 - std::transform_reduce
연산에 소요된 시간 = 0.014745
결과값 = 3025113384
- 1️⃣std::inner_product ⭕, 빠름
- 2️⃣ 순진한 멀티 쓰레딩 ❌, 빠름
    - 레이스 컨디션으로 값이 부정확하게 나온다.
 
- 3️⃣ Lock guard ⭕, 느림
    - lock,- unlock과정 때문에 느리다.
 
- 4️⃣ atomic ⭕, 느림
    - atomic의 연산은 느린 편이다.
 
- 5️⃣ Task-based parallelism (std::async) ⭕, 빠름
- 6️⃣ Task-based parallelism (std::thread, std::promise) ⭕, 빠름
- 7️⃣ std::transform_reduce ⭕, 빠름
main 함수
using namespace std;
mutex mtx;
int main()
{
	const long long numData = 100'000'000;
	const unsigned int numThread = 4;
	vector<int> vec0, vec1;
	vec0.reserve(numData);
	vec1.reserve(numData);
	std::random_device seed;
	std::mt19937_64 makerand(seed());
	std::uniform_int_distribution<> range(1, 10);
	for (long long i = 0; i < numData; ++i)
	{
		vec0.push_back(range(makerand));
		vec1.push_back(range(makerand));
	}
- mtx- mutex 객체로 전역 변수로 선언 되었다.
 
- numData- 벡터 크기
- 1억개 !
 
- numThread- 스레드 개수를 4 개로 지정했다.
- cf) 스레드를 많이 쓴다고 멀티스레딩 효율이 높아지는건 아니다
 
- vec0,- vec1- 두 벡터 컨테이너
- 각각 reserve 함수를 통해 numData(1억) 크기를 가진 벡터로 만들었다.
 
- seed- 랜덤넘버 생성 시드
 
- std::mt19937_64 makerand(seed());
    - 랜덤 넘버 생성기
 
- std::uniform_int_distribution<> range(1, 10);
    - 1~10 범위 중 랜덤 넘버
 
- for문
    - 벡터 크기안 1억번 만큼 for문을 돌며 두 벡터에 각각 1~10 중 랜덤한 숫자를 원소로 삽입한다.
 
방법 1️⃣ std::inner_product
#include <numeric>
- std::inner_product은 두 벡터의 내적값을 리턴해주는 함수다.
  const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
- sum에 벡터 내적의 결과를 담음
- vec1은- end()없이- begin()만 있어도 된다.- 벡터 내적을 구할 땐 두 벡터의 성분 개수가 같다고 가정하므로.
 
- 0ull- 0을 unsigned long long 으로 표기
        - 마치 16진수면 0x라 하는것과 같음
 
- sum이 unsigned long long int 타입이 되게끔.- 그냥 0이라고 하면 sum은 unsigned long long int 가 아닌 그냥 int가 됨.
 
 
- 0을 unsigned long long 으로 표기
        
방법 2️⃣ 순진한 멀티 쓰레딩
void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}
...
unsigned long long sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
	threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
	threads[t].join();
- threads- 스레드들을 담는 vector
- 벡터의 사이즈를 numThread, 즉 4 개 만큼 늘렸다.
 
- numPerThread- 1 개의 스레드 당 작업할 데이터 개수
- 모든 스레드의 작업 수가 모두 같다고 가정. (나머지가 0인 경우를 가정)
        - numData값이 1 억이니 한 스레드 당 2500 만개씩의 데이터를 병렬 처리 하는 셈이다.
 
 
- 첫번째 for문 👉 4 번 돈다.
    - 스레드 마다 벡터 내적을 구하는 작업을 부여
        - 하나의 스레드마다 2500만개(numPerThread)의 데이터의 벡터 내적을 구하고 결과를sum에 담는다.- 첫번째 인수는 함수 doProductNaive. 나머지 인자들은 함수에 필요한 인자들.- 시작 범위 t * numPerThread, 끝 범위 (t + 1) * numPerThread
 
 
- 첫번째 인수는 함수 
 
- 하나의 스레드마다 2500만개(
 
- 스레드 마다 벡터 내적을 구하는 작업을 부여
        
- 두번째 for문
    - 벡터의 스레드들이 작업이 끝나기를 대기
 
이 방법은 레이스 컨디션이 발생하기 때문에 벡터 내적의 결과가 부정확하게 나온다.
- 모든 스레드가 sum메모리를 공유하기 때문에 발생한다.
방법 3️⃣ Lock guard
void dotProductLock(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, unsigned long long& sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		std::scoped_lock lock(mtx);
		sum += (v0[i] * v1[i]);
	}
}
...
		unsigned long long sum = 0;
		vector<std::thread> threads;
		threads.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
sum값을 가지고 연산을 하고 그 결과를sum에 덮어 씌우는 과정을 한 스레드가 작업 중일땐 다른 스레드가 접근하지 못하도록 lock 하는 방법으로 레이스 컨디션을 해결할 수 있다.
std::scoped_lock lock(mtx);  // scoped_lock 은 unlock 이 필요 없다. C++ 17 부터 가능
sum += (v0[i] * v1[i]);
- 이제 벡터 내적의 값은 정확하게 나오지만 굉장히 느리다. 무려 2 초를 넘긴다 !
    - lock 을 1 억번 걸어주어야 하고 한 스레드가 작업을 하고 있을 땐 다른 스레드들이 기다려야 하기 때문에..
- 너무 느려서 멀티 스레딩 하는 보람이 없다.
        - 물론 경우에 따라 다르긴 하지만 이 경우엔 그렇다.
 
 
- scoped_lock을 for문 바깥에다 놓으면 빨라지지만 이건 비동기 작업이 아니라 그냥 동기적으로 순차적으로 실행되는거나 마찬가지다. 병렬 처리가 아님.
방법 4️⃣ atomic
void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
	for (unsigned int i = start; i < end; ++i)
	{
		sum += (v0[i] * v1[i]);
	}
}
...
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
		for (unsigned int t = 0; t < numThread; ++t)
			threads[t].join();
- 벡터 내적 결과를 담을 sum을atomic<unsigned long long>타입으로 받고 있다.- 메모리 값을 CPU로 가져오고, CPU에서 연산을 하고, 다시 연산 결과를 메모리에 덮어 씌우는 이 3 단계의 과정을 하나로 묶어준다.
- 이 3단계의 과정 도중엔 다른 스레드가 접근하지 못하도록!
 
- 결과는 정확하나 Lock Guard 처럼 느리다.
    - atomic연산이 일반 연산보다 느리기 때문에.
 
방법 5️⃣ Task-based parallelism (std::async)
auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
	const unsigned int start, const unsigned int end)
{
	int sum = 0;
	for (unsigned int i = start; i < end; ++i)
		sum += (v0[i] * v1[i]);
	return sum;
}
...
		unsigned long long sum = 0;
		vector<std::future<int>> futures;
		futures.resize(numThread);
		const unsigned int numPerThread = numData / numThread;
		for (unsigned int t = 0; t < numThread; ++t)
			futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread);
		for (unsigned int t = 0; t < numThread; ++t)
			sum += futures[t].get();
- futures- std::future객체들이 들어있는 벡터
 
- std::async을 통해- doProductFuture연산 결과를- futures[t]원소에 받는다.- doProductFuture함수- 위 예제들과 다르게 sum이doProductFuture의 인자로 들어가지 않는다.
- 위 예제의 다른 함수들과 다르게 void가 아닌 리턴 타입이 있는 함수로 설정했다다. 이를async에서 받아future객체에 리턴할 수 있도록! 로컬sum을 리턴한다.- 스레드는 연산 결과를 직접 받지 못하기 때문에 void 함수와 더불어 또 조작할 변수도 레퍼런스로 함께 넘겨 주었어야 했음
 
 
- 위 예제들과 다르게 
 
- futures[t]에 연산 결과를 받기 전까진- futures[t].get출력이 실행되지 않는다.- sum += futures[t].get();
        - thread는 레퍼런스로 sum을 넘겨서 함수 내부에서- +=했으니 최종적으로 또 취합할 일이 없었다. 대신- join으로 각 스레드가 일이 끝나길 기다리는 과정이 필요했음- thread는 다루는 sum이 global 한 변수였기 때문에 이를 레퍼런스로 넘겼다.
- 그러니 스레드들이 sum을 동시에 건드려 레이스 컨디션이 일어날 우려가 있었음
 
- async은 로컬 sum을 리턴하기 때문에 futures.get() 을 통해 리턴 받은 값을- sum +=취합해주어야 한다.- 로컬 sum이기에 함수 실행마다 각각 다른 별개의 sum이니까 서로 영향을 줄 일이 없다. 즉 레이스 컨디션이 일어날 일이 없음!
 
 
 
- sum += futures[t].get();
        
- 오히려 Naive 하게 멀티 스레딩 했던 2️⃣ 방법보다 연산이 더 빨랐다.
방법 6️⃣ Task-based parallelism (std::thread, std::promise)
		vector<std::promise<int>> prom;
		vector<std::future<int>> future;
		vector<std::thread> threads;
		prom.resize(numThread);
		future.resize(numThread);
		threads.resize(numThread);
		
		unsigned long long sum(0);
		unsigned numPerThread = numData / numThread;
		for (unsigned i = 0; i < numThread; ++i)
		{
			future[i] = prom[i].get_future();  // prom 에서 future 가져와 저장
			unsigned long long tempSum(0);
			threads[i] = std::thread([&](std::promise<int>&& prom)
			{
					std::scoped_lock lock2(mtx);
					
					for (unsigned int j = 0; j < numPerThread; ++j)
						tempSum += (vec0[j] * vec1[j]);
					prom.set_value(tempSum);
			}, std::move(prom[i]));
			threads[i].join();
		}
		for (unsigned int t = 0; t < numThread; ++t)
			sum += future[t].get();
5️⃣와 같다. 다만 async을 thread로 바꾸고 promise로 future에 연산 결과를 받도록 설정한 것 뿐!
- vector 3개 필요
    vector<std::promise<int>> prom; vector<std::future<int>> future; vector<std::thread> threads;
- 첫 번째 for 문
    - future 객체에 약속한 promise들을 넘기기로 약속한다.
        - future[i] = prom[i].get_future();
 
- 아래와 같은 기능을 하는 람다함수를 스레드에게 부여하고 매개 변수인 prom이prom[i]을 R-value로서 참조하기 때문에 이 연산 결과를future로 저장하게 한다.std::scoped_lock lock2(mtx); for (unsigned int j = 0; j < numPerThread; ++j) tempSum += (vec0[j] * vec1[j]); prom.set_value(tempSum);
 
- future 객체에 약속한 promise들을 넘기기로 약속한다.
        
- 두 번째 for 문
    - futures[t]에 연산 결과를 받기 전까진- futures[t].get출력이 실행되지 않는다.
 
방법 7️⃣ std::transform_reduce
#include <numeric>
const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
- std::transform_reduce- 병렬 처리를 지원해주는 C++ 표준 함수이다.
- inner_product의 발전형으로 병렬 처리로 벡터 내적을 계산한다.
 
- inner_product보다 인수가 하나 더 필요하다.- std::execution::par- 병렬처리로 돌려달라.
- 마치 스레드를 사용하는 것 처럼
 
- std::execution::seq- 병렬처리 말고 순차적으로, 동기적으로 돌려달라.
 
 
🔔 참고
🌜 개인 공부 기록용 블로그입니다. 오류나 틀린 부분이 있을 경우 
언제든지 댓글 혹은 메일로 지적해주시면 감사하겠습니다! 😄
 
      
    
댓글남기기