diff --git a/Stdlib/Queue/PriorityQueue.php b/Stdlib/Queue/PriorityQueue.php index 866cd1813..0ad22a04c 100644 --- a/Stdlib/Queue/PriorityQueue.php +++ b/Stdlib/Queue/PriorityQueue.php @@ -66,14 +66,14 @@ class PriorityQueue implements \Countable, \Serializable * @since 1.0.0 * @author Dennis Eichhorn */ - public function insert($data, float $priority = 0.0) : int + public function insert($data, float $priority = 1.0) : int { do { $key = rand(); } while (array_key_exists($key, $this->queue)); if ($this->count === 0) { - $this->queue[$key] = ['key' => $key, 'data' => $data, 'priority' => $priority]; + $this->queue[$key] = ['key' => $key, 'job' => $job, 'priority' => $priority]; } else { $pos = 0; foreach ($this->queue as $ele) { @@ -84,7 +84,7 @@ class PriorityQueue implements \Countable, \Serializable $pos++; } - array_splice($original, $pos, 0, [$key => ['key' => $key, 'data' => $data, 'priority' => $priority]]); + array_splice($original, $pos, 0, [$key => ['key' => $key, 'job' => $job, 'priority' => $priority]]); } return $key; @@ -100,7 +100,7 @@ class PriorityQueue implements \Countable, \Serializable * @since 1.0.0 * @author Dennis Eichhorn */ - public function increaseAll(float $increase) + public function increaseAll(float $increase = 0.1) { foreach ($this->queue as $key => &$ele) { $ele['priority'] += $increase; @@ -115,10 +115,9 @@ class PriorityQueue implements \Countable, \Serializable * @since 1.0.0 * @author Dennis Eichhorn */ - public function get() + public function pop() { - $ele = array_pop($this->queue); - $this->queue[$ele['key']] = $ele; + return array_pop($this->queue); } /** diff --git a/Utils/JobQueue/Job.php b/Utils/JobQueue/Job.php new file mode 100644 index 000000000..b97f49560 --- /dev/null +++ b/Utils/JobQueue/Job.php @@ -0,0 +1,51 @@ + + * @author Dennis Eichhorn + * @copyright 2013 Dennis Eichhorn + * @license OMS License 1.0 + * @version 1.0.0 + * @link http://orange-management.com + */ +namespace phpOMS\Utils\JobQueue; + +/** + * Array utils. + * + * @category Framework + * @package phpOMS\Utils + * @author OMS Development Team + * @author Dennis Eichhorn + * @license OMS License 1.0 + * @link http://orange-management.com + * @since 1.0.0 + */ +class Job +{ + private $priority = 0.0; + private $callback = null; + + public function __construct($callback, float $priority = 0.0) { + $this->priority = $priority; + $this->callback = $callback; + } + + public function execute() { + $this->callback(); + } + + public function getPriority() : float { + return $this->priority; + } + + public function setPriority(float $priority) { + $this->priority = $priority; + } +} \ No newline at end of file diff --git a/Utils/JobQueue/JobQueue.php b/Utils/JobQueue/JobQueue.php new file mode 100644 index 000000000..f768d939b --- /dev/null +++ b/Utils/JobQueue/JobQueue.php @@ -0,0 +1,135 @@ + + * @author Dennis Eichhorn + * @copyright 2013 Dennis Eichhorn + * @license OMS License 1.0 + * @version 1.0.0 + * @link http://orange-management.com + */ +namespace phpOMS\Utils\JobQueue; + +use \phpOMS\Stdlib\Queue\PriorityQueue; + +/** + * Array utils. + * + * @category Framework + * @package phpOMS\Utils + * @author OMS Development Team + * @author Dennis Eichhorn + * @license OMS License 1.0 + * @link http://orange-management.com + * @since 1.0.0 + */ +class JobQueue +{ + private $queue = null; + private $run = true; + private $suspended = false; + private $isTerminating = true; + private $isDeamonized; + + public function __construct() { + $this->queue = new PriorityQueue(); + } + + public function dispatch(Job $job) { + $this->queue->insert($job, $job->getPriority()); + } + + public function run() { + $this->run = true; + $this->suspended = false; + + if($this->isDeamonized) { + if($pid = pcntl_fork()) { + return $pid; + } + + $this->runAsDeamon(); + + if (posix_setsid() < 0 || $pid = pcntl_fork()) { + return; + } + } + + while($this->run) { + while(!$this->suspended) + if($this->deamonized) { + // todo: see if still unsuspended and still running (db, file etc) + } + + $job = $this->queue->pop(); + + $this->queue->increaseAll(); + $job['job']->execute(); + + if($this->isTerminating && $this->queue->count() < 1) { + $this->suspended = true; + $this->run = false; + } + + sleep(1); + } + + sleep(1); + } + } + + public function setRunning(bool $run = true) { + $this->run = $run; + $this->suspended = $run; + } + + public function isRunning() : bool { + return $this->run; + } + + public function setSuspended(bool $suspended = true) { + $this->suspended = $suspended; + } + + public function isSuspended() : bool { + return $this->suspended; + } + + public function isTerminating() : bool { + return $this->isTerminating; + } + + public function setTerminating(bool $terminating = true) { + $this->isTerminating = $terminating; + } + + public function isDeamonized() : bool { + return $this->isDeamonized; + } + + public function setDeamonized(bool $deamonized) { + $this->isDeamonized = $deamonized; + } + + private function runAsDeamon() { + ob_end_clean(); + fclose(STDIN); + fclose(STDOUT); + fclose(STDERR); + + function shutdown() { + posix_kill(posix_getpid(), SIGHUP); + } + + register_shutdown_function('shutdown'); + } + + private function savePid() { + // todo: save pid somewhere for kill + } +} \ No newline at end of file