2021-04-01 10:59:15 +00:00
< ? php
namespace App\Classes\File ;
2022-11-13 13:29:55 +00:00
use Illuminate\Support\Arr ;
2021-04-01 10:59:15 +00:00
use Illuminate\Support\Collection ;
use Illuminate\Support\Facades\Log ;
2023-07-02 13:40:08 +00:00
use Illuminate\Support\Facades\Storage ;
2021-04-01 10:59:15 +00:00
use Symfony\Component\HttpFoundation\File\Exception\FileException ;
2023-07-02 13:40:08 +00:00
use App\Classes\ { File , Protocol };
2022-11-13 13:29:55 +00:00
use App\Classes\FTN\ { InvalidPacketException , Packet };
2023-07-02 13:40:08 +00:00
use App\Exceptions\FileGrewException ;
2022-11-01 11:24:36 +00:00
use App\Jobs\ { MessageProcess , TicProcess };
2021-07-18 12:10:21 +00:00
use App\Models\Address ;
2021-04-01 10:59:15 +00:00
/**
* Object representing the files we are receiving
*
* @ property - read resource $fd
* @ property - read int total_recv
* @ property - read int total_recv_bytes
*/
final class Receive extends Item
{
2021-08-17 13:49:39 +00:00
private const LOGKEY = 'IR-' ;
2023-07-02 13:40:08 +00:00
private const compression = [
'BZ2' ,
'GZ' ,
];
2021-07-18 12:10:21 +00:00
private Address $ao ;
2021-04-01 10:59:15 +00:00
private Collection $list ;
private ? Item $receiving ;
2021-07-18 12:10:21 +00:00
private string $file ; // Local filename for file received
2023-07-02 13:40:08 +00:00
/** @var ?string The compression used by the incoming file */
private ? string $comp ;
/** @var string|null The compressed data received */
private ? string $comp_data ;
2021-04-01 10:59:15 +00:00
public function __construct ()
{
// Initialise our variables
$this -> list = collect ();
$this -> receiving = NULL ;
}
public function __get ( $key )
{
switch ( $key ) {
case 'fd' :
return is_resource ( $this -> f );
case 'filepos' :
return $this -> file_pos ;
case 'mtime' :
case 'name' :
case 'size' :
2022-11-13 13:29:55 +00:00
return $this -> receiving ? -> { 'file_' . $key };
2021-04-01 10:59:15 +00:00
2021-08-15 14:41:43 +00:00
case 'name_size_time' :
return sprintf ( '%s %lu %lu' , $this -> name , $this -> size , $this -> mtime );
2021-04-01 10:59:15 +00:00
case 'to_get' :
return $this -> list
-> filter ( function ( $item ) { return ( $item -> action & self :: I_RECV ) && $item -> received === FALSE ; })
-> count ();
case 'total_recv' :
return $this -> list
-> filter ( function ( $item ) { return ( $item -> action & self :: I_RECV ) && $item -> received === TRUE ; })
-> count ();
case 'total_recv_bytes' :
return $this -> list
-> filter ( function ( $item ) { return ( $item -> action & self :: I_RECV ) && $item -> received === TRUE ; })
2023-07-02 13:40:08 +00:00
-> sum ( function ( $item ) { return $item -> size ; });
2021-04-01 10:59:15 +00:00
default :
2023-06-27 07:39:11 +00:00
throw new \Exception ( 'Unknown key: ' . $key );
2021-04-01 10:59:15 +00:00
}
}
/**
* Close the file descriptor for our incoming file
*
2023-06-27 07:39:11 +00:00
* @ throws \Exception
2021-04-01 10:59:15 +00:00
*/
public function close () : void
{
2023-07-02 13:40:08 +00:00
if ( ! $this -> receiving )
2023-06-27 07:39:11 +00:00
throw new \Exception ( 'No file to close' );
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
if ( $this -> f ) {
if ( $this -> file_pos !== $this -> receiving -> size ) {
Log :: warning ( sprintf ( '%s:- Closing [%s], but missing [%d] bytes' , self :: LOGKEY , $this -> receiving -> name , $this -> receiving -> size - $this -> file_pos ));
$this -> receiving -> incomplete = TRUE ;
}
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
$this -> receiving -> received = TRUE ;
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
$end = time () - $this -> start ;
Log :: debug ( sprintf ( '%s:- Closing [%s], received in [%d]' , self :: LOGKEY , $this -> receiving -> name , $end ));
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
if ( $this -> comp )
Log :: info ( sprintf ( '%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]' , self :: LOGKEY , $this -> comp , $x = strlen ( $this -> comp_data ), $this -> receiving -> size , $x / $this -> receiving -> size * 100 ));
2021-07-18 12:10:21 +00:00
2023-07-02 13:40:08 +00:00
fclose ( $this -> f );
// Set our mtime
touch ( $this -> file , $this -> mtime );
$this -> file_pos = 0 ;
$this -> f = NULL ;
2021-08-12 11:59:48 +00:00
2023-07-02 13:40:08 +00:00
// If the packet has been received but not the right size, dont process it any more.
2021-07-18 12:10:21 +00:00
2023-07-02 13:40:08 +00:00
// If we received a packet, we'll dispatch a job to process it
if ( ! $this -> receiving -> incomplete )
switch ( $this -> receiving -> type ) {
case self :: IS_ARC :
case self :: IS_PKT :
Log :: info ( sprintf ( '%s:- Processing mail %s [%s]' , self :: LOGKEY , $this -> receiving -> type === self :: IS_PKT ? 'PACKET' : 'ARCHIVE' , $this -> file ));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
try {
$f = new File ( $this -> file );
$processed = FALSE ;
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
foreach ( $f as $packet ) {
$po = Packet :: process ( $packet , Arr :: get ( stream_get_meta_data ( $packet ), 'uri' ), $f -> itemSize (), $this -> ao -> system );
2022-12-04 10:59:06 +00:00
2023-07-02 13:40:08 +00:00
// Check the messages are from the uplink
if ( $this -> ao -> system -> addresses -> search ( function ( $item ) use ( $po ) { return $item -> id === $po -> fftn_o -> id ; }) === FALSE ) {
Log :: error ( sprintf ( '%s:! Packet [%s] is not from this link? [%d]' , self :: LOGKEY , $po -> fftn_o -> ftn , $this -> ao -> system_id ));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
break ;
}
2022-12-04 10:59:06 +00:00
2023-07-02 13:40:08 +00:00
// Check the packet password
if ( $this -> ao -> session ( 'pktpass' ) != $po -> password ) {
Log :: error ( sprintf ( '%s:! Packet from [%s] with password [%s] is invalid.' , self :: LOGKEY , $this -> ao -> ftn , $po -> password ));
// @todo Generate message to system advising invalid password - that message should be sent without a packet password!
break ;
}
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
Log :: info ( sprintf ( '%s:- Packet has [%d] messages' , self :: LOGKEY , $po -> count ()));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
// Queue messages if there are too many in the packet.
if ( $queue = ( $po -> count () > config ( 'app.queue_msgs' )))
Log :: info ( sprintf ( '%s:- Messages will be sent to the queue for processing' , self :: LOGKEY ));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
$count = 0 ;
foreach ( $po as $msg ) {
Log :: info ( sprintf ( '%s:- Mail from [%s] to [%s]' , self :: LOGKEY , $msg -> fftn , $msg -> tftn ));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
// @todo Quick check that the packet should be processed by us.
// @todo validate that the packet's zone is in the domain.
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
try {
// Dispatch job.
if ( $queue )
MessageProcess :: dispatch ( $msg , $f -> pktName ());
else
MessageProcess :: dispatchSync ( $msg , $f -> pktName ());
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
} catch ( \Exception $e ) {
Log :: error ( sprintf ( '%s:! Got error dispatching message [%s] (%d:%s-%s).' , self :: LOGKEY , $msg -> msgid , $e -> getLine (), $e -> getFile (), $e -> getMessage ()));
}
$count ++ ;
2022-11-13 13:29:55 +00:00
}
2023-01-24 10:41:08 +00:00
2023-07-02 13:40:08 +00:00
if ( $count === $po -> count ())
$processed = TRUE ;
2022-11-13 13:29:55 +00:00
}
2023-01-24 10:41:08 +00:00
2023-07-02 13:40:08 +00:00
if ( ! $processed ) {
Log :: alert ( sprintf ( '%s:- Not deleting packet [%s], it doesnt seem to be processed?' , self :: LOGKEY , $this -> file ));
2022-11-13 13:29:55 +00:00
2023-07-02 13:40:08 +00:00
// If we want to keep the packet, we could do that logic here
} elseif ( ! config ( 'app.packet_keep' )) {
Log :: debug ( sprintf ( '%s:- Deleting processed packet [%s]' , self :: LOGKEY , $this -> file ));
unlink ( $this -> file );
}
2021-07-30 14:35:52 +00:00
2023-07-02 13:40:08 +00:00
} catch ( InvalidPacketException $e ) {
Log :: error ( sprintf ( '%s:- Not deleting packet [%s], as it generated an InvalidPacketException' , self :: LOGKEY , $this -> file ),[ 'e' => $e -> getMessage ()]);
2023-01-11 02:15:30 +00:00
2023-07-02 13:40:08 +00:00
} catch ( \Exception $e ) {
Log :: error ( sprintf ( '%s:- Not deleting packet [%s], as it generated an uncaught exception' , self :: LOGKEY , $this -> file ),[ 'e' => $e -> getMessage ()]);
}
2021-07-19 14:26:12 +00:00
2023-07-02 13:40:08 +00:00
break ;
2021-07-18 12:10:21 +00:00
2023-07-02 13:40:08 +00:00
case self :: IS_TIC :
Log :: info ( sprintf ( '%s:- Processing TIC file [%s]' , self :: LOGKEY , $this -> file ));
2022-11-01 11:24:36 +00:00
2023-07-02 13:40:08 +00:00
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
TicProcess :: dispatch ( $this -> file );
2022-11-01 11:24:36 +00:00
2023-07-02 13:40:08 +00:00
break ;
2022-11-01 11:24:36 +00:00
2023-07-02 13:40:08 +00:00
default :
Log :: debug ( sprintf ( '%s:- Leaving file [%s] in the inbound dir' , self :: LOGKEY , $this -> file ));
}
}
2021-07-18 12:10:21 +00:00
$this -> receiving = NULL ;
2021-04-01 10:59:15 +00:00
}
/**
* Open the file descriptor to receive a file
*
2021-08-18 14:20:34 +00:00
* @ param Address $ao
2021-04-01 10:59:15 +00:00
* @ param bool $check
2023-07-02 13:40:08 +00:00
* @ param string | null $comp If the incoming file will be compressed
* @ return int
2023-06-27 07:39:11 +00:00
* @ throws \Exception
2023-07-02 13:40:08 +00:00
* @ todo $comp should be parsed , in case it contains other items
2021-04-01 10:59:15 +00:00
*/
2023-07-02 13:40:08 +00:00
public function open ( Address $ao , bool $check = FALSE , string $comp = NULL ) : int
2021-04-01 10:59:15 +00:00
{
// @todo implement return 4 - SUSPEND(?) file
2023-07-02 13:40:08 +00:00
// @todo Change to use Storage::class()
2021-04-01 10:59:15 +00:00
if ( ! $this -> receiving )
2023-06-27 07:39:11 +00:00
throw new \Exception ( 'No files currently receiving' );
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
$this -> comp_data = '' ;
$this -> comp = $comp ;
/*
if ( $this -> receiving -> size <= self :: MAX_COMPSIZE ) {
$this -> comp = $comp ;
} else {
Log :: alert ( sprintf ( '%s:- Compression [%s] disabled for file [%s], its size is too big [%d]' , self :: LOGKEY , $comp , $this -> receiving -> name , $this -> receiving -> size ));
}
*/
if ( $this -> comp && ( ! in_array ( $this -> comp , self :: compression )))
throw new \Exception ( 'Unsupported compression:' . $this -> comp );
elseif ( $this -> comp )
Log :: debug ( sprintf ( '%s:- Receiving file with [%s] compression' , self :: LOGKEY , $this -> comp ));
2021-07-18 12:10:21 +00:00
$this -> ao = $ao ;
2021-04-01 10:59:15 +00:00
$this -> file_pos = 0 ;
$this -> start = time ();
2023-07-02 13:40:08 +00:00
$this -> file = sprintf ( 'storage/app/%s' , $this -> local_path ( $ao ));
if ( file_exists ( $this -> file )
&& ( Storage :: disk ( 'local' ) -> lastModified ( $this -> local_path ( $ao )) === $this -> mtime )
2023-07-08 08:00:23 +00:00
&& ( Storage :: disk ( 'local' ) -> size ( $this -> local_path ( $ao )) === $this -> size ))
{
2023-07-02 13:40:08 +00:00
Log :: alert ( sprintf ( '%s:- File already exists - skipping [%s]' , self :: LOGKEY , $this -> file ));
2023-07-11 07:22:31 +00:00
return Protocol :: FOP_GOT ;
2023-07-02 13:40:08 +00:00
} elseif ( file_exists ( $this -> file ) && ( Storage :: disk ( 'local' ) -> size ( $this -> local_path ( $ao )) > 0 )) {
Log :: alert ( sprintf ( '%s:- File exists with different details - skipping [%s]' , self :: LOGKEY , $this -> file ));
2023-07-11 07:22:31 +00:00
return Protocol :: FOP_SKIP ;
2023-07-02 13:40:08 +00:00
} else {
2023-07-09 01:18:57 +00:00
// @todo I dont think we are enabling resumable sessions - need to check
2023-07-02 13:40:08 +00:00
Log :: debug ( sprintf ( '%s:- Opening [%s]' , self :: LOGKEY , $this -> file ));
}
// If we are only checking, we'll return (NR mode)
if ( $check )
return Protocol :: FOP_OK ;
2021-04-01 10:59:15 +00:00
2021-07-18 12:10:21 +00:00
$this -> f = fopen ( $this -> file , 'wb' );
2021-04-01 10:59:15 +00:00
if ( ! $this -> f ) {
2023-07-02 13:40:08 +00:00
Log :: error ( sprintf ( '%s:! Unable to open file [%s] for writing' , self :: LOGKEY , $this -> receiving -> name ));
return Protocol :: FOP_ERROR ;
2021-04-01 10:59:15 +00:00
}
2023-07-02 13:40:08 +00:00
Log :: info ( sprintf ( '%s:= open - File [%s] opened for writing' , self :: LOGKEY , $this -> receiving -> name ));
return Protocol :: FOP_OK ;
2021-04-01 10:59:15 +00:00
}
/**
* Add a new file to receive
*
* @ param array $file
2023-06-27 07:39:11 +00:00
* @ throws \Exception
2021-04-01 10:59:15 +00:00
*/
public function new ( array $file ) : void
{
2021-08-17 13:49:39 +00:00
Log :: debug ( sprintf ( '%s:+ new [%s]' , self :: LOGKEY , join ( '|' , $file )));
2021-04-01 10:59:15 +00:00
if ( $this -> receiving )
2023-06-27 07:39:11 +00:00
throw new \Exception ( 'Can only have 1 file receiving at a time' );
2021-04-01 10:59:15 +00:00
$o = new Item ( $file , self :: I_RECV );
$this -> list -> push ( $o );
$this -> receiving = $o ;
}
2023-07-02 13:40:08 +00:00
private function local_path ( Address $ao ) : string
{
return sprintf ( '%s/%04X-%s' , config ( 'app.fido' ), $ao -> id , $this -> receiving -> recvas );
}
2021-04-01 10:59:15 +00:00
/**
* Write data to the file we are receiving
*
* @ param string $buf
2023-07-02 13:40:08 +00:00
* @ return bool
2023-06-27 07:39:11 +00:00
* @ throws \Exception
2021-04-01 10:59:15 +00:00
*/
2023-07-02 13:40:08 +00:00
public function write ( string $buf ) : bool
2021-04-01 10:59:15 +00:00
{
if ( ! $this -> f )
2023-07-02 13:40:08 +00:00
throw new \Exception ( 'No file open for write' );
$data = '' ;
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
// If we are using compression mode, then we need to buffer the right until we have everything
if ( $this -> comp ) {
$this -> comp_data .= $buf ;
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
// See if we can uncompress the data yet
switch ( $this -> comp ) {
case 'BZ2' :
if (( $data = bzdecompress ( $this -> comp_data , TRUE )) === FALSE )
throw new FileException ( 'BZ2 decompression failed?' );
elseif ( is_numeric ( $data ))
throw new FileException ( sprintf ( 'BZ2 decompression failed with (:%d)?' , $data ));
break ;
case 'GZ' :
if (( $data = gzdeflate ( $this -> comp_data )) === FALSE )
throw new FileException ( 'BZ2 decompression failed?' );
break ;
}
// Compressed file grew
if ( ! strlen ( $data )) {
if ( strlen ( $this -> comp_data ) > $this -> receiving -> size ) {
fclose ( $this -> f );
$this -> f = NULL ;
throw new FileGrewException ( sprintf ( 'Error compressed file grew, rejecting [%d] -> [%d]' , strlen ( $this -> comp_data ), $this -> receiving -> size ));
}
return TRUE ;
}
} else {
$data = $buf ;
}
if ( $this -> file_pos + strlen ( $data ) > $this -> receiving -> size )
throw new \Exception ( sprintf ( 'Too many bytes received [%d] (%d)?' , $this -> file_pos + strlen ( $buf ), $this -> receiving -> size ));
$rc = fwrite ( $this -> f , $data );
2021-04-01 10:59:15 +00:00
if ( $rc === FALSE )
throw new FileException ( 'Error while writing to file' );
$this -> file_pos += $rc ;
2023-07-02 13:40:08 +00:00
Log :: debug ( sprintf ( '%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)' , self :: LOGKEY , $rc , $this -> file_pos , $this -> receiving -> size , strlen ( $buf )));
if ( strlen ( $this -> comp_data ) > $this -> receiving -> size )
Log :: alert ( sprintf ( '%s:- Compression grew the file during transfer (%d->%d)' , self :: LOGKEY , $this -> receiving -> size , strlen ( $this -> comp_data )));
2021-04-01 10:59:15 +00:00
2023-07-02 13:40:08 +00:00
return TRUE ;
2021-04-01 10:59:15 +00:00
}
}