DigiComp.FlowSymfonyBridge..../Classes/Command/MessengerCommandController.php
2022-09-18 13:17:53 +02:00

119 lines
4 KiB
PHP

<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Command;
use DigiComp\FlowSymfonyBridge\Messenger\EventListener\StopWorkerOnRestartSignalListener;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Neos\Flow\ObjectManagement\DependencyInjection\DependencyProxy;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\RoutableMessageBus;
class MessengerCommandController extends CommandController
{
use RunSymfonyCommandTrait;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus")
* @var RoutableMessageBus
*/
protected $routableBus;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
* @var ContainerInterface
*/
protected $receiverContainer;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher")
* @var EventDispatcherInterface
*/
protected $eventDispatcher;
/**
* @Flow\Inject(lazy=false)
* @var LoggerInterface
*/
protected LoggerInterface $logger;
/**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
* @var CacheItemPoolInterface
*/
protected $restartSignalCachePool;
/**
* Consumes messages and dispatches them to the message bus
*
* To receive from multiple transports, pass each name:
* <info>worker:consume receiver1 receiver2</info>
*
* Options are:
* --limit limits the number of messages received
* --failure-limit stop the worker when the given number of failed messages is reached
* --memory-limit stop the worker if it exceeds a given memory usage limit. You can use shorthand
* byte values [K, M, or G]
* --time-limit stop the worker when the gien time limit (in seconds) is reached. If a message is beeing handled,
* the worker will stop after the processing is finished
* --bus specify the message bus to dispatch received messages to instead of trying to determine it automatically.
* This is required if the messages didn't originate from Messenger
*
* Optional arguments are -q (quiet) and -v[v[v]] (verbosity)
*/
public function consumeCommand()
{
if ($this->receiverContainer instanceof DependencyProxy) {
$this->receiverContainer->_activateDependency();
}
if ($this->eventDispatcher instanceof DependencyProxy) {
$this->eventDispatcher->_activateDependency();
}
$command = new ConsumeMessagesCommand(
$this->routableBus,
$this->receiverContainer,
$this->eventDispatcher,
$this->logger,
\array_keys($this->configuration['transports'])
);
$this->run($command);
}
/**
* List all available receivers
*/
public function listReceiversCommand()
{
foreach (\array_keys($this->configuration['transports']) as $transportName) {
$this->outputLine('- ' . $transportName);
}
}
/**
* Stop workers after their current message
*
* Each worker command will finish the message they are currently processing
* and then exit. Worker commands are *not* automatically restarted: that
* should be handled by a process control system.
*/
public function stopWorkersCommand()
{
$cacheItem = $this->restartSignalCachePool->getItem(
StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY
);
$cacheItem->set(\microtime(true));
$this->restartSignalCachePool->save($cacheItem);
//TODO: Add the possibility to wait until all are exited
}
}