When originating a session, send anything received via the queue
This commit is contained in:
parent
7847728e52
commit
e5de4970d1
@ -34,6 +34,7 @@ class Receive extends Base
|
|||||||
private ?string $comp;
|
private ?string $comp;
|
||||||
/** @var string|null The compressed data received */
|
/** @var string|null The compressed data received */
|
||||||
private ?string $comp_data;
|
private ?string $comp_data;
|
||||||
|
private $queue = FALSE;
|
||||||
|
|
||||||
public function __construct()
|
public function __construct()
|
||||||
{
|
{
|
||||||
@ -125,8 +126,8 @@ class Receive extends Base
|
|||||||
case self::IS_PKT:
|
case self::IS_PKT:
|
||||||
try {
|
try {
|
||||||
// If packet is greater than a size, lets queue it
|
// If packet is greater than a size, lets queue it
|
||||||
if ($this->receiving->size > config('fido.queue_size',0)) {
|
if ($this->queue || ($this->receiving->size > config('fido.queue_size',0))) {
|
||||||
Log::info(sprintf('%s:- Packet [%s] will be sent to the queue for processing because its [%d] size',self::LOGKEY,$this->receiving->full_name,$this->receiving->size));
|
Log::info(sprintf('%s:- Packet [%s] will be sent to the queue for processing because its [%d] size, or queue forced',self::LOGKEY,$this->receiving->full_name,$this->receiving->size));
|
||||||
PacketProcess::dispatch($this->receiving,$this->ao,$rcvd_time);
|
PacketProcess::dispatch($this->receiving,$this->ao,$rcvd_time);
|
||||||
|
|
||||||
} else
|
} else
|
||||||
@ -157,11 +158,12 @@ class Receive extends Base
|
|||||||
/**
|
/**
|
||||||
* Add a new file to receive
|
* Add a new file to receive
|
||||||
*
|
*
|
||||||
* @param array $file
|
* @param array $file The name of the receiving file
|
||||||
* @param Address $ao
|
* @param Address $ao Sender sending a file to us
|
||||||
|
* @param bool $queue Force sending received items to the queue
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function new(array $file,Address $ao): void
|
public function new(array $file,Address $ao,$queue=FALSE): void
|
||||||
{
|
{
|
||||||
Log::debug(sprintf('%s:+ Receiving new file [%s]',self::LOGKEY,join('|',$file)));
|
Log::debug(sprintf('%s:+ Receiving new file [%s]',self::LOGKEY,join('|',$file)));
|
||||||
|
|
||||||
@ -169,6 +171,7 @@ class Receive extends Base
|
|||||||
throw new \Exception('Can only have 1 file receiving at a time');
|
throw new \Exception('Can only have 1 file receiving at a time');
|
||||||
|
|
||||||
$this->ao = $ao;
|
$this->ao = $ao;
|
||||||
|
$this->queue = $queue;
|
||||||
|
|
||||||
$this->list->push(new Item($ao,Arr::get($file,'name'),(int)Arr::get($file,'mtime'),(int)Arr::get($file,'size')));
|
$this->list->push(new Item($ao,Arr::get($file,'name'),(int)Arr::get($file,'mtime'),(int)Arr::get($file,'size')));
|
||||||
$this->index = $this->list->count()-1;
|
$this->index = $this->list->count()-1;
|
||||||
|
@ -132,9 +132,11 @@ abstract class Protocol
|
|||||||
|
|
||||||
private array $comms;
|
private array $comms;
|
||||||
|
|
||||||
|
protected bool $force_queue = FALSE;
|
||||||
|
|
||||||
abstract protected function protocol_init(): int;
|
abstract protected function protocol_init(): int;
|
||||||
|
|
||||||
abstract protected function protocol_session(): int;
|
abstract protected function protocol_session(bool $force_queue=FALSE): int;
|
||||||
|
|
||||||
public function __construct(Setup $o=NULL)
|
public function __construct(Setup $o=NULL)
|
||||||
{
|
{
|
||||||
@ -381,13 +383,13 @@ abstract class Protocol
|
|||||||
|
|
||||||
case self::SESSION_EMSI:
|
case self::SESSION_EMSI:
|
||||||
Log::debug(sprintf('%s:- Starting EMSI',self::LOGKEY));
|
Log::debug(sprintf('%s:- Starting EMSI',self::LOGKEY));
|
||||||
$rc = $this->protocol_session();
|
$rc = $this->protocol_session($this->originate);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case self::SESSION_BINKP:
|
case self::SESSION_BINKP:
|
||||||
Log::debug(sprintf('%s:- Starting BINKP',self::LOGKEY));
|
Log::debug(sprintf('%s:- Starting BINKP',self::LOGKEY));
|
||||||
$rc = $this->protocol_session();
|
$rc = $this->protocol_session($this->originate);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -396,7 +398,7 @@ abstract class Protocol
|
|||||||
$this->client->speed = self::TCP_SPEED;
|
$this->client->speed = self::TCP_SPEED;
|
||||||
$this->originate = FALSE;
|
$this->originate = FALSE;
|
||||||
|
|
||||||
return $this->protocol_session();
|
return $this->protocol_session($this->originate);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
Log::error(sprintf('%s:! Unsupported session type [%d]',self::LOGKEY,$type));
|
Log::error(sprintf('%s:! Unsupported session type [%d]',self::LOGKEY,$type));
|
||||||
|
@ -897,7 +897,7 @@ final class Binkp extends BaseProtocol
|
|||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->recv->new($file['file'],$this->node->address);
|
$this->recv->new($file['file'],$this->node->address,$this->force_queue);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch ($this->recv->open($file['offs']<0,$file['flags'])) {
|
switch ($this->recv->open($file['offs']<0,$file['flags'])) {
|
||||||
@ -1341,14 +1341,16 @@ final class Binkp extends BaseProtocol
|
|||||||
/**
|
/**
|
||||||
* Set up our BINKP session
|
* Set up our BINKP session
|
||||||
*
|
*
|
||||||
|
* @param bool $force_queue
|
||||||
* @return int
|
* @return int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
protected function protocol_session(): int
|
protected function protocol_session(bool $force_queue=FALSE): int
|
||||||
{
|
{
|
||||||
if ($this->binkp_init() !== self::OK)
|
if ($this->binkp_init() !== self::OK)
|
||||||
return self::S_FAILURE;
|
return self::S_FAILURE;
|
||||||
|
|
||||||
|
$this->force_queue = $force_queue;
|
||||||
$this->binkp_hs();
|
$this->binkp_hs();
|
||||||
|
|
||||||
while (TRUE) {
|
while (TRUE) {
|
||||||
|
@ -99,10 +99,11 @@ final class DNS extends BaseProtocol
|
|||||||
* TTL positive values of a signed 32 bit number.
|
* TTL positive values of a signed 32 bit number.
|
||||||
* UDP messages 512 octets or less
|
* UDP messages 512 octets or less
|
||||||
*
|
*
|
||||||
|
* @param bool $force_queue Not used here
|
||||||
* @return int
|
* @return int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function protocol_session(): int
|
public function protocol_session(bool $force_queue=FALSE): int
|
||||||
{
|
{
|
||||||
Log::debug(sprintf('%s:+ DNS Query',self::LOGKEY));
|
Log::debug(sprintf('%s:+ DNS Query',self::LOGKEY));
|
||||||
|
|
||||||
|
@ -976,11 +976,12 @@ final class EMSI extends BaseProtocol implements CRCInterface,ZmodemInterface
|
|||||||
* @return int
|
* @return int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
protected function protocol_session(): int
|
protected function protocol_session(bool $force_queue=FALSE): int
|
||||||
{
|
{
|
||||||
// @todo introduce emsi_init() to perform the job of protocol_init. Only needs to be done when we originate a session
|
// @todo introduce emsi_init() to perform the job of protocol_init. Only needs to be done when we originate a session
|
||||||
|
|
||||||
Log::debug(sprintf('%s:+ Starting EMSI Protocol SESSION',self::LOGKEY));
|
Log::debug(sprintf('%s:+ Starting EMSI Protocol SESSION',self::LOGKEY));
|
||||||
|
$this->force_queue = $force_queue;
|
||||||
|
|
||||||
$was_req = 0;
|
$was_req = 0;
|
||||||
$got_req = 0;
|
$got_req = 0;
|
||||||
@ -1189,7 +1190,7 @@ final class EMSI extends BaseProtocol implements CRCInterface,ZmodemInterface
|
|||||||
Log::debug(sprintf('%s:+ Start WAZOO Receive',self::LOGKEY));
|
Log::debug(sprintf('%s:+ Start WAZOO Receive',self::LOGKEY));
|
||||||
|
|
||||||
// @todo If the node is not defined in the DB node->address is NULL. Need to figure out how to handle those nodes.
|
// @todo If the node is not defined in the DB node->address is NULL. Need to figure out how to handle those nodes.
|
||||||
$rc = (new Zmodem)->zmodem_receive($this->client,$zap,$this->recv,$this->node->address);
|
$rc = (new Zmodem)->zmodem_receive($this->client,$zap,$this->recv,$this->node->address,$this->force_queue);
|
||||||
|
|
||||||
return ($rc === self::RCDO || $rc === self::ERROR);
|
return ($rc === self::RCDO || $rc === self::ERROR);
|
||||||
}
|
}
|
||||||
|
@ -273,10 +273,11 @@ final class Zmodem extends Protocol implements CRCInterface,ZmodemInterface
|
|||||||
/**
|
/**
|
||||||
* Setup our ZMODEM session
|
* Setup our ZMODEM session
|
||||||
*
|
*
|
||||||
|
* @param bool $force_queue Not used here
|
||||||
* @return int
|
* @return int
|
||||||
* @throws \Exception
|
* @throws \Exception
|
||||||
*/
|
*/
|
||||||
public function protocol_session(): int
|
public function protocol_session(bool $force_queue=FALSE): int
|
||||||
{
|
{
|
||||||
$proto = $this->originate ? $this->node->optionGet(self::P_MASK) : $this->optionGet(self::P_MASK);
|
$proto = $this->originate ? $this->node->optionGet(self::P_MASK) : $this->optionGet(self::P_MASK);
|
||||||
|
|
||||||
@ -301,7 +302,7 @@ final class Zmodem extends Protocol implements CRCInterface,ZmodemInterface
|
|||||||
* @param int $canzap
|
* @param int $canzap
|
||||||
* @return int
|
* @return int
|
||||||
*/
|
*/
|
||||||
public function zmodem_receive(SocketClient $client,int $canzap,Receive $recv,Address $ao): int
|
public function zmodem_receive(SocketClient $client,int $canzap,Receive $recv,Address $ao,bool $force_queue=FALSE): int
|
||||||
{
|
{
|
||||||
Log::debug(sprintf('%s:+ Starting ZModem Receive [%d]',self::LOGKEY,$canzap));
|
Log::debug(sprintf('%s:+ Starting ZModem Receive [%d]',self::LOGKEY,$canzap));
|
||||||
|
|
||||||
@ -1489,7 +1490,7 @@ final class Zmodem extends Protocol implements CRCInterface,ZmodemInterface
|
|||||||
$filesleft = -1;
|
$filesleft = -1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
$this->recv->new($file,$ao);
|
$this->recv->new($file,$ao,$this->force_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
return self::ZFILE;
|
return self::ZFILE;
|
||||||
|
Loading…
Reference in New Issue
Block a user