initial commit v0.0.1
authorSebastian Wassen <sebastian.wassen@graph-it.org>
Tue, 11 Oct 2016 13:00:59 +0000 (15:00 +0200)
committerSebastian Wassen <sebastian.wassen@graph-it.org>
Tue, 11 Oct 2016 13:00:59 +0000 (15:00 +0200)
.gitignore [new file with mode: 0644]
composer.json [new file with mode: 0644]
src/Daemon.php [new file with mode: 0644]
src/MainThread.php [new file with mode: 0644]
src/Service.php [new file with mode: 0644]
src/Thread.php [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..48b8bf9
--- /dev/null
@@ -0,0 +1 @@
+vendor/
diff --git a/composer.json b/composer.json
new file mode 100644 (file)
index 0000000..a570d23
--- /dev/null
@@ -0,0 +1,18 @@
+{
+    "name": "graphit/concurrent",
+    "type": "library",
+    "authors": [
+        {
+            "name": "Graph-IT",
+            "email": "info@graph-it.com"
+        }
+    ],
+    "require": {
+        "php": ">=5.6.0"
+    },
+    "autoload": {
+        "psr-4": {
+            "Graphit\\Concurrent\\": "src/"
+        }
+    }
+}
diff --git a/src/Daemon.php b/src/Daemon.php
new file mode 100644 (file)
index 0000000..3bfd7a9
--- /dev/null
@@ -0,0 +1,59 @@
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Daemon extends Thread
+{
+  public function start()
+  {
+    if ($this->isRunning()) {
+      return false;
+    }
+
+    $fifo_file = '/tmp/thread_fifo_'.posix_getpid();
+    posix_mkfifo($fifo_file, 0600);
+
+    $starterpid = pcntl_fork();
+    if ($starterpid == -1) { // Unable to start Starter-Thread
+      unlink($fifo_file);
+    } else if ($starterpid != 0) { // Pull Child-PID from Starter-Thread
+
+      $fifo = fopen($fifo_file, 'r');
+      $data = fread($fifo, 2);
+      fclose($fifo);
+      unlink($fifo_file);
+
+      $childpid = ord($data[0]) * 256 + ord($data[1]);
+      if ($childpid === 256*256-1) {
+        return false;
+      }
+
+      $this->pid = $childpid;
+      return true;
+    } else { // Execute Starter-Thread
+
+      $childpid = pcntl_fork();
+      if ($childpid == -1) { // Unable to start Child-Thread
+        $childpid = 256*256-1;
+      }
+      if ($childpid != 0) { // Push Child-PID from Starter-Thread
+        $data = chr(floor($childpid / 256)).chr($childpid % 256);
+
+        $fifo = fopen($fifo_file, 'w');
+        fwrite($fifo, $data);
+        fclose($fifo);
+
+      } else { // execute Child-Thread
+        Thread::makeCurrent($this);
+
+        $this->setup();
+        $this->run();
+        $this->teardown();
+
+        exit();
+      }
+
+      exit();
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/MainThread.php b/src/MainThread.php
new file mode 100644 (file)
index 0000000..e60e803
--- /dev/null
@@ -0,0 +1,13 @@
+<?php
+
+namespace Graphit\Concurrent;
+
+class MainThread extends Thread
+{
+  public function getChildren()
+  {
+    return parent::getChildren();
+  }
+
+  abstract function run();
+}
\ No newline at end of file
diff --git a/src/Service.php b/src/Service.php
new file mode 100644 (file)
index 0000000..5f849cd
--- /dev/null
@@ -0,0 +1,59 @@
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Service extends Daemon
+{
+  /** @var string */
+  protected $pidfile;
+
+  public function __construct($pidfile)
+  {
+    $this->pidfile = $pidfile;
+    if (file_exists($this->pidfile)) {
+      $this->pid = trim(file_get_contents($this->pidfile));
+    }
+  }
+
+  public function isRunning()
+  {
+    if ( !$this->pid){
+      if (file_exists($this->pidfile)) {
+        $this->pid = trim(file_get_contents($this->pidfile));
+      }
+    }
+    $running = parent::isRunning();
+    if ( !$running) {
+      if (file_exists($this->pidfile)) {
+        @unlink($this->pidfile);
+      }
+    }
+    return $running;
+  }
+
+  public function stop()
+  {
+    while ($this->isRunning()) {
+      parent::stop();
+      usleep(30000);
+    }
+  }
+
+  public function setup()
+  {
+    parent::setup();
+
+    if ($this->isRunning()) {
+      @file_put_contents($this->pidfile, $this->pid);
+    }
+  }
+
+  public function teardown()
+  {
+    if ($this->isRunning()) {
+      @unlink($this->pidfile);
+    }
+
+    parent::teardown();
+  }
+}
diff --git a/src/Thread.php b/src/Thread.php
new file mode 100644 (file)
index 0000000..eed9e4f
--- /dev/null
@@ -0,0 +1,187 @@
+<?php
+
+namespace Graphit\Concurrent;
+
+abstract class Thread
+{
+  /** @var integer */
+  protected $pid;
+
+  /** @var boolean */
+  protected $stopped = false;
+
+  /** @var \Graphit\Concurrent\Thread[] */
+  protected $children = array();
+
+  /** @var \Graphit\Concurrent\Thread */
+  protected static $current;
+
+  /**
+   * Gibt den aktuellen Thread zurück.
+   *
+   * @return \Graphit\Concurrent\Thread der aktuelle Thread
+   */
+  public static function getCurrent()
+  {
+    if ( !isset(Thread::$current)) {
+      Thread::makeCurrent(new MainThread);
+    }
+    if (Thread::$current->pid !== posix_getpid()) {
+      throw new \Exception('pid mismatch!');
+    }
+    return Thread::$current;
+  }
+
+  protected static function makeCurrent(Thread $current)
+  {
+    Thread::$current = $current;
+    Thread::$current->pid = posix_getpid();
+
+    pcntl_signal(SIGTERM, array(Thread::$current, 'termHandler'));
+    pcntl_signal(SIGCHLD, array(Thread::$current, 'chldHandler'));
+  }
+
+  protected function termHandler()
+  {
+    $this->stopped = true;
+  }
+
+  protected function chldHandler()
+  {
+    while (($childpid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
+      if (isset($this->children[$childpid])) {
+        $child = $this->children[$childpid];
+        $child->isRunning();
+
+        unset($this->children[$childpid]);
+      }
+    }
+  }
+
+  /**
+   * Gibt die Prozess-Id des Threads zurück. Gibt nur den richtigen Wert zurück,
+   * solange $this->isRunning() === true.
+   *
+   * @return integer
+   */
+  public function getPid()
+  {
+    return $this->pid;
+  }
+
+  /**
+   * Gibt die noch laufenden Child-Threads zurück. Gibt nur den richtigen Wert
+   * zurück wenn Thread::getCurrent() === $this ist.
+   *
+   * @return \Graphit\Concurrent\Thread[]
+   */
+  protected function getChildren()
+  {
+    if (function_exists('pcntl_signal_dispatch')) {
+      pcntl_signal_dispatch();
+    } else {
+      declare(ticks=1) {
+        $true = true;
+      }
+    }
+
+    return $this->children;
+  }
+
+  public function isDaemon()
+  {
+    return $this->_daemon;
+  }
+
+  public function setDaemon($daemon)
+  {
+    $this->_daemon = true == $daemon;
+  }
+
+  /**
+   * Gibt zurück ob der Thread aufgefordert wurde sich zu beenden. Gibt nur den
+   * richtigen Wert zurück wenn Thread::getCurrent() === $this ist.
+   *
+   * @return boolean aufgefordert anzuhalten?
+   */
+  public function isStopped()
+  {
+    if (function_exists('pcntl_signal_dispatch')) {
+      pcntl_signal_dispatch();
+    } else {
+      declare(ticks=1) {
+        $true = true;
+      }
+    }
+
+    return $this->stopped;
+  }
+
+  /**
+   * Fordert den Thread auf sich zu beenden.
+   *
+   * @return void
+   */
+  public function stop()
+  {
+    if (null !== $this->pid) {
+      posix_kill($this->pid, 15);
+    }
+  }
+
+  /**
+   * Gibt zurück ob der Thread läuft.
+   *
+   * @return boolean
+   */
+  public function isRunning()
+  {
+    if (null === $this->pid) {
+      return false;
+    }
+    $running = posix_kill($this->pid, 0);
+    if ( !$running) {
+      $this->pid = null;
+    }
+    return $running;
+  }
+
+  /**
+   * Started den Thread
+   *
+   * @return boolean
+   */
+  public function start()
+  {
+    if ($this->isRunning()) {
+      return false;
+    }
+
+    $childpid = pcntl_fork();
+    if ($childpid == -1) { // unable to start Child-Thread
+      return false;
+    } else if ($childpid != 0) { // register Child-Thread in the Parent-Thread
+      $this->pid = $childpid;
+
+      $current = $this->getCurrent();
+      $current->children[$childpid] = $this;
+
+      return true;
+    } else { // execute Child-Thread
+      Thread::makeCurrent($this);
+
+      $this->setup();
+      $this->run();
+      $this->teardown();
+
+      exit();
+    }
+  }
+
+  public function setup() { }
+
+  abstract public function run();
+
+  public function teardown() { }
+}
+