Implements JobQueue draft. Maybe some minor bugs existing and the
callback may need some reworking.
This commit is contained in:
Dennis Eichhorn 2016-07-14 20:57:41 +02:00
parent 1ad61d92fa
commit d4623539e6
3 changed files with 192 additions and 7 deletions

View File

@ -66,14 +66,14 @@ class PriorityQueue implements \Countable, \Serializable
* @since 1.0.0
* @author Dennis Eichhorn <d.eichhorn@oms.com>
*/
public function insert($data, float $priority = 0.0) : int
public function insert($data, float $priority = 1.0) : int
{
do {
$key = rand();
} while (array_key_exists($key, $this->queue));
if ($this->count === 0) {
$this->queue[$key] = ['key' => $key, 'data' => $data, 'priority' => $priority];
$this->queue[$key] = ['key' => $key, 'job' => $job, 'priority' => $priority];
} else {
$pos = 0;
foreach ($this->queue as $ele) {
@ -84,7 +84,7 @@ class PriorityQueue implements \Countable, \Serializable
$pos++;
}
array_splice($original, $pos, 0, [$key => ['key' => $key, 'data' => $data, 'priority' => $priority]]);
array_splice($original, $pos, 0, [$key => ['key' => $key, 'job' => $job, 'priority' => $priority]]);
}
return $key;
@ -100,7 +100,7 @@ class PriorityQueue implements \Countable, \Serializable
* @since 1.0.0
* @author Dennis Eichhorn <d.eichhorn@oms.com>
*/
public function increaseAll(float $increase)
public function increaseAll(float $increase = 0.1)
{
foreach ($this->queue as $key => &$ele) {
$ele['priority'] += $increase;
@ -115,10 +115,9 @@ class PriorityQueue implements \Countable, \Serializable
* @since 1.0.0
* @author Dennis Eichhorn <d.eichhorn@oms.com>
*/
public function get()
public function pop()
{
$ele = array_pop($this->queue);
$this->queue[$ele['key']] = $ele;
return array_pop($this->queue);
}
/**

51
Utils/JobQueue/Job.php Normal file
View File

@ -0,0 +1,51 @@
<?php
/**
* Orange Management
*
* PHP Version 7.0
*
* @category TBD
}
* @package TBD
* @author OMS Development Team <dev@oms.com>
* @author Dennis Eichhorn <d.eichhorn@oms.com>
* @copyright 2013 Dennis Eichhorn
* @license OMS License 1.0
* @version 1.0.0
* @link http://orange-management.com
*/
namespace phpOMS\Utils\JobQueue;
/**
* Array utils.
*
* @category Framework
* @package phpOMS\Utils
* @author OMS Development Team <dev@oms.com>
* @author Dennis Eichhorn <d.eichhorn@oms.com>
* @license OMS License 1.0
* @link http://orange-management.com
* @since 1.0.0
*/
class Job
{
private $priority = 0.0;
private $callback = null;
public function __construct($callback, float $priority = 0.0) {
$this->priority = $priority;
$this->callback = $callback;
}
public function execute() {
$this->callback();
}
public function getPriority() : float {
return $this->priority;
}
public function setPriority(float $priority) {
$this->priority = $priority;
}
}

135
Utils/JobQueue/JobQueue.php Normal file
View File

@ -0,0 +1,135 @@
<?php
/**
* Orange Management
*
* PHP Version 7.0
*
* @category TBD
* @package TBD
* @author OMS Development Team <dev@oms.com>
* @author Dennis Eichhorn <d.eichhorn@oms.com>
* @copyright 2013 Dennis Eichhorn
* @license OMS License 1.0
* @version 1.0.0
* @link http://orange-management.com
*/
namespace phpOMS\Utils\JobQueue;
use \phpOMS\Stdlib\Queue\PriorityQueue;
/**
* Array utils.
*
* @category Framework
* @package phpOMS\Utils
* @author OMS Development Team <dev@oms.com>
* @author Dennis Eichhorn <d.eichhorn@oms.com>
* @license OMS License 1.0
* @link http://orange-management.com
* @since 1.0.0
*/
class JobQueue
{
private $queue = null;
private $run = true;
private $suspended = false;
private $isTerminating = true;
private $isDeamonized;
public function __construct() {
$this->queue = new PriorityQueue();
}
public function dispatch(Job $job) {
$this->queue->insert($job, $job->getPriority());
}
public function run() {
$this->run = true;
$this->suspended = false;
if($this->isDeamonized) {
if($pid = pcntl_fork()) {
return $pid;
}
$this->runAsDeamon();
if (posix_setsid() < 0 || $pid = pcntl_fork()) {
return;
}
}
while($this->run) {
while(!$this->suspended)
if($this->deamonized) {
// todo: see if still unsuspended and still running (db, file etc)
}
$job = $this->queue->pop();
$this->queue->increaseAll();
$job['job']->execute();
if($this->isTerminating && $this->queue->count() < 1) {
$this->suspended = true;
$this->run = false;
}
sleep(1);
}
sleep(1);
}
}
public function setRunning(bool $run = true) {
$this->run = $run;
$this->suspended = $run;
}
public function isRunning() : bool {
return $this->run;
}
public function setSuspended(bool $suspended = true) {
$this->suspended = $suspended;
}
public function isSuspended() : bool {
return $this->suspended;
}
public function isTerminating() : bool {
return $this->isTerminating;
}
public function setTerminating(bool $terminating = true) {
$this->isTerminating = $terminating;
}
public function isDeamonized() : bool {
return $this->isDeamonized;
}
public function setDeamonized(bool $deamonized) {
$this->isDeamonized = $deamonized;
}
private function runAsDeamon() {
ob_end_clean();
fclose(STDIN);
fclose(STDOUT);
fclose(STDERR);
function shutdown() {
posix_kill(posix_getpid(), SIGHUP);
}
register_shutdown_function('shutdown');
}
private function savePid() {
// todo: save pid somewhere for kill
}
}