First working version of symfony/messenger bus

This commit is contained in:
Ferdinand Kuhl 2021-04-18 22:24:02 +02:00
parent ef5d1ed04c
commit 4773325ae8
26 changed files with 1367 additions and 0 deletions

8
.phpstorm.meta.php Normal file
View file

@ -0,0 +1,8 @@
<?php
/*
* This file configures dynamic return type support for factory methods in PhpStorm
*/
namespace PHPSTORM_META {
override(\Symfony\Component\Messenger\Envelope::last(), type(0));
}

View file

@ -0,0 +1,101 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Command;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
class FailedCommandController extends CommandController
{
use RunSymfonyCommandTrait;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
* @var ContainerInterface
*/
protected $receiverContainer;
/**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration;
/**
* Show one or more messages from the failure transport
*
* The <info>%command.name%</info> shows message that are pending in the failure transport.
*
* <info>php %command.full_name%</info>
*
* Or look at a specific message by its id:
*
* <info>php %command.full_name% {id}</info>
*
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
*/
public function showCommand()
{
$command = new FailedMessagesShowCommand(
$this->configuration['failureTransport'],
$this->receiverContainer->get($this->configuration['failureTransport'])
);
$this->run($command);
}
/**
* Remove given messages from the failure transport
*
* The <info>%command.name%</info> removes given messages that are pending in the failure transport.
*
* <info>php %command.full_name% {id1} [{id2} ...]</info>
*
* The specific ids can be found via the messenger:failed:show command.
*
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
*/
public function removeCommand()
{
$command = new FailedMessagesRemoveCommand(
$this->configuration['failureTransport'],
$this->receiverContainer->get($this->configuration['failureTransport'])
);
$this->run($command);
}
/**
* Retry one or more messages from the failure transport
*
* The command will interactively ask if each message should be retried
* or discarded.
*
* Some transports support retrying a specific message id, which comes
* from the <info>messenger:failed:show</info> command.
*
* <info>php %command.full_name% {id}</info>
*
* Or pass multiple ids at once to process multiple messages:
*
* <info>php %command.full_name% {id1} {id2} {id3}</info>
*
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
*
* @noinspection PhpParamsInspection
*/
public function retryCommand()
{
$command = new FailedMessagesRetryCommand(
$this->configuration['failureTransport'],
$this->receiverContainer->get($this->configuration['failureTransport']),
$this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus'),
$this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'),
$this->objectManager->get(LoggerInterface::class)
);
$this->run($command);
}
}

View file

@ -0,0 +1,120 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Command;
use DigiComp\FlowSymfonyBridge\Messenger\EventListener\StopWorkerOnRestartSignalListener;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;
use Neos\Flow\ObjectManagement\DependencyInjection\DependencyProxy;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\RoutableMessageBus;
class MessengerCommandController extends CommandController
{
use RunSymfonyCommandTrait;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus")
* @var RoutableMessageBus
*/
protected $routableBus;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
* @var ContainerInterface
*/
protected $receiverContainer;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher")
* @var EventDispatcherInterface
*/
protected $eventDispatcher;
/**
* @Flow\Inject(lazy=false)
* @var LoggerInterface
*/
protected LoggerInterface $logger;
/**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
* @var CacheItemPoolInterface
*/
protected $restartSignalCachePool;
/**
* Consumes messages and dispatches them to the message bus
*
* To receive from multiple transports, pass each name:
* <info>worker:consume receiver1 receiver2</info>
*
* Options are:
* --limit limits the number of messages received
* --failure-limit stop the worker when the given number of failed messages is reached
* --memory-limit stop the worker if it exceeds a given memory usage limit. You can use shorthand
* byte values [K, M, or G]
* --time-limit stop the worker when the gien time limit (in seconds) is reached. If a message is beeing handled,
* the worker will stop after the processing is finished
* --bus specify the message bus to dispatch received messages to instead of trying to determine it automatically.
* This is required if the messages didn't originate from Messenger
*
* Optional arguments are -q (quiet) and -v[v[v]] (verbosity)
*/
public function consumeCommand()
{
if ($this->receiverContainer instanceof DependencyProxy) {
$this->receiverContainer->_activateDependency();
}
if ($this->eventDispatcher instanceof DependencyProxy) {
$this->eventDispatcher->_activateDependency();
}
$command = new ConsumeMessagesCommand(
$this->routableBus,
$this->receiverContainer,
$this->eventDispatcher,
$this->logger,
array_keys($this->configuration['transports'])
);
$this->run($command);
}
/**
* List all available receivers
*/
public function listReceiversCommand()
{
foreach (array_keys($this->configuration['transports']) as $transportName) {
$this->outputLine('- ' . $transportName);
}
}
/**
* Stop workers after their current message
*
* Each worker command will finish the message they are currently processing
* and then exit. Worker commands are *not* automatically restarted: that
* should be handled by a process control system.
*/
public function stopWorkersCommand()
{
$cacheItem = $this->restartSignalCachePool->getItem(
StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY
);
$cacheItem->set(microtime(true));
$this->restartSignalCachePool->save($cacheItem);
//TODO: Add the possibility to wait until all are exited
}
}

View file

@ -0,0 +1,89 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\ArgvInput;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
trait RunSymfonyCommandTrait
{
protected function run(Command $command)
{
$definition = $command->getDefinition();
$definition->setArguments(array_merge(
[new InputArgument('command', InputArgument::REQUIRED)],
$definition->getArguments()
));
$definition->setOptions(array_merge(
[
new InputOption('--verbose', '-v|vv|vvv', InputOption::VALUE_NONE, 'Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug'),
new InputOption('--quiet', '-q', InputOption::VALUE_NONE, 'Do not output any message'),
],
$definition->getOptions()
));
$input = new ArgvInput(null, $command->getDefinition());
$this->configureIO($input, $this->output->getOutput());
$command->run($input, $this->output->getOutput());
}
protected function configureIO($input, $output)
{
switch ($shellVerbosity = (int) getenv('SHELL_VERBOSITY')) {
case -1:
$output->setVerbosity(OutputInterface::VERBOSITY_QUIET);
break;
case 1:
$output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE);
break;
case 2:
$output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE);
break;
case 3:
$output->setVerbosity(OutputInterface::VERBOSITY_DEBUG);
break;
default:
$shellVerbosity = 0;
break;
}
if (true === $input->hasParameterOption(['--quiet', '-q'], true)) {
$output->setVerbosity(OutputInterface::VERBOSITY_QUIET);
$shellVerbosity = -1;
} else {
if (
$input->hasParameterOption('-vvv', true)
|| $input->hasParameterOption('--verbose=3', true)
|| 3 === $input->getParameterOption('--verbose', false, true)
) {
$output->setVerbosity(OutputInterface::VERBOSITY_DEBUG);
$shellVerbosity = 3;
} elseif (
$input->hasParameterOption('-vv', true)
|| $input->hasParameterOption('--verbose=2', true)
|| 2 === $input->getParameterOption('--verbose', false, true)
) {
$output->setVerbosity(OutputInterface::VERBOSITY_VERY_VERBOSE);
$shellVerbosity = 2;
} elseif (
$input->hasParameterOption('-v', true)
|| $input->hasParameterOption('--verbose=1', true)
|| $input->hasParameterOption('--verbose', true)
|| $input->getParameterOption('--verbose', false, true)
) {
$output->setVerbosity(OutputInterface::VERBOSITY_VERBOSE);
$shellVerbosity = 1;
}
}
if (-1 === $shellVerbosity) {
$input->setInteractive(false);
}
putenv('SHELL_VERBOSITY=' . $shellVerbosity);
$_ENV['SHELL_VERBOSITY'] = $shellVerbosity;
$_SERVER['SHELL_VERBOSITY'] = $shellVerbosity;
}
}

View file

@ -0,0 +1,76 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
// TODO: Maybe an own package for EntityManager bridge?
class EventDispatcherFactory
{
/**
* @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
/**
* @Flow\InjectConfiguration
* @var array
*/
protected $configuration;
public function create()
{
$eventDispatcher = new EventDispatcher();
foreach ($this->configuration['eventDispatcher']['subscribers'] as $subscriberId => $enabled) {
if ($subscriberId === null || ! (bool) $enabled) {
continue;
}
$this->addLazySubscribers($eventDispatcher, $subscriberId);
}
return $eventDispatcher;
}
private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId)
{
$subscriberClass = $this->objectManager->getClassNameByObjectName($subscriberId);
if (! is_a($subscriberClass, EventSubscriberInterface::class, true)) {
throw new \RuntimeException(
'Object with name ' . $subscriberId . ' is not an EventSubscriberInterface',
1618753949
);
}
foreach ($subscriberClass::getSubscribedEvents() as $eventName => $params) {
if (\is_string($params)) {
$callClosure = function (...$arguments) use ($subscriberId, $params) {
$subscriber = $this->objectManager->get($subscriberId);
$method = $params;
return $subscriber->$method(...$arguments);
};
$eventDispatcher->addListener($eventName, $callClosure);
} elseif (\is_string($params[0])) {
$callClosure = function (...$arguments) use ($subscriberId, $params) {
$subscriber = $this->objectManager->get($subscriberId);
$method = $params[0];
return $subscriber->$method(...$arguments);
};
$eventDispatcher->addListener($eventName, $callClosure, $params[1] ?? 0);
} else {
foreach ($params as $listener) {
$callClosure = function (...$arguments) use ($subscriberId, $listener) {
$subscriber = $this->objectManager->get($subscriberId);
$method = $listener[0];
return $subscriber->$method(...$arguments);
};
$eventDispatcher->addListener($eventName, $callClosure, $listener[1] ?? 0);
}
}
}
}
}

View file

@ -0,0 +1,70 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\EventListener;
use Neos\Flow\Annotations as Flow;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
// This is a 1 to one copy of the original event listener, with a modified RESTART_REQUESTED_TIMESTAMP_KEY to match
// the restriction of the cache ids in flow.
// Also the DI is simplified
/**
* @Flow\Scope("singleton")
*/
class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
{
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers_restart_requested_timestamp';
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
* @var CacheItemPoolInterface
*/
protected $cachePool;
/**
* @Flow\Inject
* @var LoggerInterface
*/
protected $logger;
private $workerStartedAt;
public function onWorkerStarted(): void
{
$this->workerStartedAt = microtime(true);
}
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if ($this->shouldRestart()) {
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
}
}
public static function getSubscribedEvents()
{
return [
WorkerStartedEvent::class => 'onWorkerStarted',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
private function shouldRestart(): bool
{
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
if (!$cacheItem->isHit()) {
// no restart has ever been scheduled
return false;
}
return $this->workerStartedAt < $cacheItem->get();
}
}

View file

@ -0,0 +1,57 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Neos\Flow\Reflection\ReflectionService;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class HandlersLocatorFactory
{
/**
* @Flow\InjectConfiguration
* @var array
*/
protected $configuration;
/**
* @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
/**
* @Flow\Inject
* @var ReflectionService
*/
protected $reflectionService;
public function create($busName = 'default')
{
$messageHandlerClasses = $this->reflectionService
->getAllImplementationClassNamesForInterface(MessageSubscriberInterface::class);
$handlerDescriptors = [];
foreach ($messageHandlerClasses as $messageHandlerClass) {
foreach ($messageHandlerClass::getHandledMessages() as $messageName => $config) {
if (! is_array($config)) {
throw new \InvalidArgumentException(
'different from doctrine, we (currently) need subscribers to always have an option array'
);
}
if (isset($config['bus']) && $config['bus'] !== $busName) {
continue;
}
$handlerDescriptors[$messageName][] = new HandlerDescriptor(
$this->objectManager->get($messageHandlerClass),
$config
);
}
}
// TODO: Maybe we can allow handlers to be added to bus or globally by configuration?
return new HandlersLocator($handlerDescriptors);
}
}

View file

@ -0,0 +1,45 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger;
use DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\RewindableGenerator;
use Neos\Flow\Annotations as Flow;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\MessageBus;
/**
* @Flow\Scope("singleton")
*/
class MessageBusContainer implements ContainerInterface
{
/**
* @Flow\InjectConfiguration(path="buses")
* @var array
*/
protected $configuration;
/**
* @var MessageBus[]
*/
protected array $buses = [];
/**
* @inheritDoc
*/
public function get(string $id)
{
if (! isset($this->buses[$id])) {
$middlewares = new RewindableGenerator($this->configuration[$id]['middleware']);
$this->buses[$id] = new MessageBus($middlewares);
}
return $this->buses[$id];
}
/**
* @inheritDoc
*/
public function has(string $id)
{
return isset($this->configuration[$id]);
}
}

View file

@ -0,0 +1,35 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement;
use Psr\Container\ContainerInterface;
class ChainedContainer implements ContainerInterface
{
private array $childContainers;
public function __construct(ContainerInterface ...$childContainers)
{
$this->childContainers = $childContainers;
}
public function get(string $id)
{
foreach ($this->childContainers as $childContainer) {
if ($childContainer->has($id)) {
return $childContainer->get($id);
}
}
throw new \InvalidArgumentException('Service id is unknown: ' . $id);
}
public function has(string $id)
{
foreach ($this->childContainers as $childContainer) {
if ($childContainer->has($id)) {
return true;
}
}
return false;
}
}

View file

@ -0,0 +1,60 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Neos\Utility\PositionalArraySorter;
use Psr\Log\LoggerInterface;
/**
* Helper for dependency injection. It allows to defer object construction until the list is actually iterated.
*
* It filters out service ids which have been set to NULL to make deleting services possible, without overwriting the
* complete array, allowing to use string keys.
*/
class RewindableGenerator implements \IteratorAggregate, \Countable
{
/**
* @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
private array $serviceIds;
private \Closure $generator;
public function __construct(array $serviceIds)
{
$this->serviceIds = $serviceIds;
$sortedServiceIds = array_keys(
(new PositionalArraySorter($serviceIds))->toArray()
);
$this->generator = function () use ($sortedServiceIds) {
foreach ($sortedServiceIds as $serviceId) {
if ($serviceId === null) {
continue;
}
$object = $this->objectManager->get($serviceId);
// TODO: Thats a quite poor solution to dynamically inject the logger - but it is easy
if (method_exists($object, 'setLogger')) {
$object->setLogger($this->objectManager->get(LoggerInterface::class));
}
yield $object;
}
};
}
public function getIterator()
{
$g = $this->generator;
return $g();
}
public function count()
{
return count($this->serviceIds);
}
}

14
Classes/Package.php Normal file
View file

@ -0,0 +1,14 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Package\Package as BasePackage;
class Package extends BasePackage
{
public function boot(Bootstrap $bootstrap)
{
parent::boot($bootstrap);
}
}

View file

@ -0,0 +1,61 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
/**
* @Flow\Scope("singleton")
*/
class RetryStrategiesContainer implements ContainerInterface
{
/**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration;
/**
* @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
/**
* @var RetryStrategyInterface[]
*/
protected array $retryStrategies;
public function get(string $id)
{
if (! isset($this->configuration['transports'][$id])) {
throw new \InvalidArgumentException('Unknown transport name: ' . $id);
}
if (! isset($this->retryStrategies[$id])) {
$strategyDefinition = array_merge(
$this->configuration['defaultRetryStrategyOptions'],
$this->configuration['transports'][$id]['retryStrategy'] ?? []
);
if ($strategyDefinition['service']) {
$this->retryStrategies[$id] = $this->objectManager->get($strategyDefinition['service']);
} else {
$this->retryStrategies[$id] = new MultiplierRetryStrategy(
$strategyDefinition['maxRetries'],
$strategyDefinition['delay'],
$strategyDefinition['multiplier'],
$strategyDefinition['maxDelay']
);
}
}
return $this->retryStrategies[$id];
}
public function has(string $id)
{
return isset($this->configuration['transports'][$id]);
}
}

View file

@ -0,0 +1,61 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Doctrine\DBAL\Driver\AbstractPostgreSQLDriver;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Neos\Flow\Annotations as Flow;
/**
* @Flow\Scope("singleton")
*/
class FlowDoctrineTransportFactory implements TransportFactoryInterface
{
private EntityManagerInterface $entityManager;
public function __construct(EntityManagerInterface $entityManager)
{
$this->entityManager = $entityManager;
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
$useNotify = ($options['use_notify'] ?? true);
unset($options['transport_name'], $options['use_notify']);
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver
// when LISTEN/NOTIFY isn't available
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
try {
$driverConnection = $this->entityManager->getConnection();
} catch (\InvalidArgumentException $e) {
throw new TransportException(sprintf(
'Could not find Doctrine connection from Messenger DSN "%s".',
$dsn
), 0, $e);
}
if ($useNotify && $driverConnection->getDriver() instanceof AbstractPostgreSQLDriver) {
$connection = new PostgreSqlConnection($configuration, $driverConnection);
} else {
$connection = new Connection($configuration, $driverConnection);
}
return new DoctrineTransport($connection, $serializer);
}
/**
* @inheritDoc
*/
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'flow-doctrine://');
}
}

View file

@ -0,0 +1,41 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\TransportInterface;
class NullTransport implements TransportInterface
{
/**
* @inheritDoc
*/
public function get(): iterable
{
return new \EmptyIterator();
}
/**
* @inheritDoc
*/
public function ack(Envelope $envelope): void
{
// do nothing
}
/**
* @inheritDoc
*/
public function reject(Envelope $envelope): void
{
// do nothing
}
/**
* @inheritDoc
*/
public function send(Envelope $envelope): Envelope
{
return $envelope;
}
}

View file

@ -0,0 +1,26 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
class NullTransportFactory implements TransportFactoryInterface
{
/**
* @inheritDoc
*/
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new NullTransport();
}
/**
* @inheritDoc
*/
public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'null://');
}
}

View file

@ -0,0 +1,76 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
/**
* @Flow\Scope("singleton")
*/
class TransportsContainer implements ContainerInterface
{
/**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration;
/**
* @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
/**
* @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:TransportFactory")
* @var TransportFactoryInterface
*/
protected $transportFactory;
/**
* @var TransportInterface[]
*/
protected array $transports;
public function get(string $id)
{
if (! isset($this->configuration['transports'][$id])) {
throw new \InvalidArgumentException('Unknown transport name: ' . $id);
}
if (! isset($this->transports[$id])) {
$transportDefinition = array_merge([
'dsn' => '',
'options' => [],
'serializer' => $this->configuration['defaultSerializerName'],
# TODO: Probably this has to be setup elsewhere, as the transport does not care by itself
'retry_strategy' => [ # TODO: Make the default configurable
'max_retries' => 3,
# milliseconds delay
'delay' => 1000,
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
'multiplier' => 2,
'max_delay' => 0,
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
'service' => null
]
], $this->configuration['transports'][$id]);
$this->transports[$id] = $this->transportFactory->createTransport(
$transportDefinition['dsn'],
$transportDefinition['options'],
$this->objectManager->get($transportDefinition['serializer'])
);
}
return $this->transports[$id];
}
public function has(string $id)
{
return isset($this->configuration['transports'][$id]);
}
}

View file

@ -0,0 +1,3 @@
DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal:
frontend: 'Neos\Cache\Frontend\VariableFrontend'
backend: 'Neos\Cache\Backend\FileBackend'

142
Configuration/Objects.yaml Normal file
View file

@ -0,0 +1,142 @@
DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus:
className: 'Symfony\Component\Messenger\RoutableMessageBus'
arguments:
1:
object: 'DigiComp\FlowSymfonyBridge\Messenger\MessageBusContainer'
2:
object: 'Symfony\Component\Messenger\MessageBusInterface'
Symfony\Component\Messenger\MessageBusInterface:
className: 'Symfony\Component\Messenger\MessageBus'
factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\MessageBusContainer'
factoryMethodName: 'get'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName'
DigiComp.FlowSymfonyBridge.Messenger:DefaultBusHandlersLocator:
className: 'Symfony\Component\Messenger\Handler\HandlersLocator'
factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\HandlersLocatorFactory'
factoryMethodName: 'create'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName'
DigiComp.FlowSymfonyBridge.Messenger:DefaultSendersLocator:
className: 'Symfony\Component\Messenger\Transport\Sender\SendersLocator'
arguments:
1:
# TODO: This would be the position were routes with bus specific routes could be merged
setting: 'DigiComp.FlowSymfonyBridge.Messenger.routing'
2:
object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer'
DigiComp.FlowSymfonyBridge.Messenger:DefaultAddBusNameStampMiddleware:
className: 'Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.defaultBusName'
DigiComp.FlowSymfonyBridge.Messenger:DefaultHandleMessageMiddleware:
className: 'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware'
arguments:
1:
object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultBusHandlersLocator'
DigiComp.FlowSymfonyBridge.Messenger:DefaultSendMessageMiddleware:
className: 'Symfony\Component\Messenger\Middleware\SendMessageMiddleware'
arguments:
1:
object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultSendersLocator'
2:
object: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'
DigiComp.FlowSymfonyBridge.Messenger:TransportFactory:
className: 'Symfony\Component\Messenger\Transport\TransportFactory'
scope: 'singleton'
arguments:
1:
object: 'DigiComp.FlowSymfonyBridge.Messenger:DefaultTransportFactories'
DigiComp.FlowSymfonyBridge.Messenger:DefaultTransportFactories:
className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\RewindableGenerator'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.transportFactories'
DigiComp.FlowSymfonyBridge.Messenger:SyncTransportFactory:
className: 'Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory'
arguments:
1:
object: 'DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus'
DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer:
className: 'Symfony\Component\Messenger\Transport\Serialization\PhpSerializer'
DigiComp.FlowSymfonyBridge.Messenger:SendersContainer:
className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\ChainedContainer'
scope: 'singleton'
arguments:
1:
object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer'
# TODO: add own senders here, which are no transports
DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer:
className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\ChainedContainer'
scope: 'singleton'
arguments:
1:
object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer'
# TODO: add own receivers here, which are no transports
DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher:
className: 'Symfony\Component\EventDispatcher\EventDispatcher'
scope: 'singleton'
factoryObjectName: 'DigiComp\FlowSymfonyBridge\Messenger\EventDispatcherFactory'
factoryMethodName: 'create'
DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCache:
className: 'Neos\Cache\Frontend\FrontendInterface'
factoryObjectName: 'Neos\Flow\Cache\CacheManager'
factoryMethodName: 'getCache'
arguments:
1:
value: 'DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal'
DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool:
className: 'Neos\Cache\Psr\Cache\CachePool'
scope: 'singleton'
arguments:
1:
value: 'DigiComp_FlowSymfony_Bridge_Messenger_RestartSignal'
2:
object:
factoryObjectName: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCache'
factoryMethodName: 'getBackend'
Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener:
scope: 'singleton'
arguments:
1:
object:
name: 'DigiComp.FlowSymfonyBridge.Messenger:SendersContainer'
2:
object: 'DigiComp\FlowSymfonyBridge\Messenger\RetryStrategiesContainer'
3:
object: 'Psr\Log\LoggerInterface'
4:
object:
name: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher'
Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener:
scope: 'singleton'
arguments:
1:
object:
factoryObjectName: 'DigiComp.FlowSymfonyBridge.Messenger:SendersContainer'
factoryMethodName: 'get'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.failureTransport'
2:
object: 'Psr\Log\LoggerInterface'

View file

@ -0,0 +1,59 @@
Neos:
Flow:
object:
includeClasses:
symfony.messenger:
- "Symfony\\\\Component\\\\Messenger\\\\EventListener\\\\.*"
DigiComp:
FlowSymfonyBridge:
Messenger:
defaultBusName: "default"
defaultSerializerName: "DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer"
# TODO: use this
defaultRetryStrategyOptions:
maxRetries: 3
# milliseconds delay
delay: 1000
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
maxDelay: 0
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
service: null
eventDispatcher:
subscribers:
Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener: true
Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener: true
Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener: true
DigiComp\FlowSymfonyBridge\Messenger\EventListener\StopWorkerOnRestartSignalListener: true
buses:
default:
middleware:
DigiComp.FlowSymfonyBridge.Messenger:DefaultAddBusNameStampMiddleware:
position: "start"
Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware: true
Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware: true
Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware: true
DigiComp.FlowSymfonyBridge.Messenger:DefaultSendMessageMiddleware:
position: "end"
DigiComp.FlowSymfonyBridge.Messenger:DefaultHandleMessageMiddleware:
position: "end"
transportFactories:
DigiComp\FlowSymfonyBridge\Messenger\Transport\NullTransportFactory: true
DigiComp.FlowSymfonyBridge.Messenger:SyncTransportFactory: true
Symfony\Component\Messenger\Transport\InMemoryTransportFactory: true
DigiComp\FlowSymfonyBridge\Messenger\Transport\FlowDoctrineTransportFactory: true
transports:
discard:
dsn: "null://"
failureTransport: "discard"
# TODO: Receivers and Senders? (As far as I can see not possible in Symfony)
# receivers:[]
# senders: []
routing: []

View file

@ -0,0 +1,36 @@
DigiComp:
FlowSymfonyBridge:
Messenger:
transports:
"test-in-memory-2":
dsn: "in-memory://"
"test-in-memory-1":
dsn: "in-memory://"
"test-doctrine":
dsn: "flow-doctrine://default?table_name=test_messenger_messages"
"test-sync":
dsn: "sync://"
"test-retry-doctrine":
dsn: "flow-doctrine://default?table_name=test_messenger_messages&queue_name=retry"
retryStrategy:
maxRetries: 1
# milliseconds delay
delay: 50
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
maxDelay: 0
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
"test-failed-doctrine":
dsn: "flow-doctrine://default?table_name=test_messenger_messages&queue_name=failed"
failureTransport: "test-failed-doctrine"
routing:
DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\TestMessage:
- "test-in-memory-1"
- "test-in-memory-2"
- "test-doctrine"
- "test-sync"
DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\FailingMessage:
- "test-retry-doctrine"

View file

@ -0,0 +1,92 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional;
use DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\FailingMessage;
use DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message\TestMessage;
use DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer;
use Neos\Flow\Tests\FunctionalTestCase;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
use Symfony\Component\Messenger\Worker;
class BusTest extends FunctionalTestCase
{
/**
* @test
*/
public function itSendsAsyncMessage()
{
$messageBus = $this->objectManager->get(MessageBusInterface::class);
$messageBus->dispatch(new TestMessage('Hallo Welt!'));
$sendersContainer = $this->objectManager->get(TransportsContainer::class);
/* @var InMemoryTransport $transport1 */
$transport1 = $sendersContainer->get('test-in-memory-1');
/* @var InMemoryTransport $transport2 */
$transport2 = $sendersContainer->get('test-in-memory-2');
/* @var DoctrineTransport $transport3 */
$transport3 = $sendersContainer->get('test-doctrine');
$this->assertInstanceOf(InMemoryTransport::class, $transport1);
$this->assertInstanceOf(InMemoryTransport::class, $transport2);
$this->assertInstanceOf(DoctrineTransport::class, $transport3);
$this->assertCount(1, $transport1->getSent());
$this->assertCount(1, $transport2->getSent());
$this->assertCount(1, $transport3->all());
$this->assertCount(0, $transport1->getAcknowledged());
$this->assertCount(0, $transport2->getAcknowledged());
$this->assertCount(1, $transport3->all());
$eventDispatcher = $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher');
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
foreach (['test-in-memory-1', 'test-in-memory-2', 'test-doctrine'] as $transportId) {
$worker = new Worker([$transportId => $sendersContainer->get($transportId)], $messageBus, $eventDispatcher);
$worker->run();
}
// TODO: Check for success on all workers - doctrine does not seem to get executed
$this->assertCount(1, $transport1->getAcknowledged());
$this->assertCount(1, $transport2->getAcknowledged());
$this->assertCount(0, $transport3->all());
}
/**
* @test
*/
public function itRetriesFailingMessages()
{
$messageBus = $this->objectManager->get(MessageBusInterface::class);
$messageBus->dispatch(new FailingMessage());
$sendersContainer = $this->objectManager->get(TransportsContainer::class);
/* @var DoctrineTransport $transport1 */
$transport1 = $sendersContainer->get('test-retry-doctrine');
/* @var DoctrineTransport $failedTransport */
$failedTransport = $sendersContainer->get('test-failed-doctrine');
$this->assertCount(1, $transport1->all());
$this->assertCount(0, $failedTransport->all());
$eventDispatcher = $this->objectManager->get('DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher');
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
$worker = new Worker(
['test-retry-doctrine' => $sendersContainer->get('test-retry-doctrine')],
$messageBus,
$eventDispatcher
);
$worker->run();
$this->assertCount(1, $transport1->all());
$this->assertCount(0, $failedTransport->all());
$worker = new Worker(
['test-retry-doctrine' => $sendersContainer->get('test-retry-doctrine')],
$messageBus,
$eventDispatcher
);
$worker->run();
$this->assertCount(0, $transport1->all());
$this->assertCount(1, $failedTransport->all());
}
}

View file

@ -0,0 +1,7 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message;
class FailingMessage
{
}

View file

@ -0,0 +1,18 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class FailingTestMessageHandler implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
yield FailingMessage::class => [];
}
public function __invoke(FailingMessage $message)
{
throw new \Exception('bang!');
}
}

View file

@ -0,0 +1,24 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message;
class TestMessage
{
protected string $message;
/**
* @param string $message
*/
public function __construct(string $message)
{
$this->message = $message;
}
/**
* @return string
*/
public function getMessage(): string
{
return $this->message;
}
}

View file

@ -0,0 +1,18 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class TestMessageHandler implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
yield TestMessage::class => [];
}
public function __invoke(TestMessage $message)
{
//do nothing for now
}
}

28
composer.json Normal file
View file

@ -0,0 +1,28 @@
{
"name": "digicomp/flow-symfony-bridge-messenger",
"type": "neos-package",
"description": "Flow dependency injection bridge to symfony/messenger",
"require": {
"neos/flow": "^6.3",
"symfony/doctrine-messenger": "^5.2.5",
"symfony/event-dispatcher": "^4.2 | ^5.2"
},
"autoload": {
"psr-4": {
"DigiComp\\FlowSymfonyBridge\\Messenger\\": "Classes/"
}
},
"autoload-dev": {
"psr-4": {
"DigiComp\\FlowSymfonyBridge\\Messenger\\Tests\\": "Tests/"
}
},
"extra": {
"branch-alias": {
"dev-master": "0.0.x-dev"
},
"neos": {
"package-key": "DigiComp.FlowSymfonyBridge.Messenger"
}
}
}