Move packet processing into a job
This commit is contained in:
parent
dc86d09894
commit
ff8c370d86
@ -48,7 +48,7 @@ abstract class Base
|
||||
|
||||
// 4 BITS of type
|
||||
protected const IS_FILE = (1<<0);
|
||||
protected const IS_PKT = (1<<1);
|
||||
public const IS_PKT = (1<<1);
|
||||
protected const IS_ARC = (1<<2);
|
||||
protected const IS_REQ = (1<<3);
|
||||
protected const IS_TIC = (1<<4);
|
||||
@ -79,7 +79,7 @@ abstract class Base
|
||||
return ($this->ftype&0xff) & $type;
|
||||
}
|
||||
|
||||
protected function whatType(): int
|
||||
public function whatType(): int
|
||||
{
|
||||
static $ext = ['su','mo','tu','we','th','fr','sa','req'];
|
||||
|
||||
|
@ -5,15 +5,12 @@ namespace App\Classes\File;
|
||||
use Carbon\Carbon;
|
||||
use Illuminate\Support\Arr;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Notification;
|
||||
use Symfony\Component\HttpFoundation\File\Exception\FileException;
|
||||
|
||||
use App\Classes\{File,Protocol};
|
||||
use App\Classes\FTN\{InvalidPacketException,Packet};
|
||||
use App\Classes\Protocol;
|
||||
use App\Exceptions\FileGrewException;
|
||||
use App\Jobs\{MessageProcess,TicProcess};
|
||||
use App\Jobs\{PacketProcess,TicProcess};
|
||||
use App\Models\Address;
|
||||
use App\Notifications\Netmails\PacketPasswordInvalid;
|
||||
|
||||
/**
|
||||
* Object representing the files we are receiving
|
||||
@ -123,92 +120,20 @@ class Receive extends Base
|
||||
|
||||
// If we received a packet, we'll dispatch a job to process it, if we got it all
|
||||
if ($this->receiving->complete)
|
||||
switch ($x=$this->receiving->whatType()) {
|
||||
switch ($this->receiving->whatType()) {
|
||||
case self::IS_ARC:
|
||||
case self::IS_PKT:
|
||||
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$x === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->receiving->nameas));
|
||||
|
||||
try {
|
||||
$f = new File($this->receiving->full_name);
|
||||
$processed = FALSE;
|
||||
// If packet is greater than a size, lets queue it
|
||||
if ($this->receiving->size > config('app.queue_size',0)) {
|
||||
Log::info(sprintf('%s:- Packet [%s] will be sent to the queue for processing because its [%d] size',self::LOGKEY,$this->receiving->full_name,$this->receiving->size));
|
||||
PacketProcess::dispatch($this->receiving,$this->ao,$rcvd_time);
|
||||
|
||||
foreach ($f as $packet) {
|
||||
$po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system);
|
||||
|
||||
// Check the messages are from the uplink
|
||||
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) {
|
||||
Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Check the packet password
|
||||
if ($this->ao->session('pktpass') !== $po->password) {
|
||||
Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password));
|
||||
|
||||
Notification::route('netmail',$this->ao)->notify(new PacketPasswordInvalid($po->password,$this->receiving->nameas));
|
||||
break;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count()));
|
||||
|
||||
// Queue messages if there are too many in the packet.
|
||||
if ($queue = ($po->count() > config('app.queue_msgs')))
|
||||
Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY));
|
||||
|
||||
$count = 0;
|
||||
foreach ($po as $msg) {
|
||||
Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
|
||||
|
||||
// @todo Quick check that the packet should be processed by us.
|
||||
// @todo validate that the packet's zone is in the domain.
|
||||
|
||||
/*
|
||||
* // @todo generate exception when echomail for an area that doesnt exist
|
||||
* // @todo generate exception when echomail for an area sender cannot post to
|
||||
* // @todo generate exception when echomail for an area sender not subscribed to
|
||||
* // @todo generate exception when echomail comes from a system not defined here
|
||||
* // @todo generate exception when echomail comes from a system doesnt exist
|
||||
*
|
||||
* // @todo generate exception when netmail to system that doesnt exist (node/point)
|
||||
* // @todo generate exception when netmail from system that doesnt exist (node/point)
|
||||
* // @todo generate warning when netmail comes from a system not defined here
|
||||
*
|
||||
* // @todo generate exception when packet has wrong password
|
||||
*/
|
||||
|
||||
try {
|
||||
// Dispatch job.
|
||||
if ($queue)
|
||||
MessageProcess::dispatch($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time);
|
||||
else
|
||||
MessageProcess::dispatchSync($msg,$f->pktName(),$this->ao,$po->fftn_o,$rcvd_time);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage()));
|
||||
}
|
||||
|
||||
$count++;
|
||||
}
|
||||
|
||||
if ($count === $po->count())
|
||||
$processed = TRUE;
|
||||
}
|
||||
|
||||
if (! $processed) {
|
||||
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->receiving->nameas));
|
||||
|
||||
// If we want to keep the packet, we could do that logic here
|
||||
} elseif (! config('app.packet_keep')) {
|
||||
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->receiving->full_name));
|
||||
unlink($this->receiving->full_name);
|
||||
}
|
||||
|
||||
} catch (InvalidPacketException $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]);
|
||||
} else
|
||||
PacketProcess::dispatchSync($this->receiving,$this->ao,$rcvd_time);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]);
|
||||
Log::error(sprintf('%s:! Got error dispatching packet [%s] (%d:%s-%s).',self::LOGKEY,$this->receiving->full_name,$e->getLine(),$e->getFile(),$e->getMessage()));
|
||||
}
|
||||
|
||||
break;
|
||||
|
140
app/Jobs/PacketProcess.php
Normal file
140
app/Jobs/PacketProcess.php
Normal file
@ -0,0 +1,140 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use Carbon\Carbon;
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Arr;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Notification;
|
||||
|
||||
use App\Classes\File;
|
||||
use App\Classes\File\Item;
|
||||
use App\Classes\FTN\{InvalidPacketException,Packet};
|
||||
use App\Models\Address;
|
||||
use App\Notifications\Netmails\PacketPasswordInvalid;
|
||||
|
||||
class PacketProcess implements ShouldQueue
|
||||
{
|
||||
private const LOGKEY = 'JPP';
|
||||
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
private Item $file;
|
||||
private Address $ao;
|
||||
private Carbon $rcvd_time;
|
||||
|
||||
public function __construct(Item $file,Address $ao,Carbon $rcvd_time)
|
||||
{
|
||||
$this->file = $file;
|
||||
$this->ao = $ao;
|
||||
$this->rcvd_time = $rcvd_time;
|
||||
}
|
||||
|
||||
public function __get($key): mixed
|
||||
{
|
||||
switch ($key) {
|
||||
case 'subject':
|
||||
return $this->file->name;
|
||||
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When calling MessageProcess - we assume that the packet is from a valid source, and
|
||||
* the destination (netmail/echomail) is also valid
|
||||
*/
|
||||
public function handle()
|
||||
{
|
||||
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$this->file->whatType() === Item::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file->nameas));
|
||||
|
||||
try {
|
||||
$f = new File($this->file->full_name);
|
||||
$processed = FALSE;
|
||||
|
||||
foreach ($f as $packet) {
|
||||
$po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system);
|
||||
|
||||
// Check the messages are from the uplink
|
||||
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) {
|
||||
Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Check the packet password
|
||||
if ($this->ao->session('pktpass') !== $po->password) {
|
||||
Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password));
|
||||
|
||||
Notification::route('netmail',$this->ao)->notify(new PacketPasswordInvalid($po->password,$this->file->nameas));
|
||||
break;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count()));
|
||||
|
||||
// Queue messages if there are too many in the packet.
|
||||
if ($queue = ($po->count() > config('app.queue_msgs')))
|
||||
Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY));
|
||||
|
||||
$count = 0;
|
||||
foreach ($po as $msg) {
|
||||
Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
|
||||
|
||||
// @todo Quick check that the packet should be processed by us.
|
||||
// @todo validate that the packet's zone is in the domain.
|
||||
|
||||
/*
|
||||
* // @todo generate exception when echomail for an area that doesnt exist
|
||||
* // @todo generate exception when echomail for an area sender cannot post to
|
||||
* // @todo generate exception when echomail for an area sender not subscribed to
|
||||
* // @todo generate exception when echomail comes from a system not defined here
|
||||
* // @todo generate exception when echomail comes from a system doesnt exist
|
||||
*
|
||||
* // @todo generate exception when netmail to system that doesnt exist (node/point)
|
||||
* // @todo generate exception when netmail from system that doesnt exist (node/point)
|
||||
* // @todo generate warning when netmail comes from a system not defined here
|
||||
*
|
||||
* // @todo generate exception when packet has wrong password
|
||||
*/
|
||||
|
||||
try {
|
||||
// Dispatch job.
|
||||
if ($queue)
|
||||
MessageProcess::dispatch($msg,$f->pktName(),$this->ao,$po->fftn_o,$this->rcvd_time);
|
||||
else
|
||||
MessageProcess::dispatchSync($msg,$f->pktName(),$this->ao,$po->fftn_o,$this->rcvd_time);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage()));
|
||||
}
|
||||
|
||||
$count++;
|
||||
}
|
||||
|
||||
if ($count === $po->count())
|
||||
$processed = TRUE;
|
||||
}
|
||||
|
||||
if (! $processed) {
|
||||
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file->nameas));
|
||||
|
||||
// If we want to keep the packet, we could do that logic here
|
||||
} elseif (! config('app.packet_keep')) {
|
||||
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->file->full_name));
|
||||
unlink($this->file->full_name);
|
||||
}
|
||||
|
||||
} catch (InvalidPacketException $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file->nameas),['e'=>$e->getMessage()]);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file->nameas),['e'=>$e->getMessage()]);
|
||||
}
|
||||
}
|
||||
}
|
@ -18,6 +18,8 @@ return [
|
||||
'fido' => env('FIDO_DIR', 'fido'),
|
||||
'packet_keep' => env('FIDO_PACKET_KEEP', FALSE),
|
||||
|
||||
// Size of packet before we decide to queue it for processing
|
||||
'queue_size' => env('FIDO_QUEUE_SIZE', 1000000),
|
||||
// Number of messages in a packet that will result in them being queued for processing
|
||||
'queue_msgs' => env('FIDO_QUEUE_MSGS', 50),
|
||||
'default_pkt' => env('FIDO_DEFAULT_PACKET', '2+'),
|
||||
|
Loading…
x
Reference in New Issue
Block a user