From ff8c370d86d948630f1481205939c9cc2705299b Mon Sep 17 00:00:00 2001 From: Deon George Date: Wed, 13 Sep 2023 20:58:22 +1000 Subject: [PATCH] Move packet processing into a job --- app/Classes/File/Base.php | 4 +- app/Classes/File/Receive.php | 95 +++--------------------- app/Jobs/PacketProcess.php | 140 +++++++++++++++++++++++++++++++++++ config/app.php | 2 + 4 files changed, 154 insertions(+), 87 deletions(-) create mode 100644 app/Jobs/PacketProcess.php diff --git a/app/Classes/File/Base.php b/app/Classes/File/Base.php index 9c0facf..29a3db1 100644 --- a/app/Classes/File/Base.php +++ b/app/Classes/File/Base.php @@ -48,7 +48,7 @@ abstract class Base // 4 BITS of type protected const IS_FILE = (1<<0); - protected const IS_PKT = (1<<1); + public const IS_PKT = (1<<1); protected const IS_ARC = (1<<2); protected const IS_REQ = (1<<3); protected const IS_TIC = (1<<4); @@ -79,7 +79,7 @@ abstract class Base return ($this->ftype&0xff) & $type; } - protected function whatType(): int + public function whatType(): int { static $ext = ['su','mo','tu','we','th','fr','sa','req']; diff --git a/app/Classes/File/Receive.php b/app/Classes/File/Receive.php index eb1c3ff..bb2fa7b 100644 --- a/app/Classes/File/Receive.php +++ b/app/Classes/File/Receive.php @@ -5,15 +5,12 @@ namespace App\Classes\File; use Carbon\Carbon; use Illuminate\Support\Arr; use Illuminate\Support\Facades\Log; -use Illuminate\Support\Facades\Notification; use Symfony\Component\HttpFoundation\File\Exception\FileException; -use App\Classes\{File,Protocol}; -use App\Classes\FTN\{InvalidPacketException,Packet}; +use App\Classes\Protocol; use App\Exceptions\FileGrewException; -use App\Jobs\{MessageProcess,TicProcess}; +use App\Jobs\{PacketProcess,TicProcess}; use App\Models\Address; -use App\Notifications\Netmails\PacketPasswordInvalid; /** * Object representing the files we are receiving @@ -123,92 +120,20 @@ class Receive extends Base // If we received a packet, we'll dispatch a job to process it, if we got it all if ($this->receiving->complete) - switch ($x=$this->receiving->whatType()) { + switch ($this->receiving->whatType()) { case self::IS_ARC: case self::IS_PKT: - Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$x === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->receiving->nameas)); - try { - $f = new File($this->receiving->full_name); - $processed = FALSE; + // If packet is greater than a size, lets queue it + if ($this->receiving->size > config('app.queue_size',0)) { + Log::info(sprintf('%s:- Packet [%s] will be sent to the queue for processing because its [%d] size',self::LOGKEY,$this->receiving->full_name,$this->receiving->size)); + PacketProcess::dispatch($this->receiving,$this->ao,$rcvd_time); - foreach ($f as $packet) { - $po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system); - - // Check the messages are from the uplink - if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) { - Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id)); - - break; - } - - // Check the packet password - if ($this->ao->session('pktpass') !== $po->password) { - Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password)); - - Notification::route('netmail',$this->ao)->notify(new PacketPasswordInvalid($po->password,$this->receiving->nameas)); - break; - } - - Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count())); - - // Queue messages if there are too many in the packet. - if ($queue = ($po->count() > config('app.queue_msgs'))) - Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY)); - - $count = 0; - foreach ($po as $msg) { - Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); - - // @todo Quick check that the packet should be processed by us. - // @todo validate that the packet's zone is in the domain. - - /* - * // @todo generate exception when echomail for an area that doesnt exist - * // @todo generate exception when echomail for an area sender cannot post to - * // @todo generate exception when echomail for an area sender not subscribed to - * // @todo generate exception when echomail comes from a system not defined here - * // @todo generate exception when echomail comes from a system doesnt exist - * - * // @todo generate exception when netmail to system that doesnt exist (node/point) - * // @todo generate exception when netmail from system that doesnt exist (node/point) - * // @todo generate warning when netmail comes from a system not defined here - * - * // @todo generate exception when packet has wrong password - */ - - try { - // Dispatch job. - if ($queue) - MessageProcess::dispatch($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time); - else - MessageProcess::dispatchSync($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time); - - } catch (\Exception $e) { - Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage())); - } - - $count++; - } - - if ($count === $po->count()) - $processed = TRUE; - } - - if (! $processed) { - Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->receiving->nameas)); - - // If we want to keep the packet, we could do that logic here - } elseif (! config('app.packet_keep')) { - Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->receiving->full_name)); - unlink($this->receiving->full_name); - } - - } catch (InvalidPacketException $e) { - Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]); + } else + PacketProcess::dispatchSync($this->receiving,$this->ao,$rcvd_time); } catch (\Exception $e) { - Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]); + Log::error(sprintf('%s:! Got error dispatching packet [%s] (%d:%s-%s).',self::LOGKEY,$this->receiving->full_name,$e->getLine(),$e->getFile(),$e->getMessage())); } break; diff --git a/app/Jobs/PacketProcess.php b/app/Jobs/PacketProcess.php new file mode 100644 index 0000000..a0f1eeb --- /dev/null +++ b/app/Jobs/PacketProcess.php @@ -0,0 +1,140 @@ +file = $file; + $this->ao = $ao; + $this->rcvd_time = $rcvd_time; + } + + public function __get($key): mixed + { + switch ($key) { + case 'subject': + return $this->file->name; + + default: + return NULL; + } + } + + /** + * When calling MessageProcess - we assume that the packet is from a valid source, and + * the destination (netmail/echomail) is also valid + */ + public function handle() + { + Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$this->file->whatType() === Item::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file->nameas)); + + try { + $f = new File($this->file->full_name); + $processed = FALSE; + + foreach ($f as $packet) { + $po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system); + + // Check the messages are from the uplink + if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) { + Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id)); + + break; + } + + // Check the packet password + if ($this->ao->session('pktpass') !== $po->password) { + Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password)); + + Notification::route('netmail',$this->ao)->notify(new PacketPasswordInvalid($po->password,$this->file->nameas)); + break; + } + + Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count())); + + // Queue messages if there are too many in the packet. + if ($queue = ($po->count() > config('app.queue_msgs'))) + Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY)); + + $count = 0; + foreach ($po as $msg) { + Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); + + // @todo Quick check that the packet should be processed by us. + // @todo validate that the packet's zone is in the domain. + + /* + * // @todo generate exception when echomail for an area that doesnt exist + * // @todo generate exception when echomail for an area sender cannot post to + * // @todo generate exception when echomail for an area sender not subscribed to + * // @todo generate exception when echomail comes from a system not defined here + * // @todo generate exception when echomail comes from a system doesnt exist + * + * // @todo generate exception when netmail to system that doesnt exist (node/point) + * // @todo generate exception when netmail from system that doesnt exist (node/point) + * // @todo generate warning when netmail comes from a system not defined here + * + * // @todo generate exception when packet has wrong password + */ + + try { + // Dispatch job. + if ($queue) + MessageProcess::dispatch($msg,$f->pktName(),$this->ao,$po->fftn_o,$this->rcvd_time); + else + MessageProcess::dispatchSync($msg,$f->pktName(),$this->ao,$po->fftn_o,$this->rcvd_time); + + } catch (\Exception $e) { + Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage())); + } + + $count++; + } + + if ($count === $po->count()) + $processed = TRUE; + } + + if (! $processed) { + Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file->nameas)); + + // If we want to keep the packet, we could do that logic here + } elseif (! config('app.packet_keep')) { + Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->file->full_name)); + unlink($this->file->full_name); + } + + } catch (InvalidPacketException $e) { + Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file->nameas),['e'=>$e->getMessage()]); + + } catch (\Exception $e) { + Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file->nameas),['e'=>$e->getMessage()]); + } + } +} \ No newline at end of file diff --git a/config/app.php b/config/app.php index 38095dd..a9354eb 100644 --- a/config/app.php +++ b/config/app.php @@ -18,6 +18,8 @@ return [ 'fido' => env('FIDO_DIR', 'fido'), 'packet_keep' => env('FIDO_PACKET_KEEP', FALSE), + // Size of packet before we decide to queue it for processing + 'queue_size' => env('FIDO_QUEUE_SIZE', 1000000), // Number of messages in a packet that will result in them being queued for processing 'queue_msgs' => env('FIDO_QUEUE_MSGS', 50), 'default_pkt' => env('FIDO_DEFAULT_PACKET', '2+'),