connection = $connection; if ($this->type === SOCK_STREAM) { socket_getsockname($connection,$this->address_local,$this->port_local); socket_getpeername($connection,$this->address_remote,$this->port_remote); // If HAPROXY is used, work get the clients address if ((! $originate) && config('fido.haproxy')) { Log::debug(sprintf('%s:+ HAPROXY connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type)); if (($x=$this->read(5,6)) === 'PROXY ') $vers = 1; elseif (($x === "\x0d\x0a\x0d\x0a\x00\x0d") && ($this->read('5,6') === "\x0aQUIT\x0a")) $vers = 2; else throw new HAproxyException('Failed to initialise HAPROXY connection'); switch ($vers) { case 1: // Protocol/Address Family switch ($x=$this->read(5,5)) { case 'TCP4 ': $p = 4; break; case 'TCP6 ': $p = 6; break; default: throw new HAproxyException(sprintf('HAPROXY protocol [%d] is not handled',$x)); } $read = $this->read(5,104-11); // IPv4 if (($p === 4) || ($p === 6)) { $parse = collect(sscanf($read,'%s %s %s %s')); $src = Arr::get($parse,0); $dst = Arr::get($parse,1); $src_port = (int)Arr::get($parse,2); $dst_port = (int)Arr::get($parse,3); $len = $parse->map(fn($item)=>strlen($item))->sum()+3; // The last 2 chars should be "\r\n" if (($x=substr($read,$len)) !== "\r\n") throw new HAproxyException(sprintf('HAPROXY parsing failed for version [%d] [%s] (%s)',$p,$read,hex_dump($x))); } else { throw new HAproxyException(sprintf('HAPROXY version [%d] is not handled [%s]',$p,$read)); } $this->port_remote = $src_port; break; case 2: // Version/Command $vc = $this->read_ch(5); if (($x=($vc>>4)&0x7) !== 2) throw new HAproxyException(sprintf('Unknown HAPROXY version [%d]',$x)); switch ($x=($vc&0x7)) { // HAPROXY internal case 0: throw new HAproxyException('HAPROXY internal health-check'); // PROXY connection case 1: break; default: throw new HAproxyException(sprintf('HAPROXY command [%d] is not handled',$x)); } // Protocol/Address Family $pa = $this->read_ch(5); switch ($x=($pa>>4)&0x7) { case 1: // AF_INET $p = 4; break; case 2: // AF_INET6 $p = 6; break; } switch ($x=($pa&0x7)) { case 1: // STREAM break; default: throw new HAproxyException(sprintf('HAPROXY address family [%d] is not handled',$x)); } $len = Arr::get(unpack('n',$this->read(5,2)),1); // IPv4 if (($p === 4) && ($len === 12)) { $src = inet_ntop($this->read(5,4)); $dst = inet_ntop($this->read(5,4)); } elseif (($p === 6) && ($len === 36)) { $src = inet_ntop($this->read(5,16)); $dst = inet_ntop($this->read(5,16)); } else { throw new HAproxyException(sprintf('HAPROXY address len [%d:%d] is not handled',$p,$len)); } $src_port = unpack('n',$this->read(5,2)); $dst_port = Arr::get(unpack('n',$this->read(5,2)),1); $this->port_remote = Arr::get($src_port,1); break; default: throw new HAproxyException('Failed to initialise HAPROXY connection'); } $this->address_remote = $src; Log::debug(sprintf('%s:- HAPROXY src [%s:%d] dst [%s:%d]', self::LOGKEY, $this->address_remote, $this->port_remote, $dst, $dst_port, )); } Log::debug(sprintf('%s:+ Connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type)); } } public function __get($key) { switch ($key) { case 'address_remote': case 'port_remote': return $this->{$key}; case 'cps': case 'speed': return Arr::get($this->session,$key); case 'rx_free': return self::RX_BUF_SIZE-$this->rx_left; case 'rx_left': return strlen($this->rx_buf); case 'tx_free': return self::TX_BUF_SIZE-strlen($this->tx_buf); case 'type': return socket_get_option($this->connection,SOL_SOCKET,SO_TYPE); default: throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key)); } } public function __set($key,$value) { switch ($key) { case 'cps': case 'speed': return $this->session[$key] = $value; default: throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key)); } } /** * Create a client socket * * @param string $address * @param int $port * @return static * @throws SocketException|HAproxyException */ public static function create(string $address,int $port): self { Log::info(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port)); $type = collect(config('fido.ip')) ->filter(fn($item)=>$item['enabled']); if (filter_var($address,FILTER_VALIDATE_IP)) $resolved = collect([[ (($x=filter_var($address,FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) ? 'ipv6' : 'ip')=>$address, 'type'=>$x ? 'AAAA' : 'A' ]]); else // We only look at AAAA/A records $resolved = collect(dns_get_record($address,$type->map(fn($item)=>$item['type'])->sum())) ->filter(fn($item)=>$type->has(Arr::get($item,'type'))) ->sort(fn($a,$b)=>$type->get(Arr::get($a,'type'))['order'] < $type->get(Arr::get($b,'type'))['order']); if (! $resolved->count()) throw new SocketException(SocketException::CANT_CONNECT,sprintf('%s doesnt resolved to an IPv4/IPv6 address',$address)); $result = FALSE; $socket = NULL; foreach ($resolved as $address) { try { $try = Arr::get($address,Arr::get($address,'type') === 'AAAA' ? 'ipv6' : 'ip'); if (! $try) continue; Log::info(sprintf('%s:- Trying [%s:%d]',self::LOGKEY,$try,$port)); /* Create a TCP/IP socket. */ $socket = socket_create(Arr::get($address,'type') === 'AAAA' ? AF_INET6 : AF_INET,SOCK_STREAM,SOL_TCP); if ($socket === FALSE) throw new SocketException(SocketException::CANT_CREATE_SOCKET,socket_strerror(socket_last_error($socket))); $result = socket_connect($socket,$try,$port); break; } catch (\ErrorException $e) { // If 'Cannot assign requested address' if (socket_last_error($socket) === 99) continue; throw new SocketException(SocketException::CANT_CONNECT,socket_strerror(socket_last_error($socket))); } } if ($result === FALSE) throw new SocketException(SocketException::CANT_CONNECT,socket_strerror(socket_last_error($socket))); return new self($socket,TRUE); } /** * We'll add to our transmit buffer and if doesnt have space, we'll empty it first * * @param string $data * @return void * @throws \Exception */ public function buffer_add(string $data): void { $ptr = 0; $num_bytes = strlen($data); while ($num_bytes) { if (self::DEBUG) Log::debug(sprintf('%s:- To add [%d] to the TX buffer',self::LOGKEY,$num_bytes)); if ($num_bytes > $this->tx_free) { if (self::DEBUG) Log::debug(sprintf('%s:- TX buffer will be too full, draining...',self::LOGKEY)); do { $this->buffer_flush(5); $n = min($this->tx_free,$num_bytes); $this->tx_buf = substr($data,$ptr,$n); $num_bytes -= $n; $ptr += $n; } while ($num_bytes); } else { $this->tx_buf .= substr($data,$ptr,$num_bytes); $num_bytes = 0; } } if (self::DEBUG) Log::debug(sprintf('%s:= TX buffer has [%d] space left',self::LOGKEY,$this->tx_free)); } /** * Empty our TX buffer * * @param int $timeout * @return int * @throws \Exception */ public function buffer_flush(int $timeout): int { if (self::DEBUG) Log::debug(sprintf('%s:+ Emptying TX buffer with [%d] chars, and timeout [%d]',self::LOGKEY,strlen($this->tx_buf),$timeout)); $tm = $this->timer_set($timeout); $rc = self::OK; while (strlen($this->tx_buf)) { $tv = $this->timer_rest($tm); if (($rc=$this->canSend($tv)) > 0) { if (self::DEBUG) Log::debug(sprintf('%s:- Chars to send [%d]',self::LOGKEY,strlen($this->tx_buf))); $sent = $this->send(substr($this->tx_buf,0,self::TX_SIZE),0); if (self::DEBUG) Log::debug(sprintf('%s:- Sent [%d] chars [%s]',self::LOGKEY,$sent,Str::limit($this->tx_buf,15))); $this->tx_buf = substr($this->tx_buf,$sent); } else { return $rc; } // @todo Enable a delay for slow clients //sleep(1); if ($this->timer_expired($tm)) return self::ERROR; } $this->tx_purge(); return $rc; } /** * @param int $timeout * @return int * @throws \Exception */ public function canSend(int $timeout): int { $write = [$this->connection]; return $this->socketSelect(NULL,$write,NULL,$timeout); } /** * Close the connection with the client */ public function close(): void { try { socket_shutdown($this->connection); } catch (\ErrorException $e) { Log::error(sprintf('%s:! Shutting down socket [%s]',self::LOGKEY,$e->getMessage())); } try { socket_close($this->connection); } catch (\ErrorException $e) { Log::error(sprintf('%s:! Closing socket [%s]',self::LOGKEY,$e->getMessage())); } Log::debug(sprintf('%s:= Connection closed with [%s]',self::LOGKEY,$this->address_remote)); } /** * We have data in the buffer or on the socket * * @param int $timeout * @return int * @throws \Exception */ public function hasData(int $timeout): int { $read = [$this->connection]; return $this->rx_left ?: $this->socketSelect($read,NULL,NULL,$timeout); } /** * Read data, emptying from the RX buffer first, then checking the socket. * * @param int $timeout How long to wait for data * @param int $len The amount of data we want * @return string|null * @throws SocketException */ public function read(int $timeout,int $len=1024): ?string { // We have data in our buffer if ($this->rx_left >= $len) { if (self::DEBUG) Log::debug(sprintf('%s:- Returning [%d] chars from the RX buffer',self::LOGKEY,$len)); $result = substr($this->rx_buf,0,$len); $this->rx_buf = substr($this->rx_buf,strlen($result)); return $result; } if ($timeout AND ($this->hasData($timeout) === 0)) return NULL; $buf = ''; try { switch ($this->type) { case SOCK_STREAM: $recv = socket_recv($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT); break; case SOCK_DGRAM: $recv = socket_recvfrom($this->connection,$buf,self::RX_SIZE,MSG_DONTWAIT,$this->address_remote,$this->port_remote); break; default: throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type)); } } catch (\Exception $e) { Log::error(sprintf('%s:! socket_recv Exception [%s]',self::LOGKEY,$e->getMessage())); throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } // If we got no data, we'll send whatever is left in the buffer if ($recv === FALSE) { // If we have something in the buffer, we'll send it if ($this->rx_left) { if (self::DEBUG) Log::debug(sprintf('%s:- Network read return an error, returning final [%d] chars from the RX buffer',self::LOGKEY,strlen($this->rx_buf))); $result = $this->rx_buf; $this->rx_buf = ''; return $result; } Log::debug(sprintf('%s:! Request to read [%d] chars resulted in no data',self::LOGKEY,$len)); throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } // If our buffer is null, see if we have any out of band data. // @todo We throw an errorexception when the socket is closed by the remote I think. if (($recv === 0) && is_null($buf) && ($this->hasData(0) > 0) && $this->type === SOCK_STREAM) { try { socket_recv($this->connection,$buf,$len,MSG_OOB); } catch (\Exception $e) { throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x)); } } $this->rx_buf .= $buf; if (self::DEBUG) Log::debug(sprintf('%s:- Added [%d] chars to the RX buffer',self::LOGKEY,strlen($buf)),['rx_buf'=>hex_dump($this->rx_buf)]); // Loop again and return the data, now that it is in the RX buffer return $this->read($timeout,$len); } /** * Read a character from the remote. * We'll buffer everything received * * @param int $timeout * @return int * @throws \Exception */ public function read_ch(int $timeout): int { if ($this->hasData($timeout) > 0) { $ch = $this->read($timeout,1); } else { return self::TIMEOUT; } return ord($ch); } public function rx_purge(): void { if (self::DEBUG) Log::debug(sprintf('%s:+ Discarding [%d] chars from the RX buffer',self::LOGKEY,strlen($this->tx_buf))); $this->rx_buf = ''; } /** * Clear our TX buffer */ public function tx_purge(): void { if (self::DEBUG) Log::debug(sprintf('%s:+ Discarding [%d] chars from the TX buffer',self::LOGKEY,strlen($this->tx_buf))); $this->tx_buf = ''; } /** * Send data to the client * * @param string $message * @param int $timeout * @return int|false * @throws \Exception */ public function send(string $message,int $timeout): int|false { if ($timeout AND (! $rc=$this->canSend($timeout))) return $rc; if (self::DEBUG) Log::debug(sprintf('%s:- Sending [%d] chars [%s]',self::LOGKEY,strlen($message),Str::limit($message,15))); switch ($this->type) { case SOCK_STREAM: return socket_write($this->connection,$message,strlen($message)); case SOCK_DGRAM: return socket_sendto($this->connection,$message,strlen($message),0,$this->address_remote,$this->port_remote); default: throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type)); } } /** * Wait for data on a socket * * @param array|null $read * @param array|null $write * @param array|null $except * @param int $timeout * @return int * @throws \Exception */ private function socketSelect(?array $read,?array $write,?array $except,int $timeout): int { $rc = socket_select($read,$write,$except,$timeout); if ($rc === FALSE) throw new \Exception('Socket Error: '.socket_strerror(socket_last_error())); if (self::DEBUG) Log::debug(sprintf('%s:= Socket select returned [%d] with timeout (%d)',self::LOGKEY,$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]); return $rc; } public function timer_expired(int $timer): int { return (time() >= $timer); } public function timer_rest(int $timer): int { return $timer-time(); } public function timer_set(int $expire): int { return time()+$expire; } /** * See if there is data waiting to collect, or if we can send * * @param bool $read * @param bool $write * @param int $timeout * @return int * @throws \Exception * @deprecated use canSend or hasData */ public function ttySelect(bool $read,bool $write, int $timeout): int { if ($this->rx_left) { if (self::DEBUG) Log::debug(sprintf('%s:= We still have [%d] chars in the RX buffer.',self::LOGKEY,$this->rx_left)); return 1; } $read = $read ? [$this->connection] : NULL; $write = $write ? [$this->connection] : NULL; return $this->socketSelect($read,$write,NULL,$timeout); } }