Improve debugging and code optimisations for Socket operations
This commit is contained in:
parent
ad36da0bb1
commit
f9f9fb5345
@ -29,36 +29,27 @@ final class SocketClient {
|
|||||||
private array $session = [];
|
private array $session = [];
|
||||||
|
|
||||||
private const OK = 0;
|
private const OK = 0;
|
||||||
private const EOF = -1;
|
|
||||||
private const TIMEOUT = -2;
|
private const TIMEOUT = -2;
|
||||||
private const RCDO = -3;
|
|
||||||
private const GCOUNT = -4;
|
|
||||||
private const ERROR = -5;
|
private const ERROR = -5;
|
||||||
|
|
||||||
private const TTY_SUCCESS = self::OK;
|
/** @var string Size of our TX buffer */
|
||||||
private const TTY_TIMEOUT = self::TIMEOUT;
|
private const TX_BUF_SIZE = 0xFFFF;
|
||||||
private const TTY_HANGUP = self::RCDO;
|
/** @var string Maximum amount of data to send at a time */
|
||||||
private const TTY_ERROR = self::ERROR;
|
private const TX_SIZE = 0xFFFF;
|
||||||
|
/** @var string Data in the TX buffer */
|
||||||
public const TCP_SPEED = 115200;
|
|
||||||
|
|
||||||
// Buffer for sending
|
|
||||||
private const TX_BUF_SIZE = (0x8100);
|
|
||||||
private int $tx_ptr = 0;
|
|
||||||
private int $tx_free = self::TX_BUF_SIZE;
|
|
||||||
private int $tty_status = 0;
|
|
||||||
private string $tx_buf = '';
|
private string $tx_buf = '';
|
||||||
|
|
||||||
// Buffer for receiving
|
/** @var string Size of our RX buffer */
|
||||||
private const RX_BUF_SIZE = (0x8100);
|
private const RX_BUF_SIZE = 0xFFFF;
|
||||||
private int $rx_ptr = 0;
|
/** @var string Maximum amount of data to received at a time */
|
||||||
private int $rx_left = 0;
|
private const RX_SIZE = 0xFFFF;
|
||||||
|
/** @var string Data in the RX buffer */
|
||||||
private string $rx_buf = '';
|
private string $rx_buf = '';
|
||||||
|
|
||||||
public function __construct (\Socket $connection) {
|
public function __construct (\Socket $connection) {
|
||||||
$this->connection = $connection;
|
$this->connection = $connection;
|
||||||
|
|
||||||
if ($this->type === 'TCP') {
|
if ($this->type === SOCK_STREAM) {
|
||||||
socket_getsockname($connection,$this->address_local,$this->port_local);
|
socket_getsockname($connection,$this->address_local,$this->port_local);
|
||||||
socket_getpeername($connection,$this->address_remote,$this->port_remote);
|
socket_getpeername($connection,$this->address_remote,$this->port_remote);
|
||||||
Log::info(sprintf('%s:+ Connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type));
|
Log::info(sprintf('%s:+ Connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type));
|
||||||
@ -75,16 +66,17 @@ final class SocketClient {
|
|||||||
case 'speed':
|
case 'speed':
|
||||||
return Arr::get($this->session,$key);
|
return Arr::get($this->session,$key);
|
||||||
|
|
||||||
case 'type':
|
case 'rx_free':
|
||||||
switch ($x=socket_get_option($this->connection,SOL_SOCKET,SO_TYPE)) {
|
return self::RX_BUF_SIZE-$this->rx_left;
|
||||||
case SOCK_STREAM:
|
|
||||||
return 'TCP';
|
|
||||||
case SOCK_DGRAM:
|
|
||||||
return 'UDP';
|
|
||||||
|
|
||||||
default:
|
case 'rx_left':
|
||||||
return sprintf('UNKNOWN [%d]',$x);
|
return strlen($this->rx_buf);
|
||||||
}
|
|
||||||
|
case 'tx_free':
|
||||||
|
return self::TX_BUF_SIZE-strlen($this->tx_buf);
|
||||||
|
|
||||||
|
case 'type':
|
||||||
|
return socket_get_option($this->connection,SOL_SOCKET,SO_TYPE);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key));
|
throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key));
|
||||||
@ -102,150 +94,6 @@ final class SocketClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* We'll add to our transmit buffer and if doesnt have space, we'll empty it first
|
|
||||||
*
|
|
||||||
* @param string $data
|
|
||||||
* @return void
|
|
||||||
* @throws \Exception
|
|
||||||
*/
|
|
||||||
public function buffer_add(string $data): void
|
|
||||||
{
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:+ Start [%s] (%d)',self::LOGKEY,$data,strlen($data)));
|
|
||||||
|
|
||||||
//$rc = self::OK;
|
|
||||||
//$tx_ptr = self::TX_BUF_SIZE-$this->tx_free;
|
|
||||||
$ptr = 0;
|
|
||||||
$num_bytes = strlen($data);
|
|
||||||
$this->tty_status = self::TTY_SUCCESS;
|
|
||||||
|
|
||||||
while ($num_bytes) {
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s: - Num Bytes [%d]: TX Free [%d]',self::LOGKEY,$num_bytes,$this->tx_free));
|
|
||||||
|
|
||||||
if ($num_bytes > $this->tx_free) {
|
|
||||||
do {
|
|
||||||
$this->buffer_flush(5);
|
|
||||||
|
|
||||||
if ($this->tty_status === self::TTY_SUCCESS) {
|
|
||||||
$n = min($this->tx_free,$num_bytes);
|
|
||||||
$this->tx_buf = substr($data,$ptr,$n);
|
|
||||||
$this->tx_free -= $n;
|
|
||||||
$num_bytes -= $n;
|
|
||||||
$ptr += $n;
|
|
||||||
}
|
|
||||||
|
|
||||||
} while ($this->tty_status != self::TTY_SUCCESS);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s: - Remaining data to send [%d]',self::LOGKEY,$num_bytes));
|
|
||||||
|
|
||||||
$this->tx_buf .= substr($data,$ptr,$num_bytes);
|
|
||||||
$this->tx_free -= $num_bytes;
|
|
||||||
$num_bytes = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:= End [%s]',self::LOGKEY,strlen($this->tx_buf)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clear our TX buffer
|
|
||||||
*/
|
|
||||||
public function buffer_clear(): void
|
|
||||||
{
|
|
||||||
$this->tx_buf = '';
|
|
||||||
$this->tx_free = self::TX_BUF_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty our TX buffer
|
|
||||||
*
|
|
||||||
* @param int $timeout
|
|
||||||
* @return int
|
|
||||||
* @throws \Exception
|
|
||||||
*/
|
|
||||||
public function buffer_flush(int $timeout): int
|
|
||||||
{
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:+ Start [%d]',self::LOGKEY,$timeout));
|
|
||||||
|
|
||||||
$rc = self::OK;
|
|
||||||
$tx_ptr = 0;
|
|
||||||
$restsize = self::TX_BUF_SIZE-$this->tx_free;
|
|
||||||
|
|
||||||
$tm = $this->timer_set($timeout);
|
|
||||||
while (self::TX_BUF_SIZE != $this->tx_free) {
|
|
||||||
$tv = $this->timer_rest($tm);
|
|
||||||
|
|
||||||
if ($rc = $this->canSend($tv)>0) {
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s: - Sending [%d]',self::LOGKEY,$restsize));
|
|
||||||
$rc = $this->send(substr($this->tx_buf,$tx_ptr,$restsize),0);
|
|
||||||
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s: - Sent [%d] (%s)',self::LOGKEY,$rc,Str::limit($this->tx_buf,15)));
|
|
||||||
|
|
||||||
if ($rc === $restsize) {
|
|
||||||
$this->tx_buf = '';
|
|
||||||
$tx_ptr = 0;
|
|
||||||
$this->tx_free += $rc;
|
|
||||||
$this->buffer_clear();
|
|
||||||
|
|
||||||
} else if ($rc > 0) {
|
|
||||||
$tx_ptr += $rc;
|
|
||||||
$restsize -= $rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
return $rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
// @todo Enable a delay for slow clients
|
|
||||||
//sleep(1);
|
|
||||||
if ($this->timer_expired($tm))
|
|
||||||
return self::ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:= End [%d]',self::LOGKEY,$rc));
|
|
||||||
|
|
||||||
return $rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param int $timeout
|
|
||||||
* @return int
|
|
||||||
* @throws \Exception
|
|
||||||
*/
|
|
||||||
public function canSend(int $timeout): int
|
|
||||||
{
|
|
||||||
$write = [$this->connection];
|
|
||||||
|
|
||||||
return $this->socketSelect(NULL,$write,NULL,$timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close the connection with the client
|
|
||||||
*/
|
|
||||||
public function close(): void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
socket_shutdown($this->connection);
|
|
||||||
} catch (\ErrorException $e) {
|
|
||||||
Log::error(sprintf('%s:+ Shutting down socket [%s]',self::LOGKEY,$e->getMessage()));
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
socket_close($this->connection);
|
|
||||||
} catch (\ErrorException $e) {
|
|
||||||
Log::error(sprintf('%s:+ Closing socket [%s]',self::LOGKEY,$e->getMessage()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a client socket
|
* Create a client socket
|
||||||
*
|
*
|
||||||
@ -256,7 +104,8 @@ final class SocketClient {
|
|||||||
*/
|
*/
|
||||||
public static function create(string $address,int $port): self
|
public static function create(string $address,int $port): self
|
||||||
{
|
{
|
||||||
Log::debug(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port));
|
Log::info(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port));
|
||||||
|
|
||||||
$sort = collect(['AAAA','A']);
|
$sort = collect(['AAAA','A']);
|
||||||
|
|
||||||
// We only look at AAAA/A records
|
// We only look at AAAA/A records
|
||||||
@ -275,7 +124,7 @@ final class SocketClient {
|
|||||||
if (! $try)
|
if (! $try)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
Log::alert(sprintf('%s: - Trying [%s:%d]',self::LOGKEY,$try,$port));
|
Log::info(sprintf('%s:- Trying [%s:%d]',self::LOGKEY,$try,$port));
|
||||||
|
|
||||||
/* Create a TCP/IP socket. */
|
/* Create a TCP/IP socket. */
|
||||||
$socket = socket_create(Arr::get($address,'type') === 'AAAA' ? AF_INET6 : AF_INET,SOCK_STREAM,SOL_TCP);
|
$socket = socket_create(Arr::get($address,'type') === 'AAAA' ? AF_INET6 : AF_INET,SOCK_STREAM,SOL_TCP);
|
||||||
@ -301,9 +150,126 @@ final class SocketClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* We'll add to our transmit buffer and if doesnt have space, we'll empty it first
|
||||||
|
*
|
||||||
|
* @param string $data
|
||||||
|
* @return void
|
||||||
|
* @throws \Exception
|
||||||
|
*/
|
||||||
|
public function buffer_add(string $data): void
|
||||||
|
{
|
||||||
|
$ptr = 0;
|
||||||
|
$num_bytes = strlen($data);
|
||||||
|
|
||||||
|
while ($num_bytes) {
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- To add [%d] to the TX buffer',self::LOGKEY,$num_bytes));
|
||||||
|
|
||||||
|
if ($num_bytes > $this->tx_free) {
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- TX buffer will be too full, draining...',self::LOGKEY));
|
||||||
|
|
||||||
|
do {
|
||||||
|
$this->buffer_flush(5);
|
||||||
|
|
||||||
|
$n = min($this->tx_free,$num_bytes);
|
||||||
|
$this->tx_buf = substr($data,$ptr,$n);
|
||||||
|
$num_bytes -= $n;
|
||||||
|
$ptr += $n;
|
||||||
|
|
||||||
|
} while ($num_bytes);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
$this->tx_buf .= substr($data,$ptr,$num_bytes);
|
||||||
|
$num_bytes = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:= TX buffer has [%d] space left',self::LOGKEY,$this->tx_free));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty our TX buffer
|
||||||
|
*
|
||||||
|
* @param int $timeout
|
||||||
|
* @return int
|
||||||
|
* @throws \Exception
|
||||||
|
*/
|
||||||
|
public function buffer_flush(int $timeout): int
|
||||||
|
{
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:+ Emptying TX buffer with [%d] chars, and timeout [%d]',self::LOGKEY,strlen($this->tx_buf),$timeout));
|
||||||
|
|
||||||
|
$tm = $this->timer_set($timeout);
|
||||||
|
$rc = self::OK;
|
||||||
|
|
||||||
|
while (strlen($this->tx_buf)) {
|
||||||
|
$tv = $this->timer_rest($tm);
|
||||||
|
|
||||||
|
if (($rc=$this->canSend($tv)) > 0) {
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- Chars to send [%d]',self::LOGKEY,strlen($this->tx_buf)));
|
||||||
|
|
||||||
|
$sent = $this->send(substr($this->tx_buf,0,self::TX_SIZE),0);
|
||||||
|
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- Sent [%d] chars [%s]',self::LOGKEY,$rc,Str::limit($this->tx_buf,15)));
|
||||||
|
|
||||||
|
$this->tx_buf = substr($this->tx_buf,$sent);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return $rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
// @todo Enable a delay for slow clients
|
||||||
|
//sleep(1);
|
||||||
|
if ($this->timer_expired($tm))
|
||||||
|
return self::ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->tx_purge();
|
||||||
|
|
||||||
|
return $rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param int $timeout
|
||||||
|
* @return int
|
||||||
|
* @throws \Exception
|
||||||
|
*/
|
||||||
|
public function canSend(int $timeout): int
|
||||||
|
{
|
||||||
|
$write = [$this->connection];
|
||||||
|
|
||||||
|
return $this->socketSelect(NULL,$write,NULL,$timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the connection with the client
|
||||||
|
*/
|
||||||
|
public function close(): void
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
socket_shutdown($this->connection);
|
||||||
|
} catch (\ErrorException $e) {
|
||||||
|
Log::error(sprintf('%s:! Shutting down socket [%s]',self::LOGKEY,$e->getMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
socket_close($this->connection);
|
||||||
|
} catch (\ErrorException $e) {
|
||||||
|
Log::error(sprintf('%s:! Closing socket [%s]',self::LOGKEY,$e->getMessage()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Log::info(sprintf('%s:= Connection closed with [%s]',self::LOGKEY,$this->address_remote));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We have data in the buffer or on the socket
|
||||||
|
*
|
||||||
* @param int $timeout
|
* @param int $timeout
|
||||||
* @return int
|
* @return int
|
||||||
* @note use socketSelect()
|
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function hasData(int $timeout): int
|
public function hasData(int $timeout): int
|
||||||
@ -314,29 +280,22 @@ final class SocketClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read data from the socket.
|
* Read data, emptying from the RX buffer first, then checking the socket.
|
||||||
*
|
*
|
||||||
* @param int $timeout
|
* @param int $timeout How long to wait for data
|
||||||
* @param int $len
|
* @param int $len The amount of data we want
|
||||||
* @param int $size
|
|
||||||
* @return string
|
* @return string
|
||||||
* @throws SocketException
|
* @throws SocketException
|
||||||
*/
|
*/
|
||||||
public function read(int $timeout,int $len=1024,int $size=1024): string
|
public function read(int $timeout,int $len=1024): string
|
||||||
{
|
{
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:+ Start [%d] (%d)',self::LOGKEY,$len,$timeout));
|
|
||||||
|
|
||||||
// We have data in our buffer
|
// We have data in our buffer
|
||||||
if ($this->rx_left >= $len) {
|
if ($this->rx_left >= $len) {
|
||||||
$result = substr($this->rx_buf,$this->rx_ptr,$len);
|
if ($this->DEBUG)
|
||||||
$this->rx_ptr += $len;
|
Log::debug(sprintf('%s:- Returning [%d] chars from the RX buffer',self::LOGKEY,$len));
|
||||||
$this->rx_left -= $len;
|
|
||||||
|
|
||||||
if ($this->rx_left === 0) {
|
$result = substr($this->rx_buf,0,$len);
|
||||||
$this->rx_buf = '';
|
$this->rx_buf = substr($this->rx_buf,strlen($result));
|
||||||
$this->rx_ptr = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return $result;
|
return $result;
|
||||||
}
|
}
|
||||||
@ -345,12 +304,19 @@ final class SocketClient {
|
|||||||
return '';
|
return '';
|
||||||
|
|
||||||
$buf = '';
|
$buf = '';
|
||||||
try {
|
|
||||||
if ($this->type === 'TCP')
|
|
||||||
$rc = socket_recv($this->connection,$buf, $size,MSG_DONTWAIT);
|
|
||||||
|
|
||||||
else {
|
try {
|
||||||
$rc = socket_recvfrom($this->connection,$buf, $size,MSG_DONTWAIT,$this->address_remote,$this->port_remote);
|
switch ($this->type) {
|
||||||
|
case SOCK_STREAM:
|
||||||
|
$recv = socket_recv($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case SOCK_DGRAM:
|
||||||
|
$recv = socket_recvfrom($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT,$this->address_remote,$this->port_remote);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type));
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
@ -359,39 +325,41 @@ final class SocketClient {
|
|||||||
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
||||||
}
|
}
|
||||||
|
|
||||||
if ($this->DEBUG)
|
// If we got no data, we'll send whatever is left in the buffer
|
||||||
Log::debug(sprintf('%s: - Read [%d]',self::LOGKEY,$rc));
|
if ($recv === FALSE) {
|
||||||
|
|
||||||
if ($rc === FALSE) {
|
|
||||||
// If we have something in the buffer, we'll send it
|
// If we have something in the buffer, we'll send it
|
||||||
if ($this->rx_left && $this->rx_left < $len) {
|
if ($this->rx_left) {
|
||||||
$return = substr($this->rx_buf,$this->rx_ptr);
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- Network read return an error, returning final [%d] chars from the RX buffer',self::LOGKEY,strlen($this->rx_buf)));
|
||||||
|
|
||||||
$this->rx_left = 0;
|
$result = $this->rx_buf;
|
||||||
$this->rx_ptr = 0;
|
|
||||||
$this->rx_buf = '';
|
$this->rx_buf = '';
|
||||||
|
|
||||||
return $return;
|
return $result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Log::error(sprintf('%s:! Request to read [%d] chars resulted in no data',self::LOGKEY,$len));
|
||||||
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->rx_buf .= $buf;
|
|
||||||
$this->rx_left += strlen($buf);
|
|
||||||
|
|
||||||
// If our buffer is null, see if we have any out of band data.
|
// If our buffer is null, see if we have any out of band data.
|
||||||
// @todo We throw an errorexception when the socket is closed by the remote I think.
|
// @todo We throw an errorexception when the socket is closed by the remote I think.
|
||||||
if (($rc === 0) && is_nulL($buf) && ($this->hasData(0) > 0)) {
|
if (($recv === 0) && is_null($buf) && ($this->hasData(0) > 0) && $this->type === SOCK_STREAM) {
|
||||||
try {
|
try {
|
||||||
socket_recv($this->connection,$buf, $len,MSG_OOB);
|
socket_recv($this->connection,$buf,$len,MSG_OOB);
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
} catch (\Exception $e) {
|
||||||
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $this->read($timeout,$len,$size);
|
$this->rx_buf .= $buf;
|
||||||
|
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:- Added [%d] chars to the RX buffer',self::LOGKEY,strlen($buf)));
|
||||||
|
|
||||||
|
// Loop again and return the data, now that it is in the RX buffer
|
||||||
|
return $this->read($timeout,$len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -404,52 +372,35 @@ final class SocketClient {
|
|||||||
*/
|
*/
|
||||||
public function read_ch(int $timeout): int
|
public function read_ch(int $timeout): int
|
||||||
{
|
{
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:+ Start [%d] - rx_left[%d], rx_ptr[%d]',self::LOGKEY,$timeout,$this->rx_left,$this->rx_ptr));
|
|
||||||
|
|
||||||
// If our buffer is empty, we'll try and read from the remote
|
|
||||||
if ($this->rx_left === 0) {
|
|
||||||
if ($this->hasData($timeout) > 0) {
|
if ($this->hasData($timeout) > 0) {
|
||||||
try {
|
$ch = $this->read($timeout,1);
|
||||||
if (! strlen($this->rx_buf = $this->read(0,self::RX_BUF_SIZE))) {
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s: - Nothing read',self::LOGKEY));
|
|
||||||
|
|
||||||
return self::TTY_TIMEOUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (\Exception $e) {
|
|
||||||
return ($e->getCode() === 11) ? self::TTY_TIMEOUT : self::ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::info(sprintf('%s: - Read [%d] bytes',self::LOGKEY,strlen($this->rx_buf)));
|
|
||||||
|
|
||||||
$this->rx_ptr = 0;
|
|
||||||
$this->rx_left = strlen($this->rx_buf);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return self::TTY_TIMEOUT;
|
return self::TIMEOUT;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$rc = ord(substr($this->rx_buf,$this->rx_ptr,1));
|
return ord($ch);
|
||||||
|
|
||||||
$this->rx_left--;
|
|
||||||
$this->rx_ptr++;
|
|
||||||
|
|
||||||
if ($this->DEBUG)
|
|
||||||
Log::debug(sprintf('%s:= Return [%x] (%c)',self::LOGKEY,$rc,$rc));
|
|
||||||
|
|
||||||
return $rc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public function rx_purge(): void
|
public function rx_purge(): void
|
||||||
{
|
{
|
||||||
$this->rx_ptr = $this->rx_left = 0;
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:+ Discarding [%d] chars from the RX buffer',self::LOGKEY,strlen($this->tx_buf)));
|
||||||
|
|
||||||
$this->rx_buf = '';
|
$this->rx_buf = '';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear our TX buffer
|
||||||
|
*/
|
||||||
|
public function tx_purge(): void
|
||||||
|
{
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:+ Discarding [%d] chars from the TX buffer',self::LOGKEY,strlen($this->tx_buf)));
|
||||||
|
|
||||||
|
$this->tx_buf = '';
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send data to the client
|
* Send data to the client
|
||||||
*
|
*
|
||||||
@ -459,17 +410,24 @@ final class SocketClient {
|
|||||||
* @return false|int
|
* @return false|int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function send($message,int $timeout,$length=NULL)
|
public function send($message,int $timeout)
|
||||||
{
|
{
|
||||||
if ($timeout AND (! $rc = $this->canSend($timeout)))
|
if ($timeout AND (! $rc=$this->canSend($timeout)))
|
||||||
return $rc;
|
return $rc;
|
||||||
|
|
||||||
if (is_null($length))
|
if ($this->DEBUG)
|
||||||
$length = strlen($message);
|
Log::debug(sprintf('%s:- Sending [%d] chars [%s]',self::LOGKEY,strlen($message),Str::limit($message,15)));
|
||||||
|
|
||||||
return ($this->type === 'TCP')
|
switch ($this->type) {
|
||||||
? socket_write($this->connection,$message,$length)
|
case SOCK_STREAM:
|
||||||
: socket_sendto($this->connection,$message,$length,0,$this->address_remote,$this->port_remote);
|
return socket_write($this->connection,$message,strlen($message));
|
||||||
|
|
||||||
|
case SOCK_DGRAM:
|
||||||
|
return socket_sendto($this->connection,$message,strlen($message),0,$this->address_remote,$this->port_remote);
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -490,24 +448,24 @@ final class SocketClient {
|
|||||||
throw new \Exception('Socket Error: '.socket_strerror(socket_last_error()));
|
throw new \Exception('Socket Error: '.socket_strerror(socket_last_error()));
|
||||||
|
|
||||||
if ($this->DEBUG)
|
if ($this->DEBUG)
|
||||||
Log::debug(sprintf('Socket select returned [%d] (%d)',$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]);
|
Log::debug(sprintf('%s:= Socket select returned [%d] with timeout (%d)',self::LOGKEY,$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]);
|
||||||
|
|
||||||
return $rc;
|
return $rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
public function timer_expired(int $timer): int
|
public function timer_expired(int $timer): int
|
||||||
{
|
{
|
||||||
return (time()>=$timer);
|
return (time() >= $timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function timer_rest(int $timer): int
|
public function timer_rest(int $timer): int
|
||||||
{
|
{
|
||||||
return (($timer)-time());
|
return $timer-time();
|
||||||
}
|
}
|
||||||
|
|
||||||
public function timer_set(int $expire): int
|
public function timer_set(int $expire): int
|
||||||
{
|
{
|
||||||
return (time()+$expire);
|
return time()+$expire;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -518,11 +476,16 @@ final class SocketClient {
|
|||||||
* @param int $timeout
|
* @param int $timeout
|
||||||
* @return int
|
* @return int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
|
* @deprecated use canSend or hasData
|
||||||
*/
|
*/
|
||||||
public function ttySelect(bool $read,bool $write, int $timeout): int
|
public function ttySelect(bool $read,bool $write, int $timeout): int
|
||||||
{
|
{
|
||||||
if (strlen($this->rx_buf) && ($this->rx_left))
|
if ($this->rx_left) {
|
||||||
|
if ($this->DEBUG)
|
||||||
|
Log::debug(sprintf('%s:= We still have [%d] chars in the RX buffer.',self::LOGKEY,$this->rx_left));
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
$read = $read ? [$this->connection] : NULL;
|
$read = $read ? [$this->connection] : NULL;
|
||||||
$write = $write ? [$this->connection] : NULL;
|
$write = $write ? [$this->connection] : NULL;
|
||||||
|
@ -22,26 +22,38 @@ final class SocketServer {
|
|||||||
$this->port = $port;
|
$this->port = $port;
|
||||||
$this->type = $type;
|
$this->type = $type;
|
||||||
|
|
||||||
$this->_init();
|
$this->createSocket();
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Bind to our Socket
|
|
||||||
*
|
|
||||||
* @throws SocketException
|
|
||||||
*/
|
|
||||||
private function _bindSocket(): void
|
|
||||||
{
|
|
||||||
if (socket_bind($this->server,$this->bind,$this->port) === FALSE)
|
if (socket_bind($this->server,$this->bind,$this->port) === FALSE)
|
||||||
throw new SocketException(SocketException::CANT_BIND_SOCKET,socket_strerror(socket_last_error($this->server)));
|
throw new SocketException(SocketException::CANT_BIND_SOCKET,socket_strerror(socket_last_error($this->server)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function __get($key)
|
||||||
|
{
|
||||||
|
switch ($key) {
|
||||||
|
case 'handler':
|
||||||
|
return $this->handler;
|
||||||
|
default:
|
||||||
|
throw new \Exception('Unknown key: '.$key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __set($key,$value)
|
||||||
|
{
|
||||||
|
switch ($key) {
|
||||||
|
case 'handler':
|
||||||
|
return $this->handler = $value;
|
||||||
|
default:
|
||||||
|
throw new \Exception('Unknown key: '.$key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create our Socket
|
* Create our Socket
|
||||||
*
|
*
|
||||||
* @throws SocketException
|
* @throws SocketException
|
||||||
*/
|
*/
|
||||||
private function _createSocket(): void
|
private function createSocket(): void
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Check dependencies
|
* Check dependencies
|
||||||
@ -71,17 +83,6 @@ final class SocketServer {
|
|||||||
socket_set_option($this->server,SOL_SOCKET,SO_REUSEADDR,1);
|
socket_set_option($this->server,SOL_SOCKET,SO_REUSEADDR,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Setup Socket and Bind
|
|
||||||
*
|
|
||||||
* @throws SocketException
|
|
||||||
*/
|
|
||||||
private function _init(): void
|
|
||||||
{
|
|
||||||
$this->_createSocket();
|
|
||||||
$this->_bindSocket();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Our main loop where we listen for connections
|
* Our main loop where we listen for connections
|
||||||
*
|
*
|
||||||
@ -118,7 +119,7 @@ final class SocketServer {
|
|||||||
*
|
*
|
||||||
* @throws SocketException
|
* @throws SocketException
|
||||||
*/
|
*/
|
||||||
private function loop_tcp()
|
private function loop_tcp(): void
|
||||||
{
|
{
|
||||||
while (TRUE) {
|
while (TRUE) {
|
||||||
if (($accept = socket_accept($this->server)) === FALSE)
|
if (($accept = socket_accept($this->server)) === FALSE)
|
||||||
@ -136,7 +137,7 @@ final class SocketServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private function loop_udp()
|
private function loop_udp(): void
|
||||||
{
|
{
|
||||||
while (TRUE) {
|
while (TRUE) {
|
||||||
$r = new SocketClient($this->server);
|
$r = new SocketClient($this->server);
|
||||||
@ -149,14 +150,4 @@ final class SocketServer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set our connection handler Class and Method
|
|
||||||
*
|
|
||||||
* @param array $handler
|
|
||||||
*/
|
|
||||||
public function setConnectionHandler(array $handler): void
|
|
||||||
{
|
|
||||||
$this->handler = $handler;
|
|
||||||
}
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user