2021-04-01 21:59:15 +11:00
< ? php
namespace App\Classes\File ;
2022-11-14 00:29:55 +11:00
use Illuminate\Support\Arr ;
2021-04-01 21:59:15 +11:00
use Illuminate\Support\Collection ;
use Illuminate\Support\Facades\Log ;
2023-07-02 23:40:08 +10:00
use Illuminate\Support\Facades\Storage ;
2021-04-01 21:59:15 +11:00
use Symfony\Component\HttpFoundation\File\Exception\FileException ;
2023-07-02 23:40:08 +10:00
use App\Classes\ { File , Protocol };
2022-11-14 00:29:55 +11:00
use App\Classes\FTN\ { InvalidPacketException , Packet };
2023-07-02 23:40:08 +10:00
use App\Exceptions\FileGrewException ;
2022-11-01 22:24:36 +11:00
use App\Jobs\ { MessageProcess , TicProcess };
2021-07-18 22:10:21 +10:00
use App\Models\Address ;
2021-04-01 21:59:15 +11:00
/**
* Object representing the files we are receiving
*
* @ property - read resource $fd
* @ property - read int total_recv
* @ property - read int total_recv_bytes
*/
2023-07-17 16:36:53 +10:00
class Receive extends Base
2021-04-01 21:59:15 +11:00
{
2021-08-17 23:49:39 +10:00
private const LOGKEY = 'IR-' ;
2023-07-02 23:40:08 +10:00
private const compression = [
'BZ2' ,
'GZ' ,
];
2021-07-18 22:10:21 +10:00
private Address $ao ;
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
/** @var ?string The compression used by the incoming file */
private ? string $comp ;
/** @var string|null The compressed data received */
private ? string $comp_data ;
2021-04-01 21:59:15 +11:00
public function __construct ()
{
// Initialise our variables
2023-07-17 16:36:53 +10:00
if ( get_class ( $this ) === self :: class ) {
$this -> list = collect ();
$this -> f = NULL ;
}
2021-04-01 21:59:15 +11:00
}
public function __get ( $key )
{
switch ( $key ) {
2023-07-17 16:36:53 +10:00
case 'completed' :
return $this -> list
-> filter ( function ( $item ) { return $item -> complete === TRUE ; });
2021-04-01 21:59:15 +11:00
case 'fd' :
return is_resource ( $this -> f );
2023-07-19 16:34:09 +10:00
case 'recvmtime' :
case 'recvsize' :
2021-04-01 21:59:15 +11:00
case 'mtime' :
2023-07-17 16:36:53 +10:00
case 'nameas' :
2021-04-01 21:59:15 +11:00
case 'size' :
2021-08-16 00:41:43 +10:00
case 'name_size_time' :
2023-07-17 16:36:53 +10:00
return $this -> receiving -> { $key };
case 'pos' :
return $this -> pos ;
2021-08-16 00:41:43 +10:00
2023-07-17 16:36:53 +10:00
case 'receiving' :
return $this -> list -> get ( $this -> index );
case 'ready' :
return ( ! is_null ( $this -> index ));
case 'togo_count' :
2021-04-01 21:59:15 +11:00
return $this -> list
2023-07-17 16:36:53 +10:00
-> filter ( function ( $item ) { return $item -> complete === FALSE ; })
2021-04-01 21:59:15 +11:00
-> count ();
case 'total_recv' :
2023-07-17 16:36:53 +10:00
return $this -> completed -> count ();
2021-04-01 21:59:15 +11:00
case 'total_recv_bytes' :
2023-07-19 10:49:57 +10:00
return $this -> completed -> sum ( function ( $item ) { return $item -> recvsize ; });
2021-04-01 21:59:15 +11:00
default :
2023-06-27 19:39:11 +12:00
throw new \Exception ( 'Unknown key: ' . $key );
2021-04-01 21:59:15 +11:00
}
}
/**
* Close the file descriptor for our incoming file
*
2023-06-27 19:39:11 +12:00
* @ throws \Exception
2021-04-01 21:59:15 +11:00
*/
public function close () : void
{
2023-07-02 23:40:08 +10:00
if ( ! $this -> receiving )
2023-06-27 19:39:11 +12:00
throw new \Exception ( 'No file to close' );
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
if ( $this -> f ) {
2023-07-17 16:36:53 +10:00
if ( $this -> pos !== $this -> receiving -> recvsize )
Log :: warning ( sprintf ( '%s:- Closing [%s], but missing [%d] bytes' , self :: LOGKEY , $this -> receiving -> nameas , $this -> receiving -> recvsize - $this -> pos ));
else
$this -> receiving -> complete = TRUE ;
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
$end = time () - $this -> start ;
2023-07-17 16:36:53 +10:00
Log :: debug ( sprintf ( '%s:- Closing [%s], received in [%d]' , self :: LOGKEY , $this -> receiving -> nameas , $end ));
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
if ( $this -> comp )
2023-07-17 16:36:53 +10:00
Log :: info ( sprintf ( '%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]' , self :: LOGKEY , $this -> comp , $x = strlen ( $this -> comp_data ), $this -> receiving -> recvsize , $x / $this -> receiving -> recvsize * 100 ));
2021-07-18 22:10:21 +10:00
2023-07-02 23:40:08 +10:00
fclose ( $this -> f );
// Set our mtime
2023-07-17 16:36:53 +10:00
touch ( $this -> receiving -> full_name , $this -> receiving -> mtime );
2023-07-02 23:40:08 +10:00
$this -> f = NULL ;
2021-08-12 21:59:48 +10:00
2023-07-17 16:36:53 +10:00
// If we received a packet, we'll dispatch a job to process it, if we got it all
if ( $this -> receiving -> complete )
switch ( $x = $this -> receiving -> whatType ()) {
2023-07-02 23:40:08 +10:00
case self :: IS_ARC :
case self :: IS_PKT :
2023-07-17 16:36:53 +10:00
Log :: info ( sprintf ( '%s:- Processing mail %s [%s]' , self :: LOGKEY , $x === self :: IS_PKT ? 'PACKET' : 'ARCHIVE' , $this -> receiving -> nameas ));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
try {
2023-07-17 16:36:53 +10:00
$f = new File ( $this -> receiving -> full_name );
2023-07-02 23:40:08 +10:00
$processed = FALSE ;
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
foreach ( $f as $packet ) {
$po = Packet :: process ( $packet , Arr :: get ( stream_get_meta_data ( $packet ), 'uri' ), $f -> itemSize (), $this -> ao -> system );
2022-12-04 21:59:06 +11:00
2023-07-02 23:40:08 +10:00
// Check the messages are from the uplink
if ( $this -> ao -> system -> addresses -> search ( function ( $item ) use ( $po ) { return $item -> id === $po -> fftn_o -> id ; }) === FALSE ) {
Log :: error ( sprintf ( '%s:! Packet [%s] is not from this link? [%d]' , self :: LOGKEY , $po -> fftn_o -> ftn , $this -> ao -> system_id ));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
break ;
}
2022-12-04 21:59:06 +11:00
2023-07-02 23:40:08 +10:00
// Check the packet password
if ( $this -> ao -> session ( 'pktpass' ) != $po -> password ) {
Log :: error ( sprintf ( '%s:! Packet from [%s] with password [%s] is invalid.' , self :: LOGKEY , $this -> ao -> ftn , $po -> password ));
// @todo Generate message to system advising invalid password - that message should be sent without a packet password!
break ;
}
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
Log :: info ( sprintf ( '%s:- Packet has [%d] messages' , self :: LOGKEY , $po -> count ()));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
// Queue messages if there are too many in the packet.
if ( $queue = ( $po -> count () > config ( 'app.queue_msgs' )))
Log :: info ( sprintf ( '%s:- Messages will be sent to the queue for processing' , self :: LOGKEY ));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
$count = 0 ;
foreach ( $po as $msg ) {
Log :: info ( sprintf ( '%s:- Mail from [%s] to [%s]' , self :: LOGKEY , $msg -> fftn , $msg -> tftn ));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
// @todo Quick check that the packet should be processed by us.
// @todo validate that the packet's zone is in the domain.
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
try {
// Dispatch job.
if ( $queue )
2023-07-15 10:46:19 +10:00
MessageProcess :: dispatch ( $msg , $f -> pktName (), $this -> ao );
2023-07-02 23:40:08 +10:00
else
2023-07-15 10:46:19 +10:00
MessageProcess :: dispatchSync ( $msg , $f -> pktName (), $this -> ao );
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
} catch ( \Exception $e ) {
Log :: error ( sprintf ( '%s:! Got error dispatching message [%s] (%d:%s-%s).' , self :: LOGKEY , $msg -> msgid , $e -> getLine (), $e -> getFile (), $e -> getMessage ()));
}
$count ++ ;
2022-11-14 00:29:55 +11:00
}
2023-01-24 21:41:08 +11:00
2023-07-02 23:40:08 +10:00
if ( $count === $po -> count ())
$processed = TRUE ;
2022-11-14 00:29:55 +11:00
}
2023-01-24 21:41:08 +11:00
2023-07-02 23:40:08 +10:00
if ( ! $processed ) {
2023-07-17 16:36:53 +10:00
Log :: alert ( sprintf ( '%s:- Not deleting packet [%s], it doesnt seem to be processed?' , self :: LOGKEY , $this -> receiving -> nameas ));
2022-11-14 00:29:55 +11:00
2023-07-02 23:40:08 +10:00
// If we want to keep the packet, we could do that logic here
} elseif ( ! config ( 'app.packet_keep' )) {
2023-07-17 16:36:53 +10:00
Log :: debug ( sprintf ( '%s:- Deleting processed packet [%s]' , self :: LOGKEY , $this -> receiving -> full_name ));
unlink ( $this -> receiving -> full_name );
2023-07-02 23:40:08 +10:00
}
2021-07-31 00:35:52 +10:00
2023-07-02 23:40:08 +10:00
} catch ( InvalidPacketException $e ) {
2023-07-17 16:36:53 +10:00
Log :: error ( sprintf ( '%s:- Not deleting packet [%s], as it generated an InvalidPacketException' , self :: LOGKEY , $this -> receiving -> nameas ),[ 'e' => $e -> getMessage ()]);
2023-01-11 13:15:30 +11:00
2023-07-02 23:40:08 +10:00
} catch ( \Exception $e ) {
2023-07-17 16:36:53 +10:00
Log :: error ( sprintf ( '%s:- Not deleting packet [%s], as it generated an uncaught exception' , self :: LOGKEY , $this -> receiving -> nameas ),[ 'e' => $e -> getMessage ()]);
2023-07-02 23:40:08 +10:00
}
2021-07-20 00:26:12 +10:00
2023-07-02 23:40:08 +10:00
break ;
2021-07-18 22:10:21 +10:00
2023-07-02 23:40:08 +10:00
case self :: IS_TIC :
2023-07-17 16:36:53 +10:00
Log :: info ( sprintf ( '%s:- Processing TIC file [%s]' , self :: LOGKEY , $this -> receiving -> nameas ));
2022-11-01 22:24:36 +11:00
2023-07-02 23:40:08 +10:00
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
2023-07-17 16:36:53 +10:00
TicProcess :: dispatch ( $this -> receiving -> nameas );
2022-11-01 22:24:36 +11:00
2023-07-02 23:40:08 +10:00
break ;
2022-11-01 22:24:36 +11:00
2023-07-02 23:40:08 +10:00
default :
2023-07-17 16:36:53 +10:00
Log :: debug ( sprintf ( '%s:- Leaving file [%s] in the inbound dir' , self :: LOGKEY , $this -> receiving -> nameas ));
2023-07-02 23:40:08 +10:00
}
}
2021-07-18 22:10:21 +10:00
2023-07-17 16:36:53 +10:00
$this -> index = NULL ;
2021-04-01 21:59:15 +11:00
}
/**
2023-07-17 16:36:53 +10:00
* Add a new file to receive
2021-04-01 21:59:15 +11:00
*
2023-07-17 16:36:53 +10:00
* @ param array $file
2021-08-19 00:20:34 +10:00
* @ param Address $ao
2023-07-17 16:36:53 +10:00
* @ throws \Exception
*/
public function new ( array $file , Address $ao ) : void
{
Log :: debug ( sprintf ( '%s:+ Receiving new file [%s]' , self :: LOGKEY , join ( '|' , $file )));
if ( $this -> index )
throw new \Exception ( 'Can only have 1 file receiving at a time' );
$this -> ao = $ao ;
$this -> list -> push ( new Item ( $ao , Arr :: get ( $file , 'name' ),( int ) Arr :: get ( $file , 'mtime' ),( int ) Arr :: get ( $file , 'size' )));
$this -> index = $this -> list -> count () - 1 ;
}
/**
* Open the file descriptor to receive a file
*
2021-04-01 21:59:15 +11:00
* @ param bool $check
2023-07-02 23:40:08 +10:00
* @ param string | null $comp If the incoming file will be compressed
* @ return int
2023-06-27 19:39:11 +12:00
* @ throws \Exception
2023-07-02 23:40:08 +10:00
* @ todo $comp should be parsed , in case it contains other items
2021-04-01 21:59:15 +11:00
*/
2023-07-17 16:36:53 +10:00
public function open ( bool $check = FALSE , string $comp = NULL ) : int
2021-04-01 21:59:15 +11:00
{
// @todo implement return 4 - SUSPEND(?) file
2023-07-17 16:36:53 +10:00
if ( is_null ( $this -> index ))
2023-06-27 19:39:11 +12:00
throw new \Exception ( 'No files currently receiving' );
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
$this -> comp_data = '' ;
$this -> comp = $comp ;
/*
if ( $this -> receiving -> size <= self :: MAX_COMPSIZE ) {
$this -> comp = $comp ;
} else {
Log :: alert ( sprintf ( '%s:- Compression [%s] disabled for file [%s], its size is too big [%d]' , self :: LOGKEY , $comp , $this -> receiving -> name , $this -> receiving -> size ));
}
*/
if ( $this -> comp && ( ! in_array ( $this -> comp , self :: compression )))
throw new \Exception ( 'Unsupported compression:' . $this -> comp );
elseif ( $this -> comp )
Log :: debug ( sprintf ( '%s:- Receiving file with [%s] compression' , self :: LOGKEY , $this -> comp ));
2023-07-17 16:36:53 +10:00
$this -> pos = 0 ;
2021-04-01 21:59:15 +11:00
$this -> start = time ();
2023-07-02 23:40:08 +10:00
2023-07-17 16:36:53 +10:00
if ( $this -> receiving -> exists && $this -> receiving -> match_mtime && $this -> receiving -> match_size ) {
Log :: alert ( sprintf ( '%s:- File already exists - skipping [%s]' , self :: LOGKEY , $this -> receiving -> nameas ));
2023-07-11 17:22:31 +10:00
return Protocol :: FOP_GOT ;
2023-07-02 23:40:08 +10:00
2023-07-17 16:36:53 +10:00
} elseif ( $this -> receiving -> exists && $this -> receiving -> size > 0 ) {
Log :: alert ( sprintf ( '%s:- File exists with different details - skipping [%s] (size: %d, mtime: %d)' , self :: LOGKEY , $this -> receiving -> nameas , $this -> receiving -> size , $this -> receiving -> mtime ));
2023-07-11 17:22:31 +10:00
return Protocol :: FOP_SKIP ;
2023-07-02 23:40:08 +10:00
} else {
2023-07-09 11:18:57 +10:00
// @todo I dont think we are enabling resumable sessions - need to check
2023-07-17 16:36:53 +10:00
Log :: debug ( sprintf ( '%s:- Opening [%s]' , self :: LOGKEY , $this -> receiving -> nameas ));
2023-07-02 23:40:08 +10:00
}
// If we are only checking, we'll return (NR mode)
if ( $check )
return Protocol :: FOP_OK ;
2021-04-01 21:59:15 +11:00
2023-07-17 16:36:53 +10:00
$this -> f = fopen ( $this -> receiving -> full_name , 'wb' );
2021-04-01 21:59:15 +11:00
if ( ! $this -> f ) {
2023-07-17 16:36:53 +10:00
Log :: error ( sprintf ( '%s:! Unable to open file [%s] for writing' , self :: LOGKEY , $this -> receiving -> nameas ));
2023-07-02 23:40:08 +10:00
return Protocol :: FOP_ERROR ;
2021-04-01 21:59:15 +11:00
}
2023-07-17 16:36:53 +10:00
Log :: info ( sprintf ( '%s:= File [%s] opened for writing' , self :: LOGKEY , $this -> receiving -> nameas ));
2023-07-02 23:40:08 +10:00
return Protocol :: FOP_OK ;
2021-04-01 21:59:15 +11:00
}
/**
* Write data to the file we are receiving
*
* @ param string $buf
2023-07-02 23:40:08 +10:00
* @ return bool
2023-06-27 19:39:11 +12:00
* @ throws \Exception
2021-04-01 21:59:15 +11:00
*/
2023-07-02 23:40:08 +10:00
public function write ( string $buf ) : bool
2021-04-01 21:59:15 +11:00
{
2023-07-17 16:36:53 +10:00
if ( ! $this -> fd )
2023-07-02 23:40:08 +10:00
throw new \Exception ( 'No file open for write' );
$data = '' ;
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
// If we are using compression mode, then we need to buffer the right until we have everything
if ( $this -> comp ) {
$this -> comp_data .= $buf ;
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
// See if we can uncompress the data yet
switch ( $this -> comp ) {
case 'BZ2' :
if (( $data = bzdecompress ( $this -> comp_data , TRUE )) === FALSE )
throw new FileException ( 'BZ2 decompression failed?' );
elseif ( is_numeric ( $data ))
throw new FileException ( sprintf ( 'BZ2 decompression failed with (:%d)?' , $data ));
break ;
case 'GZ' :
if (( $data = gzdeflate ( $this -> comp_data )) === FALSE )
throw new FileException ( 'BZ2 decompression failed?' );
break ;
}
// Compressed file grew
if ( ! strlen ( $data )) {
if ( strlen ( $this -> comp_data ) > $this -> receiving -> size ) {
fclose ( $this -> f );
$this -> f = NULL ;
throw new FileGrewException ( sprintf ( 'Error compressed file grew, rejecting [%d] -> [%d]' , strlen ( $this -> comp_data ), $this -> receiving -> size ));
}
return TRUE ;
}
} else {
$data = $buf ;
}
2023-07-17 16:36:53 +10:00
if (( $x = $this -> pos + strlen ( $data )) > $this -> receiving -> recvsize )
throw new \Exception ( sprintf ( 'Too many bytes received [%d] (%d)?' , $x , $this -> receiving -> recvsize ));
2023-07-02 23:40:08 +10:00
$rc = fwrite ( $this -> f , $data );
2021-04-01 21:59:15 +11:00
if ( $rc === FALSE )
throw new FileException ( 'Error while writing to file' );
2023-07-17 16:36:53 +10:00
$this -> pos += $rc ;
Log :: debug ( sprintf ( '%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)' , self :: LOGKEY , $rc , $this -> pos , $this -> receiving -> recvsize , strlen ( $buf )));
2023-07-02 23:40:08 +10:00
2023-07-17 16:36:53 +10:00
if ( strlen ( $this -> comp_data ) > $this -> receiving -> recvsize )
Log :: alert ( sprintf ( '%s:- Compression grew the file during transfer (%d->%d)' , self :: LOGKEY , $this -> receiving -> recvsize , strlen ( $this -> comp_data )));
2021-04-01 21:59:15 +11:00
2023-07-02 23:40:08 +10:00
return TRUE ;
2021-04-01 21:59:15 +11:00
}
}