Replacing annotations with attributes
This commit is contained in:
parent
7c18b1d329
commit
04ee933830
16 changed files with 75 additions and 189 deletions
|
@ -14,16 +14,10 @@ class FailedCommandController extends CommandController
|
|||
{
|
||||
use RunSymfonyCommandTrait;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $receiverContainer;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer')]
|
||||
protected ContainerInterface $receiverContainer;
|
||||
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
|
@ -39,7 +33,7 @@ class FailedCommandController extends CommandController
|
|||
*
|
||||
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
|
||||
*/
|
||||
public function showCommand()
|
||||
public function showCommand(): void
|
||||
{
|
||||
$command = new FailedMessagesShowCommand(
|
||||
$this->configuration['failureTransport'],
|
||||
|
@ -59,7 +53,7 @@ class FailedCommandController extends CommandController
|
|||
*
|
||||
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
|
||||
*/
|
||||
public function removeCommand()
|
||||
public function removeCommand(): void
|
||||
{
|
||||
$command = new FailedMessagesRemoveCommand(
|
||||
$this->configuration['failureTransport'],
|
||||
|
@ -87,7 +81,7 @@ class FailedCommandController extends CommandController
|
|||
*
|
||||
* @noinspection PhpParamsInspection
|
||||
*/
|
||||
public function retryCommand()
|
||||
public function retryCommand(): void
|
||||
{
|
||||
$command = new FailedMessagesRetryCommand(
|
||||
$this->configuration['failureTransport'],
|
||||
|
|
|
@ -17,47 +17,29 @@ class MessengerCommandController extends CommandController
|
|||
{
|
||||
use RunSymfonyCommandTrait;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus")
|
||||
* @var RoutableMessageBus
|
||||
*/
|
||||
protected $routableBus;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus')]
|
||||
protected RoutableMessageBus $routableBus;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $receiverContainer;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer')]
|
||||
protected ContainerInterface $receiverContainer;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher")
|
||||
* @var EventDispatcherInterface
|
||||
*/
|
||||
protected $eventDispatcher;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher')]
|
||||
protected EventDispatcherInterface $eventDispatcher;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(lazy=false)
|
||||
* @var LoggerInterface
|
||||
*/
|
||||
#[Flow\Inject]
|
||||
protected LoggerInterface $logger;
|
||||
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
|
||||
* @var CacheItemPoolInterface
|
||||
*/
|
||||
protected $restartSignalCachePool;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool')]
|
||||
protected CacheItemPoolInterface $restartSignalCachePool;
|
||||
|
||||
/**
|
||||
* Consumes messages and dispatches them to the message bus
|
||||
*
|
||||
* To receive from multiple transports, pass each name:
|
||||
* <info>worker:consume receiver1 receiver2</info>
|
||||
* <info>messenger:consume receiver1 receiver2</info>
|
||||
*
|
||||
* Options are:
|
||||
* --limit limits the number of messages received
|
||||
|
@ -71,7 +53,7 @@ class MessengerCommandController extends CommandController
|
|||
*
|
||||
* Optional arguments are -q (quiet) and -v[v[v]] (verbosity)
|
||||
*/
|
||||
public function consumeCommand()
|
||||
public function consumeCommand(): void
|
||||
{
|
||||
if ($this->receiverContainer instanceof DependencyProxy) {
|
||||
$this->receiverContainer->_activateDependency();
|
||||
|
@ -92,7 +74,7 @@ class MessengerCommandController extends CommandController
|
|||
/**
|
||||
* List all available receivers
|
||||
*/
|
||||
public function listReceiversCommand()
|
||||
public function listReceiversCommand(): void
|
||||
{
|
||||
foreach (\array_keys($this->configuration['transports']) as $transportName) {
|
||||
$this->outputLine('- ' . $transportName);
|
||||
|
@ -106,7 +88,7 @@ class MessengerCommandController extends CommandController
|
|||
* and then exit. Worker commands are *not* automatically restarted: that
|
||||
* should be handled by a process control system.
|
||||
*/
|
||||
public function stopWorkersCommand()
|
||||
public function stopWorkersCommand(): void
|
||||
{
|
||||
$cacheItem = $this->restartSignalCachePool->getItem(
|
||||
StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY
|
||||
|
|
|
@ -10,7 +10,7 @@ use Symfony\Component\Console\Output\OutputInterface;
|
|||
|
||||
trait RunSymfonyCommandTrait
|
||||
{
|
||||
protected function run(Command $command)
|
||||
protected function run(Command $command): void
|
||||
{
|
||||
$definition = $command->getDefinition();
|
||||
$definition->setArguments(\array_merge(
|
||||
|
@ -29,7 +29,7 @@ trait RunSymfonyCommandTrait
|
|||
$command->run($input, $this->output->getOutput());
|
||||
}
|
||||
|
||||
protected function configureIO($input, $output)
|
||||
protected function configureIO($input, $output): void
|
||||
{
|
||||
switch ($shellVerbosity = (int)\getenv('SHELL_VERBOSITY')) {
|
||||
case -1:
|
||||
|
|
|
@ -11,19 +11,13 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
|
|||
// TODO: Maybe an own package for EntityManager bridge?
|
||||
class EventDispatcherFactory
|
||||
{
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ObjectManagerInterface
|
||||
*/
|
||||
protected $objectManager;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected ObjectManagerInterface $objectManager;
|
||||
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
protected $configuration;
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
public function create()
|
||||
public function create(): EventDispatcher
|
||||
{
|
||||
$eventDispatcher = new EventDispatcher();
|
||||
|
||||
|
@ -36,7 +30,7 @@ class EventDispatcherFactory
|
|||
return $eventDispatcher;
|
||||
}
|
||||
|
||||
private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId)
|
||||
private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId): void
|
||||
{
|
||||
$subscriberClass = $this->objectManager->getClassNameByObjectName($subscriberId);
|
||||
if (! \is_a($subscriberClass, EventSubscriberInterface::class, true)) {
|
||||
|
|
|
@ -11,27 +11,19 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent;
|
|||
|
||||
// This is a 1 to one copy of the original event listener, with a modified RESTART_REQUESTED_TIMESTAMP_KEY to match
|
||||
// the restriction of the cache ids in flow.
|
||||
// Also the DI is simplified
|
||||
// Additionally, the DI is simplified
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
|
||||
{
|
||||
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers_restart_requested_timestamp';
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
|
||||
* @var CacheItemPoolInterface
|
||||
*/
|
||||
protected $cachePool;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool', lazy: false)]
|
||||
protected CacheItemPoolInterface $cachePool;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var LoggerInterface
|
||||
*/
|
||||
protected $logger;
|
||||
private $workerStartedAt;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected LoggerInterface $logger;
|
||||
private float $workerStartedAt;
|
||||
|
||||
public function onWorkerStarted(): void
|
||||
{
|
||||
|
@ -42,13 +34,11 @@ class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
|
|||
{
|
||||
if ($this->shouldRestart()) {
|
||||
$event->getWorker()->stop();
|
||||
if (null !== $this->logger) {
|
||||
$this->logger->info('Worker stopped because a restart was requested.');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static function getSubscribedEvents()
|
||||
public static function getSubscribedEvents(): array
|
||||
{
|
||||
return [
|
||||
WorkerStartedEvent::class => 'onWorkerStarted',
|
||||
|
|
|
@ -12,25 +12,16 @@ use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
|
|||
|
||||
class HandlersLocatorFactory
|
||||
{
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
protected $configuration;
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ObjectManagerInterface
|
||||
*/
|
||||
protected $objectManager;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected ObjectManagerInterface $objectManager;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ReflectionService
|
||||
*/
|
||||
protected $reflectionService;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected ReflectionService $reflectionService;
|
||||
|
||||
public function create($busName = 'default')
|
||||
public function create($busName = 'default'): HandlersLocator
|
||||
{
|
||||
$messageHandlerClasses = $this->reflectionService
|
||||
->getAllImplementationClassNamesForInterface(MessageSubscriberInterface::class);
|
||||
|
|
|
@ -7,25 +7,17 @@ use Neos\Flow\Annotations as Flow;
|
|||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\MessageBus;
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class MessageBusContainer implements ContainerInterface
|
||||
{
|
||||
/**
|
||||
* @Flow\InjectConfiguration(path="buses")
|
||||
* @var array
|
||||
*/
|
||||
protected $configuration;
|
||||
#[Flow\InjectConfiguration(path: 'buses')]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
* @var MessageBus[]
|
||||
*/
|
||||
protected array $buses = [];
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function get(string $id)
|
||||
{
|
||||
if (! isset($this->buses[$id])) {
|
||||
|
@ -35,10 +27,7 @@ class MessageBusContainer implements ContainerInterface
|
|||
return $this->buses[$id];
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function has(string $id)
|
||||
public function has(string $id): bool
|
||||
{
|
||||
return isset($this->configuration[$id]);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ class ChainedContainer implements ContainerInterface
|
|||
throw new \InvalidArgumentException('Service id is unknown: ' . $id);
|
||||
}
|
||||
|
||||
public function has(string $id)
|
||||
public function has(string $id): bool
|
||||
{
|
||||
foreach ($this->childContainers as $childContainer) {
|
||||
if ($childContainer->has($id)) {
|
||||
|
|
|
@ -15,11 +15,8 @@ use Psr\Log\LoggerInterface;
|
|||
*/
|
||||
class RewindableGenerator implements \IteratorAggregate, \Countable
|
||||
{
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ObjectManagerInterface
|
||||
*/
|
||||
protected $objectManager;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected ObjectManagerInterface $objectManager;
|
||||
|
||||
private array $serviceIds;
|
||||
|
||||
|
@ -46,14 +43,14 @@ class RewindableGenerator implements \IteratorAggregate, \Countable
|
|||
};
|
||||
}
|
||||
|
||||
public function getIterator()
|
||||
public function getIterator(): \Traversable
|
||||
{
|
||||
$g = $this->generator;
|
||||
|
||||
return $g();
|
||||
}
|
||||
|
||||
public function count()
|
||||
public function count(): int
|
||||
{
|
||||
return \count($this->serviceIds);
|
||||
}
|
||||
|
|
|
@ -8,22 +8,14 @@ use Psr\Container\ContainerInterface;
|
|||
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
|
||||
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class RetryStrategiesContainer implements ContainerInterface
|
||||
{
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ObjectManagerInterface
|
||||
*/
|
||||
protected $objectManager;
|
||||
#[Flow\Inject]
|
||||
protected ObjectManagerInterface $objectManager;
|
||||
|
||||
/**
|
||||
* @var RetryStrategyInterface[]
|
||||
|
@ -54,7 +46,7 @@ class RetryStrategiesContainer implements ContainerInterface
|
|||
return $this->retryStrategies[$id];
|
||||
}
|
||||
|
||||
public function has(string $id)
|
||||
public function has(string $id): bool
|
||||
{
|
||||
return isset($this->configuration['transports'][$id]);
|
||||
}
|
||||
|
|
|
@ -6,9 +6,7 @@ use Neos\Flow\Annotations as Flow;
|
|||
use Psr\Container\ContainerInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class FailureTransportContainer implements ContainerInterface
|
||||
{
|
||||
/**
|
||||
|
@ -21,12 +19,12 @@ class FailureTransportContainer implements ContainerInterface
|
|||
return $this->transports[$id];
|
||||
}
|
||||
|
||||
public function has(string $id)
|
||||
public function has(string $id): bool
|
||||
{
|
||||
return isset($this->transports[$id]);
|
||||
}
|
||||
|
||||
public function set(string $id, TransportInterface $transport)
|
||||
public function set(string $id, TransportInterface $transport): void
|
||||
{
|
||||
$this->transports[$id] = $transport;
|
||||
}
|
||||
|
|
|
@ -13,9 +13,7 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
|
|||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class FlowDoctrineTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
private EntityManagerInterface $entityManager;
|
||||
|
@ -51,11 +49,8 @@ class FlowDoctrineTransportFactory implements TransportFactoryInterface
|
|||
return new DoctrineTransport($connection, $serializer);
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === \strpos($dsn, 'flow-doctrine://');
|
||||
return \str_starts_with($dsn, 'flow-doctrine://');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,33 +7,21 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
|||
|
||||
class NullTransport implements TransportInterface
|
||||
{
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function get(): iterable
|
||||
{
|
||||
return new \EmptyIterator();
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function ack(Envelope $envelope): void
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function reject(Envelope $envelope): void
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function send(Envelope $envelope): Envelope
|
||||
{
|
||||
return $envelope;
|
||||
|
|
|
@ -8,19 +8,13 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
|
|||
|
||||
class NullTransportFactory implements TransportFactoryInterface
|
||||
{
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
|
||||
{
|
||||
return new NullTransport();
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function supports(string $dsn, array $options): bool
|
||||
{
|
||||
return 0 === \strpos($dsn, 'null://');
|
||||
return \str_starts_with($dsn, 'null://');
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,34 +8,20 @@ use Psr\Container\ContainerInterface;
|
|||
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
|
||||
use Symfony\Component\Messenger\Transport\TransportInterface;
|
||||
|
||||
/**
|
||||
* @Flow\Scope("singleton")
|
||||
*/
|
||||
#[Flow\Scope('singleton')]
|
||||
class TransportsContainer implements ContainerInterface
|
||||
{
|
||||
/**
|
||||
* @Flow\InjectConfiguration
|
||||
* @var array
|
||||
*/
|
||||
#[Flow\InjectConfiguration]
|
||||
protected array $configuration;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var ObjectManagerInterface
|
||||
*/
|
||||
protected $objectManager;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected ObjectManagerInterface $objectManager;
|
||||
|
||||
/**
|
||||
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:TransportFactory")
|
||||
* @var TransportFactoryInterface
|
||||
*/
|
||||
protected $transportFactory;
|
||||
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:TransportFactory', lazy: false)]
|
||||
protected TransportFactoryInterface $transportFactory;
|
||||
|
||||
/**
|
||||
* @Flow\Inject
|
||||
* @var FailureTransportContainer
|
||||
*/
|
||||
protected $failureTransports;
|
||||
#[Flow\Inject(lazy: false)]
|
||||
protected FailureTransportContainer $failureTransports;
|
||||
|
||||
/**
|
||||
* @var TransportInterface[]
|
||||
|
@ -80,7 +66,7 @@ class TransportsContainer implements ContainerInterface
|
|||
return $this->transports[$id];
|
||||
}
|
||||
|
||||
public function has(string $id)
|
||||
public function has(string $id): bool
|
||||
{
|
||||
return isset($this->configuration['transports'][$id]);
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ DigiComp:
|
|||
Messenger:
|
||||
defaultBusName: "default"
|
||||
defaultSerializerName: "DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer"
|
||||
# TODO: use this
|
||||
defaultRetryStrategyOptions:
|
||||
maxRetries: 3
|
||||
# milliseconds delay
|
||||
|
@ -19,8 +18,8 @@ DigiComp:
|
|||
# e.g. 1 second delay, 2 seconds, 4 seconds
|
||||
multiplier: 2
|
||||
maxDelay: 0
|
||||
# override all of this with a service that
|
||||
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
|
||||
# override all of this with a service that implements
|
||||
# Symfony\Component\Messenger\Retry\RetryStrategyInterface
|
||||
service: null
|
||||
|
||||
eventDispatcher:
|
||||
|
@ -55,7 +54,4 @@ DigiComp:
|
|||
dsn: "null://"
|
||||
|
||||
failureTransport: "discard"
|
||||
# TODO: Receivers and Senders? (As far as I can see not possible in Symfony)
|
||||
# receivers:[]
|
||||
# senders: []
|
||||
routing: []
|
||||
|
|
Loading…
Reference in a new issue