cOMS/thread/ThreadPool.h
Dennis Eichhorn 2059cc6e77
Some checks failed
CodeQL / Analyze (${{ matrix.language }}) (autobuild, c-cpp) (push) Has been cancelled
Microsoft C++ Code Analysis / Analyze (push) Has been cancelled
update
2025-06-14 19:10:16 +00:00

290 lines
9.4 KiB
C
Executable File

/**
* Jingga
*
* @copyright Jingga
* @license OMS License 2.0
* @version 1.0.0
* @link https://jingga.app
*/
#ifndef COMS_THREADS_THREAD_POOL_H
#define COMS_THREADS_THREAD_POOL_H
#include "../stdlib/Types.h"
#include "../memory/Queue.h"
#include "../memory/BufferMemory.h"
#include "../log/DebugMemory.h"
#include "Thread.h"
#include "Atomic.h"
#include "ThreadJob.h"
#include "../log/DebugContainer.h"
struct ThreadPool {
// This is not a threaded queue since we want to handle the mutex in here, not in the queue for finer control
Queue work_queue;
mutex work_mutex;
mutex_cond work_cond;
mutex_cond working_cond;
// By design the working_cnt is <= thread_cnt
alignas(4) atomic_32 int32 working_cnt;
alignas(4) atomic_32 int32 thread_cnt;
int32 size;
int32 element_size;
// 1 = waiting for run, 2 = running, 0 = completed, -1 = canceling
alignas(4) atomic_32 int32 state;
alignas(4) atomic_32 int32 id_counter;
DebugContainer* debug_container;
};
// @performance Can we optimize this? This is a critical function.
// If we have a small worker the "spinup"/"re-activation" time is from utmost importance
static
THREAD_RETURN thread_pool_worker(void* arg)
{
ThreadPool* pool = (ThreadPool *) arg;
if (pool->debug_container) {
_log_fp = pool->debug_container->log_fp;
_log_memory = pool->debug_container->log_memory;
_dmc = pool->debug_container->dmc;
_perf_stats = pool->debug_container->perf_stats;
_perf_active = pool->debug_container->perf_active;
_stats_counter = pool->debug_container->stats_counter;
*_perf_active = *pool->debug_container->perf_active;
}
// @bug Why doesn't this work? There must be some threading issue
LOG_2("[INFO] Thread pool worker starting up");
LOG_INCREMENT(DEBUG_COUNTER_THREAD);
PoolWorker* work;
while (true) {
mutex_lock(&pool->work_mutex);
while (pool->state > 1 && queue_is_empty(&pool->work_queue)) {
coms_pthread_cond_wait(&pool->work_cond, &pool->work_mutex);
}
if (pool->state < 2) {
mutex_unlock(&pool->work_mutex);
break;
}
// We define a queue element as free based on it's id
// So even if we "keep" it in the queue the pool will not overwrite it as long as the id > 0 (see pool_add)
// This is only a ThreadPool specific queue behavior to avoid additional memory copy
work = (PoolWorker *) queue_dequeue_keep(&pool->work_queue);
mutex_unlock(&pool->work_mutex);
// When the worker functions of the thread pool get woken up it is possible that the work is already dequeued
// by another thread -> we need to check if the work is actually valid
if (work->state <= POOL_WORKER_STATE_COMPLETED || work->id <= 0) {
atomic_set_release((volatile int32*) &work->state, POOL_WORKER_STATE_COMPLETED);
continue;
}
atomic_increment_release(&pool->working_cnt);
atomic_set_release((volatile int32*) &work->state, POOL_WORKER_STATE_RUNNING);
LOG_3("ThreadPool worker started");
work->func(work);
LOG_3("ThreadPool worker ended");
// @question Do I really need state and id both? seems like setting one should be sufficient
// Obviously we would also have to change thread_pool_add_work to check for state instead of id
atomic_set_release((volatile int32*) &work->state, POOL_WORKER_STATE_COMPLETED);
if (work->callback) {
work->callback(work);
}
atomic_decrement_release(&pool->working_cnt);
// Signal that we ran out of work (maybe the main thread needs this info)
// This is not required for the thread pool itself but maybe some other part of the main thread wants to know
if (atomic_get_relaxed(&pool->working_cnt) == 0) {
coms_pthread_cond_signal(&pool->working_cond);
}
}
// We tell the thread pool taht this worker thread is shutting down
atomic_decrement_release(&pool->thread_cnt);
coms_pthread_cond_signal(&pool->working_cond);
LOG_2("[INFO] Thread pool worker shutting down");
LOG_DECREMENT(DEBUG_COUNTER_THREAD);
return (THREAD_RETURN) NULL;
}
void thread_pool_alloc(
ThreadPool* pool,
int32 element_size,
int32 thread_count,
int32 worker_count,
int32 alignment = 64
) {
PROFILE(PROFILE_THREAD_POOL_ALLOC);
LOG_1(
"[INFO] Allocating thread pool with %d threads and %d queue length",
{
{LOG_DATA_INT32, &thread_count},
{LOG_DATA_INT32, &worker_count}
}
);
queue_alloc(&pool->work_queue, worker_count, element_size, alignment);
pool->element_size = element_size;
pool->thread_cnt = thread_count;
// @todo switch from pool mutex and pool cond to threadjob mutex/cond
// thread_pool_wait etc. should just iterate over all mutexes
mutex_init(&pool->work_mutex, NULL);
coms_pthread_cond_init(&pool->work_cond, NULL);
coms_pthread_cond_init(&pool->working_cond, NULL);
pool->state = 2;
coms_pthread_t thread;
for (pool->size = 0; pool->size < thread_count; ++pool->size) {
coms_pthread_create(&thread, NULL, thread_pool_worker, pool);
coms_pthread_detach(thread);
}
LOG_2("[INFO] %d threads running", {{LOG_DATA_INT64, (void *) &_stats_counter[DEBUG_COUNTER_THREAD]}});
}
void thread_pool_create(
ThreadPool* pool,
BufferMemory* buf,
int32 element_size,
int32 thread_count,
int32 worker_count,
int32 alignment = 64
) {
PROFILE(PROFILE_THREAD_POOL_ALLOC);
LOG_1(
"Creating thread pool with %d threads and %d queue length",
{
{LOG_DATA_INT32, &thread_count},
{LOG_DATA_INT32, &worker_count}
}
);
queue_init(&pool->work_queue, buf, worker_count, element_size, alignment);
pool->element_size = element_size;
pool->thread_cnt = thread_count;
// @todo switch from pool mutex and pool cond to threadjob mutex/cond
// thread_pool_wait etc. should just iterate over all mutexes
mutex_init(&pool->work_mutex, NULL);
coms_pthread_cond_init(&pool->work_cond, NULL);
coms_pthread_cond_init(&pool->working_cond, NULL);
pool->state = 2;
coms_pthread_t thread;
for (pool->size = 0; pool->size < thread_count; ++pool->size) {
coms_pthread_create(&thread, NULL, thread_pool_worker, pool);
coms_pthread_detach(thread);
}
LOG_2("[INFO] %d threads running", {{LOG_DATA_INT64, (void *) &_stats_counter[DEBUG_COUNTER_THREAD]}});
}
void thread_pool_wait(ThreadPool* pool)
{
mutex_lock(&pool->work_mutex);
// @question We removed some state checks here, not sure if they were really necessary
// remove this comment once we are sure everything works as expected
while (pool->working_cnt != 0 || pool->thread_cnt != 0) {
coms_pthread_cond_wait(&pool->working_cond, &pool->work_mutex);
}
mutex_unlock(&pool->work_mutex);
}
void thread_pool_destroy(ThreadPool* pool)
{
// This sets the queue to empty
atomic_set_release((void **) &pool->work_queue.tail, pool->work_queue.head);
// This sets the state to "shutdown"
atomic_set_release(&pool->state, 1);
coms_pthread_cond_broadcast(&pool->work_cond);
thread_pool_wait(pool);
mutex_destroy(&pool->work_mutex);
coms_pthread_cond_destroy(&pool->work_cond);
coms_pthread_cond_destroy(&pool->working_cond);
// This sets the state to "down"
atomic_set_release(&pool->state, 0);
}
PoolWorker* thread_pool_add_work(ThreadPool* pool, const PoolWorker* job)
{
mutex_lock(&pool->work_mutex);
PoolWorker* temp_job = (PoolWorker *) ring_get_memory_nomove((RingMemory *) &pool->work_queue, pool->element_size, 8);
if (atomic_get_relaxed((volatile int32*) &temp_job->state) > POOL_WORKER_STATE_COMPLETED) {
mutex_unlock(&pool->work_mutex);
ASSERT_SIMPLE(temp_job->state <= POOL_WORKER_STATE_COMPLETED);
return NULL;
}
memcpy(temp_job, job, pool->element_size);
temp_job->state = POOL_WORKER_STATE_WAITING;
ring_move_pointer((RingMemory *) &pool->work_queue, &pool->work_queue.head, pool->element_size, 8);
if (temp_job->id == 0) {
temp_job->id = atomic_fetch_add_acquire(&pool->id_counter, 1);
}
coms_pthread_cond_broadcast(&pool->work_cond);
mutex_unlock(&pool->work_mutex);
return temp_job;
}
// This is basically the same as thread_pool_add_work but allows us to directly write into the memory in the caller
// This makes it faster, since we can avoid a memcpy
PoolWorker* thread_pool_add_work_start(ThreadPool* pool)
{
mutex_lock(&pool->work_mutex);
PoolWorker* temp_job = (PoolWorker *) queue_enqueue_start(&pool->work_queue);
if (atomic_get_relaxed((volatile int32*) &temp_job->state) > POOL_WORKER_STATE_COMPLETED) {
mutex_unlock(&pool->work_mutex);
ASSERT_SIMPLE(temp_job->state <= POOL_WORKER_STATE_COMPLETED);
return NULL;
}
if (temp_job->id == 0) {
// +1 because otherwise the very first job would be id = 0 which is not a valid id
temp_job->id = atomic_fetch_add_acquire(&pool->id_counter, 1) + 1;
}
temp_job->state = POOL_WORKER_STATE_WAITING;
return temp_job;
}
void thread_pool_add_work_end(ThreadPool* pool)
{
queue_enqueue_end(&pool->work_queue);
coms_pthread_cond_broadcast(&pool->work_cond);
mutex_unlock(&pool->work_mutex);
}
#endif