threaded audio working, impl. queue for threads and atomics

This commit is contained in:
Dennis Eichhorn 2024-11-23 17:36:13 +01:00
parent 2420efab8a
commit 4c7026c698
9 changed files with 269 additions and 49 deletions

118
memory/Queue.h Normal file
View File

@ -0,0 +1,118 @@
/**
* Jingga
*
* @copyright Jingga
* @license OMS License 2.0
* @version 1.0.0
* @link https://jingga.app
*/
#ifndef TOS_MEMORY_QUEUE_H
#define TOS_MEMORY_QUEUE_H
#include "RingMemory.h"
typedef RingMemory Queue;
inline
void queue_alloc(Queue* ring, uint64 size, int32 alignment = 64)
{
ring_alloc(ring, size, alignment);
pthread_mutex_init(&ring->mutex, NULL);
pthread_cond_init(&ring->cond, NULL);
}
inline
void queue_init(Queue* ring, BufferMemory* buf, uint64 size, int32 alignment = 64)
{
ring_init(ring, buf, size, alignment);
pthread_mutex_init(&ring->mutex, NULL);
pthread_cond_init(&ring->cond, NULL);
}
inline
void queue_free(Queue* buf)
{
ring_free(buf);
}
inline
void queue_init(Queue* ring, byte* buf, uint64 size, int32 alignment = 64)
{
ring_init(ring, buf, size, alignment);
}
inline
void ring_enqueue(Queue* ring, byte* data, uint64 size)
{
pthread_mutex_lock(&ring->mutex);
while (!ring_commit_safe(ring, size)) {
pthread_cond_wait(&ring->cond, &ring->mutex);
}
byte* mem = ring_get_memory(ring, size);
memcpy(mem, data, size);
pthread_cond_signal(&ring->cond);
pthread_mutex_unlock(&ring->mutex);
}
inline
byte* ring_enqueue_start(Queue* ring, uint64 size, byte aligned = 0)
{
pthread_mutex_lock(&ring->mutex);
while (!ring_commit_safe(ring, size, aligned)) {
pthread_cond_wait(&ring->cond, &ring->mutex);
}
return ring_get_memory(ring, size, aligned);
}
inline
void ring_enqueue_end(Queue* ring)
{
pthread_cond_signal(&ring->cond);
pthread_mutex_unlock(&ring->mutex);
}
inline
byte* ring_dequeue(Queue* ring, byte* data, uint64 size, byte aligned = 0)
{
pthread_mutex_lock(&ring->mutex);
while (ring->head == ring->tail) {
pthread_cond_wait(&ring->cond, &ring->mutex);
}
memcpy(data, ring->tail, size);
ring_move_pointer(ring, &ring->tail, size, aligned);
pthread_cond_signal(&ring->cond);
pthread_mutex_unlock(&ring->mutex);
}
inline
byte* ring_dequeue_start(Queue* ring)
{
pthread_mutex_lock(&ring->mutex);
while (ring->head == ring->tail) {
pthread_cond_wait(&ring->cond, &ring->mutex);
}
return ring->tail;
}
inline
void ring_dequeue_end(Queue* ring, uint64 size, byte aligned = 0)
{
ring_move_pointer(ring, &ring->tail, size, aligned);
pthread_cond_signal(&ring->cond);
pthread_mutex_unlock(&ring->mutex);
}
#endif

View File

@ -22,8 +22,10 @@
#if _WIN32
#include "../platform/win32/Allocation.h"
#include "../platform/win32/Thread.h"
#elif __linux__
#include "../platform/linux/Allocation.h"
#include "../platform/linux/Thread.h"
#endif
struct RingMemory {
@ -41,8 +43,13 @@ struct RingMemory {
uint64 size;
int32 alignment;
int32 element_alignment;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
// @bug alignment should also include the end point, not just the start
inline
void ring_alloc(RingMemory* ring, uint64 size, int32 alignment = 64)
{
@ -147,6 +154,33 @@ void ring_reset(RingMemory* ring)
ring->head = ring->memory;
}
// Moves a pointer based on the size you want to consume (new position = after consuming size)
void ring_move_pointer(RingMemory* ring, byte** pos, uint64 size, byte aligned = 0)
{
ASSERT_SIMPLE(size <= ring->size);
if (aligned == 0) {
aligned = (byte) OMS_MAX(ring->element_alignment, 1);
}
if (aligned > 1) {
uintptr_t address = (uintptr_t) *pos;
*pos += (aligned - (address& (aligned - 1))) % aligned;
}
size = ROUND_TO_NEAREST(size, aligned);
if (*pos + size > ring->end) {
*pos = ring->memory;
if (aligned > 1) {
uintptr_t address = (uintptr_t) *pos;
*pos += (aligned - (address & (aligned - 1))) % aligned;
}
}
*pos += size;
}
byte* ring_get_memory(RingMemory* ring, uint64 size, byte aligned = 0, bool zeroed = false)
{
ASSERT_SIMPLE(size <= ring->size);
@ -203,6 +237,8 @@ inline
bool ring_commit_safe(const RingMemory* ring, uint64 size, byte aligned = 0)
{
// aligned * 2 since that should be the maximum overhead for an element
// @bug could this result in a case where the ring is considered empty/full (false positive/negative)?
// The "correct" version would probably to use ring_move_pointer in some form
uint64 max_mem_required = size + aligned * 2;
if (ring->tail < ring->head) {
@ -211,7 +247,6 @@ bool ring_commit_safe(const RingMemory* ring, uint64 size, byte aligned = 0)
} else if (ring->tail > ring->head) {
return ((uint64) (ring->tail - ring->head)) > max_mem_required;
} else {
// @question Is this really the case? What if it is completely filled?
return true;
}
}
@ -231,7 +266,7 @@ void ring_force_tail_update(const RingMemory* ring)
inline
int64 ring_dump(const RingMemory* ring, byte* data)
{
byte* tail = data;
byte* start = data;
// Size
*((uint64 *) data) = SWAP_ENDIAN_LITTLE(ring->size);
@ -259,7 +294,7 @@ int64 ring_dump(const RingMemory* ring, byte* data)
memcpy(data, ring->memory, ring->size);
data += ring->size;
return data - tail;
return data - start;
}
#endif

View File

@ -36,6 +36,9 @@ struct SIMDInfo {
f32 sse;
int32 avx256;
int32 avx512;
int32 sve;
int32 neon;
bool abm;
};
struct CpuInfo {
@ -43,6 +46,7 @@ struct CpuInfo {
char brand[49];
int32 model;
int32 family;
int32 thread_count;
int32 mhz;
CpuCacheInfo cache[4];
int32 page_size;

View File

@ -11,8 +11,43 @@
#include <pthread.h>
#include <unistd.h>
#include "../../stdlib/Types.h"
typedef void* (*ThreadJobFunc)(void*);
#define THREAD_RETURN void*
inline
void atomic_set(volatile int32* value, int32 new_value)
{
__atomic_store_n(value, new_value, __ATOMIC_SEQ_CST);
}
inline
int32 atomic_get(volatile int32* value)
{
return __atomic_load_n((int32 *) value, __ATOMIC_SEQ_CST);
}
inline
void atomic_increment(volatile int32* value) {
__atomic_fetch_add(value, 1, __ATOMIC_SEQ_CST);
}
inline
void atomic_decrement(volatile int32* value) {
__atomic_fetch_sub(value, 1, __ATOMIC_SEQ_CST);
}
inline
int32 atomic_add(volatile int32* value, int32 increment) {
return __atomic_fetch_add(value, increment, __ATOMIC_SEQ_CST);
}
inline
int32 atomic_subtract(volatile int32* value, int32 decrement) {
return __atomic_fetch_sub(value, decrement, __ATOMIC_SEQ_CST);
}
#endif

View File

@ -27,16 +27,14 @@
#include <comdef.h>
#include <winnls.h>
#ifdef _MSC_VER
// @performance Do we really need all these libs, can't we simplify that?!
#include <intrin.h>
#pragma comment(lib, "Advapi32.lib")
#pragma comment(lib, "wbemuuid.lib")
#pragma comment(lib, "iphlpapi.lib")
#pragma comment(lib, "d3d12.lib")
#pragma comment(lib, "dxgi.lib")
#pragma comment(lib, "Ws2_32.lib")
#endif
// @performance Do we really need all these libs, can't we simplify that?!
#include <intrin.h>
#pragma comment(lib, "Advapi32.lib")
#pragma comment(lib, "wbemuuid.lib")
#pragma comment(lib, "iphlpapi.lib")
#pragma comment(lib, "d3d12.lib")
#pragma comment(lib, "dxgi.lib")
#pragma comment(lib, "Ws2_32.lib")
// @todo implement for arm?
@ -318,6 +316,9 @@ void cpu_info_get(CpuInfo* info) {
info->simd.sse = (temp = max_sse_supported()) > 9 ? temp / 10.0f : temp;
info->simd.avx256 = max_avx256_supported();
info->simd.avx512 = max_avx512_supported();
info->simd.sve = max_sve_supported();
info->simd.neon = max_neon_supported();
info->simd.abm = supports_abm();
cache_info_get(1, &info->cache[0]);
cache_info_get(2, &info->cache[1]);
@ -326,15 +327,16 @@ void cpu_info_get(CpuInfo* info) {
SYSTEM_INFO sys_info;
GetSystemInfo(&sys_info);
info->thread_count = sys_info.dwNumberOfProcessors;
info->page_size = sys_info.dwPageSize;
int32 cpuInfo[4] = { 0 };
__cpuid(cpuInfo, 0);
memset(info->vendor, 0, sizeof(info->vendor));
*((int*)info->vendor) = cpuInfo[1];
*((int*)(info->vendor + 4)) = cpuInfo[3];
*((int*)(info->vendor + 8)) = cpuInfo[2];
*((int32 *) info->vendor) = cpuInfo[1];
*((int32 *) (info->vendor + 4)) = cpuInfo[3];
*((int32 *) (info->vendor + 8)) = cpuInfo[2];
info->vendor[12] = '\0';
__cpuid(cpuInfo, 0x80000002);
@ -513,7 +515,7 @@ void system_info_render(char* buf, const SystemInfo* info) {
"\n"
"CPU:\n"
"==============\n"
"Hardware\n" "Vendor: %s\n" "Brand: %s\n" "Model: %d\n" "Family: %d\n" "Mhz: %d\n" "Page Size: %d\n"
"Hardware\n" "Vendor: %s\n" "Brand: %s\n" "Model: %d\n" "Family: %d\n" "Mhz: %d\n" "Thread Count: %d\n" "Page Size: %d\n"
"\n"
"Cache:\n"
"L1: Size %d Line %d\n"
@ -521,7 +523,7 @@ void system_info_render(char* buf, const SystemInfo* info) {
"L3: Size %d Line %d\n"
"L4: Size %d Line %d\n"
"\n"
"SIMD:\n" "SSE: %.1f\n" "AVX256: %d\n" "AVX512: %s\n"
"SIMD:\n" "SSE: %.1f\n" "AVX256: %d\n" "AVX512: %s\n" "SVE: %d\n" "NEON: %d\n" "ABM: %d\n"
"\n"
"GPU:\n"
"==============\n"
@ -546,12 +548,12 @@ void system_info_render(char* buf, const SystemInfo* info) {
info->network_count < 2 ? "" : info->network[1].slot, info->network_count < 2 ? 0 : info->network[1].mac[0], info->network_count < 2 ? 0 : info->network[1].mac[1], info->network_count < 2 ? 0 : info->network[1].mac[2], info->network_count < 2 ? 0 : info->network[1].mac[3], info->network_count < 2 ? 0 : info->network[1].mac[4], info->network_count < 2 ? 0 : info->network[1].mac[5], info->network_count < 2 ? 0 : info->network[1].mac[6], info->network_count < 2 ? 0 : info->network[1].mac[7],
info->network_count < 3 ? "" : info->network[2].slot, info->network_count < 3 ? 0 : info->network[2].mac[0], info->network_count < 3 ? 0 : info->network[2].mac[1], info->network_count < 3 ? 0 : info->network[2].mac[2], info->network_count < 3 ? 0 : info->network[2].mac[3], info->network_count < 3 ? 0 : info->network[2].mac[4], info->network_count < 3 ? 0 : info->network[2].mac[5], info->network_count < 3 ? 0 : info->network[2].mac[6], info->network_count < 3 ? 0 : info->network[2].mac[7],
info->network_count < 4 ? "" : info->network[3].slot, info->network_count < 4 ? 0 : info->network[3].mac[0], info->network_count < 4 ? 0 : info->network[3].mac[1], info->network_count < 4 ? 0 : info->network[3].mac[2], info->network_count < 4 ? 0 : info->network[3].mac[3], info->network_count < 4 ? 0 : info->network[3].mac[4], info->network_count < 4 ? 0 : info->network[3].mac[5], info->network_count < 4 ? 0 : info->network[3].mac[6], info->network_count < 4 ? 0 : info->network[3].mac[7],
info->cpu.vendor, info->cpu.brand, info->cpu.model, info->cpu.family, info->cpu.mhz, info->cpu.page_size,
info->cpu.vendor, info->cpu.brand, info->cpu.model, info->cpu.family, info->cpu.mhz, info->cpu.thread_count, info->cpu.page_size,
info->cpu.cache[0].size, info->cpu.cache[0].line_size,
info->cpu.cache[1].size, info->cpu.cache[1].line_size,
info->cpu.cache[2].size, info->cpu.cache[2].line_size,
info->cpu.cache[3].size, info->cpu.cache[3].line_size,
info->cpu.simd.sse, info->cpu.simd.avx256, info->cpu.simd.avx512 > 0 ? avx512[info->cpu.simd.avx512 - 1] : "0",
info->cpu.simd.sse, info->cpu.simd.avx256, info->cpu.simd.avx512 > 0 ? avx512[info->cpu.simd.avx512 - 1] : "0", info->cpu.simd.sve, info->cpu.simd.neon, (int32) info->cpu.simd.abm,
info->gpu[0].name, info->gpu[0].vram,
info->gpu_count < 2 ? "" : info->gpu[1].name, info->gpu_count < 2 ? 0 : info->gpu[1].vram,
info->display[0].name, info->display[0].width, info->display[0].height, info->display[0].hz,

View File

@ -9,8 +9,11 @@
#ifndef TOS_PLATFORM_WIN32_THREAD_DEFINES_H
#define TOS_PLATFORM_WIN32_THREAD_DEFINES_H
#include <stdio.h>
#include <windows.h>
#include "../../stdlib/Types.h"
typedef DWORD (WINAPI *ThreadJobFunc)(void*);
typedef CRITICAL_SECTION pthread_mutex_t;
typedef void pthread_mutexattr_t;
@ -26,4 +29,36 @@ struct pthread_rwlock_t {
#define THREAD_RETURN DWORD WINAPI
inline
void atomic_set(volatile int32* value, int32 new_value)
{
InterlockedExchange((long *) value, new_value);
}
inline
int32 atomic_get(volatile int32* value)
{
return (int32) InterlockedCompareExchange((long *) value, 0, 0);
}
inline
void atomic_increment(volatile int32* value) {
InterlockedIncrement((long *) value);
}
inline
void atomic_decrement(volatile int32* value) {
InterlockedDecrement((long *) value);
}
inline
int32 atomic_add(volatile int32* value, int32 increment) {
return InterlockedExchangeAdd((long *) value, increment);
}
inline
int32 atomic_subtract(volatile int32* value, int32 decrement) {
return InterlockedExchangeAdd((long *) value, -decrement);
}
#endif

View File

@ -12,6 +12,8 @@
#include <stdio.h>
#include <stdlib.h>
#include "../stdlib/Types.h"
#if _WIN32
#include "../platform/win32/ThreadDefines.h"
#include "../platform/win32/Thread.h"
@ -25,7 +27,7 @@
void thread_create(Worker* worker, ThreadJobFunc routine, void* arg)
{
for (int i = 0; i < worker->mutex_size; ++i) {
for (int32 i = 0; i < worker->mutex_size; ++i) {
pthread_mutex_init(&worker->mutex[i], NULL);
}
@ -35,10 +37,11 @@ void thread_create(Worker* worker, ThreadJobFunc routine, void* arg)
void thread_stop(Worker* worker)
{
atomic_set(&worker->state, 0);
pthread_join(worker->thread, NULL);
pthread_cond_destroy(&worker->condition);
for (int i = 0; i < worker->mutex_size; ++i) {
for (int32 i = 0; i < worker->mutex_size; ++i) {
pthread_mutex_destroy(&worker->mutex[i]);
}
}

View File

@ -12,28 +12,29 @@
#include <stdio.h>
#include <stdlib.h>
#include "../stdlib/Types.h"
#if _WIN32
#include "../platform/win32/ThreadDefines.h"
#elif __linux__
#include "../platform/linux/ThreadDefines.h"
#endif
struct job_t {
struct PoolWorker {
ThreadJobFunc func;
void *arg;
int state;
job_t *next;
volatile int32 state;
PoolWorker *next;
};
typedef job_t ThreadJob;
typedef PoolWorker ThreadJob;
struct Worker {
int index; // @todo When are we using this?
int state;
volatile int32 state;
pthread_t thread;
pthread_cond_t condition;
int mutex_size;
int32 mutex_size;
pthread_mutex_t* mutex;
};

View File

@ -12,21 +12,16 @@
#include <stdio.h>
#include <stdlib.h>
#include "../stdlib/Types.h"
#ifdef _WIN32
#include "../platform/win32/Thread.h"
#else
#include "../platform/linux/Thread.h"
#endif
#include "../stdlib/Types.h"
#include "ThreadJob.h"
#if _WIN32
#elif __linux__
#include "../platform/linux/Thread.h"
#endif
#include "ThreadJob.h"
struct ThreadPool {
ThreadJob *work_first;
ThreadJob *work_last;
@ -63,11 +58,7 @@ ThreadJob *thread_pool_work_poll(ThreadPool *pool)
return work;
}
#ifdef _WIN32
static DWORD WINAPI thread_pool_worker(void* arg)
#else
static void* thread_pool_worker(void *arg)
#endif
static THREAD_RETURN thread_pool_worker(void* arg)
{
ThreadPool *pool = (ThreadPool *) arg;
ThreadJob *work;
@ -128,7 +119,7 @@ ThreadPool *thread_pool_create(size_t num, ThreadPool* pool)
pool->work_first = NULL;
pool->work_last = NULL;
for (i = 0; i < num; i++) {
for (i = 0; i < num; ++i) {
pthread_create(&thread, NULL, thread_pool_worker, pool);
++(pool->size);
@ -183,11 +174,7 @@ void thread_pool_destroy(ThreadPool *pool)
ThreadJob* thread_pool_add_work(ThreadPool *pool, ThreadJob* job)
{
if (pool == NULL) {
return NULL;
}
if (job == NULL) {
if (pool == NULL || job == NULL) {
return NULL;
}