[Effective Modern C++] Chapter 7. 동시성 API [항목 35~40]

Chapter 7. 동시성 API

항목 35. 스레드 기반 프로그래밍보다 과제 기반 프로그래밍을 선호하라.

확실히 과제 기반 프로그래밍은 스레드 기반 프로그래밍보다 더 쉽고 가독성이나 설계 측면에서 더 우월하다. 하지만, 과하게 마구잡이로 사용하는 것을 주의해야 한다고 느꼈다. 스레드를 쓰면 간단하게 되는 것을 과제 기반으로 하려고 억지로 짜 맞추는 것은 좋지 않다.
그리고 과제 기반 프로그래밍은 스레드 기반 보다 더 높은 추상화로서, 내부적으로 스레드 고갈, over subscription, load balancing등의 문제를 해결해줄 수 도 있다. 실제로 현재에는 std::async가 visual studio에서는 PPL을 바탕으로 위와 같은 것을 어느 정도 해결해주고 있다. 그러나, libc++ (llvm의 std 구현)과 libstdc++ (gcc의 std구현)의 경우 비효율적인 방법으로 구현되어 있다. (관련 내용 : http://rpgmakerxp.tistory.com/63)
정말 비동기 작업을 효율적으로 진행해야 하고, 더 많은 부분들(thread priority, affinity, thread pool등)을 직접 컨트롤 해야하는 경우에는 std::async가 적합하지 않다. 이런 경우는 std::thread 혹은 Windows API / pthreads 등을 사용해야 한다.
그리고, 아직 C++11/14의 과제 기반 프로그래밍은 빈약한 편이므로, 좀 더 본격적인 과제 기반 프로그래밍이 필요한 경우에는 TBB, PPL, HPX등을 사용하는 것이 좋아 보인다. (그러나, C++의 향후 표준들에서 점차 강력해질 것이다.)

[기억해 둘 사항들]
 - std::thread API에서는 비동기적으로 실행된 함수의 반환값을 직접 얻을 수 없으며, 만일 그런 함수가 예외를 던지면 프로그램이 종료된다.
 - 스레드 기반 프로그래밍에서는 스레드 고갈, 과다구독, 부하 균형화, 새 플랫폼으로의 적응을 독자가 직접 처리해야 한다.
 - std::async와 기본 시동 방침을 이용한 과제 기반 프로그래밍은 그런 대부분의 문제를 알아서 처리해준다.

항목 36. 비동기성이 필수일 때에는 std::launch::async를 지정하라.

그렇다.

[기억해 둘 사항들]
 - std::async의 기본 시동 방침은 과제의 비동기적 실행과 동기적 실행을 모두 허용한다.
 - 그러나 이러한 유연성 때문에 thread_local 접근의 불확실성이 발생하고, 과제가 절대로 실행되지 않을 수도 있고, 시간 만료 기반 wait 호출에 대한 프로그램 논리에도 영향이 미친다.
 - 과제를 반드시 비동기적으로 실행해야 한다면 std::launch::async를 지정하라.

항목 37. std::thread들을 모든 경로에서 합류 불가능하게 만들어라.

이 것은 되게 중요한 문제이다. 핵심은 단순하다. std::thread들을 모든 경로에서 합류 불가능 (unjoinable)하게 만들어야 한다. 만약 joinable한 상태에서 std::thread 객체의 소멸자가 호출되면 프로그램은 그냥 종료 되어버린다. 표준 위원회가 이런 선택을 한 것은 이유가 있다. 만약 그렇게 하지 않으려면, thread가 암묵적으로 join이 되거나 detach가 되는 방법을 채택해야 하는데, 이 것은 더 심각한 문제들을 가져올 수 있기 때문이다. 따라서 std::thread 객체는 항상 unjoinable한 상태에서 소멸되도록 표준을 제정한 것이 옳다.
#include <thread>
#include <condition_variable>
#include <iostream>
#include <atomic>
#include <vector>
#include <chrono>
#include <numeric>

class BackgroundWorkerPool
{
public:
    BackgroundWorkerPool()
    {
        std::cout << "[Main Thread] BackgroundWorkerPool() \n";
        m_worker[0] = std::thread(&BackgroundWorkerPool::BackgroundWorkerMain, this, 1);
        m_worker[1] = std::thread(&BackgroundWorkerPool::BackgroundWorkerMain, this, 2);
    }

    ~BackgroundWorkerPool()
    {
        std::cout << "[Main Thread] ~BackgroundWorkerPool() \n";
        m_workerTerminated = true;
        m_cvData.notify_all();
        m_worker[0].join();
        m_worker[1].join();
        std::cout << "[Main Thread] After joining worker threads. \n";
    }

    void FeedData(const std::vector<int>& data)
    {
        {
            std::lock_guard<decltype(m_mutex)> lk(m_mutex);
            std::cout << "[Main Thread] Feed data. \n";
            m_data.insert(std::end(m_data), std::begin(data), std::end(data));
            m_dataExist = true;
        }
        m_cvData.notify_one();
    }

private:
    void BackgroundWorkerMain(int id)
    {
        while (true)
        {
            std::unique_lock<decltype(m_mutex)> lk(m_mutex);
            m_cvData.wait_for(lk, std::chrono::milliseconds(3000), [this]() {
                return m_dataExist || m_workerTerminated;
            });

            std::cout << "[Worker Thread " << id << "] Wake up! \n";

            if (m_workerTerminated)
                break;

            std::cout << "[Worker Thread " << id << "] WORK! \n";
            auto result = std::accumulate(std::begin(m_data), std::end(m_data), 0);
            std::cout << "[Worker Thread " << id << "] Result = " << result << " \n";
            m_data.clear();
            m_dataExist = false;
        }

        std::cout << "[Worker Thread " << id << "] I'm terminated. \n";
    }

    std::mutex m_mutex;
    std::atomic<bool> m_workerTerminated{ false };
    std::vector<int> m_data;
    bool m_dataExist{ false };
    std::condition_variable m_cvData;
    std::thread m_worker[2];
};

int main()
{
    BackgroundWorkerPool workerPool;
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    workerPool.FeedData({ 1,2,3,4,5,6,7,8 });
    workerPool.FeedData({ 1 });
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    workerPool.FeedData({ 1,2,3,4,5 });
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    workerPool.FeedData({ 1,2 });
    workerPool.FeedData({ 1,2,3,4,5,6,7,8,9 });
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    workerPool.FeedData({ 1,2,3,4,5,6,7,8,9,10,11,12 });
    std::this_thread::sleep_for(std::chrono::milliseconds(500));

    /* Execution Result
        [Main Thread] BackgroundWorkerPool()
        [Main Thread] Feed data.
        [Main Thread] Feed data.
        [Worker Thread 2] Wake up!
        [Worker Thread 2] WORK!
        [Worker Thread 2] Result = 37
        [Main Thread] Feed data.
        [Worker Thread 2] Wake up!
        [Worker Thread 2] WORK!
        [Worker Thread 2] Result = 15
        [Main Thread] Feed data.
        [Main Thread] Feed data.
        [Worker Thread 1] Wake up!
        [Worker Thread 1] WORK!
        [Worker Thread 1] Result = 48
        [Main Thread] Feed data.
        [Worker Thread 1] Wake up!
        [Worker Thread 1] WORK!
        [Worker Thread 1] Result = 78
        [Main Thread] ~BackgroundWorkerPool()
        [Worker Thread 2] Wake up!
        [Worker Thread 2] I'm terminated.
        [Worker Thread 1] Wake up!
        [Worker Thread 1] I'm terminated.
        [Main Thread] After joining worker threads.
    */
}

    Thread를 합류 불가능하게 만들려면, thread를 detach하거나 join해야 한다. 근데, detach를 하는 것은 깔끔하지 못한 일종의 꼼수책이다. 따라서 join을 해서 끝내는 것이 깔끔하고 더 일반적인 방법인데 이 방법을 사용하려면, thread를 외부에서 종료 시킬 수 있어야 한다. Windows API의 TerminatedThread 같은 것을 이용해서 강제적으로 종료 시키면 각종 많은 문제들이 발생하기 때문에 graceful stop을 구현해야만 한다. 나는 보통 위 코드와 같이 구현해서 사용한다. 이 때 blocking I/O같이 infinite하게 blocking되는 함수 호출이나, 오랜 시간이 걸리는 연산을 수행하는 코드를 주의해야 한다. 즉, 수시로 termination 여부를 polling 할 수 있어야 하고, 이러한 polling의 간격이 곧 thread termination 요청을 하고 나서 실제 join이 될 때 까지의 응답 시간이 된다. 위 코드의 BackgroundWorkerPool의 소멸자에서 m_cvData.notify_all(); 를 한 것과 같이 thread 내의 blocking operation을 직접 깨울 수 있는 방법이 있을 때는 그러한 응답 시간을 더 줄일 수 있다.
    아무튼 나는 위 코드와 같은 방법을 사용하는 데, 더 좋은 방법이 있는 지는 잘 모르겠다. 혹시 이 글을 보는 분들 중 더 좋은 방법을 아는 분이 계시면 알려주시면 감사하겠다. (C++ Concurrency in action 책에 interruptible thread 라는 개념이 등장한다던데 결국 근본적으로는 내가 쓰는 방법과 같은 것으로 보인다.)

[기억해 둘 사항들]
 - 모든 경로에서 std::thread를 합류 불가능으로 만들어라.
 - 소멸 시 join 방식은 디버깅하기 어려운 성능 이상으로 이어질 수 있다.
 - 소멸 시 detach 방식은 디버깅하기 어려운 미정의 행동으로 이어질 수 있다.
 - 자료 멤버 목록에서 std::thread 객체를 마지막에 선언하라.

항목 38. 스레드 핸들 소멸자들의 다양한 행동 방식을 주의하라.

std::thread와 비지연 과제에 대한 미래 객체는 시스템 스레드에 대응된다는 점에서 모두 '시스템 스레드 핸들' 이라고 말할 수 있다. 이러한 스레드 핸들의 소말자에 행동 방식에 대해 다루는 것이 이 항목이다. 일단 std::thread의 소멸자에 대한 부분은 항목 37에서 다루고 있다. 그러므로 이 항목에서는 미래 객체 소멸자의 행동 방식들에 대해 다룬다.
행동 방식은 제각각인 것처럼 보일 수 있지만, 사실 핵심은 간단하다.
"시스템 스레드과 대응되는 유일한 미래객체일 경우에만, 시스템 스레드에 대해 암묵적 join을 수행한다. 그 외의 경우에는 그냥 바로 객체가 소멸된다."
즉, 어떤 미래 객체가 소멸자에서 암묵적 join을 수행할 지, 아니면 그냥 바로 객체가 소멸되고 끝날 지 판단하는 것은 이 미래 객체가 시스템 스레드와 대응되는 유일한 미래 객체인지만 생각해보면 된다.
몇 가지 케이스를 통해 살펴보자.
1. 지연된 과제에 대한 미래 객체
    - 대응되는 시스템 스레드가 없으므로 바로 객체가 소멸된다.
2. std::async (with std::launch::async) 호출에 의해 생성된 공유 상태를 참조하는 std::future
    - 비동기적으로 실행된 과제의 공유 상태에 유일하게 대응되는 미래 객체이므로 암묵적 join이 수행된 뒤 객체가 소멸된다.
3. std::async (with std::launch::async) 호출에 의해 생성된 공유 상태를 참조하는 std::shared_future
    - 여러 개의 std::shared_future가 공유 상태를 참조하고 있을 것이다.
    - 따라서 공유 상태를 참조하고 있는 마지막 std::shared_future 만 암묵적 join이 수행된다.
4. std::packaged_task가 std::thread에 의해 실행되고 있을 때, std::packaged_task으로 부터 얻어진 std::future
    - 시스템 스레드가 std::thread에 대응되므로, std::future는 시스템 스레드에 대한 책임이 없다. 따라서 바로 객체가 소멸된다.

즉, 결과적으로만 보면 "std::async를 통해서 시동된 비지연(지연되지 않은) 과제에 대한 공유 상태를 참조하는 마지막 미래 객체의 소멸자"의 경우에만 과제가 완료될 때까지 차단 (즉, 암묵적 join) 되는 것이다.
하지만, 이 것의 근본적인 이유는 위에서 말한 '핵심' 때문이다.
그리고 더 나아가면 사실 std::thread 소멸자의 행동 방식과도 일맥상통하는 원리를 얻을 수 있다.
결국에는 시스템 스레드 (바탕 스레드)에 대응되는 마지막 핸들이 시스템 스레드를 책임져야하는 것인데, 시스템 스레드가 종료되기 전에 핸들(객체)가 소멸될 경우, 바로 객체가 소멸되지 않고 특별한 행동이 일어나는 것이다.
그리고 그 특별한 행동은 std::thread의 경우에는 프로그램 종료, 미래 객체의 경우에는 암묵적 join인 것이다.
다만, 표준위원회가 왜 std::thread와 미래 객체에 대해 특별한 행동으로서 서로 다른 것을 선택했는지는 잘 모르겠다.
아무튼, 나는 그래서 std::thread는 항상 unjoinable한 상태에서만 소멸할 수 있도록 코드를 짜고, 미래 객체가 소멸하면서 암시적 join이 일어날 수 있는 곳에는 주석으로서 그 사실을 명시한다.

[기억해 둘 사항들]
 - 미래 객체의 소멸자는 그냥 미래 객체의 자료 멤버들을 파괴할 뿐이다.
 - std::async를 통해 시동된 비지연 과제에 대한 공유 상태를 참조하는 마지막 미래 객체의 소멸자는 그 과제가 완료될 때까지 차단된다.

항목 39. 단발성 사건 통신에는 void 미래 객체를 고려하라.

일반적으로 thread 사이에서 어떤 사건(이벤트)를 통지하거나 흐름을 통제하고 싶을 경우, condition variable과 flag 변수가 조합되어 사용된다. 그러나 단발성 사건 통신의 경우에는 void 미래 객체를 활용하면 훨씬 간단하고 깔끔하게 설계가 가능하다.

[기억해 둘 사항들]
 - 간단한 사건 통신을 수행할 때, 조건 변수 기반 설계에는 여분의 뮤텍스가 필요하고, 검출 과제와 반응 과제의 진행 순서에 제약이 있으며, 사건이 실제로 발생했는지를 반응 과제가 다시 확인해야 한다.
 - 플래그 기반 설계를 사용하면 그런 단점들이 없지만, 대신 차단이 아니라 폴링이 일어난다는 단점이 있다.
 - 조건 변수와 플래그를 조합할 수 도 있으나, 그런 조합을 이용한 통신 메커니즘은 필요 이상으로 복잡하다.
 - std::promise와 미래 객체를 사용하면 이러한 문제점들을 피할 수 있지만, 그런 접근방식은 공유 상태에 힙 메모리를 사용하며, 단발성 통신만 가능하다.

항목 40. 동시성에는 std::atomic을 사용하고, volatile은 특별한 메모리에 사용하라.

동시성에 있어서 그냥 일반적인 변수를 사용할 때 데이터 레이스등의 문제가 발생하는 이유는 다음과 같다.
1. 변수가 레지스터에 할당(캐시) 될 수 있다.
2. Compiler instruction reordering에 의해 memory access의 순서가 바뀔 수 있다.
    - A; B; 와 같은 코드가 compiler에 의해 B; A; 와 같이 순서가 바뀔 수 있다.
    - http://preshing.com/20120625/memory-ordering-at-compile-time/
3. Memory visibility가 보장되지 않을 수 있다.
    - 프로세서의 out of order execution등에 의해 memory visibility가 보장되지 않을 수 있다.
    - 프로세서마다 memory consistency model이 다르다.
    - http://preshing.com/20120930/weak-vs-strong-memory-models/
    - http://www.kandroid.org/board/board.php?board=AndroidBeginner&command=body&no=102
    - http://egloos.zum.com/studyfoss/v/5141402
    - http://stackoverflow.com/questions/7346893/out-of-order-execution-and-memory-fences (x86)

    이 3가지 이유중에 '표준' volatile'은 오직 1번 밖에 해결하지 못한다. 그러나 몇몇 컴파일러들을 volatile keyword에 대해 추가적으로 다른 문제점들을 해결해주기도 한다. 하지만, 표준적으로는 volatile을 동시성에 사용하는 것은 매우 위험한 행위이고, 위의 3가지 문제를 모두 해결하려면 C++11이후의 memory fense, std::atomic 등의 기능을 사용해야만 한다.
    그렇다고 std::atomic이 volatile의 기능을 포함하고 있다는 식으로 착각하면 안된다. 예를 들면, x = 1; x = 2; 와 같은 코드의 경우, x가 volatile로 선언되있을 경우에는 최적화가 발생하지 않지만, x가 std::atomic인 경우 x = 2; 와 같은 식으로 최적화가 진행 될 수 있다. 따라서, volatile과 std::atomic은 포함 관계가 아닌 아예 서로 다른 역할과 기능을 가지고 있는 것으로 인식 하는 것이 옳다.
    또 특별히 우리가 많이 사용하는 x86 의 경우에는 강력한 memory consistency model을 가지고 있다. 따라서 read-after-write의 경우를 제외하면 sequentially consistency를 가지고 있다고 할 수 있다. (자세한 건 위 링크들 참조) 따라서, 그냥 동시성을 위해 volatile을 써도 평소에 큰 문제가 생기지 않았을 가능성이 크고, 이에 따라 많은 사람들이 동시성에 volatile을 사용하는 실수와 착각을 하고 있다고 생각한다. (나도 과거에 그런 착각을 하던 시절이 있었다.)

[기억해 둘 사항들]
 - std::atomic은 뮤텍스 보호 없이 여러 스레드가 접근하는 자료를 위한 것으로, 동시적 소프트웨어의 작성을 위한 도구이다.
 - volatile은 읽기와 기록을 최적화로 제거하지 말아야 하는 메모리를 위한 것으로, 특별한 메모리를 다룰 때 필요한 도구이다.



Concurrency 너무 좋아!

댓글