diff --git a/Threads/Job.h b/Threads/Job.h new file mode 100644 index 0000000..c9a610b --- /dev/null +++ b/Threads/Job.h @@ -0,0 +1,27 @@ +#ifndef THREADS_JOB_H +#define THREADS_JOB_H + +#include +#include + +#ifdef _WIN32 + #include +#endif + +namespace Threads { + #ifdef _WIN32 + typedef DWORD WINAPI(*JobFunc)(void *); + #else + typedef void (*JobFunc)(void *); + #endif + + struct job_t { + JobFunc func; + void *arg; + job_t *next; + }; + + typedef job_t Job; +} + +#endif \ No newline at end of file diff --git a/Threads/OSWrapper.h b/Threads/OSWrapper.h new file mode 100644 index 0000000..80b850f --- /dev/null +++ b/Threads/OSWrapper.h @@ -0,0 +1,313 @@ +#ifndef THREADS_OS_WRAPPER_H +#define THREADS_OS_WRAPPER_H + +#ifdef _WIN32 + #include + #include +#else + #include + #include +#endif + +#ifdef _WIN32 + typedef CRITICAL_SECTION pthread_mutex_t; + typedef void pthread_mutexattr_t; + typedef void pthread_condattr_t; + typedef void pthread_rwlockattr_t; + typedef HANDLE pthread_t; + typedef CONDITION_VARIABLE pthread_cond_t; + + typedef struct { + SRWLock lock; + bool exclusive; + } pthread_rwlock_t; + + struct timespec { + long tv_sec; + long tv_nsec; + }; +#endif + +void ms_to_timespec(struct timespec *ts, unsigned int ms) +{ + if (ts == NULL) { + return; + } + + ts->tv_sec = (ms / 1000) + time(NULL); + ts->tv_nsec = (ms % 1000) * 1000000; +} + +#ifdef _WIN32 + typedef struct { + void *(*start_routine)(void *); + void *start_arg; + } win_thread_start_t; + + static DWORD WINAPI win_thread_start(void *arg) + { + win_thread_start_t *data = arg; + void *(*start_routine)(void *) = arg->start_routine; + void *start_arg = arg->start_arg; + + free(data); + + start_routine(start_arg); + + return 0; + } + + int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg) + { + win_thread_start_t *data; + + void (attr); + + if (thread == NULL || start_routine == NULL) { + return 1; + } + + data = mcalloc(sizeof(*data)); + data->start_routine = start_routine; + data->start_arg = arg; + + *thread = CreateThread(NULL, 0, win_thread_start, data, 0, NULL); + if (*thread == NULL) { + return 1; + } + + return 0; + } + + int pthread_join(pthread_t thread, void **value_ptr) + { + (void)value_ptr; + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + + return 0; + } + + int pthread_detach(pthread_t thread) + { + CloseHandle(thread); + } + + int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr) + { + (void) attr; + + if (mutex == NULL) { + return 1; + } + + InitializeCriticalSection(mutex); + + return 0; + } + + int pthread_mutex_destroy(pthread_mutex_t *mutex) + { + if (mutex == NULL) { + return 1; + } + + DeleteCriticalSection(mutex); + + return 0; + } + + int pthread_mutex_lock(pthread_mutex_t *mutex) + { + if (mutex == NULL) { + return 1; + } + + EnterCriticalSection(mutex); + + return 0; + } + + int pthread_mutex_unlock(pthread_mutex_t *mutex) + { + if (mutex == NULL) { + return 1; + } + + LeaveCriticalSection(mutex); + + return 0; + } + + int pthread_cond_init(thread_cond_t *cond, pthread_condattr_t *attr) + { + (void) attr; + if (cond == NULL) { + return 1; + } + + InitializeConditionVariable(cond); + + return 0; + } + + int pthread_cond_destroy(thread_cond_t *cond) + { + /* Windows does not have a destroy for conditionals */ + (void)cond; + + return 0; + } + + int pthread_cond_wait(thread_cond_t *cond, pthread_mutex_t *mutex) + { + if (cond == NULL || mutex == NULL) { + return 1; + } + + return pthread_cond_timedwait(cond, mutex, NULL) + } + + int pthread_cond_timedwait(thread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime) + { + if (cond == NULL || mutex == NULL) { + return 1; + } + + if (!SleepConditionVariableCS(cond, mutex, timespec_to_ms(abstime))) { + return 1; + } + + return 0; + } + + int pthread_cond_signal(thread_cond_t *cond) + { + if (cond == NULL) { + return 1; + } + + WakeConditionVariable(cond); + + return 0; + } + + int pthread_cond_broadcast(thread_cond_t *cond) + { + if (cond == NULL) { + return 1; + } + + WakeAllConditionVariable(cond); + + return 0; + } + + static DWORD timespec_to_ms(const struct timespec *abstime) + { + DWORD t; + + if (abstime == NULL) { + return INFINITE; + } + + t = ((abstime->tv_sec - time(NULL)) * 1000) + (abstime->tv_nsec / 1000000); + if (t < 0) { + t = 1; + } + + return t; + } + + int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr) + { + (void) attr; + if (rwlock == NULL) { + return 1; + } + + InitializeSRWLock(&(rwlock->lock)); + rwlock->exclusive = false; + + return 0; + } + + int pthread_rwlock_destroy(pthread_rwlock_t *rwlock) + { + (void) rwlock; + } + + int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock) + { + if (rwlock == NULL) { + return 1; + } + + AcquireSRWLockShared(&(rwlock->lock)); + } + + int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock) + { + if (rwlock == NULL) { + return 1; + } + + return !TryAcquireSRWLockShared(&(rwlock->lock)); + } + + int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock) + { + if (rwlock == NULL) { + return 1; + } + + AcquireSRWLockExclusive(&(rwlock->lock)); + rwlock->exclusive = true; + } + + int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock) + { + BOOLEAN ret; + + if (rwlock == NULL) { + return 1; + } + + ret = TryAcquireSRWLockExclusive(&(rwlock->lock)); + if (ret) { + rwlock->exclusive = true; + } + + return ret; + } + + int pthread_rwlock_unlock(pthread_rwlock_t *rwlock) + { + if (rwlock == NULL) { + return 1; + } + + if (rwlock->exclusive) { + rwlock->exclusive = false; + ReleaseSRWLockExclusive(&(rwlock->lock)); + } else { + ReleaseSRWLockShared(&(rwlock->lock)); + } + } + + unsigned int pcthread_get_num_procs() + { + SYSTEM_INFO sysinfo; + + GetSystemInfo(&sysinfo); + + return sysinfo.dwNumberOfProcessors; + } +#else + unsigned int pcthread_get_num_procs() + { + return (unsigned int)sysconf(_SC_NPROCESSORS_ONLN); + } +#endif + +#endif \ No newline at end of file diff --git a/Threads/Thread.h b/Threads/Thread.h new file mode 100644 index 0000000..65c47fa --- /dev/null +++ b/Threads/Thread.h @@ -0,0 +1,215 @@ +#ifndef THREADS_THREAD_H +#define THREADS_THREAD_H + +#include +#include + +#ifdef _WIN32 + #include +#else + #include + #include +#endif + +#include "OSWrapper.h" +#include "Job.h" +#include "ThreadPool.h" + +namespace Threads { + Job *pool_work_create(JobFunc func, void *arg) + { + Job *work; + + if (func == NULL) { + return NULL; + } + + work = (Job *) malloc(sizeof(*work)); + work->func = func; + work->arg = arg; + work->next = NULL; + + return work; + } + + void pool_work_destroy(Job *work) + { + if (work == NULL) { + return; + } + + free(work); + } + + Job *pool_work_get(Threads::ThreadPool *pool) + { + Job *work; + + if (pool == NULL) { + return NULL; + } + + work = pool->work_first; + if (work == NULL) { + return NULL; + } + + if (work->next == NULL) { + pool->work_first = NULL; + pool->work_last = NULL; + } else { + pool->work_first = work->next; + } + + return work; + } + + static void *pool_worker(void *arg) + { + Threads::ThreadPool *pool = (Threads::ThreadPool *) arg; + Threads::Job *work; + + while (1) { + pthread_mutex_lock(&(pool->work_mutex)); + + while (pool->work_first == NULL && !pool->stop) { + pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex)); + } + + if (pool->stop) { + break; + } + + work = Threads::pool_work_get(pool); + ++(pool->working_cnt); + pthread_mutex_unlock(&(pool->work_mutex)); + + if (work != NULL) { + work->func(work->arg); + pool_work_destroy(work); + } + + pthread_mutex_lock(&(pool->work_mutex)); + --(pool->working_cnt); + + if (!pool->stop && pool->working_cnt == 0 && pool->work_first == NULL) { + pthread_cond_signal(&(pool->working_cond)); + } + + pthread_mutex_unlock(&(pool->work_mutex)); + } + + --(pool->thread_cnt); + pthread_cond_signal(&(pool->working_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + + return NULL; + } + + Threads::ThreadPool *pool_create(size_t num) + { + pthread_t thread; + size_t i; + + if (num == 0) { + num = 2; + } + + Threads::ThreadPool *pool = (Threads::ThreadPool *) malloc(sizeof(Threads::ThreadPool)); + pool->thread_cnt = num; + + pthread_mutex_init(&(pool->work_mutex), NULL); + pthread_cond_init(&(pool->work_cond), NULL); + pthread_cond_init(&(pool->working_cond), NULL); + + pool->work_first = NULL; + pool->work_last = NULL; + + for (i=0; iwork_mutex)); + + while (true) { + if ((!pool->stop && pool->working_cnt != 0) || (pool->stop && pool->thread_cnt != 0)) { + pthread_cond_wait(&(pool->working_cond), &(pool->work_mutex)); + } else { + break; + } + } + + pthread_mutex_unlock(&(pool->work_mutex)); + } + + void pool_destroy(Threads::ThreadPool *pool) + { + Threads::Job *work; + Threads::Job *work2; + + if (pool == NULL) { + return; + } + + pthread_mutex_lock(&(pool->work_mutex)); + work = pool->work_first; + + while (work != NULL) { + work2 = work->next; + pool_work_destroy(work); + work = work2; + } + + pool->stop = true; + pthread_cond_broadcast(&(pool->work_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + + pool_wait(pool); + + pthread_mutex_destroy(&(pool->work_mutex)); + pthread_cond_destroy(&(pool->work_cond)); + pthread_cond_destroy(&(pool->working_cond)); + + free(pool); + } + + bool pool_add_work(Threads::ThreadPool *pool, JobFunc func, void *arg) + { + Threads::Job *work; + + if (pool == NULL) { + return false; + } + + work = Threads::pool_work_create(func, arg); + if (work == NULL) { + return false; + } + + pthread_mutex_lock(&(pool->work_mutex)); + if (pool->work_first == NULL) { + pool->work_first = work; + pool->work_last = pool->work_first; + } else { + pool->work_last->next = work; + pool->work_last = work; + } + + pthread_cond_broadcast(&(pool->work_cond)); + pthread_mutex_unlock(&(pool->work_mutex)); + + return true; + } +} + +#endif \ No newline at end of file diff --git a/Threads/ThreadPool.h b/Threads/ThreadPool.h new file mode 100644 index 0000000..f6771bc --- /dev/null +++ b/Threads/ThreadPool.h @@ -0,0 +1,30 @@ +#ifndef THREADS_THREAD_POOL_H +#define THREADS_THREAD_POOL_H + +#include +#include + +#ifdef _WIN32 + #include +#else + #include + #include +#endif + +#include "OSWrapper.h" +#include "Job.h" + +namespace Threads { + typedef struct { + Job *work_first; + Job *work_last; + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + pthread_cond_t working_cond; + size_t working_cnt; + size_t thread_cnt; + bool stop; + } ThreadPool; +} + +#endif \ No newline at end of file diff --git a/Utils/FileUtils.h b/Utils/FileUtils.h index 7b0806a..ad7632c 100755 --- a/Utils/FileUtils.h +++ b/Utils/FileUtils.h @@ -107,7 +107,7 @@ namespace Utils { typedef struct { char *content; - int size; + int size = 0; // doesn't include null termination (same as strlen) } file_body; file_body read_file (const char *filename) diff --git a/Utils/TestUtils.h b/Utils/TestUtils.h index 203f40a..3e5f320 100755 --- a/Utils/TestUtils.h +++ b/Utils/TestUtils.h @@ -33,4 +33,34 @@ return 0; } \ }) +#define ASSERT_CONTAINS(a, b) ({\ + if (strstr((a), (b)) != NULL) { \ + printf("."); \ + } else { \ + printf("[F]"); \ + printf("\n\n%s - %i: ", __FILE__, __LINE__); \ + printf("%s", (a)); printf(" !contains "); printf("%s", (b)); printf("\n"); \ + return 0; } \ + }) + +#define ASSERT_TRUE(a) ({\ + if ((a) == true) { \ + printf("."); \ + } else { \ + printf("[F]"); \ + printf("\n\n%s - %i: ", __FILE__, __LINE__); \ + printf("%d", (a)); printf(" != "); printf("1"); printf("\n"); \ + return 0; } \ + }) + +#define ASSERT_FALSE(a) ({\ + if ((a) == false) { \ + printf("."); \ + } else { \ + printf("[F]"); \ + printf("\n\n%s - %i: ", __FILE__, __LINE__); \ + printf("%d", (a)); printf(" != "); printf("1"); printf("\n"); \ + return 0; } \ + }) + #endif diff --git a/Utils/WebUtils.h b/Utils/WebUtils.h index 51a949f..8483e45 100755 --- a/Utils/WebUtils.h +++ b/Utils/WebUtils.h @@ -19,23 +19,54 @@ namespace Utils { namespace WebUtils { + static bool CURL_SETUP = false; + + inline + void setup() + { + curl_global_init(CURL_GLOBAL_DEFAULT); + Utils::WebUtils::CURL_SETUP = true; + } + + inline + void clean() + { + curl_global_cleanup(); + Utils::WebUtils::CURL_SETUP = false; + } + int write_download_data (void *ptr, size_t size, size_t nmeb, void *stream) { Utils::FileUtils::file_body *out = (Utils::FileUtils::file_body *) stream; size_t outSize = size * nmeb; - out->content = (char *) malloc(outSize + 1); - if (!out->content) { - fprintf(stderr, "CRITICAL: malloc failed"); + if (out->size == 0) { + // first time this function is called for a specific resource + out->content = (char *) malloc((outSize + 1) * sizeof(char)); + if (!out->content) { + fprintf(stderr, "CRITICAL: malloc failed"); - return 0; - } + return 0; + } - if (out->content) { - memcpy(out->content, ptr, outSize); + if (out->content) { + memcpy(out->content, ptr, outSize * sizeof(char)); - out->size = (int) outSize; - out->content[out->size] = 0; + out->size = (int) outSize; + out->content[out->size] = 0; + } + } else { + // the max buffer (16384 = 16k) is exceeded, then this is called again and needs to get extended + char *temp = (char *) malloc((outSize + out->size + 1) * sizeof(char)); + + memcpy(temp, out->content, out->size * sizeof(char)); + memcpy(temp + out->size * sizeof(char), ptr, outSize * sizeof(char)); + + free(out->content); + + out->content = temp; + out->size += outSize; + out->content[out->size] = 0; } return out->size; @@ -46,9 +77,15 @@ namespace Utils { { Utils::FileUtils::file_body page = {0}; + if (!Utils::WebUtils::CURL_SETUP) { + Utils::WebUtils::setup(); + } + CURL *h = curl_easy_init(); curl_easy_setopt(h, CURLOPT_URL, url); - + curl_easy_setopt(h, CURLOPT_PRIVATE, url); + curl_easy_setopt(h, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(h, CURLOPT_SSL_VERIFYHOST, 0L); curl_easy_setopt(h, CURLOPT_WRITEFUNCTION, write_download_data); curl_easy_setopt(h, CURLOPT_WRITEDATA, &page); curl_easy_perform(h); @@ -56,6 +93,76 @@ namespace Utils { return page; } + + inline + void add_transfer(CURLM *cm, char *url, Utils::FileUtils::file_body *page, int *left) + { + CURL *h = curl_easy_init(); + curl_easy_setopt(h, CURLOPT_URL, url); + curl_easy_setopt(h, CURLOPT_PRIVATE, url); + curl_easy_setopt(h, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(h, CURLOPT_SSL_VERIFYHOST, 0L); + curl_easy_setopt(h, CURLOPT_WRITEFUNCTION, write_download_data); + curl_easy_setopt(h, CURLOPT_WRITEDATA, page); + + curl_multi_add_handle(cm, h); + + ++(*left); + } + + Utils::FileUtils::file_body *multi_download (char **urls, int count, int max_parrallel) + { + Utils::FileUtils::file_body *pages = (Utils::FileUtils::file_body *) malloc(count * sizeof(Utils::FileUtils::file_body)); + + if (!Utils::WebUtils::CURL_SETUP) { + Utils::WebUtils::setup(); + } + + CURLM *cm = curl_multi_init(); + curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, max_parrallel); + + int downloads; + int left = 0; + + for(downloads = 0; downloads < max_parrallel && downloads < count; ++downloads) { + Utils::WebUtils::add_transfer(cm, urls[downloads], &pages[downloads], &left); + } + + CURLMsg *msg; + int msgs_left = -1; + + do { + int alive = 1; + curl_multi_perform(cm, &alive); + + while((msg = curl_multi_info_read(cm, &msgs_left))) { + if(msg->msg == CURLMSG_DONE) { + char *url; + CURL *e = msg->easy_handle; + curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &url); + curl_multi_remove_handle(cm, e); + curl_easy_cleanup(e); + --left; + } else { + fprintf(stderr, "E: CURLMsg (%d)\n", msg->msg); + } + + if(downloads < count) { + ++downloads; + add_transfer(cm, urls[downloads], &pages[downloads], &left); + } + } + + if(left > 0) { + curl_multi_wait(cm, NULL, 0, 1000, NULL); + } + + } while(left > 0); + + curl_multi_cleanup(cm); + + return pages; + } } } diff --git a/tests/Threads/ThreadPoolTest.cpp b/tests/Threads/ThreadPoolTest.cpp new file mode 100644 index 0000000..ae3c708 --- /dev/null +++ b/tests/Threads/ThreadPoolTest.cpp @@ -0,0 +1,63 @@ +/** + * Karaka + * + * @package Test + * @copyright Dennis Eichhorn + * @license OMS License 1.0 + * @version 1.0.0 + * @link https://karaka.app + */ +#include + +#include "../../Threads/Thread.h" +#include "../../Utils/TestUtils.h" + +static const size_t num_threads = 4; +static const size_t num_items = 10; + +void worker(void *arg) +{ + int *val = (int *) arg; + int old = *val; + + *val += 100; + // printf("tid=%p, old=%d, val=%d\n", (void *) pthread_self(), old, *val); + + if (*val % 2) { + sleep(1); + } +} + +int main(int argc, char** argv) +{ + printf("Threads:\n\n"); + printf("ThreadPool:\n"); + + int i; + Threads::ThreadPool *pool = Threads::pool_create(num_threads); + int *vals = (int *) calloc(num_items, sizeof(int)); + + for (i = 0; i < num_items; ++i) { + vals[i] = i; + Threads::pool_add_work(pool, worker, vals + i); + } + + Threads::pool_wait(pool); + sleep(1); + + bool test = true; + + for (i = 0; i < num_items; ++i) { + // printf("%d\n", vals[i]); + test = test && 100 + i == vals[i]; + } + + ASSERT_EQUALS(test, true, "%d", "%d"); + + free(vals); + Threads::pool_destroy(pool); + + printf("\n\n"); + + return 0; +} diff --git a/tests/Utils/WebUtilsTest.cpp b/tests/Utils/WebUtilsTest.cpp new file mode 100644 index 0000000..f3629bf --- /dev/null +++ b/tests/Utils/WebUtilsTest.cpp @@ -0,0 +1,52 @@ +/** + * Karaka + * + * @package Test + * @copyright Dennis Eichhorn + * @license OMS License 1.0 + * @version 1.0.0 + * @link https://karaka.app + */ +#include + +#include "../../Utils/WebUtils.h" +#include "../../Utils/TestUtils.h" + +int main(int argc, char** argv) +{ + printf("Utils:\n\n"); + printf("WebUtils:\n"); + Utils::WebUtils::setup(); + + /* Single download */ + Utils::FileUtils::file_body single = Utils::WebUtils::download((char *) "https://jingga.app"); + + ASSERT_CONTAINS(single.content, "Jingga"); + free(single.content); + + /* Multi download */ + const char *urls[] = { + "https://jingga.app/terms", + "https://jingga.app/imprint", + "https://jingga.app/privacy" + }; + + const char *contains[] = { + "Terms of Service", + "Commercial register", + "What is personal data?" + }; + + Utils::FileUtils::file_body *multi = Utils::WebUtils::multi_download((char **) urls, 3, 3); + for (int i = 0; i < 3; ++i) { + ASSERT_CONTAINS(multi[i].content, contains[i]); + free(multi[i].content); + } + + free(multi); + + Utils::WebUtils::clean(); + printf("\n\n"); + + return 0; +} diff --git a/tests/test.sh b/tests/test.sh index 7f213b1..a22cf33 100755 --- a/tests/test.sh +++ b/tests/test.sh @@ -1,5 +1,11 @@ #!/bin/bash +# Info: For debugging use -g and comment the && rm part + BASEDIR=$(dirname "$(readlink -f "$0")") -g++ $BASEDIR/Image/ImageUtilsTest.cpp -o $BASEDIR/Image/ImageUtilsTest && $BASEDIR/Image/ImageUtilsTest && rm $BASEDIR/Image/ImageUtilsTest \ No newline at end of file +g++ $BASEDIR/Image/ImageUtilsTest.cpp -o $BASEDIR/Image/ImageUtilsTest && $BASEDIR/Image/ImageUtilsTest && rm $BASEDIR/Image/ImageUtilsTest + +g++ $BASEDIR/Threads/ThreadPoolTest.cpp -o $BASEDIR/Threads/ThreadPoolTest && $BASEDIR/Threads/ThreadPoolTest && rm $BASEDIR/Threads/ThreadPoolTest + +g++ $BASEDIR/Utils/WebUtilsTest.cpp -o $BASEDIR/Utils/WebUtilsTest -l curl && $BASEDIR/Utils/WebUtilsTest && rm $BASEDIR/Utils/WebUtilsTest \ No newline at end of file