<?php namespace App\Classes\File; use Carbon\Carbon; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Notification; use Symfony\Component\HttpFoundation\File\Exception\FileException; use App\Classes\{File,Protocol}; use App\Classes\FTN\{InvalidPacketException,Packet}; use App\Exceptions\FileGrewException; use App\Jobs\{MessageProcess,TicProcess}; use App\Models\Address; use App\Notifications\Netmails\PacketPasswordInvalid; /** * Object representing the files we are receiving * * @property-read resource $fd * @property-read int total_recv * @property-read int total_recv_bytes */ class Receive extends Base { private const LOGKEY = 'IR-'; private const compression = [ 'BZ2', 'GZ', ]; private Address $ao; /** @var ?string The compression used by the incoming file */ private ?string $comp; /** @var string|null The compressed data received */ private ?string $comp_data; public function __construct() { // Initialise our variables if (get_class($this) === self::class) { $this->list = collect(); $this->f = NULL; } } public function __get($key) { switch ($key) { case 'completed': return $this->list ->filter(function($item) { return $item->complete === TRUE; }); case 'fd': return is_resource($this->f); case 'recvmtime': case 'recvsize': case 'mtime': case 'nameas': case 'size': case 'name_size_time': return $this->receiving->{$key}; case 'pos': return $this->pos; case 'receiving': return $this->list->get($this->index); case 'ready': return (! is_null($this->index)); case 'togo_count': return $this->list ->filter(function($item) { return $item->complete === FALSE; }) ->count(); case 'total_recv': return $this->completed->count(); case 'total_recv_bytes': return $this->completed->sum(function($item) { return $item->recvsize; }); default: throw new \Exception('Unknown key: '.$key); } } /** * Close the file descriptor for our incoming file * * @throws \Exception */ public function close(): void { if (! $this->receiving) throw new \Exception('No file to close'); if ($this->f) { $rcvd_time = Carbon::now(); if ($this->pos !== $this->receiving->recvsize) Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->nameas,$this->receiving->recvsize-$this->pos)); else $this->receiving->complete = TRUE; $end = time()-$this->start; Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->nameas,$end)); if ($this->comp) Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->recvsize,$x/$this->receiving->recvsize*100)); fclose($this->f); // Set our mtime touch($this->receiving->full_name,$this->receiving->mtime); $this->f = NULL; // If we received a packet, we'll dispatch a job to process it, if we got it all if ($this->receiving->complete) switch ($x=$this->receiving->whatType()) { case self::IS_ARC: case self::IS_PKT: Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$x === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->receiving->nameas)); try { $f = new File($this->receiving->full_name); $processed = FALSE; foreach ($f as $packet) { $po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system); // Check the messages are from the uplink if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) { Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id)); break; } // Check the packet password if ($this->ao->session('pktpass') !== $po->password) { Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password)); Notification::route('netmail',$this->ao)->notify(new PacketPasswordInvalid($po->password,$this->receiving->nameas)); break; } Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count())); // Queue messages if there are too many in the packet. if ($queue = ($po->count() > config('app.queue_msgs'))) Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY)); $count = 0; foreach ($po as $msg) { Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); // @todo Quick check that the packet should be processed by us. // @todo validate that the packet's zone is in the domain. /* * // @todo generate exception when echomail for an area that doesnt exist * // @todo generate exception when echomail for an area sender cannot post to * // @todo generate exception when echomail for an area sender not subscribed to * // @todo generate exception when echomail comes from a system not defined here * // @todo generate exception when echomail comes from a system doesnt exist * * // @todo generate exception when netmail to system that doesnt exist (node/point) * // @todo generate exception when netmail from system that doesnt exist (node/point) * // @todo generate warning when netmail comes from a system not defined here * * // @todo generate exception when packet has wrong password */ try { // Dispatch job. if ($queue) MessageProcess::dispatch($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time); else MessageProcess::dispatchSync($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time); } catch (\Exception $e) { Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage())); } $count++; } if ($count === $po->count()) $processed = TRUE; } if (! $processed) { Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->receiving->nameas)); // If we want to keep the packet, we could do that logic here } elseif (! config('app.packet_keep')) { Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->receiving->full_name)); unlink($this->receiving->full_name); } } catch (InvalidPacketException $e) { Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]); } catch (\Exception $e) { Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]); } break; case self::IS_TIC: Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->receiving->nameas)); // Queue the tic to be processed later, in case the referenced file hasnt been received yet TicProcess::dispatch($this->receiving->nameas); break; default: Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->receiving->nameas)); } } $this->index = NULL; } /** * Add a new file to receive * * @param array $file * @param Address $ao * @throws \Exception */ public function new(array $file,Address $ao): void { Log::debug(sprintf('%s:+ Receiving new file [%s]',self::LOGKEY,join('|',$file))); if ($this->index) throw new \Exception('Can only have 1 file receiving at a time'); $this->ao = $ao; $this->list->push(new Item($ao,Arr::get($file,'name'),(int)Arr::get($file,'mtime'),(int)Arr::get($file,'size'))); $this->index = $this->list->count()-1; } /** * Open the file descriptor to receive a file * * @param bool $check * @param string|null $comp If the incoming file will be compressed * @return int * @throws \Exception * @todo $comp should be parsed, in case it contains other items */ public function open(bool $check=FALSE,string $comp=NULL): int { // @todo implement return 4 - SUSPEND(?) file if (is_null($this->index)) throw new \Exception('No files currently receiving'); $this->comp_data = ''; $this->comp = $comp; /* if ($this->receiving->size <= self::MAX_COMPSIZE) { $this->comp = $comp; } else { Log::alert(sprintf('%s:- Compression [%s] disabled for file [%s], its size is too big [%d]',self::LOGKEY,$comp,$this->receiving->name,$this->receiving->size)); } */ if ($this->comp && (! in_array($this->comp,self::compression))) throw new \Exception('Unsupported compression:'.$this->comp); elseif ($this->comp) Log::debug(sprintf('%s:- Receiving file with [%s] compression',self::LOGKEY,$this->comp)); $this->pos = 0; $this->start = time(); if ($this->receiving->exists && $this->receiving->match_mtime && $this->receiving->match_size) { Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_GOT; } elseif ($this->receiving->exists && $this->receiving->size > 0) { Log::alert(sprintf('%s:- File exists with different details - skipping [%s] (size: %d, mtime: %d)',self::LOGKEY,$this->receiving->nameas,$this->receiving->size,$this->receiving->mtime)); return Protocol::FOP_SKIP; } else { // @todo I dont think we are enabling resumable sessions - need to check Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->receiving->nameas)); } // If we are only checking, we'll return (NR mode) if ($check) return Protocol::FOP_OK; $this->f = fopen($this->receiving->full_name,'wb'); if (! $this->f) { Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_ERROR; } Log::info(sprintf('%s:= File [%s] opened for writing',self::LOGKEY,$this->receiving->nameas)); return Protocol::FOP_OK; } /** * Write data to the file we are receiving * * @param string $buf * @return bool * @throws \Exception */ public function write(string $buf): bool { if (! $this->fd) throw new \Exception('No file open for write'); $data = ''; // If we are using compression mode, then we need to buffer the right until we have everything if ($this->comp) { $this->comp_data .= $buf; // See if we can uncompress the data yet switch ($this->comp) { case 'BZ2': if (($data=bzdecompress($this->comp_data,TRUE)) === FALSE) throw new FileException('BZ2 decompression failed?'); elseif (is_numeric($data)) throw new FileException(sprintf('BZ2 decompression failed with (:%d)?',$data)); break; case 'GZ': if (($data=gzdeflate($this->comp_data)) === FALSE) throw new FileException('BZ2 decompression failed?'); break; } // Compressed file grew if (! strlen($data)) { if (strlen($this->comp_data) > $this->receiving->size) { fclose($this->f); $this->f = NULL; throw new FileGrewException(sprintf('Error compressed file grew, rejecting [%d] -> [%d]', strlen($this->comp_data), $this->receiving->size)); } return TRUE; } } else { $data = $buf; } if (($x=$this->pos+strlen($data)) > $this->receiving->recvsize) throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$x,$this->receiving->recvsize)); $rc = fwrite($this->f,$data); if ($rc === FALSE) throw new FileException('Error while writing to file'); $this->pos += $rc; Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->pos,$this->receiving->recvsize,strlen($buf))); if (strlen($this->comp_data) > $this->receiving->recvsize) Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->recvsize,strlen($this->comp_data))); return TRUE; } }