--- /dev/null
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Daemon extends Thread
+{
+ public function start()
+ {
+ if ($this->isRunning()) {
+ return false;
+ }
+
+ $fifo_file = '/tmp/thread_fifo_'.posix_getpid();
+ posix_mkfifo($fifo_file, 0600);
+
+ $starterpid = pcntl_fork();
+ if ($starterpid == -1) { // Unable to start Starter-Thread
+ unlink($fifo_file);
+ } else if ($starterpid != 0) { // Pull Child-PID from Starter-Thread
+
+ $fifo = fopen($fifo_file, 'r');
+ $data = fread($fifo, 2);
+ fclose($fifo);
+ unlink($fifo_file);
+
+ $childpid = ord($data[0]) * 256 + ord($data[1]);
+ if ($childpid === 256*256-1) {
+ return false;
+ }
+
+ $this->pid = $childpid;
+ return true;
+ } else { // Execute Starter-Thread
+
+ $childpid = pcntl_fork();
+ if ($childpid == -1) { // Unable to start Child-Thread
+ $childpid = 256*256-1;
+ }
+ if ($childpid != 0) { // Push Child-PID from Starter-Thread
+ $data = chr(floor($childpid / 256)).chr($childpid % 256);
+
+ $fifo = fopen($fifo_file, 'w');
+ fwrite($fifo, $data);
+ fclose($fifo);
+
+ } else { // execute Child-Thread
+ Thread::makeCurrent($this);
+
+ $this->setup();
+ $this->run();
+ $this->teardown();
+
+ exit();
+ }
+
+ exit();
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Service extends Daemon
+{
+ /** @var string */
+ protected $pidfile;
+
+ public function __construct($pidfile)
+ {
+ $this->pidfile = $pidfile;
+ if (file_exists($this->pidfile)) {
+ $this->pid = trim(file_get_contents($this->pidfile));
+ }
+ }
+
+ public function isRunning()
+ {
+ if ( !$this->pid){
+ if (file_exists($this->pidfile)) {
+ $this->pid = trim(file_get_contents($this->pidfile));
+ }
+ }
+ $running = parent::isRunning();
+ if ( !$running) {
+ if (file_exists($this->pidfile)) {
+ @unlink($this->pidfile);
+ }
+ }
+ return $running;
+ }
+
+ public function stop()
+ {
+ while ($this->isRunning()) {
+ parent::stop();
+ usleep(30000);
+ }
+ }
+
+ public function setup()
+ {
+ parent::setup();
+
+ if ($this->isRunning()) {
+ @file_put_contents($this->pidfile, $this->pid);
+ }
+ }
+
+ public function teardown()
+ {
+ if ($this->isRunning()) {
+ @unlink($this->pidfile);
+ }
+
+ parent::teardown();
+ }
+}
--- /dev/null
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Thread
+{
+ /** @var integer */
+ protected $pid;
+
+ /** @var boolean */
+ protected $stopped = false;
+
+ /** @var \Graphit\Concurrent\Thread[] */
+ protected $children = array();
+
+ /** @var \Graphit\Concurrent\Thread */
+ protected static $current;
+
+ /**
+ * Gibt den aktuellen Thread zurück.
+ *
+ * @return \Graphit\Concurrent\Thread der aktuelle Thread
+ */
+ public static function getCurrent()
+ {
+ if ( !isset(Thread::$current)) {
+ Thread::makeCurrent(new MainThread);
+ }
+ if (Thread::$current->pid !== posix_getpid()) {
+ throw new \Exception('pid mismatch!');
+ }
+ return Thread::$current;
+ }
+
+ protected static function makeCurrent(Thread $current)
+ {
+ Thread::$current = $current;
+ Thread::$current->pid = posix_getpid();
+
+ pcntl_signal(SIGTERM, array(Thread::$current, 'termHandler'));
+ pcntl_signal(SIGCHLD, array(Thread::$current, 'chldHandler'));
+ }
+
+ protected function termHandler()
+ {
+ $this->stopped = true;
+ }
+
+ protected function chldHandler()
+ {
+ while (($childpid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
+ if (isset($this->children[$childpid])) {
+ $child = $this->children[$childpid];
+ $child->isRunning();
+
+ unset($this->children[$childpid]);
+ }
+ }
+ }
+
+ /**
+ * Gibt die Prozess-Id des Threads zurück. Gibt nur den richtigen Wert zurück,
+ * solange $this->isRunning() === true.
+ *
+ * @return integer
+ */
+ public function getPid()
+ {
+ return $this->pid;
+ }
+
+ /**
+ * Gibt die noch laufenden Child-Threads zurück. Gibt nur den richtigen Wert
+ * zurück wenn Thread::getCurrent() === $this ist.
+ *
+ * @return \Graphit\Concurrent\Thread[]
+ */
+ protected function getChildren()
+ {
+ if (function_exists('pcntl_signal_dispatch')) {
+ pcntl_signal_dispatch();
+ } else {
+ declare(ticks=1) {
+ $true = true;
+ }
+ }
+
+ return $this->children;
+ }
+
+ public function isDaemon()
+ {
+ return $this->_daemon;
+ }
+
+ public function setDaemon($daemon)
+ {
+ $this->_daemon = true == $daemon;
+ }
+
+ /**
+ * Gibt zurück ob der Thread aufgefordert wurde sich zu beenden. Gibt nur den
+ * richtigen Wert zurück wenn Thread::getCurrent() === $this ist.
+ *
+ * @return boolean aufgefordert anzuhalten?
+ */
+ public function isStopped()
+ {
+ if (function_exists('pcntl_signal_dispatch')) {
+ pcntl_signal_dispatch();
+ } else {
+ declare(ticks=1) {
+ $true = true;
+ }
+ }
+
+ return $this->stopped;
+ }
+
+ /**
+ * Fordert den Thread auf sich zu beenden.
+ *
+ * @return void
+ */
+ public function stop()
+ {
+ if (null !== $this->pid) {
+ posix_kill($this->pid, 15);
+ }
+ }
+
+ /**
+ * Gibt zurück ob der Thread läuft.
+ *
+ * @return boolean
+ */
+ public function isRunning()
+ {
+ if (null === $this->pid) {
+ return false;
+ }
+ $running = posix_kill($this->pid, 0);
+ if ( !$running) {
+ $this->pid = null;
+ }
+ return $running;
+ }
+
+ /**
+ * Started den Thread
+ *
+ * @return boolean
+ */
+ public function start()
+ {
+ if ($this->isRunning()) {
+ return false;
+ }
+
+ $childpid = pcntl_fork();
+ if ($childpid == -1) { // unable to start Child-Thread
+ return false;
+ } else if ($childpid != 0) { // register Child-Thread in the Parent-Thread
+ $this->pid = $childpid;
+
+ $current = $this->getCurrent();
+ $current->children[$childpid] = $this;
+
+ return true;
+ } else { // execute Child-Thread
+ Thread::makeCurrent($this);
+
+ $this->setup();
+ $this->run();
+ $this->teardown();
+
+ exit();
+ }
+ }
+
+ public function setup() { }
+
+ abstract public function run();
+
+ public function teardown() { }
+}
+