<?php namespace App\Classes\File; use Carbon\Carbon; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Log; use Symfony\Component\HttpFoundation\File\Exception\FileException; use App\Classes\Protocol; use App\Exceptions\FileGrewException; use App\Jobs\{PacketProcess,TicProcess}; use App\Models\Address; /** * Object representing the files we are receiving * * @property-read resource $fd * @property-read int total_recv * @property-read int total_recv_bytes */ class Receive extends Base { private const LOGKEY = 'IR-'; private const compression = [ 'BZ2', 'GZ', ]; private Address $ao; /** @var ?string The compression used by the incoming file */ private ?string $comp; /** @var string|null The compressed data received */ private ?string $comp_data; private $queue = FALSE; public function __construct() { // Initialise our variables if (get_class($this) === self::class) { $this->list = collect(); $this->f = NULL; } } public function __get($key) { switch ($key) { case 'completed': return $this->list ->filter(function($item) { return $item->complete === TRUE; }); case 'fd': return is_resource($this->f); case 'recvmtime': case 'recvsize': case 'mtime': case 'nameas': case 'size': case 'name_size_time': case 'pref_name': return $this->receiving->{$key}; case 'pos': return $this->pos; case 'receiving': return $this->list->get($this->index); case 'ready': return (! is_null($this->index)); case 'togo_count': return $this->list ->filter(function($item) { return $item->complete === FALSE; }) ->count(); case 'total_recv': return $this->completed->count(); case 'total_recv_bytes': return $this->completed->sum(function($item) { return $item->recvsize; }); default: throw new \Exception('Unknown key: '.$key); } } /** * Close the file descriptor for our incoming file * * @throws \Exception */ public function close(): void { if (! $this->receiving) throw new \Exception('No file to close'); if ($this->f) { $rcvd_time = Carbon::now(); if ($this->pos !== $this->receiving->recvsize) Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->nameas,$this->receiving->recvsize-$this->pos)); else $this->receiving->complete = TRUE; $end = time()-$this->start; Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->nameas,$end)); if ($this->comp) Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->recvsize,$x/$this->receiving->recvsize*100)); fclose($this->f); // Set our mtime Log::debug(sprintf('%s:= Setting file [%s] to time [%s]',self::LOGKEY,$this->receiving->full_name,$this->receiving->recvmtime)); touch($this->receiving->full_name,$this->receiving->recvmtime); $this->f = NULL; // If we received a packet, we'll dispatch a job to process it, if we got it all if ($this->receiving->complete) switch ($this->receiving->whatType()) { case self::IS_ARC: case self::IS_PKT: try { // If packet is greater than a size, lets queue it if ($this->queue || ($this->receiving->size > config('fido.queue_size',0))) { Log::info(sprintf('%s:- Packet [%s] will be sent to the queue for processing because its [%d] size, or queue forced',self::LOGKEY,$this->receiving->full_name,$this->receiving->size)); PacketProcess::dispatch($this->receiving,$this->ao,$rcvd_time); } else PacketProcess::dispatchSync($this->receiving,$this->ao,$rcvd_time); } catch (\Exception $e) { Log::error(sprintf('%s:! Got error dispatching packet [%s] (%d:%s-%s).',self::LOGKEY,$this->receiving->full_name,$e->getLine(),$e->getFile(),$e->getMessage())); } break; case self::IS_TIC: Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->receiving->nameas)); // Queue the tic to be processed later, in case the referenced file hasnt been received yet TicProcess::dispatch($this->receiving->pref_name)->delay(60); break; default: Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->receiving->nameas)); } } $this->index = NULL; } /** * Add a new file to receive * * @param array $file The name of the receiving file * @param Address $ao Sender sending a file to us * @param bool $queue Force sending received items to the queue * @throws \Exception */ public function new(array $file,Address $ao,$queue=FALSE): void { Log::debug(sprintf('%s:+ Receiving new file [%s]',self::LOGKEY,join('|',$file))); if ($this->index) throw new \Exception('Can only have 1 file receiving at a time'); $this->ao = $ao; $this->queue = $queue; $this->list->push(new Item($ao,Arr::get($file,'name'),(int)Arr::get($file,'mtime'),(int)Arr::get($file,'size'))); $this->index = $this->list->count()-1; } /** * Open the file descriptor to receive a file * * @param bool $check * @param string|null $comp If the incoming file will be compressed * @return int * @throws \Exception * @todo $comp should be parsed, in case it contains other items */ public function open(bool $check=FALSE,string $comp=NULL): int { // @todo implement return 4 - SUSPEND(?) file if (is_null($this->index)) throw new \Exception('No files currently receiving'); $this->comp_data = ''; $this->comp = $comp; /* if ($this->receiving->size <= self::MAX_COMPSIZE) { $this->comp = $comp; } else { Log::alert(sprintf('%s:- Compression [%s] disabled for file [%s], its size is too big [%d]',self::LOGKEY,$comp,$this->receiving->name,$this->receiving->size)); } */ if ($this->comp && (! in_array($this->comp,self::compression))) throw new \Exception('Unsupported compression:'.$this->comp); elseif ($this->comp) Log::debug(sprintf('%s:- Receiving file with [%s] compression',self::LOGKEY,$this->comp)); $this->pos = 0; $this->start = time(); if ($this->receiving->exists && $this->receiving->match_mtime && $this->receiving->match_size) { Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_GOT; } elseif ($this->receiving->exists && $this->receiving->size > 0) { Log::alert(sprintf('%s:- File exists with different details - skipping [%s] (size: %d, mtime: %d)',self::LOGKEY,$this->receiving->nameas,$this->receiving->size,$this->receiving->mtime)); return Protocol::FOP_SKIP; } else { // @todo I dont think we are enabling resumable sessions - need to check Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->receiving->nameas)); } // If we are only checking, we'll return (NR mode) if ($check) return Protocol::FOP_OK; $this->f = fopen($this->receiving->full_name,'wb'); if (! $this->f) { Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_ERROR; } Log::info(sprintf('%s:= File [%s] opened for writing',self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_OK; } /** * Write data to the file we are receiving * * @param string $buf * @return bool * @throws \Exception */ public function write(string $buf): bool { if (! $this->fd) throw new \Exception('No file open for write'); $data = ''; // If we are using compression mode, then we need to buffer the right until we have everything if ($this->comp) { $this->comp_data .= $buf; // See if we can uncompress the data yet switch ($this->comp) { case 'BZ2': if (($data=bzdecompress($this->comp_data,TRUE)) === FALSE) throw new FileException('BZ2 decompression failed?'); elseif (is_numeric($data)) throw new FileException(sprintf('BZ2 decompression failed with (:%d)?',$data)); break; case 'GZ': if (($data=gzdeflate($this->comp_data)) === FALSE) throw new FileException('BZ2 decompression failed?'); break; } // Compressed file grew if (! strlen($data)) { if (strlen($this->comp_data) > $this->receiving->size) { fclose($this->f); $this->f = NULL; throw new FileGrewException(sprintf('Error compressed file grew, rejecting [%d] -> [%d]', strlen($this->comp_data), $this->receiving->size)); } return TRUE; } } else { $data = $buf; } if (($x=$this->pos+strlen($data)) > $this->receiving->recvsize) throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$x,$this->receiving->recvsize)); $rc = fwrite($this->f,$data); if ($rc === FALSE) throw new FileException('Error while writing to file'); $this->pos += $rc; Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->pos,$this->receiving->recvsize,strlen($buf))); if (strlen($this->comp_data) > $this->receiving->recvsize) Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->recvsize,strlen($this->comp_data))); return TRUE; } }