From d86d90cdeb5ced188f214a93b9e7f69d563d24d6 Mon Sep 17 00:00:00 2001 From: Deon George Date: Tue, 24 Aug 2021 23:42:03 +1000 Subject: [PATCH] Leverage Redis and queue to handle large packets --- app/Classes/FTN/Message.php | 13 ++++- app/Classes/FTN/Packet.php | 48 +++++++++++++++++-- app/Classes/File/Receive.php | 6 +-- app/Jobs/ProcessPacket.php | 2 +- config/database.php | 2 +- .../2021_08_23_093537_create_jobs_table.php | 36 ++++++++++++++ 6 files changed, 97 insertions(+), 10 deletions(-) create mode 100644 database/migrations/2021_08_23_093537_create_jobs_table.php diff --git a/app/Classes/FTN/Message.php b/app/Classes/FTN/Message.php index 50c5dd7..f63338c 100644 --- a/app/Classes/FTN/Message.php +++ b/app/Classes/FTN/Message.php @@ -106,7 +106,7 @@ class Message extends FTNBase private array $dst; // Address the message is to private Collection $path; // FTS-0004.001 The message PATH lines - private Collection $pathaddress; // Collection of Addresses after parsing seenby + private Collection $pathaddress; // Collection of Addresses after parsing seenby private Collection $rogue_path; // Collection of FTNs in the Seen-by that are not defined private Collection $seenby; // FTS-0004.001 The message SEEN-BY lines private Collection $seenaddress; // Collection of Addresses after parsing seenby @@ -553,6 +553,11 @@ class Message extends FTNBase */ private function parseAddresses(string $type,Collection $addresses,Collection &$rogue): Collection { + static $aos = NULL; + + if (! $aos) + $aos = collect(); + $nodes = collect(); $net = NULL; @@ -567,7 +572,11 @@ class Message extends FTNBase }; $ftn = sprintf('%d:%d/%d',$this->fz,$net&0x7fff,$node&0x7fff); - $ao = Address::findFTN($ftn); + // @todo This should be enhanced to include the address at the time of the message. + if ($aos->has($ftn)) + $ao = $aos->get($ftn); + else + $aos->put($ftn,$ao=(bool)Address::findFTN($ftn)); if (! $ao) { Log::alert(sprintf('%s:! Undefined Node [%s] in %s.',self::LOGKEY,$ftn,$type)); diff --git a/app/Classes/FTN/Packet.php b/app/Classes/FTN/Packet.php index 97e12f1..af8b6e4 100644 --- a/app/Classes/FTN/Packet.php +++ b/app/Classes/FTN/Packet.php @@ -6,12 +6,13 @@ use Carbon\Carbon; use Illuminate\Support\Arr; use Illuminate\Support\Collection; use Illuminate\Support\Facades\Log; +use Illuminate\Support\Facades\Redis; use Symfony\Component\HttpFoundation\File\File; use App\Classes\FTN as FTNBase; use App\Models\{Address,Domain,Setup,Software}; -class Packet extends FTNBase +class Packet extends FTNBase implements \Iterator, \Countable { private const LOGKEY = 'PKT'; @@ -57,6 +58,41 @@ class Packet extends FTNBase public Collection $messages; // Messages in the Packet public Collection $errors; // Messages that fail validation private string $name; // Packet name + public bool $use_redis = TRUE; // Use redis for messages. + private int $index; // Our array index + + /** + * Number of messages in this packet + */ + public function count(): int + { + return $this->messages->count(); + } + + public function current() + { + return $this->use_redis ? unserialize(Redis::get($this->key())) : $this->messages->get($this->index); + } + + public function key() + { + return $this->messages->get($this->index); + } + + public function next(): void + { + $this->index++; + } + + public function rewind() + { + $this->index = 0; + } + + public function valid() + { + return $this->use_redis ? ($this->key() && Redis::exists($this->key())) : $this->key(); + } public function __construct(Address $o=NULL) { @@ -359,7 +395,7 @@ class Packet extends FTNBase * @param Domain|null $domain * @throws InvalidPacketException */ - public function parseMessage(string $message,Domain $domain=NULL): void + private function parseMessage(string $message,Domain $domain=NULL): void { $msg = Message::parseMessage($message,$domain); @@ -369,7 +405,13 @@ class Packet extends FTNBase Log::error(sprintf('%s:! %s Skipping...',self::LOGKEY,join('|',$msg->errors->messages()->get('from')))); } else { - $this->messages->push($msg); + if ($this->use_redis) { + Redis::set($msg->msgid,serialize($msg)); + $this->messages->push($msg->msgid); + + } else { + $this->messages->push($msg); + } } } } \ No newline at end of file diff --git a/app/Classes/File/Receive.php b/app/Classes/File/Receive.php index 855679e..6664000 100644 --- a/app/Classes/File/Receive.php +++ b/app/Classes/File/Receive.php @@ -122,7 +122,7 @@ final class Receive extends Item // Check the messages are from the uplink if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id == $po->fftn_o->id; }) === FALSE) { - Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$this->fftn_o->ftn,$this->ao->system_id)); + Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id)); break; } @@ -133,14 +133,14 @@ final class Receive extends Item break; } - foreach ($po->messages as $msg) { + foreach ($po as $msg) { Log::info(sprintf('%s: - Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); // @todo Quick check that the packet should be processed by us. // @todo validate that the packet's zone is in the domain. // Dispatch job. - ProcessPacket::dispatchSync($msg); + ProcessPacket::dispatch($msg); } if ($po->errors->count()) { diff --git a/app/Jobs/ProcessPacket.php b/app/Jobs/ProcessPacket.php index 0a2bf29..7b2a1c3 100644 --- a/app/Jobs/ProcessPacket.php +++ b/app/Jobs/ProcessPacket.php @@ -10,7 +10,7 @@ use Illuminate\Queue\SerializesModels; use Illuminate\Support\Facades\Log; use App\Classes\FTN\{Message,Process}; -use App\Models\{Echoarea,Echomail,Netmail,Setup,System}; +use App\Models\{Echoarea,Echomail,Netmail,Setup}; class ProcessPacket implements ShouldQueue { diff --git a/config/database.php b/config/database.php index d6ddb31..713634d 100644 --- a/config/database.php +++ b/config/database.php @@ -143,7 +143,7 @@ return [ 'redis' => [ - 'client' => 'predis', + 'client' => 'phpredis', 'default' => [ 'host' => env('REDIS_HOST', '127.0.0.1'), diff --git a/database/migrations/2021_08_23_093537_create_jobs_table.php b/database/migrations/2021_08_23_093537_create_jobs_table.php new file mode 100644 index 0000000..1be9e8a --- /dev/null +++ b/database/migrations/2021_08_23_093537_create_jobs_table.php @@ -0,0 +1,36 @@ +bigIncrements('id'); + $table->string('queue')->index(); + $table->longText('payload'); + $table->unsignedTinyInteger('attempts'); + $table->unsignedInteger('reserved_at')->nullable(); + $table->unsignedInteger('available_at'); + $table->unsignedInteger('created_at'); + }); + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::dropIfExists('jobs'); + } +}