<?xml version="1.0"?>
<psalm
- errorLevel="1"
- resolveFromConfigFile="true"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="https://getpsalm.org/schema/config"
- xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
+ errorLevel="1"
+ findUnusedCode="false"
+ findUnusedBaselineEntry="true"
+ resolveFromConfigFile="true"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="https://getpsalm.org/schema/config"
+ xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
>
<projectFiles>
<directory name="src" />
namespace Graphit\Graph\Client;
+use Graphit\Graph\Common\RemoteConnection;
use Graphit\Graph\Common\ConnectionInterface;
class Connection extends RemoteConnection implements ConnectionInterface {
+
+ const STREAM_SELECT_TIMEOUT = 2;
/** @var string */
private $url;
$timeout = (float)ini_get('default_socket_timeout');
$this->socket = stream_socket_client($this->url, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, $context);
if ($this->socket === false) {
- throw new \Exception("stream_socket_server() failed: $errstr\n");
+ throw new \Exception("stream_socket_server() failed: $errstr");
}
+
+ if ( !stream_set_blocking($this->socket, false)) {
+ throw new \Exception("stream_set_blocking(false) failed!");
+ }
+
$this->_readMessage();
}
$bytes = '';
while (strlen($bytes) < $size) {
- $fread = fread($this->socket, $size - strlen($bytes));
- if ($fread === false) {
- break;
+
+ $rs = [$this->socket];
+ $ws = NULL;
+ $es = NULL;
+ if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT)) {
+ if (feof($this->socket)) {
+ break;
+ }
+
+ $data = fread($this->socket, $size - strlen($bytes));
+ if ($data === false) {
+ break;
+ }
+ $bytes .= $data;
}
- $bytes.= $fread;
}
+
if (strlen($bytes) != $size) {
throw new \Exception("Unable to read $size Bytes from the socket");
}
+
return $bytes;
}
$size = strlen($bytes);
for ($written = 0; $written < strlen($bytes); $written += $fwrite) {
+ $rs = null;
+ $ws = [$this->socket];
+ $es = null;
+ if (@stream_select($rs, $ws, $es, self::STREAM_SELECT_TIMEOUT) === false) {
+ break;
+ }
+ if (feof($this->socket)) {
+ break;
+ }
+
$fwrite = fwrite($this->socket, substr($bytes, $written));
if ($fwrite === false) {
break;
}
}
+
if ($written != $size) {
throw new \Exception("Unable to write $size Bytes to socket");
}