Leverage Redis and queue to handle large packets

This commit is contained in:
Deon George 2021-08-24 23:42:03 +10:00
parent 1398ecff2d
commit d86d90cdeb
6 changed files with 97 additions and 10 deletions

View File

@ -106,7 +106,7 @@ class Message extends FTNBase
private array $dst; // Address the message is to private array $dst; // Address the message is to
private Collection $path; // FTS-0004.001 The message PATH lines private Collection $path; // FTS-0004.001 The message PATH lines
private Collection $pathaddress; // Collection of Addresses after parsing seenby private Collection $pathaddress; // Collection of Addresses after parsing seenby
private Collection $rogue_path; // Collection of FTNs in the Seen-by that are not defined private Collection $rogue_path; // Collection of FTNs in the Seen-by that are not defined
private Collection $seenby; // FTS-0004.001 The message SEEN-BY lines private Collection $seenby; // FTS-0004.001 The message SEEN-BY lines
private Collection $seenaddress; // Collection of Addresses after parsing seenby private Collection $seenaddress; // Collection of Addresses after parsing seenby
@ -553,6 +553,11 @@ class Message extends FTNBase
*/ */
private function parseAddresses(string $type,Collection $addresses,Collection &$rogue): Collection private function parseAddresses(string $type,Collection $addresses,Collection &$rogue): Collection
{ {
static $aos = NULL;
if (! $aos)
$aos = collect();
$nodes = collect(); $nodes = collect();
$net = NULL; $net = NULL;
@ -567,7 +572,11 @@ class Message extends FTNBase
}; };
$ftn = sprintf('%d:%d/%d',$this->fz,$net&0x7fff,$node&0x7fff); $ftn = sprintf('%d:%d/%d',$this->fz,$net&0x7fff,$node&0x7fff);
$ao = Address::findFTN($ftn); // @todo This should be enhanced to include the address at the time of the message.
if ($aos->has($ftn))
$ao = $aos->get($ftn);
else
$aos->put($ftn,$ao=(bool)Address::findFTN($ftn));
if (! $ao) { if (! $ao) {
Log::alert(sprintf('%s:! Undefined Node [%s] in %s.',self::LOGKEY,$ftn,$type)); Log::alert(sprintf('%s:! Undefined Node [%s] in %s.',self::LOGKEY,$ftn,$type));

View File

@ -6,12 +6,13 @@ use Carbon\Carbon;
use Illuminate\Support\Arr; use Illuminate\Support\Arr;
use Illuminate\Support\Collection; use Illuminate\Support\Collection;
use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
use Symfony\Component\HttpFoundation\File\File; use Symfony\Component\HttpFoundation\File\File;
use App\Classes\FTN as FTNBase; use App\Classes\FTN as FTNBase;
use App\Models\{Address,Domain,Setup,Software}; use App\Models\{Address,Domain,Setup,Software};
class Packet extends FTNBase class Packet extends FTNBase implements \Iterator, \Countable
{ {
private const LOGKEY = 'PKT'; private const LOGKEY = 'PKT';
@ -57,6 +58,41 @@ class Packet extends FTNBase
public Collection $messages; // Messages in the Packet public Collection $messages; // Messages in the Packet
public Collection $errors; // Messages that fail validation public Collection $errors; // Messages that fail validation
private string $name; // Packet name private string $name; // Packet name
public bool $use_redis = TRUE; // Use redis for messages.
private int $index; // Our array index
/**
* Number of messages in this packet
*/
public function count(): int
{
return $this->messages->count();
}
public function current()
{
return $this->use_redis ? unserialize(Redis::get($this->key())) : $this->messages->get($this->index);
}
public function key()
{
return $this->messages->get($this->index);
}
public function next(): void
{
$this->index++;
}
public function rewind()
{
$this->index = 0;
}
public function valid()
{
return $this->use_redis ? ($this->key() && Redis::exists($this->key())) : $this->key();
}
public function __construct(Address $o=NULL) public function __construct(Address $o=NULL)
{ {
@ -359,7 +395,7 @@ class Packet extends FTNBase
* @param Domain|null $domain * @param Domain|null $domain
* @throws InvalidPacketException * @throws InvalidPacketException
*/ */
public function parseMessage(string $message,Domain $domain=NULL): void private function parseMessage(string $message,Domain $domain=NULL): void
{ {
$msg = Message::parseMessage($message,$domain); $msg = Message::parseMessage($message,$domain);
@ -369,7 +405,13 @@ class Packet extends FTNBase
Log::error(sprintf('%s:! %s Skipping...',self::LOGKEY,join('|',$msg->errors->messages()->get('from')))); Log::error(sprintf('%s:! %s Skipping...',self::LOGKEY,join('|',$msg->errors->messages()->get('from'))));
} else { } else {
$this->messages->push($msg); if ($this->use_redis) {
Redis::set($msg->msgid,serialize($msg));
$this->messages->push($msg->msgid);
} else {
$this->messages->push($msg);
}
} }
} }
} }

View File

@ -122,7 +122,7 @@ final class Receive extends Item
// Check the messages are from the uplink // Check the messages are from the uplink
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id == $po->fftn_o->id; }) === FALSE) { if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id == $po->fftn_o->id; }) === FALSE) {
Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$this->fftn_o->ftn,$this->ao->system_id)); Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
break; break;
} }
@ -133,14 +133,14 @@ final class Receive extends Item
break; break;
} }
foreach ($po->messages as $msg) { foreach ($po as $msg) {
Log::info(sprintf('%s: - Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); Log::info(sprintf('%s: - Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
// @todo Quick check that the packet should be processed by us. // @todo Quick check that the packet should be processed by us.
// @todo validate that the packet's zone is in the domain. // @todo validate that the packet's zone is in the domain.
// Dispatch job. // Dispatch job.
ProcessPacket::dispatchSync($msg); ProcessPacket::dispatch($msg);
} }
if ($po->errors->count()) { if ($po->errors->count()) {

View File

@ -10,7 +10,7 @@ use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log; use Illuminate\Support\Facades\Log;
use App\Classes\FTN\{Message,Process}; use App\Classes\FTN\{Message,Process};
use App\Models\{Echoarea,Echomail,Netmail,Setup,System}; use App\Models\{Echoarea,Echomail,Netmail,Setup};
class ProcessPacket implements ShouldQueue class ProcessPacket implements ShouldQueue
{ {

View File

@ -143,7 +143,7 @@ return [
'redis' => [ 'redis' => [
'client' => 'predis', 'client' => 'phpredis',
'default' => [ 'default' => [
'host' => env('REDIS_HOST', '127.0.0.1'), 'host' => env('REDIS_HOST', '127.0.0.1'),

View File

@ -0,0 +1,36 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
class CreateJobsTable extends Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::create('jobs', function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('queue')->index();
$table->longText('payload');
$table->unsignedTinyInteger('attempts');
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('jobs');
}
}