From f9f9fb5345a649582057ce6c6bbd624d5ec08f89 Mon Sep 17 00:00:00 2001 From: Deon George Date: Wed, 28 Jun 2023 19:50:24 +1000 Subject: [PATCH] Improve debugging and code optimisations for Socket operations --- app/Classes/Sock/SocketClient.php | 481 ++++++++++++++---------------- app/Classes/Sock/SocketServer.php | 57 ++-- 2 files changed, 246 insertions(+), 292 deletions(-) diff --git a/app/Classes/Sock/SocketClient.php b/app/Classes/Sock/SocketClient.php index f28ad51..c742c1c 100644 --- a/app/Classes/Sock/SocketClient.php +++ b/app/Classes/Sock/SocketClient.php @@ -29,36 +29,27 @@ final class SocketClient { private array $session = []; private const OK = 0; - private const EOF = -1; private const TIMEOUT = -2; - private const RCDO = -3; - private const GCOUNT = -4; private const ERROR = -5; - private const TTY_SUCCESS = self::OK; - private const TTY_TIMEOUT = self::TIMEOUT; - private const TTY_HANGUP = self::RCDO; - private const TTY_ERROR = self::ERROR; - - public const TCP_SPEED = 115200; - - // Buffer for sending - private const TX_BUF_SIZE = (0x8100); - private int $tx_ptr = 0; - private int $tx_free = self::TX_BUF_SIZE; - private int $tty_status = 0; + /** @var string Size of our TX buffer */ + private const TX_BUF_SIZE = 0xFFFF; + /** @var string Maximum amount of data to send at a time */ + private const TX_SIZE = 0xFFFF; + /** @var string Data in the TX buffer */ private string $tx_buf = ''; - // Buffer for receiving - private const RX_BUF_SIZE = (0x8100); - private int $rx_ptr = 0; - private int $rx_left = 0; + /** @var string Size of our RX buffer */ + private const RX_BUF_SIZE = 0xFFFF; + /** @var string Maximum amount of data to received at a time */ + private const RX_SIZE = 0xFFFF; + /** @var string Data in the RX buffer */ private string $rx_buf = ''; public function __construct (\Socket $connection) { $this->connection = $connection; - if ($this->type === 'TCP') { + if ($this->type === SOCK_STREAM) { socket_getsockname($connection,$this->address_local,$this->port_local); socket_getpeername($connection,$this->address_remote,$this->port_remote); Log::info(sprintf('%s:+ Connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type)); @@ -75,16 +66,17 @@ final class SocketClient { case 'speed': return Arr::get($this->session,$key); - case 'type': - switch ($x=socket_get_option($this->connection,SOL_SOCKET,SO_TYPE)) { - case SOCK_STREAM: - return 'TCP'; - case SOCK_DGRAM: - return 'UDP'; + case 'rx_free': + return self::RX_BUF_SIZE-$this->rx_left; - default: - return sprintf('UNKNOWN [%d]',$x); - } + case 'rx_left': + return strlen($this->rx_buf); + + case 'tx_free': + return self::TX_BUF_SIZE-strlen($this->tx_buf); + + case 'type': + return socket_get_option($this->connection,SOL_SOCKET,SO_TYPE); default: throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key)); @@ -102,150 +94,6 @@ final class SocketClient { } } - /** - * We'll add to our transmit buffer and if doesnt have space, we'll empty it first - * - * @param string $data - * @return void - * @throws \Exception - */ - public function buffer_add(string $data): void - { - if ($this->DEBUG) - Log::debug(sprintf('%s:+ Start [%s] (%d)',self::LOGKEY,$data,strlen($data))); - - //$rc = self::OK; - //$tx_ptr = self::TX_BUF_SIZE-$this->tx_free; - $ptr = 0; - $num_bytes = strlen($data); - $this->tty_status = self::TTY_SUCCESS; - - while ($num_bytes) { - if ($this->DEBUG) - Log::debug(sprintf('%s: - Num Bytes [%d]: TX Free [%d]',self::LOGKEY,$num_bytes,$this->tx_free)); - - if ($num_bytes > $this->tx_free) { - do { - $this->buffer_flush(5); - - if ($this->tty_status === self::TTY_SUCCESS) { - $n = min($this->tx_free,$num_bytes); - $this->tx_buf = substr($data,$ptr,$n); - $this->tx_free -= $n; - $num_bytes -= $n; - $ptr += $n; - } - - } while ($this->tty_status != self::TTY_SUCCESS); - - } else { - if ($this->DEBUG) - Log::debug(sprintf('%s: - Remaining data to send [%d]',self::LOGKEY,$num_bytes)); - - $this->tx_buf .= substr($data,$ptr,$num_bytes); - $this->tx_free -= $num_bytes; - $num_bytes = 0; - } - } - - if ($this->DEBUG) - Log::debug(sprintf('%s:= End [%s]',self::LOGKEY,strlen($this->tx_buf))); - } - - /** - * Clear our TX buffer - */ - public function buffer_clear(): void - { - $this->tx_buf = ''; - $this->tx_free = self::TX_BUF_SIZE; - } - - /** - * Empty our TX buffer - * - * @param int $timeout - * @return int - * @throws \Exception - */ - public function buffer_flush(int $timeout): int - { - if ($this->DEBUG) - Log::debug(sprintf('%s:+ Start [%d]',self::LOGKEY,$timeout)); - - $rc = self::OK; - $tx_ptr = 0; - $restsize = self::TX_BUF_SIZE-$this->tx_free; - - $tm = $this->timer_set($timeout); - while (self::TX_BUF_SIZE != $this->tx_free) { - $tv = $this->timer_rest($tm); - - if ($rc = $this->canSend($tv)>0) { - if ($this->DEBUG) - Log::debug(sprintf('%s: - Sending [%d]',self::LOGKEY,$restsize)); - $rc = $this->send(substr($this->tx_buf,$tx_ptr,$restsize),0); - - if ($this->DEBUG) - Log::debug(sprintf('%s: - Sent [%d] (%s)',self::LOGKEY,$rc,Str::limit($this->tx_buf,15))); - - if ($rc === $restsize) { - $this->tx_buf = ''; - $tx_ptr = 0; - $this->tx_free += $rc; - $this->buffer_clear(); - - } else if ($rc > 0) { - $tx_ptr += $rc; - $restsize -= $rc; - } - - } else { - return $rc; - } - - // @todo Enable a delay for slow clients - //sleep(1); - if ($this->timer_expired($tm)) - return self::ERROR; - } - - if ($this->DEBUG) - Log::debug(sprintf('%s:= End [%d]',self::LOGKEY,$rc)); - - return $rc; - } - - /** - * @param int $timeout - * @return int - * @throws \Exception - */ - public function canSend(int $timeout): int - { - $write = [$this->connection]; - - return $this->socketSelect(NULL,$write,NULL,$timeout); - } - - /** - * Close the connection with the client - */ - public function close(): void - { - try { - socket_shutdown($this->connection); - } catch (\ErrorException $e) { - Log::error(sprintf('%s:+ Shutting down socket [%s]',self::LOGKEY,$e->getMessage())); - } - - try { - socket_close($this->connection); - } catch (\ErrorException $e) { - Log::error(sprintf('%s:+ Closing socket [%s]',self::LOGKEY,$e->getMessage())); - } - } - /** * Create a client socket * @@ -256,7 +104,8 @@ final class SocketClient { */ public static function create(string $address,int $port): self { - Log::debug(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port)); + Log::info(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port)); + $sort = collect(['AAAA','A']); // We only look at AAAA/A records @@ -275,7 +124,7 @@ final class SocketClient { if (! $try) continue; - Log::alert(sprintf('%s: - Trying [%s:%d]',self::LOGKEY,$try,$port)); + Log::info(sprintf('%s:- Trying [%s:%d]',self::LOGKEY,$try,$port)); /* Create a TCP/IP socket. */ $socket = socket_create(Arr::get($address,'type') === 'AAAA' ? AF_INET6 : AF_INET,SOCK_STREAM,SOL_TCP); @@ -301,9 +150,126 @@ final class SocketClient { } /** + * We'll add to our transmit buffer and if doesnt have space, we'll empty it first + * + * @param string $data + * @return void + * @throws \Exception + */ + public function buffer_add(string $data): void + { + $ptr = 0; + $num_bytes = strlen($data); + + while ($num_bytes) { + if ($this->DEBUG) + Log::debug(sprintf('%s:- To add [%d] to the TX buffer',self::LOGKEY,$num_bytes)); + + if ($num_bytes > $this->tx_free) { + if ($this->DEBUG) + Log::debug(sprintf('%s:- TX buffer will be too full, draining...',self::LOGKEY)); + + do { + $this->buffer_flush(5); + + $n = min($this->tx_free,$num_bytes); + $this->tx_buf = substr($data,$ptr,$n); + $num_bytes -= $n; + $ptr += $n; + + } while ($num_bytes); + + } else { + $this->tx_buf .= substr($data,$ptr,$num_bytes); + $num_bytes = 0; + } + } + + if ($this->DEBUG) + Log::debug(sprintf('%s:= TX buffer has [%d] space left',self::LOGKEY,$this->tx_free)); + } + + /** + * Empty our TX buffer + * + * @param int $timeout + * @return int + * @throws \Exception + */ + public function buffer_flush(int $timeout): int + { + if ($this->DEBUG) + Log::debug(sprintf('%s:+ Emptying TX buffer with [%d] chars, and timeout [%d]',self::LOGKEY,strlen($this->tx_buf),$timeout)); + + $tm = $this->timer_set($timeout); + $rc = self::OK; + + while (strlen($this->tx_buf)) { + $tv = $this->timer_rest($tm); + + if (($rc=$this->canSend($tv)) > 0) { + if ($this->DEBUG) + Log::debug(sprintf('%s:- Chars to send [%d]',self::LOGKEY,strlen($this->tx_buf))); + + $sent = $this->send(substr($this->tx_buf,0,self::TX_SIZE),0); + + if ($this->DEBUG) + Log::debug(sprintf('%s:- Sent [%d] chars [%s]',self::LOGKEY,$rc,Str::limit($this->tx_buf,15))); + + $this->tx_buf = substr($this->tx_buf,$sent); + + } else { + return $rc; + } + + // @todo Enable a delay for slow clients + //sleep(1); + if ($this->timer_expired($tm)) + return self::ERROR; + } + + $this->tx_purge(); + + return $rc; + } + + /** + * @param int $timeout + * @return int + * @throws \Exception + */ + public function canSend(int $timeout): int + { + $write = [$this->connection]; + + return $this->socketSelect(NULL,$write,NULL,$timeout); + } + + /** + * Close the connection with the client + */ + public function close(): void + { + try { + socket_shutdown($this->connection); + } catch (\ErrorException $e) { + Log::error(sprintf('%s:! Shutting down socket [%s]',self::LOGKEY,$e->getMessage())); + } + + try { + socket_close($this->connection); + } catch (\ErrorException $e) { + Log::error(sprintf('%s:! Closing socket [%s]',self::LOGKEY,$e->getMessage())); + } + + Log::info(sprintf('%s:= Connection closed with [%s]',self::LOGKEY,$this->address_remote)); + } + + /** + * We have data in the buffer or on the socket + * * @param int $timeout * @return int - * @note use socketSelect() * @throws \Exception */ public function hasData(int $timeout): int @@ -314,29 +280,22 @@ final class SocketClient { } /** - * Read data from the socket. + * Read data, emptying from the RX buffer first, then checking the socket. * - * @param int $timeout - * @param int $len - * @param int $size + * @param int $timeout How long to wait for data + * @param int $len The amount of data we want * @return string * @throws SocketException */ - public function read(int $timeout,int $len=1024,int $size=1024): string + public function read(int $timeout,int $len=1024): string { - if ($this->DEBUG) - Log::debug(sprintf('%s:+ Start [%d] (%d)',self::LOGKEY,$len,$timeout)); - // We have data in our buffer if ($this->rx_left >= $len) { - $result = substr($this->rx_buf,$this->rx_ptr,$len); - $this->rx_ptr += $len; - $this->rx_left -= $len; + if ($this->DEBUG) + Log::debug(sprintf('%s:- Returning [%d] chars from the RX buffer',self::LOGKEY,$len)); - if ($this->rx_left === 0) { - $this->rx_buf = ''; - $this->rx_ptr = 0; - } + $result = substr($this->rx_buf,0,$len); + $this->rx_buf = substr($this->rx_buf,strlen($result)); return $result; } @@ -345,12 +304,19 @@ final class SocketClient { return ''; $buf = ''; - try { - if ($this->type === 'TCP') - $rc = socket_recv($this->connection,$buf, $size,MSG_DONTWAIT); - else { - $rc = socket_recvfrom($this->connection,$buf, $size,MSG_DONTWAIT,$this->address_remote,$this->port_remote); + try { + switch ($this->type) { + case SOCK_STREAM: + $recv = socket_recv($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT); + break; + + case SOCK_DGRAM: + $recv = socket_recvfrom($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT,$this->address_remote,$this->port_remote); + break; + + default: + throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type)); } } catch (\Exception $e) { @@ -359,39 +325,41 @@ final class SocketClient { throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } - if ($this->DEBUG) - Log::debug(sprintf('%s: - Read [%d]',self::LOGKEY,$rc)); - - if ($rc === FALSE) { + // If we got no data, we'll send whatever is left in the buffer + if ($recv === FALSE) { // If we have something in the buffer, we'll send it - if ($this->rx_left && $this->rx_left < $len) { - $return = substr($this->rx_buf,$this->rx_ptr); + if ($this->rx_left) { + if ($this->DEBUG) + Log::debug(sprintf('%s:- Network read return an error, returning final [%d] chars from the RX buffer',self::LOGKEY,strlen($this->rx_buf))); - $this->rx_left = 0; - $this->rx_ptr = 0; + $result = $this->rx_buf; $this->rx_buf = ''; - return $return; + return $result; } + Log::error(sprintf('%s:! Request to read [%d] chars resulted in no data',self::LOGKEY,$len)); throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } - $this->rx_buf .= $buf; - $this->rx_left += strlen($buf); - // If our buffer is null, see if we have any out of band data. // @todo We throw an errorexception when the socket is closed by the remote I think. - if (($rc === 0) && is_nulL($buf) && ($this->hasData(0) > 0)) { + if (($recv === 0) && is_null($buf) && ($this->hasData(0) > 0) && $this->type === SOCK_STREAM) { try { - socket_recv($this->connection,$buf, $len,MSG_OOB); + socket_recv($this->connection,$buf,$len,MSG_OOB); } catch (\Exception $e) { throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } } - return $this->read($timeout,$len,$size); + $this->rx_buf .= $buf; + + if ($this->DEBUG) + Log::debug(sprintf('%s:- Added [%d] chars to the RX buffer',self::LOGKEY,strlen($buf))); + + // Loop again and return the data, now that it is in the RX buffer + return $this->read($timeout,$len); } /** @@ -404,52 +372,35 @@ final class SocketClient { */ public function read_ch(int $timeout): int { - if ($this->DEBUG) - Log::debug(sprintf('%s:+ Start [%d] - rx_left[%d], rx_ptr[%d]',self::LOGKEY,$timeout,$this->rx_left,$this->rx_ptr)); + if ($this->hasData($timeout) > 0) { + $ch = $this->read($timeout,1); - // If our buffer is empty, we'll try and read from the remote - if ($this->rx_left === 0) { - if ($this->hasData($timeout) > 0) { - try { - if (! strlen($this->rx_buf = $this->read(0,self::RX_BUF_SIZE))) { - if ($this->DEBUG) - Log::debug(sprintf('%s: - Nothing read',self::LOGKEY)); - - return self::TTY_TIMEOUT; - } - - } catch (\Exception $e) { - return ($e->getCode() === 11) ? self::TTY_TIMEOUT : self::ERROR; - } - - if ($this->DEBUG) - Log::info(sprintf('%s: - Read [%d] bytes',self::LOGKEY,strlen($this->rx_buf))); - - $this->rx_ptr = 0; - $this->rx_left = strlen($this->rx_buf); - - } else { - return self::TTY_TIMEOUT; - } + } else { + return self::TIMEOUT; } - $rc = ord(substr($this->rx_buf,$this->rx_ptr,1)); - - $this->rx_left--; - $this->rx_ptr++; - - if ($this->DEBUG) - Log::debug(sprintf('%s:= Return [%x] (%c)',self::LOGKEY,$rc,$rc)); - - return $rc; + return ord($ch); } public function rx_purge(): void { - $this->rx_ptr = $this->rx_left = 0; + if ($this->DEBUG) + Log::debug(sprintf('%s:+ Discarding [%d] chars from the RX buffer',self::LOGKEY,strlen($this->tx_buf))); + $this->rx_buf = ''; } + /** + * Clear our TX buffer + */ + public function tx_purge(): void + { + if ($this->DEBUG) + Log::debug(sprintf('%s:+ Discarding [%d] chars from the TX buffer',self::LOGKEY,strlen($this->tx_buf))); + + $this->tx_buf = ''; + } + /** * Send data to the client * @@ -459,17 +410,24 @@ final class SocketClient { * @return false|int * @throws \Exception */ - public function send($message,int $timeout,$length=NULL) + public function send($message,int $timeout) { - if ($timeout AND (! $rc = $this->canSend($timeout))) + if ($timeout AND (! $rc=$this->canSend($timeout))) return $rc; - if (is_null($length)) - $length = strlen($message); + if ($this->DEBUG) + Log::debug(sprintf('%s:- Sending [%d] chars [%s]',self::LOGKEY,strlen($message),Str::limit($message,15))); - return ($this->type === 'TCP') - ? socket_write($this->connection,$message,$length) - : socket_sendto($this->connection,$message,$length,0,$this->address_remote,$this->port_remote); + switch ($this->type) { + case SOCK_STREAM: + return socket_write($this->connection,$message,strlen($message)); + + case SOCK_DGRAM: + return socket_sendto($this->connection,$message,strlen($message),0,$this->address_remote,$this->port_remote); + + default: + throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type)); + } } /** @@ -490,24 +448,24 @@ final class SocketClient { throw new \Exception('Socket Error: '.socket_strerror(socket_last_error())); if ($this->DEBUG) - Log::debug(sprintf('Socket select returned [%d] (%d)',$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]); + Log::debug(sprintf('%s:= Socket select returned [%d] with timeout (%d)',self::LOGKEY,$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]); return $rc; } public function timer_expired(int $timer): int { - return (time()>=$timer); + return (time() >= $timer); } public function timer_rest(int $timer): int { - return (($timer)-time()); + return $timer-time(); } public function timer_set(int $expire): int { - return (time()+$expire); + return time()+$expire; } /** @@ -518,11 +476,16 @@ final class SocketClient { * @param int $timeout * @return int * @throws \Exception + * @deprecated use canSend or hasData */ public function ttySelect(bool $read,bool $write, int $timeout): int { - if (strlen($this->rx_buf) && ($this->rx_left)) + if ($this->rx_left) { + if ($this->DEBUG) + Log::debug(sprintf('%s:= We still have [%d] chars in the RX buffer.',self::LOGKEY,$this->rx_left)); + return 1; + } $read = $read ? [$this->connection] : NULL; $write = $write ? [$this->connection] : NULL; diff --git a/app/Classes/Sock/SocketServer.php b/app/Classes/Sock/SocketServer.php index 7f7dc7b..3715a10 100644 --- a/app/Classes/Sock/SocketServer.php +++ b/app/Classes/Sock/SocketServer.php @@ -22,26 +22,38 @@ final class SocketServer { $this->port = $port; $this->type = $type; - $this->_init(); - } + $this->createSocket(); - /** - * Bind to our Socket - * - * @throws SocketException - */ - private function _bindSocket(): void - { if (socket_bind($this->server,$this->bind,$this->port) === FALSE) throw new SocketException(SocketException::CANT_BIND_SOCKET,socket_strerror(socket_last_error($this->server))); } + public function __get($key) + { + switch ($key) { + case 'handler': + return $this->handler; + default: + throw new \Exception('Unknown key: '.$key); + } + } + + public function __set($key,$value) + { + switch ($key) { + case 'handler': + return $this->handler = $value; + default: + throw new \Exception('Unknown key: '.$key); + } + } + /** * Create our Socket * * @throws SocketException */ - private function _createSocket(): void + private function createSocket(): void { /** * Check dependencies @@ -71,17 +83,6 @@ final class SocketServer { socket_set_option($this->server,SOL_SOCKET,SO_REUSEADDR,1); } - /** - * Setup Socket and Bind - * - * @throws SocketException - */ - private function _init(): void - { - $this->_createSocket(); - $this->_bindSocket(); - } - /** * Our main loop where we listen for connections * @@ -118,7 +119,7 @@ final class SocketServer { * * @throws SocketException */ - private function loop_tcp() + private function loop_tcp(): void { while (TRUE) { if (($accept = socket_accept($this->server)) === FALSE) @@ -136,7 +137,7 @@ final class SocketServer { } } - private function loop_udp() + private function loop_udp(): void { while (TRUE) { $r = new SocketClient($this->server); @@ -149,14 +150,4 @@ final class SocketServer { } } } - - /** - * Set our connection handler Class and Method - * - * @param array $handler - */ - public function setConnectionHandler(array $handler): void - { - $this->handler = $handler; - } } \ No newline at end of file