C++ Chapter 19.6 : 멀티 스레딩 예제 (벡터 내적)
카테고리: Cpp
태그: Cpp Programming
인프런에 있는 홍정모 교수님의 홍정모의 따라 하며 배우는 C++ 강의를 듣고 정리한 필기입니다. 😀
🌜 [홍정모의 따라 하며 배우는 C++]강의 들으러 가기!
chapter 19. 모던 C++ 필수 요소들
멀티 스레딩 예제 (벡터 내적)
🔔 벡터 내적(Dot Product)이란
\[\vec{A}\cdot\vec{B} = x_A{\cdot}x_B + y_A{\cdot}y_B \]
- 두 벡터의 성분끼리 곱해서 더한 스칼라 값
- 수학적으로는 A dot B 으로 표현한다.
벡터 내적
의 원리와 개념 참고
🔔 벡터 내적을 계산 하는 코드
7 가지 방법으로 벡터 내적을 계산해보고 각각의 방법이 얼마나 효율적인지 실행 시간을 재볼 것이다.
전체 코드와 비교
#include <iostream>
#include <chrono>
#include <mutex>
#include <utility>
#include <vector>
#include <atomic>
#include <numeric> // std::inner_product
#include <random>
#include <execution> // parallel execution
#include <future>
#include <thread>
using namespace std;
mutex mtx;
void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, unsigned long long& sum)
{
for (unsigned int i = start; i < end; ++i)
{
sum += (v0[i] * v1[i]);
}
}
void dotProductLock(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, unsigned long long& sum)
{
//cout << "Thread Start" << endl;
for (unsigned int i = start; i < end; ++i)
{
std::scoped_lock lock(mtx);
sum += (v0[i] * v1[i]);
}
//cout << "Thread End" << endl;
}
void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
for (unsigned int i = start; i < end; ++i)
{
sum += (v0[i] * v1[i]);
}
}
auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end)
{
int sum = 0;
for (unsigned int i = start; i < end; ++i)
sum += (v0[i] * v1[i]);
return sum;
}
int main()
{
const long long numData = 100'000'000;
const unsigned int numThread = 4;
// thread 개수 늘린다고 degree of multithreading이 높아지지는 않음
vector<int> vec0, vec1;
vec0.reserve(numData);
vec1.reserve(numData);
std::random_device seed;
std::mt19937_64 makerand(seed());
std::uniform_int_distribution<> range(1, 10);
for (long long i = 0; i < numData; ++i)
{
vec0.push_back(range(makerand));
vec1.push_back(range(makerand));
}
cout << "실험 1번 - std::inner_product\n";
{
const auto sta = std::chrono::steady_clock::now(); // 시간 측정 시작
const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
// 두 벡터의 개수가 같다고 가정하므로 vec1은 begin만 있어도 되며 ull은 unsigned long long의 약자
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
cout << "실험 2번 - Naive\n";
{
const auto sta = std::chrono::steady_clock::now();
unsigned long long sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1),
t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
cout << "실험 3번 - Lockguard\n";
{
const auto sta = std::chrono::steady_clock::now();
unsigned long long sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1),
t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
cout << "실험 4번 - Atomic\n";
{
const auto sta = std::chrono::steady_clock::now();
atomic<unsigned long long> sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductAtomic, std::ref(vec0), std::ref(vec1),
t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
cout << "실험 5번 - Future\n";
{
const auto sta = std::chrono::steady_clock::now();
unsigned long long sum = 0;
vector<std::future<int>> futures;
futures.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1),
t * numPerThread, (t + 1) * numPerThread);
for (unsigned int t = 0; t < numThread; ++t)
sum += futures[t].get();
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
// TODO : Future의 divide and conquer 방식은 thread에서도 구현해보자.
// async 대신에 thread와 promise를 사용해서 future을 사용해보자.
cout << "실험 6번 - promise\n";
{
const auto sta = std::chrono::steady_clock::now();
vector<std::promise<int>> prom;
vector<std::future<int>> future;
vector<std::thread> threads;
prom.resize(numThread);
future.resize(numThread);
threads.resize(numThread);
unsigned long long sum(0);
unsigned numPerThread = numData / numThread;
for (unsigned i = 0; i < numThread; ++i)
{
future[i] = prom[i].get_future();
unsigned long long tempSum(0);
threads[i] = std::thread([&](std::promise<int>&& prom)
{
std::scoped_lock lock2(mtx);
for (unsigned int j = 0; j < numPerThread; ++j)
tempSum += (vec0[j] * vec1[j]);
prom.set_value(tempSum);
}, std::move(prom[i]));
threads[i].join();
}
for (unsigned int t = 0; t < numThread; ++t)
sum += future[t].get();
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
cout << "실험 7번 - std::transform_reduce\n";
{
const auto sta = std::chrono::steady_clock::now();
const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
const chrono::duration<double> dur = chrono::steady_clock::now() - sta;
cout << "연산에 소요된 시간 = " << dur.count() << '\n';
cout << "결과값 = " << sum << '\n';
cout << '\n';
}
}
💎출력💎
실험 1번 - std::inner_product
연산에 소요된 시간 = 0.068767
결과값 = 3025113384
실험 2번 - Naive
연산에 소요된 시간 = 0.04685
결과값 = 889569211
실험 3번 - Lockguard
연산에 소요된 시간 = 2.15617
결과값 = 3025113384
실험 4번 - Atomic
연산에 소요된 시간 = 1.31186
결과값 = 3025113384
실험 5번 - Future
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384
실험 6번 - promise
연산에 소요된 시간 = 0.0189238
결과값 = 3025113384
실험 7번 - std::transform_reduce
연산에 소요된 시간 = 0.014745
결과값 = 3025113384
- 1️⃣std::inner_product ⭕, 빠름
- 2️⃣ 순진한 멀티 쓰레딩 ❌, 빠름
- 레이스 컨디션으로 값이 부정확하게 나온다.
- 3️⃣ Lock guard ⭕, 느림
lock
,unlock
과정 때문에 느리다.
- 4️⃣ atomic ⭕, 느림
atomic
의 연산은 느린 편이다.
- 5️⃣ Task-based parallelism (std::async) ⭕, 빠름
- 6️⃣ Task-based parallelism (std::thread, std::promise) ⭕, 빠름
- 7️⃣ std::transform_reduce ⭕, 빠름
main 함수
using namespace std;
mutex mtx;
int main()
{
const long long numData = 100'000'000;
const unsigned int numThread = 4;
vector<int> vec0, vec1;
vec0.reserve(numData);
vec1.reserve(numData);
std::random_device seed;
std::mt19937_64 makerand(seed());
std::uniform_int_distribution<> range(1, 10);
for (long long i = 0; i < numData; ++i)
{
vec0.push_back(range(makerand));
vec1.push_back(range(makerand));
}
mtx
- mutex 객체로 전역 변수로 선언 되었다.
numData
- 벡터 크기
- 1억개 !
numThread
- 스레드 개수를 4 개로 지정했다.
- cf) 스레드를 많이 쓴다고 멀티스레딩 효율이 높아지는건 아니다
vec0
,vec1
- 두 벡터 컨테이너
- 각각 reserve 함수를 통해
numData
(1억) 크기를 가진 벡터로 만들었다.
seed
- 랜덤넘버 생성 시드
- std::mt19937_64 makerand(seed());
- 랜덤 넘버 생성기
- std::uniform_int_distribution<> range(1, 10);
- 1~10 범위 중 랜덤 넘버
- for문
- 벡터 크기안 1억번 만큼 for문을 돌며 두 벡터에 각각 1~10 중 랜덤한 숫자를 원소로 삽입한다.
방법 1️⃣ std::inner_product
#include <numeric>
std::inner_product
은 두 벡터의 내적값을 리턴해주는 함수다.
const auto sum = std::inner_product(vec0.begin(), vec0.end(), vec1.begin(), 0ull);
sum
에 벡터 내적의 결과를 담음vec1
은end()
없이begin()
만 있어도 된다.- 벡터 내적을 구할 땐 두 벡터의 성분 개수가 같다고 가정하므로.
0ull
- 0을 unsigned long long 으로 표기
- 마치 16진수면 0x라 하는것과 같음
sum
이 unsigned long long int 타입이 되게끔.- 그냥 0이라고 하면 sum은 unsigned long long int 가 아닌 그냥 int가 됨.
- 0을 unsigned long long 으로 표기
방법 2️⃣ 순진한 멀티 쓰레딩
void dotProductNaive(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, unsigned long long& sum)
{
for (unsigned int i = start; i < end; ++i)
{
sum += (v0[i] * v1[i]);
}
}
...
unsigned long long sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductNaive, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
threads
- 스레드들을 담는 vector
- 벡터의 사이즈를
numThread
, 즉 4 개 만큼 늘렸다.
numPerThread
- 1 개의 스레드 당 작업할 데이터 개수
- 모든 스레드의 작업 수가 모두 같다고 가정. (나머지가 0인 경우를 가정)
numData
값이 1 억이니 한 스레드 당 2500 만개씩의 데이터를 병렬 처리 하는 셈이다.
- 첫번째 for문 👉 4 번 돈다.
- 스레드 마다 벡터 내적을 구하는 작업을 부여
- 하나의 스레드마다 2500만개(
numPerThread
)의 데이터의 벡터 내적을 구하고 결과를sum
에 담는다.- 첫번째 인수는 함수
doProductNaive
. 나머지 인자들은 함수에 필요한 인자들.- 시작 범위 t * numPerThread, 끝 범위 (t + 1) * numPerThread
- 첫번째 인수는 함수
- 하나의 스레드마다 2500만개(
- 스레드 마다 벡터 내적을 구하는 작업을 부여
- 두번째 for문
- 벡터의 스레드들이 작업이 끝나기를 대기
이 방법은 레이스 컨디션이 발생하기 때문에 벡터 내적의 결과가 부정확하게 나온다.
- 모든 스레드가
sum
메모리를 공유하기 때문에 발생한다.
방법 3️⃣ Lock guard
void dotProductLock(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, unsigned long long& sum)
{
for (unsigned int i = start; i < end; ++i)
{
std::scoped_lock lock(mtx);
sum += (v0[i] * v1[i]);
}
}
...
unsigned long long sum = 0;
vector<std::thread> threads;
threads.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
sum
값을 가지고 연산을 하고 그 결과를sum
에 덮어 씌우는 과정을 한 스레드가 작업 중일땐 다른 스레드가 접근하지 못하도록 lock 하는 방법으로 레이스 컨디션을 해결할 수 있다.
std::scoped_lock lock(mtx); // scoped_lock 은 unlock 이 필요 없다. C++ 17 부터 가능
sum += (v0[i] * v1[i]);
- 이제 벡터 내적의 값은 정확하게 나오지만 굉장히 느리다. 무려 2 초를 넘긴다 !
- lock 을 1 억번 걸어주어야 하고 한 스레드가 작업을 하고 있을 땐 다른 스레드들이 기다려야 하기 때문에..
- 너무 느려서 멀티 스레딩 하는 보람이 없다.
- 물론 경우에 따라 다르긴 하지만 이 경우엔 그렇다.
scoped_lock
을 for문 바깥에다 놓으면 빨라지지만 이건 비동기 작업이 아니라 그냥 동기적으로 순차적으로 실행되는거나 마찬가지다. 병렬 처리가 아님.
방법 4️⃣ atomic
void dotProductAtomic(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end, atomic<unsigned long long> & sum)
{
for (unsigned int i = start; i < end; ++i)
{
sum += (v0[i] * v1[i]);
}
}
...
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
threads[t] = std::thread(dotProductLock, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread, std::ref(sum));
for (unsigned int t = 0; t < numThread; ++t)
threads[t].join();
- 벡터 내적 결과를 담을
sum
을atomic<unsigned long long>
타입으로 받고 있다.- 메모리 값을 CPU로 가져오고, CPU에서 연산을 하고, 다시 연산 결과를 메모리에 덮어 씌우는 이 3 단계의 과정을 하나로 묶어준다.
- 이 3단계의 과정 도중엔 다른 스레드가 접근하지 못하도록!
- 결과는 정확하나 Lock Guard 처럼 느리다.
atomic
연산이 일반 연산보다 느리기 때문에.
방법 5️⃣ Task-based parallelism (std::async)
auto dotProductFuture(const vector<int>& v0, const vector<int>& v1,
const unsigned int start, const unsigned int end)
{
int sum = 0;
for (unsigned int i = start; i < end; ++i)
sum += (v0[i] * v1[i]);
return sum;
}
...
unsigned long long sum = 0;
vector<std::future<int>> futures;
futures.resize(numThread);
const unsigned int numPerThread = numData / numThread;
for (unsigned int t = 0; t < numThread; ++t)
futures[t] = std::async(dotProductFuture, std::ref(vec0), std::ref(vec1), t * numPerThread, (t + 1) * numPerThread);
for (unsigned int t = 0; t < numThread; ++t)
sum += futures[t].get();
futures
std::future
객체들이 들어있는 벡터
std::async
을 통해doProductFuture
연산 결과를futures[t]
원소에 받는다.doProductFuture
함수- 위 예제들과 다르게
sum
이doProductFuture
의 인자로 들어가지 않는다. - 위 예제의 다른 함수들과 다르게
void
가 아닌 리턴 타입이 있는 함수로 설정했다다. 이를async
에서 받아future
객체에 리턴할 수 있도록! 로컬sum
을 리턴한다.- 스레드는 연산 결과를 직접 받지 못하기 때문에 void 함수와 더불어 또 조작할 변수도 레퍼런스로 함께 넘겨 주었어야 했음
- 위 예제들과 다르게
futures[t]
에 연산 결과를 받기 전까진futures[t].get
출력이 실행되지 않는다.- sum += futures[t].get();
thread
는 레퍼런스로 sum을 넘겨서 함수 내부에서+=
했으니 최종적으로 또 취합할 일이 없었다. 대신join
으로 각 스레드가 일이 끝나길 기다리는 과정이 필요했음- thread는 다루는 sum이 global 한 변수였기 때문에 이를 레퍼런스로 넘겼다.
- 그러니 스레드들이 sum을 동시에 건드려 레이스 컨디션이 일어날 우려가 있었음
async
은 로컬 sum을 리턴하기 때문에 futures.get() 을 통해 리턴 받은 값을sum +=
취합해주어야 한다.- 로컬 sum이기에 함수 실행마다 각각 다른 별개의 sum이니까 서로 영향을 줄 일이 없다. 즉 레이스 컨디션이 일어날 일이 없음!
- sum += futures[t].get();
- 오히려 Naive 하게 멀티 스레딩 했던 2️⃣ 방법보다 연산이 더 빨랐다.
방법 6️⃣ Task-based parallelism (std::thread, std::promise)
vector<std::promise<int>> prom;
vector<std::future<int>> future;
vector<std::thread> threads;
prom.resize(numThread);
future.resize(numThread);
threads.resize(numThread);
unsigned long long sum(0);
unsigned numPerThread = numData / numThread;
for (unsigned i = 0; i < numThread; ++i)
{
future[i] = prom[i].get_future(); // prom 에서 future 가져와 저장
unsigned long long tempSum(0);
threads[i] = std::thread([&](std::promise<int>&& prom)
{
std::scoped_lock lock2(mtx);
for (unsigned int j = 0; j < numPerThread; ++j)
tempSum += (vec0[j] * vec1[j]);
prom.set_value(tempSum);
}, std::move(prom[i]));
threads[i].join();
}
for (unsigned int t = 0; t < numThread; ++t)
sum += future[t].get();
5️⃣와 같다. 다만 async
을 thread
로 바꾸고 promise
로 future
에 연산 결과를 받도록 설정한 것 뿐!
- vector 3개 필요
vector<std::promise<int>> prom; vector<std::future<int>> future; vector<std::thread> threads;
- 첫 번째 for 문
- future 객체에 약속한 promise들을 넘기기로 약속한다.
future[i] = prom[i].get_future();
- 아래와 같은 기능을 하는 람다함수를 스레드에게 부여하고 매개 변수인
prom
이prom[i]
을 R-value로서 참조하기 때문에 이 연산 결과를future
로 저장하게 한다.std::scoped_lock lock2(mtx); for (unsigned int j = 0; j < numPerThread; ++j) tempSum += (vec0[j] * vec1[j]); prom.set_value(tempSum);
- future 객체에 약속한 promise들을 넘기기로 약속한다.
- 두 번째 for 문
futures[t]
에 연산 결과를 받기 전까진futures[t].get
출력이 실행되지 않는다.
방법 7️⃣ std::transform_reduce
#include <numeric>
const auto sum = std::transform_reduce(std::execution::par, vec0.begin(), vec0.end(), vec1.begin(), 0ull);
std::transform_reduce
- 병렬 처리를 지원해주는 C++ 표준 함수이다.
inner_product
의 발전형으로 병렬 처리로 벡터 내적을 계산한다.
inner_product
보다 인수가 하나 더 필요하다.std::execution::par
- 병렬처리로 돌려달라.
- 마치 스레드를 사용하는 것 처럼
std::execution::seq
- 병렬처리 말고 순차적으로, 동기적으로 돌려달라.
🔔 참고
🌜 개인 공부 기록용 블로그입니다. 오류나 틀린 부분이 있을 경우
언제든지 댓글 혹은 메일로 지적해주시면 감사하겠습니다! 😄
댓글남기기