From 7f1eab2b3c80331fdcff10f3d6b4aa2cf36c5f0a Mon Sep 17 00:00:00 2001 From: Sebastian Wassen Date: Tue, 11 Oct 2016 15:00:59 +0200 Subject: [PATCH] initial commit --- .gitignore | 1 + composer.json | 18 +++++ src/Daemon.php | 59 ++++++++++++++ src/MainThread.php | 13 ++++ src/Service.php | 59 ++++++++++++++ src/Thread.php | 187 +++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 337 insertions(+) create mode 100644 .gitignore create mode 100644 composer.json create mode 100644 src/Daemon.php create mode 100644 src/MainThread.php create mode 100644 src/Service.php create mode 100644 src/Thread.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..48b8bf9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor/ diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..a570d23 --- /dev/null +++ b/composer.json @@ -0,0 +1,18 @@ +{ + "name": "graphit/concurrent", + "type": "library", + "authors": [ + { + "name": "Graph-IT", + "email": "info@graph-it.com" + } + ], + "require": { + "php": ">=5.6.0" + }, + "autoload": { + "psr-4": { + "Graphit\\Concurrent\\": "src/" + } + } +} diff --git a/src/Daemon.php b/src/Daemon.php new file mode 100644 index 0000000..3bfd7a9 --- /dev/null +++ b/src/Daemon.php @@ -0,0 +1,59 @@ +isRunning()) { + return false; + } + + $fifo_file = '/tmp/thread_fifo_'.posix_getpid(); + posix_mkfifo($fifo_file, 0600); + + $starterpid = pcntl_fork(); + if ($starterpid == -1) { // Unable to start Starter-Thread + unlink($fifo_file); + } else if ($starterpid != 0) { // Pull Child-PID from Starter-Thread + + $fifo = fopen($fifo_file, 'r'); + $data = fread($fifo, 2); + fclose($fifo); + unlink($fifo_file); + + $childpid = ord($data[0]) * 256 + ord($data[1]); + if ($childpid === 256*256-1) { + return false; + } + + $this->pid = $childpid; + return true; + } else { // Execute Starter-Thread + + $childpid = pcntl_fork(); + if ($childpid == -1) { // Unable to start Child-Thread + $childpid = 256*256-1; + } + if ($childpid != 0) { // Push Child-PID from Starter-Thread + $data = chr(floor($childpid / 256)).chr($childpid % 256); + + $fifo = fopen($fifo_file, 'w'); + fwrite($fifo, $data); + fclose($fifo); + + } else { // execute Child-Thread + Thread::makeCurrent($this); + + $this->setup(); + $this->run(); + $this->teardown(); + + exit(); + } + + exit(); + } + } +} \ No newline at end of file diff --git a/src/MainThread.php b/src/MainThread.php new file mode 100644 index 0000000..e60e803 --- /dev/null +++ b/src/MainThread.php @@ -0,0 +1,13 @@ +pidfile = $pidfile; + if (file_exists($this->pidfile)) { + $this->pid = trim(file_get_contents($this->pidfile)); + } + } + + public function isRunning() + { + if ( !$this->pid){ + if (file_exists($this->pidfile)) { + $this->pid = trim(file_get_contents($this->pidfile)); + } + } + $running = parent::isRunning(); + if ( !$running) { + if (file_exists($this->pidfile)) { + @unlink($this->pidfile); + } + } + return $running; + } + + public function stop() + { + while ($this->isRunning()) { + parent::stop(); + usleep(30000); + } + } + + public function setup() + { + parent::setup(); + + if ($this->isRunning()) { + @file_put_contents($this->pidfile, $this->pid); + } + } + + public function teardown() + { + if ($this->isRunning()) { + @unlink($this->pidfile); + } + + parent::teardown(); + } +} diff --git a/src/Thread.php b/src/Thread.php new file mode 100644 index 0000000..eed9e4f --- /dev/null +++ b/src/Thread.php @@ -0,0 +1,187 @@ +pid !== posix_getpid()) { + throw new \Exception('pid mismatch!'); + } + return Thread::$current; + } + + protected static function makeCurrent(Thread $current) + { + Thread::$current = $current; + Thread::$current->pid = posix_getpid(); + + pcntl_signal(SIGTERM, array(Thread::$current, 'termHandler')); + pcntl_signal(SIGCHLD, array(Thread::$current, 'chldHandler')); + } + + protected function termHandler() + { + $this->stopped = true; + } + + protected function chldHandler() + { + while (($childpid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) { + if (isset($this->children[$childpid])) { + $child = $this->children[$childpid]; + $child->isRunning(); + + unset($this->children[$childpid]); + } + } + } + + /** + * Gibt die Prozess-Id des Threads zurück. Gibt nur den richtigen Wert zurück, + * solange $this->isRunning() === true. + * + * @return integer + */ + public function getPid() + { + return $this->pid; + } + + /** + * Gibt die noch laufenden Child-Threads zurück. Gibt nur den richtigen Wert + * zurück wenn Thread::getCurrent() === $this ist. + * + * @return \Graphit\Concurrent\Thread[] + */ + protected function getChildren() + { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } else { + declare(ticks=1) { + $true = true; + } + } + + return $this->children; + } + + public function isDaemon() + { + return $this->_daemon; + } + + public function setDaemon($daemon) + { + $this->_daemon = true == $daemon; + } + + /** + * Gibt zurück ob der Thread aufgefordert wurde sich zu beenden. Gibt nur den + * richtigen Wert zurück wenn Thread::getCurrent() === $this ist. + * + * @return boolean aufgefordert anzuhalten? + */ + public function isStopped() + { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } else { + declare(ticks=1) { + $true = true; + } + } + + return $this->stopped; + } + + /** + * Fordert den Thread auf sich zu beenden. + * + * @return void + */ + public function stop() + { + if (null !== $this->pid) { + posix_kill($this->pid, 15); + } + } + + /** + * Gibt zurück ob der Thread läuft. + * + * @return boolean + */ + public function isRunning() + { + if (null === $this->pid) { + return false; + } + $running = posix_kill($this->pid, 0); + if ( !$running) { + $this->pid = null; + } + return $running; + } + + /** + * Started den Thread + * + * @return boolean + */ + public function start() + { + if ($this->isRunning()) { + return false; + } + + $childpid = pcntl_fork(); + if ($childpid == -1) { // unable to start Child-Thread + return false; + } else if ($childpid != 0) { // register Child-Thread in the Parent-Thread + $this->pid = $childpid; + + $current = $this->getCurrent(); + $current->children[$childpid] = $this; + + return true; + } else { // execute Child-Thread + Thread::makeCurrent($this); + + $this->setup(); + $this->run(); + $this->teardown(); + + exit(); + } + } + + public function setup() { } + + abstract public function run(); + + public function teardown() { } +} + -- 2.34.1