list = collect(); $this->receiving = NULL; $this->file_pos = 0; $this->f = NULL; } public function __get($key) { switch ($key) { case 'fd': return is_resource($this->f); case 'filepos': return $this->file_pos; case 'mtime': case 'name': case 'size': return $this->receiving?->{'file_'.$key}; case 'name_size_time': return sprintf('%s %lu %lu',$this->name,$this->size,$this->mtime); case 'to_get': return $this->list ->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === FALSE; }) ->count(); case 'total_recv': return $this->list ->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; }) ->count(); case 'total_recv_bytes': return $this->list ->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; }) ->sum(function($item) { return $item->file_size; }); default: throw new Exception('Unknown key: '.$key); } } /** * Close the file descriptor for our incoming file * * @throws Exception */ public function close(): void { if (! $this->f) throw new Exception('No file to close'); if ($this->file_pos != $this->receiving->file_size) { Log::warning(sprintf('%s: - Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->file_name,$this->receiving->file_size-$this->file_pos)); $this->receiving->incomplete = TRUE; } $this->receiving->received = TRUE; $end = time()-$this->start; Log::debug(sprintf('%s: - Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->file_name,$end)); fclose($this->f); $this->file_pos = 0; $this->f = NULL; // If the packet has been received but not the right size, dont process it any more. // If we received a packet, we'll dispatch a job to process it if (! $this->receiving->incomplete) switch ($this->receiving->file_type) { case self::IS_ARC: case self::IS_PKT: Log::info(sprintf('%s: - Processing mail %s [%s]',self::LOGKEY,$this->receiving->file_type === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file)); try { $f = new File($this->file); $processed = FALSE; foreach ($f as $packet) { $po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system); // Check the messages are from the uplink if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id == $po->fftn_o->id; }) === FALSE) { Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id)); break; } // Check the packet password if ($this->ao->session('pktpass') != $po->password) { Log::error(sprintf('%s: ! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password)); // @todo Generate message to system advising invalid password - that message should be sent without a packet password! break; } Log::info(sprintf('%s: - Packet has [%d] messages',self::LOGKEY,$po->count())); // Queue messages if there are too many in the packet. if ($queue = ($po->count() > config('app.queue_msgs'))) Log::info(sprintf('%s: - Messages will be sent to the queue for processing',self::LOGKEY)); $count = 0; foreach ($po as $msg) { Log::info(sprintf('%s: - Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn)); // @todo Quick check that the packet should be processed by us. // @todo validate that the packet's zone is in the domain. try { // Dispatch job. if ($queue) MessageProcess::dispatch($msg,$f->pktName()); else MessageProcess::dispatchSync($msg,$f->pktName()); } catch (Exception $e) { Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage())); } $count++; } if ($count === $po->count()) $processed = TRUE; } if (! $processed) { Log::alert(sprintf('%s: - Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file)); // If we want to keep the packet, we could do that logic here } elseif (! config('app.packet_keep')) { Log::debug(sprintf('%s: - Deleting processed packet [%s]',self::LOGKEY,$this->file)); unlink($this->file); } } catch (InvalidPacketException $e) { Log::error(sprintf('%s: - Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file),['e'=>$e->getMessage()]); } catch (\Exception $e) { Log::error(sprintf('%s: - Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file),['e'=>$e->getMessage()]); } break; case self::IS_TIC: Log::info(sprintf('%s: - Processing TIC file [%s]',self::LOGKEY,$this->file)); // Queue the tic to be processed later, in case the referenced file hasnt been received yet TicProcess::dispatch($this->file); break; default: Log::debug(sprintf('%s: - Leaving file [%s] in the inbound dir',self::LOGKEY,$this->file)); } $this->receiving = NULL; } /** * Open the file descriptor to receive a file * * @param Address $ao * @param bool $check * @return bool * @throws Exception */ public function open(Address $ao,bool $check=FALSE): bool { Log::debug(sprintf('%s:+ open [%d]',self::LOGKEY,$check)); // Check we can open this file // @todo // @todo implement return 2 - SKIP file // @todo implement return 4 - SUSPEND(?) file if ($check) { return 0; } if (! $this->receiving) throw new Exception('No files currently receiving'); $this->ao = $ao; $this->file_pos = 0; $this->start = time(); $this->file = sprintf('storage/app/%s/%04X-%s',config('app.fido'),$this->ao->id,$this->receiving->recvas); Log::debug(sprintf('%s: - Opening [%s]',self::LOGKEY,$this->file)); $this->f = fopen($this->file,'wb'); if (! $this->f) { Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->file_name)); return 3; // @todo change to const } Log::info(sprintf('%s:= open - File [%s] opened for writing',self::LOGKEY,$this->receiving->file_name)); return 0; // @todo change to const } /** * Add a new file to receive * * @param array $file * @throws Exception */ public function new(array $file): void { Log::debug(sprintf('%s:+ new [%s]',self::LOGKEY,join('|',$file))); if ($this->receiving) throw new Exception('Can only have 1 file receiving at a time'); $o = new Item($file,self::I_RECV); $this->list->push($o); $this->receiving = $o; } /** * Write data to the file we are receiving * * @param string $buf * @return int * @throws Exception */ public function write(string $buf): int { if (! $this->f) throw new Exception('No file open for read'); if ($this->file_pos+strlen($buf) > $this->receiving->file_size) throw new Exception(sprintf('Too many bytes received [%d] (%d)?',$this->file_pos+strlen($buf),$this->receiving->file_size)); $rc = fwrite($this->f,$buf); if ($rc === FALSE) throw new FileException('Error while writing to file'); $this->file_pos += $rc; Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->file_pos,$this->receiving->file_size,strlen($buf))); return $rc; } }