From 15f2de3c39b0a9c344ad07fa3c004a30280ecad6 Mon Sep 17 00:00:00 2001 From: "Oe.Salim Duran" Date: Thu, 9 Mar 2023 12:35:14 +0100 Subject: [PATCH] - Added stream_set_blocking for non-blocking mode. - Added stream_select for _readBytes and _writeBytes methods. --- psalm.xml | 12 +++++++----- src/Connection.php | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/psalm.xml b/psalm.xml index 3240886..8e3aa41 100644 --- a/psalm.xml +++ b/psalm.xml @@ -1,10 +1,12 @@ diff --git a/src/Connection.php b/src/Connection.php index 608f508..8fd7a90 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -2,9 +2,12 @@ namespace Graphit\Graph\Client; +use Graphit\Graph\Common\RemoteConnection; use Graphit\Graph\Common\ConnectionInterface; class Connection extends RemoteConnection implements ConnectionInterface { + + const STREAM_SELECT_TIMEOUT = 2; /** @var string */ private $url; @@ -75,8 +78,13 @@ class Connection extends RemoteConnection implements ConnectionInterface { $timeout = (float)ini_get('default_socket_timeout'); $this->socket = stream_socket_client($this->url, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, $context); if ($this->socket === false) { - throw new \Exception("stream_socket_server() failed: $errstr\n"); + throw new \Exception("stream_socket_server() failed: $errstr"); } + + if ( !stream_set_blocking($this->socket, false)) { + throw new \Exception("stream_set_blocking(false) failed!"); + } + $this->_readMessage(); } @@ -91,15 +99,27 @@ class Connection extends RemoteConnection implements ConnectionInterface { $bytes = ''; while (strlen($bytes) < $size) { - $fread = fread($this->socket, $size - strlen($bytes)); - if ($fread === false) { - break; + + $rs = [$this->socket]; + $ws = NULL; + $es = NULL; + if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT)) { + if (feof($this->socket)) { + break; + } + + $data = fread($this->socket, $size - strlen($bytes)); + if ($data === false) { + break; + } + $bytes .= $data; } - $bytes.= $fread; } + if (strlen($bytes) != $size) { throw new \Exception("Unable to read $size Bytes from the socket"); } + return $bytes; } @@ -124,11 +144,22 @@ class Connection extends RemoteConnection implements ConnectionInterface { $size = strlen($bytes); for ($written = 0; $written < strlen($bytes); $written += $fwrite) { + $rs = null; + $ws = [$this->socket]; + $es = null; + if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT) === false) { + break; + } + if (feof($this->socket)) { + break; + } + $fwrite = fwrite($this->socket, substr($bytes, $written)); if ($fwrite === false) { break; } } + if ($written != $size) { throw new \Exception("Unable to write $size Bytes to socket"); } -- 2.34.1