add thread pool and multi page download

This commit is contained in:
Dennis Eichhorn 2022-12-06 23:00:01 +01:00
parent 8f6d9f213e
commit 09d77b359b
10 changed files with 855 additions and 12 deletions

27
Threads/Job.h Normal file
View File

@ -0,0 +1,27 @@
#ifndef THREADS_JOB_H
#define THREADS_JOB_H
#include <stdio.h>
#include <stdlib.h>
#ifdef _WIN32
#include <windows.h>
#endif
namespace Threads {
#ifdef _WIN32
typedef DWORD WINAPI(*JobFunc)(void *);
#else
typedef void (*JobFunc)(void *);
#endif
struct job_t {
JobFunc func;
void *arg;
job_t *next;
};
typedef job_t Job;
}
#endif

313
Threads/OSWrapper.h Normal file
View File

@ -0,0 +1,313 @@
#ifndef THREADS_OS_WRAPPER_H
#define THREADS_OS_WRAPPER_H
#ifdef _WIN32
#include <stdbool.h>
#include <windows.h>
#else
#include <pthread.h>
#include <unistd.h>
#endif
#ifdef _WIN32
typedef CRITICAL_SECTION pthread_mutex_t;
typedef void pthread_mutexattr_t;
typedef void pthread_condattr_t;
typedef void pthread_rwlockattr_t;
typedef HANDLE pthread_t;
typedef CONDITION_VARIABLE pthread_cond_t;
typedef struct {
SRWLock lock;
bool exclusive;
} pthread_rwlock_t;
struct timespec {
long tv_sec;
long tv_nsec;
};
#endif
void ms_to_timespec(struct timespec *ts, unsigned int ms)
{
if (ts == NULL) {
return;
}
ts->tv_sec = (ms / 1000) + time(NULL);
ts->tv_nsec = (ms % 1000) * 1000000;
}
#ifdef _WIN32
typedef struct {
void *(*start_routine)(void *);
void *start_arg;
} win_thread_start_t;
static DWORD WINAPI win_thread_start(void *arg)
{
win_thread_start_t *data = arg;
void *(*start_routine)(void *) = arg->start_routine;
void *start_arg = arg->start_arg;
free(data);
start_routine(start_arg);
return 0;
}
int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
{
win_thread_start_t *data;
void (attr);
if (thread == NULL || start_routine == NULL) {
return 1;
}
data = mcalloc(sizeof(*data));
data->start_routine = start_routine;
data->start_arg = arg;
*thread = CreateThread(NULL, 0, win_thread_start, data, 0, NULL);
if (*thread == NULL) {
return 1;
}
return 0;
}
int pthread_join(pthread_t thread, void **value_ptr)
{
(void)value_ptr;
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
return 0;
}
int pthread_detach(pthread_t thread)
{
CloseHandle(thread);
}
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr)
{
(void) attr;
if (mutex == NULL) {
return 1;
}
InitializeCriticalSection(mutex);
return 0;
}
int pthread_mutex_destroy(pthread_mutex_t *mutex)
{
if (mutex == NULL) {
return 1;
}
DeleteCriticalSection(mutex);
return 0;
}
int pthread_mutex_lock(pthread_mutex_t *mutex)
{
if (mutex == NULL) {
return 1;
}
EnterCriticalSection(mutex);
return 0;
}
int pthread_mutex_unlock(pthread_mutex_t *mutex)
{
if (mutex == NULL) {
return 1;
}
LeaveCriticalSection(mutex);
return 0;
}
int pthread_cond_init(thread_cond_t *cond, pthread_condattr_t *attr)
{
(void) attr;
if (cond == NULL) {
return 1;
}
InitializeConditionVariable(cond);
return 0;
}
int pthread_cond_destroy(thread_cond_t *cond)
{
/* Windows does not have a destroy for conditionals */
(void)cond;
return 0;
}
int pthread_cond_wait(thread_cond_t *cond, pthread_mutex_t *mutex)
{
if (cond == NULL || mutex == NULL) {
return 1;
}
return pthread_cond_timedwait(cond, mutex, NULL)
}
int pthread_cond_timedwait(thread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
{
if (cond == NULL || mutex == NULL) {
return 1;
}
if (!SleepConditionVariableCS(cond, mutex, timespec_to_ms(abstime))) {
return 1;
}
return 0;
}
int pthread_cond_signal(thread_cond_t *cond)
{
if (cond == NULL) {
return 1;
}
WakeConditionVariable(cond);
return 0;
}
int pthread_cond_broadcast(thread_cond_t *cond)
{
if (cond == NULL) {
return 1;
}
WakeAllConditionVariable(cond);
return 0;
}
static DWORD timespec_to_ms(const struct timespec *abstime)
{
DWORD t;
if (abstime == NULL) {
return INFINITE;
}
t = ((abstime->tv_sec - time(NULL)) * 1000) + (abstime->tv_nsec / 1000000);
if (t < 0) {
t = 1;
}
return t;
}
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr)
{
(void) attr;
if (rwlock == NULL) {
return 1;
}
InitializeSRWLock(&(rwlock->lock));
rwlock->exclusive = false;
return 0;
}
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock)
{
(void) rwlock;
}
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
return 1;
}
AcquireSRWLockShared(&(rwlock->lock));
}
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
return 1;
}
return !TryAcquireSRWLockShared(&(rwlock->lock));
}
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
return 1;
}
AcquireSRWLockExclusive(&(rwlock->lock));
rwlock->exclusive = true;
}
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock)
{
BOOLEAN ret;
if (rwlock == NULL) {
return 1;
}
ret = TryAcquireSRWLockExclusive(&(rwlock->lock));
if (ret) {
rwlock->exclusive = true;
}
return ret;
}
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock)
{
if (rwlock == NULL) {
return 1;
}
if (rwlock->exclusive) {
rwlock->exclusive = false;
ReleaseSRWLockExclusive(&(rwlock->lock));
} else {
ReleaseSRWLockShared(&(rwlock->lock));
}
}
unsigned int pcthread_get_num_procs()
{
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
return sysinfo.dwNumberOfProcessors;
}
#else
unsigned int pcthread_get_num_procs()
{
return (unsigned int)sysconf(_SC_NPROCESSORS_ONLN);
}
#endif
#endif

215
Threads/Thread.h Normal file
View File

@ -0,0 +1,215 @@
#ifndef THREADS_THREAD_H
#define THREADS_THREAD_H
#include <stdio.h>
#include <stdlib.h>
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <pthread.h>
#endif
#include "OSWrapper.h"
#include "Job.h"
#include "ThreadPool.h"
namespace Threads {
Job *pool_work_create(JobFunc func, void *arg)
{
Job *work;
if (func == NULL) {
return NULL;
}
work = (Job *) malloc(sizeof(*work));
work->func = func;
work->arg = arg;
work->next = NULL;
return work;
}
void pool_work_destroy(Job *work)
{
if (work == NULL) {
return;
}
free(work);
}
Job *pool_work_get(Threads::ThreadPool *pool)
{
Job *work;
if (pool == NULL) {
return NULL;
}
work = pool->work_first;
if (work == NULL) {
return NULL;
}
if (work->next == NULL) {
pool->work_first = NULL;
pool->work_last = NULL;
} else {
pool->work_first = work->next;
}
return work;
}
static void *pool_worker(void *arg)
{
Threads::ThreadPool *pool = (Threads::ThreadPool *) arg;
Threads::Job *work;
while (1) {
pthread_mutex_lock(&(pool->work_mutex));
while (pool->work_first == NULL && !pool->stop) {
pthread_cond_wait(&(pool->work_cond), &(pool->work_mutex));
}
if (pool->stop) {
break;
}
work = Threads::pool_work_get(pool);
++(pool->working_cnt);
pthread_mutex_unlock(&(pool->work_mutex));
if (work != NULL) {
work->func(work->arg);
pool_work_destroy(work);
}
pthread_mutex_lock(&(pool->work_mutex));
--(pool->working_cnt);
if (!pool->stop && pool->working_cnt == 0 && pool->work_first == NULL) {
pthread_cond_signal(&(pool->working_cond));
}
pthread_mutex_unlock(&(pool->work_mutex));
}
--(pool->thread_cnt);
pthread_cond_signal(&(pool->working_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return NULL;
}
Threads::ThreadPool *pool_create(size_t num)
{
pthread_t thread;
size_t i;
if (num == 0) {
num = 2;
}
Threads::ThreadPool *pool = (Threads::ThreadPool *) malloc(sizeof(Threads::ThreadPool));
pool->thread_cnt = num;
pthread_mutex_init(&(pool->work_mutex), NULL);
pthread_cond_init(&(pool->work_cond), NULL);
pthread_cond_init(&(pool->working_cond), NULL);
pool->work_first = NULL;
pool->work_last = NULL;
for (i=0; i<num; i++) {
pthread_create(&thread, NULL, pool_worker, pool);
pthread_detach(thread);
}
return pool;
}
void pool_wait(Threads::ThreadPool *pool)
{
if (pool == NULL) {
return;
}
pthread_mutex_lock(&(pool->work_mutex));
while (true) {
if ((!pool->stop && pool->working_cnt != 0) || (pool->stop && pool->thread_cnt != 0)) {
pthread_cond_wait(&(pool->working_cond), &(pool->work_mutex));
} else {
break;
}
}
pthread_mutex_unlock(&(pool->work_mutex));
}
void pool_destroy(Threads::ThreadPool *pool)
{
Threads::Job *work;
Threads::Job *work2;
if (pool == NULL) {
return;
}
pthread_mutex_lock(&(pool->work_mutex));
work = pool->work_first;
while (work != NULL) {
work2 = work->next;
pool_work_destroy(work);
work = work2;
}
pool->stop = true;
pthread_cond_broadcast(&(pool->work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
pool_wait(pool);
pthread_mutex_destroy(&(pool->work_mutex));
pthread_cond_destroy(&(pool->work_cond));
pthread_cond_destroy(&(pool->working_cond));
free(pool);
}
bool pool_add_work(Threads::ThreadPool *pool, JobFunc func, void *arg)
{
Threads::Job *work;
if (pool == NULL) {
return false;
}
work = Threads::pool_work_create(func, arg);
if (work == NULL) {
return false;
}
pthread_mutex_lock(&(pool->work_mutex));
if (pool->work_first == NULL) {
pool->work_first = work;
pool->work_last = pool->work_first;
} else {
pool->work_last->next = work;
pool->work_last = work;
}
pthread_cond_broadcast(&(pool->work_cond));
pthread_mutex_unlock(&(pool->work_mutex));
return true;
}
}
#endif

30
Threads/ThreadPool.h Normal file
View File

@ -0,0 +1,30 @@
#ifndef THREADS_THREAD_POOL_H
#define THREADS_THREAD_POOL_H
#include <stdio.h>
#include <stdlib.h>
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <pthread.h>
#endif
#include "OSWrapper.h"
#include "Job.h"
namespace Threads {
typedef struct {
Job *work_first;
Job *work_last;
pthread_mutex_t work_mutex;
pthread_cond_t work_cond;
pthread_cond_t working_cond;
size_t working_cnt;
size_t thread_cnt;
bool stop;
} ThreadPool;
}
#endif

View File

@ -107,7 +107,7 @@ namespace Utils {
typedef struct {
char *content;
int size;
int size = 0; // doesn't include null termination (same as strlen)
} file_body;
file_body read_file (const char *filename)

View File

@ -33,4 +33,34 @@
return 0; } \
})
#define ASSERT_CONTAINS(a, b) ({\
if (strstr((a), (b)) != NULL) { \
printf("."); \
} else { \
printf("[F]"); \
printf("\n\n%s - %i: ", __FILE__, __LINE__); \
printf("%s", (a)); printf(" !contains "); printf("%s", (b)); printf("\n"); \
return 0; } \
})
#define ASSERT_TRUE(a) ({\
if ((a) == true) { \
printf("."); \
} else { \
printf("[F]"); \
printf("\n\n%s - %i: ", __FILE__, __LINE__); \
printf("%d", (a)); printf(" != "); printf("1"); printf("\n"); \
return 0; } \
})
#define ASSERT_FALSE(a) ({\
if ((a) == false) { \
printf("."); \
} else { \
printf("[F]"); \
printf("\n\n%s - %i: ", __FILE__, __LINE__); \
printf("%d", (a)); printf(" != "); printf("1"); printf("\n"); \
return 0; } \
})
#endif

View File

@ -19,23 +19,54 @@
namespace Utils {
namespace WebUtils {
static bool CURL_SETUP = false;
inline
void setup()
{
curl_global_init(CURL_GLOBAL_DEFAULT);
Utils::WebUtils::CURL_SETUP = true;
}
inline
void clean()
{
curl_global_cleanup();
Utils::WebUtils::CURL_SETUP = false;
}
int write_download_data (void *ptr, size_t size, size_t nmeb, void *stream)
{
Utils::FileUtils::file_body *out = (Utils::FileUtils::file_body *) stream;
size_t outSize = size * nmeb;
out->content = (char *) malloc(outSize + 1);
if (!out->content) {
fprintf(stderr, "CRITICAL: malloc failed");
if (out->size == 0) {
// first time this function is called for a specific resource
out->content = (char *) malloc((outSize + 1) * sizeof(char));
if (!out->content) {
fprintf(stderr, "CRITICAL: malloc failed");
return 0;
}
return 0;
}
if (out->content) {
memcpy(out->content, ptr, outSize);
if (out->content) {
memcpy(out->content, ptr, outSize * sizeof(char));
out->size = (int) outSize;
out->content[out->size] = 0;
out->size = (int) outSize;
out->content[out->size] = 0;
}
} else {
// the max buffer (16384 = 16k) is exceeded, then this is called again and needs to get extended
char *temp = (char *) malloc((outSize + out->size + 1) * sizeof(char));
memcpy(temp, out->content, out->size * sizeof(char));
memcpy(temp + out->size * sizeof(char), ptr, outSize * sizeof(char));
free(out->content);
out->content = temp;
out->size += outSize;
out->content[out->size] = 0;
}
return out->size;
@ -46,9 +77,15 @@ namespace Utils {
{
Utils::FileUtils::file_body page = {0};
if (!Utils::WebUtils::CURL_SETUP) {
Utils::WebUtils::setup();
}
CURL *h = curl_easy_init();
curl_easy_setopt(h, CURLOPT_URL, url);
curl_easy_setopt(h, CURLOPT_PRIVATE, url);
curl_easy_setopt(h, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(h, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(h, CURLOPT_WRITEFUNCTION, write_download_data);
curl_easy_setopt(h, CURLOPT_WRITEDATA, &page);
curl_easy_perform(h);
@ -56,6 +93,76 @@ namespace Utils {
return page;
}
inline
void add_transfer(CURLM *cm, char *url, Utils::FileUtils::file_body *page, int *left)
{
CURL *h = curl_easy_init();
curl_easy_setopt(h, CURLOPT_URL, url);
curl_easy_setopt(h, CURLOPT_PRIVATE, url);
curl_easy_setopt(h, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(h, CURLOPT_SSL_VERIFYHOST, 0L);
curl_easy_setopt(h, CURLOPT_WRITEFUNCTION, write_download_data);
curl_easy_setopt(h, CURLOPT_WRITEDATA, page);
curl_multi_add_handle(cm, h);
++(*left);
}
Utils::FileUtils::file_body *multi_download (char **urls, int count, int max_parrallel)
{
Utils::FileUtils::file_body *pages = (Utils::FileUtils::file_body *) malloc(count * sizeof(Utils::FileUtils::file_body));
if (!Utils::WebUtils::CURL_SETUP) {
Utils::WebUtils::setup();
}
CURLM *cm = curl_multi_init();
curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, max_parrallel);
int downloads;
int left = 0;
for(downloads = 0; downloads < max_parrallel && downloads < count; ++downloads) {
Utils::WebUtils::add_transfer(cm, urls[downloads], &pages[downloads], &left);
}
CURLMsg *msg;
int msgs_left = -1;
do {
int alive = 1;
curl_multi_perform(cm, &alive);
while((msg = curl_multi_info_read(cm, &msgs_left))) {
if(msg->msg == CURLMSG_DONE) {
char *url;
CURL *e = msg->easy_handle;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &url);
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
--left;
} else {
fprintf(stderr, "E: CURLMsg (%d)\n", msg->msg);
}
if(downloads < count) {
++downloads;
add_transfer(cm, urls[downloads], &pages[downloads], &left);
}
}
if(left > 0) {
curl_multi_wait(cm, NULL, 0, 1000, NULL);
}
} while(left > 0);
curl_multi_cleanup(cm);
return pages;
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Karaka
*
* @package Test
* @copyright Dennis Eichhorn
* @license OMS License 1.0
* @version 1.0.0
* @link https://karaka.app
*/
#include <stdio.h>
#include "../../Threads/Thread.h"
#include "../../Utils/TestUtils.h"
static const size_t num_threads = 4;
static const size_t num_items = 10;
void worker(void *arg)
{
int *val = (int *) arg;
int old = *val;
*val += 100;
// printf("tid=%p, old=%d, val=%d\n", (void *) pthread_self(), old, *val);
if (*val % 2) {
sleep(1);
}
}
int main(int argc, char** argv)
{
printf("Threads:\n\n");
printf("ThreadPool:\n");
int i;
Threads::ThreadPool *pool = Threads::pool_create(num_threads);
int *vals = (int *) calloc(num_items, sizeof(int));
for (i = 0; i < num_items; ++i) {
vals[i] = i;
Threads::pool_add_work(pool, worker, vals + i);
}
Threads::pool_wait(pool);
sleep(1);
bool test = true;
for (i = 0; i < num_items; ++i) {
// printf("%d\n", vals[i]);
test = test && 100 + i == vals[i];
}
ASSERT_EQUALS(test, true, "%d", "%d");
free(vals);
Threads::pool_destroy(pool);
printf("\n\n");
return 0;
}

View File

@ -0,0 +1,52 @@
/**
* Karaka
*
* @package Test
* @copyright Dennis Eichhorn
* @license OMS License 1.0
* @version 1.0.0
* @link https://karaka.app
*/
#include <stdio.h>
#include "../../Utils/WebUtils.h"
#include "../../Utils/TestUtils.h"
int main(int argc, char** argv)
{
printf("Utils:\n\n");
printf("WebUtils:\n");
Utils::WebUtils::setup();
/* Single download */
Utils::FileUtils::file_body single = Utils::WebUtils::download((char *) "https://jingga.app");
ASSERT_CONTAINS(single.content, "Jingga");
free(single.content);
/* Multi download */
const char *urls[] = {
"https://jingga.app/terms",
"https://jingga.app/imprint",
"https://jingga.app/privacy"
};
const char *contains[] = {
"Terms of Service",
"Commercial register",
"What is personal data?"
};
Utils::FileUtils::file_body *multi = Utils::WebUtils::multi_download((char **) urls, 3, 3);
for (int i = 0; i < 3; ++i) {
ASSERT_CONTAINS(multi[i].content, contains[i]);
free(multi[i].content);
}
free(multi);
Utils::WebUtils::clean();
printf("\n\n");
return 0;
}

View File

@ -1,5 +1,11 @@
#!/bin/bash
# Info: For debugging use -g and comment the && rm part
BASEDIR=$(dirname "$(readlink -f "$0")")
g++ $BASEDIR/Image/ImageUtilsTest.cpp -o $BASEDIR/Image/ImageUtilsTest && $BASEDIR/Image/ImageUtilsTest && rm $BASEDIR/Image/ImageUtilsTest
g++ $BASEDIR/Image/ImageUtilsTest.cpp -o $BASEDIR/Image/ImageUtilsTest && $BASEDIR/Image/ImageUtilsTest && rm $BASEDIR/Image/ImageUtilsTest
g++ $BASEDIR/Threads/ThreadPoolTest.cpp -o $BASEDIR/Threads/ThreadPoolTest && $BASEDIR/Threads/ThreadPoolTest && rm $BASEDIR/Threads/ThreadPoolTest
g++ $BASEDIR/Utils/WebUtilsTest.cpp -o $BASEDIR/Utils/WebUtilsTest -l curl && $BASEDIR/Utils/WebUtilsTest && rm $BASEDIR/Utils/WebUtilsTest