Files
GdCpp12/include/CWorkThread.h

349 lines
10 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#pragma once
#include "GdCPP_Exports.h"
// 不依赖 MFC 的工作线程实现
// 提供Start/ Quit/ Wait / PostMessage / SetTimer / KillTimer / PostToThread
#include <windows.h>
#include <process.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <unordered_map>
#include <chrono>
#include <functional>
#include <memory>
#include <atomic>
#ifndef WM_QUIT_THREAD // 防止重复定义
#define WM_QUIT_THREAD (WM_USER +0)
#endif
class CWorkThread
{
public:
using clock_t = std::chrono::steady_clock;
using ms_t = std::chrono::milliseconds;
struct Message {
UINT message;
WPARAM wParam;
LPARAM lParam;
};
CWorkThread(std::function<int()> func)
: _Run(func)
, m_running(false)
, m_hThread(nullptr)
, m_threadId(0)
{
}
CWorkThread(int(*func)())
: _Run(std::function<int()>(func))
, m_running(false)
, m_hThread(nullptr)
, m_threadId(0)
{
}
virtual ~CWorkThread()
{
// 尝试优雅停止(上层最好先调用 Quit() + Wait()
if (m_running) {
Quit();
Wait(200);
}
if (m_hThread) {
CloseHandle(m_hThread);
m_hThread = nullptr;
}
unregisterThread();
}
// 启动线程。返回 true 表示启动成功
bool Start(int prio = THREAD_PRIORITY_NORMAL)
{
if (m_running) return false;
m_running = true;
// 创建 native 线程threadProc 会在内部设置线程局部 current pointer
unsigned tid = 0;
uintptr_t h = _beginthreadex(nullptr, 0, &CWorkThread::threadProc, this, 0, &tid);
if (h == 0) {
m_running = false;
return false;
}
m_hThread = reinterpret_cast<HANDLE>(h);
m_osThreadId = tid;
if (prio != THREAD_PRIORITY_NORMAL) {
::SetThreadPriority(m_hThread, prio);
}
// 等待线程内部注册完成(确保 PostToThread 等可用)
std::unique_lock<std::mutex> lk(m_regMutex);
m_regCv.wait_for(lk, std::chrono::milliseconds(200), [this]() { return m_threadId != 0; });
return true;
}
// 向线程投递消息(线程安全)
void PostMessage(UINT msg, WPARAM w = 0, LPARAM l = 0)
{
{
std::lock_guard<std::mutex> lk(m_mutex);
m_queue.push(Message{ msg, w, l });
}
m_cv.notify_one();
}
// 等待并弹出一条消息;返回 false 表示线程正在退出
bool GetMessage(Message& outMsg)
{
std::unique_lock<std::mutex> lk(m_mutex);
// 主循环:只在需要时返回消息/定时器,避免递归锁定 m_mutex
while (true) {
// 如果线程正在退出且没有可处理的项,则返回 false
if (!m_running && m_queue.empty() && timersEmpty()) {
return false;
}
// 当既没有消息又没有定时器时,等待新的事件
if (m_queue.empty() && timersEmpty()) {
m_cv.wait(lk);
continue; // 重新判断条件
}
auto now = clock_t::now();
// 有定时器存在
if (!m_timers.empty()) {
auto it = std::min_element(m_timers.begin(), m_timers.end(),
[](auto const& a, auto const& b) { return a.second.next < b.second.next; });
if (it != m_timers.end() && it->second.next <= now) {
UINT_PTR timerId = it->first;
// 计划下一次触发
if (it->second.period.count() > 0) {
it->second.next = now + it->second.period;
}
else {
// 单次定时器:移除
m_timers.erase(it);
}
outMsg = Message{ WM_TIMER, static_cast<WPARAM>(timerId), 0 };
return true;
}
// 优先处理队列消息(如果有)
if (!m_queue.empty()) {
outMsg = m_queue.front(); m_queue.pop();
return true;
}
// 无消息且最近的定时器尚未到期,等待到期或有新消息到来
auto nextDueIt = std::min_element(m_timers.begin(), m_timers.end(),
[](auto const& a, auto const& b) { return a.second.next < b.second.next; });
if (nextDueIt != m_timers.end()) {
auto nextDue = nextDueIt->second.next;
m_cv.wait_until(lk, nextDue);
continue; // 重新循环,可能定时器到期或有消息
}
else {
// 防御性处理(理论上不应到这里)
m_cv.wait(lk);
continue;
}
}
else {
// 只有消息队列
if (!m_queue.empty()) {
outMsg = m_queue.front(); m_queue.pop();
return true;
}
else {
// 没有定时器但也没有消息,等待读取新消息
m_cv.wait(lk);
continue;
}
}
}
}
// 停止线程(发送 Quit 信号)
void Quit(int /*ret*/ = 0)
{
// 将特殊消息插入队列以通知退出
PostMessage(WM_QUIT_THREAD, 0, 0);
}
// 等待线程退出ms 毫秒ms == INFINITE 表示无限等待
bool Wait(int ms = INFINITE)
{
if (!m_hThread) return true;
DWORD r = ::WaitForSingleObject(m_hThread, (DWORD)ms);
if (r == WAIT_OBJECT_0) {
CloseHandle(m_hThread);
m_hThread = nullptr;
return true;
}
return false;
}
// 定时器 APItimerId 可以自定义(与原 SetTimer 行为类似)
// intervalMs !=0
UINT_PTR SetTimer(UINT_PTR timerId, unsigned intervalMs)
{
if(intervalMs == 0) {//
return 0;
}
std::lock_guard<std::mutex> lk(m_mutex);
// 如果timerId为0自动生成一个唯一的timerId
if(timerId == 0) {
static UINT_PTR nextTimerId = 1;
timerId = nextTimerId++;
}
m_timers[timerId] = TimerInfo{ ms_t(intervalMs), clock_t::now() + ms_t(intervalMs) };
m_cv.notify_one();
return timerId;
}
void KillTimer(UINT_PTR timerId)
{
std::lock_guard<std::mutex> lk(m_mutex);
m_timers.erase(timerId);
m_cv.notify_one();
}
// 将消息投递给指定的工作线程 id线程需要已注册
static bool PostToThread(DWORD threadId, UINT msg, WPARAM w = 0, LPARAM l = 0)
{
std::lock_guard<std::mutex> lk(s_registryMutex());
auto & reg = s_registry();
auto it = reg.find(threadId);
if (it == reg.end()) return false;
it->second->PostMessage(msg, w, l);
return true;
}
// 返回当前 CWorkThread 实例(仅在该线程内部有效)
static CWorkThread* GetCurrent()
{
return t_current;
}
// 访问线程注册 id用于外部注册/路由)
DWORD GetId() const { return m_threadId; }
DWORD GetOsThreadId() const { return m_osThreadId; }
protected:
struct TimerInfo {
ms_t period;
clock_t::time_point next;
};
bool timersEmpty() const { return m_timers.empty(); }
// 线程入口
static unsigned __stdcall threadProc(void* pv)
{
CWorkThread* pThis = static_cast<CWorkThread*>(pv);
if (!pThis) return 0;
// 标记当前线程实例thread-local便于 Run 中调用 GetCurrent()
t_current = pThis;
// 生成并注册线程 id简单自增
pThis->m_threadId = ++s_nextThreadId();
pThis->registerThread();
// 通知 Start() 可以继续
{
std::lock_guard<std::mutex> lk(pThis->m_regMutex);
pThis->m_regCv.notify_all();
}
int ret = 0;
try {
if (pThis->_Run) ret = pThis->Run();
} catch (...) {
// swallow
}
// 清理并注销
pThis->m_running = false;
pThis->unregisterThread();
t_current = nullptr;
return static_cast<unsigned>(ret);
}
// 允许子类覆写 Run默认调用 _Run
virtual int Run()
{
if (_Run) return _Run();
return 0;
}
// 注册/注销到静态 registry以便 PostToThread 路由
void registerThread()
{
std::lock_guard<std::mutex> lk(s_registryMutex());
s_registry()[m_threadId] = this;
}
void unregisterThread()
{
std::lock_guard<std::mutex> lk(s_registryMutex());
auto & reg = s_registry();
if (m_threadId && reg.find(m_threadId) != reg.end()) {
reg.erase(m_threadId);
}
}
private:
std::function<int()> _Run;
std::atomic<bool> m_running;
// 消息队列
std::queue<Message> m_queue;
std::mutex m_mutex;
std::condition_variable m_cv;
// 定时器
std::unordered_map<UINT_PTR, TimerInfo> m_timers;
// native handle / ids
HANDLE m_hThread;
DWORD m_osThreadId = 0;
DWORD m_threadId; // synthetic id for registry
// 注册等待
std::mutex m_regMutex;
std::condition_variable m_regCv;
// 静态 registry
static std::unordered_map<DWORD, CWorkThread*>& s_registry()
{
static std::unordered_map<DWORD, CWorkThread*> reg;
return reg;
}
static std::mutex& s_registryMutex()
{
static std::mutex m;
return m;
}
static DWORD& s_nextThreadId()
{
static DWORD id = 0;
return id;
}
void registerThreadNoLock()
{
s_registry()[m_threadId] = this;
}
// thread local current pointer
static inline thread_local CWorkThread* t_current = nullptr;
};