- Added stream_set_blocking for non-blocking mode. v0.1.2
authorOe.Salim Duran <salim.duran@graph-it.com>
Thu, 9 Mar 2023 11:35:14 +0000 (12:35 +0100)
committerOe.Salim Duran <salim.duran@graph-it.com>
Thu, 9 Mar 2023 11:35:14 +0000 (12:35 +0100)
- Added stream_select for _readBytes and _writeBytes methods.

psalm.xml
src/Connection.php

index 32408861d98fb019f6e5662eb88af56e5a20fa21..8e3aa414eadde435075a5a917dd87463ef48400e 100644 (file)
--- a/psalm.xml
+++ b/psalm.xml
@@ -1,10 +1,12 @@
 <?xml version="1.0"?>
 <psalm
-    errorLevel="1"
-    resolveFromConfigFile="true"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-    xmlns="https://getpsalm.org/schema/config"
-    xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
+  errorLevel="1"
+  findUnusedCode="false"
+  findUnusedBaselineEntry="true"
+  resolveFromConfigFile="true"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xmlns="https://getpsalm.org/schema/config"
+  xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
 >
     <projectFiles>
         <directory name="src" />
index 608f508a8c2b592cbf21535376565b117fa4a3f9..8fd7a908683ece5e80ba1b61373de5a0d08dd33b 100644 (file)
@@ -2,9 +2,12 @@
 
 namespace Graphit\Graph\Client;
 
+use Graphit\Graph\Common\RemoteConnection;
 use Graphit\Graph\Common\ConnectionInterface;
 
 class Connection extends RemoteConnection implements ConnectionInterface {
+  const STREAM_SELECT_TIMEOUT = 2;
 
   /** @var string */
   private $url;
@@ -75,8 +78,13 @@ class Connection extends RemoteConnection implements ConnectionInterface {
     $timeout = (float)ini_get('default_socket_timeout');
     $this->socket = stream_socket_client($this->url, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, $context);
     if ($this->socket === false) {
-      throw new \Exception("stream_socket_server() failed: $errstr\n");
+      throw new \Exception("stream_socket_server() failed: $errstr");
     }
+    
+    if ( !stream_set_blocking($this->socket, false)) {
+      throw new \Exception("stream_set_blocking(false) failed!");
+    }
+
     $this->_readMessage();
   }
 
@@ -91,15 +99,27 @@ class Connection extends RemoteConnection implements ConnectionInterface {
 
     $bytes = '';
     while (strlen($bytes) < $size) {
-      $fread = fread($this->socket, $size - strlen($bytes));
-      if ($fread === false) {
-        break;
+
+      $rs = [$this->socket]; 
+      $ws = NULL; 
+      $es = NULL;
+      if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT)) {
+        if (feof($this->socket)) {
+          break;
+        }
+
+        $data = fread($this->socket, $size - strlen($bytes));
+        if ($data === false) {
+          break;
+        }
+        $bytes .= $data;
       }
-      $bytes.= $fread;
     }
+    
     if (strlen($bytes) != $size) {
       throw new \Exception("Unable to read $size Bytes from the socket");
     }
+
     return $bytes;
   }
 
@@ -124,11 +144,22 @@ class Connection extends RemoteConnection implements ConnectionInterface {
 
     $size = strlen($bytes);
     for ($written = 0; $written < strlen($bytes); $written += $fwrite) {
+      $rs = null;
+      $ws = [$this->socket];
+      $es = null;
+      if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT) === false) {
+        break;
+      }
+      if (feof($this->socket)) {
+        break;
+      }
+      
       $fwrite = fwrite($this->socket, substr($bytes, $written));
       if ($fwrite === false) {
         break;
       }
     }
+
     if ($written != $size) {
       throw new \Exception("Unable to write $size Bytes to socket");
     }