Compare commits

..

No commits in common. "main" and "0.0.1" have entirely different histories.
main ... 0.0.1

22 changed files with 197 additions and 260 deletions

View file

@ -1,8 +0,0 @@
pipeline:
code-style:
image: composer
commands:
- composer global config repositories.repo-name vcs https://git.digital-competence.de/Packages/php-codesniffer
- composer global config --no-plugins allow-plugins.dealerdirect/phpcodesniffer-composer-installer true
- composer global require digicomp/php-codesniffer:@dev
- composer global exec -- phpcs --runtime-set ignore_warnings_on_exit 1 --standard=DigiComp Classes/ Tests/

View file

@ -1,27 +0,0 @@
workspace:
base: /woodpecker
path: package
matrix:
include:
- FLOW_VERSION: 8.2
PHP_VERSION: 8.1
pipeline:
functional-tests:
image: "thecodingmachine/php:${PHP_VERSION}-v4-cli"
environment:
# Enable the PDO_SQLITE extension
- "PHP_EXTENSION_PDO_SQLITE=1"
- "FLOW_VERSION=${FLOW_VERSION}"
- "NEOS_BUILD_DIR=/woodpecker/Build-${FLOW_VERSION}"
commands:
- "sudo mkdir $NEOS_BUILD_DIR"
- "sudo chown -R docker:docker $NEOS_BUILD_DIR"
- "cd $NEOS_BUILD_DIR"
- "composer create-project --no-install neos/flow-base-distribution:^$FLOW_VERSION ."
- "composer config repositories.repo-name path /woodpecker/package"
- "composer config --no-plugins allow-plugins.neos/composer-plugin true"
- "composer remove --dev --no-update neos/behat || composer remove --no-update neos/behat"
- "composer require digicomp/flow-symfony-bridge-messenger:@dev"
- "bin/phpunit --configuration Build/BuildEssentials/PhpUnit/FunctionalTests.xml Packages/Application/DigiComp.FlowSymfonyBridge.Messenger/Tests/Functional"

View file

@ -14,10 +14,16 @@ class FailedCommandController extends CommandController
{ {
use RunSymfonyCommandTrait; use RunSymfonyCommandTrait;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer')] /**
protected ContainerInterface $receiverContainer; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
* @var ContainerInterface
*/
protected $receiverContainer;
#[Flow\InjectConfiguration] /**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration; protected array $configuration;
/** /**
@ -33,7 +39,7 @@ class FailedCommandController extends CommandController
* *
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask) * Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
*/ */
public function showCommand(): void public function showCommand()
{ {
$command = new FailedMessagesShowCommand( $command = new FailedMessagesShowCommand(
$this->configuration['failureTransport'], $this->configuration['failureTransport'],
@ -53,7 +59,7 @@ class FailedCommandController extends CommandController
* *
* Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask) * Optional arguments are -q (quiet) -v[v[v]] (verbosity) and --force (do not ask)
*/ */
public function removeCommand(): void public function removeCommand()
{ {
$command = new FailedMessagesRemoveCommand( $command = new FailedMessagesRemoveCommand(
$this->configuration['failureTransport'], $this->configuration['failureTransport'],
@ -81,7 +87,7 @@ class FailedCommandController extends CommandController
* *
* @noinspection PhpParamsInspection * @noinspection PhpParamsInspection
*/ */
public function retryCommand(): void public function retryCommand()
{ {
$command = new FailedMessagesRetryCommand( $command = new FailedMessagesRetryCommand(
$this->configuration['failureTransport'], $this->configuration['failureTransport'],

View file

@ -17,29 +17,47 @@ class MessengerCommandController extends CommandController
{ {
use RunSymfonyCommandTrait; use RunSymfonyCommandTrait;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus')] /**
protected RoutableMessageBus $routableBus; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RoutableMessageBus")
* @var RoutableMessageBus
*/
protected $routableBus;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer')] /**
protected ContainerInterface $receiverContainer; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer")
* @var ContainerInterface
*/
protected $receiverContainer;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher')] /**
protected EventDispatcherInterface $eventDispatcher; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher")
* @var EventDispatcherInterface
*/
protected $eventDispatcher;
#[Flow\Inject] /**
* @Flow\Inject(lazy=false)
* @var LoggerInterface
*/
protected LoggerInterface $logger; protected LoggerInterface $logger;
#[Flow\InjectConfiguration] /**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration; protected array $configuration;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool')] /**
protected CacheItemPoolInterface $restartSignalCachePool; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
* @var CacheItemPoolInterface
*/
protected $restartSignalCachePool;
/** /**
* Consumes messages and dispatches them to the message bus * Consumes messages and dispatches them to the message bus
* *
* To receive from multiple transports, pass each name: * To receive from multiple transports, pass each name:
* <info>messenger:consume receiver1 receiver2</info> * <info>worker:consume receiver1 receiver2</info>
* *
* Options are: * Options are:
* --limit limits the number of messages received * --limit limits the number of messages received
@ -53,7 +71,7 @@ class MessengerCommandController extends CommandController
* *
* Optional arguments are -q (quiet) and -v[v[v]] (verbosity) * Optional arguments are -q (quiet) and -v[v[v]] (verbosity)
*/ */
public function consumeCommand(): void public function consumeCommand()
{ {
if ($this->receiverContainer instanceof DependencyProxy) { if ($this->receiverContainer instanceof DependencyProxy) {
$this->receiverContainer->_activateDependency(); $this->receiverContainer->_activateDependency();
@ -74,7 +92,7 @@ class MessengerCommandController extends CommandController
/** /**
* List all available receivers * List all available receivers
*/ */
public function listReceiversCommand(): void public function listReceiversCommand()
{ {
foreach (\array_keys($this->configuration['transports']) as $transportName) { foreach (\array_keys($this->configuration['transports']) as $transportName) {
$this->outputLine('- ' . $transportName); $this->outputLine('- ' . $transportName);
@ -88,7 +106,7 @@ class MessengerCommandController extends CommandController
* and then exit. Worker commands are *not* automatically restarted: that * and then exit. Worker commands are *not* automatically restarted: that
* should be handled by a process control system. * should be handled by a process control system.
*/ */
public function stopWorkersCommand(): void public function stopWorkersCommand()
{ {
$cacheItem = $this->restartSignalCachePool->getItem( $cacheItem = $this->restartSignalCachePool->getItem(
StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY

View file

@ -10,7 +10,7 @@ use Symfony\Component\Console\Output\OutputInterface;
trait RunSymfonyCommandTrait trait RunSymfonyCommandTrait
{ {
protected function run(Command $command): void protected function run(Command $command)
{ {
$definition = $command->getDefinition(); $definition = $command->getDefinition();
$definition->setArguments(\array_merge( $definition->setArguments(\array_merge(
@ -29,7 +29,7 @@ trait RunSymfonyCommandTrait
$command->run($input, $this->output->getOutput()); $command->run($input, $this->output->getOutput());
} }
protected function configureIO($input, $output): void protected function configureIO($input, $output)
{ {
switch ($shellVerbosity = (int)\getenv('SHELL_VERBOSITY')) { switch ($shellVerbosity = (int)\getenv('SHELL_VERBOSITY')) {
case -1: case -1:

View file

@ -11,13 +11,19 @@ use Symfony\Component\EventDispatcher\EventSubscriberInterface;
// TODO: Maybe an own package for EntityManager bridge? // TODO: Maybe an own package for EntityManager bridge?
class EventDispatcherFactory class EventDispatcherFactory
{ {
#[Flow\Inject(lazy: false)] /**
protected ObjectManagerInterface $objectManager; * @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
#[Flow\InjectConfiguration] /**
protected array $configuration; * @Flow\InjectConfiguration
* @var array
*/
protected $configuration;
public function create(): EventDispatcher public function create()
{ {
$eventDispatcher = new EventDispatcher(); $eventDispatcher = new EventDispatcher();
@ -30,7 +36,7 @@ class EventDispatcherFactory
return $eventDispatcher; return $eventDispatcher;
} }
private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId): void private function addLazySubscribers(EventDispatcherInterface $eventDispatcher, $subscriberId)
{ {
$subscriberClass = $this->objectManager->getClassNameByObjectName($subscriberId); $subscriberClass = $this->objectManager->getClassNameByObjectName($subscriberId);
if (! \is_a($subscriberClass, EventSubscriberInterface::class, true)) { if (! \is_a($subscriberClass, EventSubscriberInterface::class, true)) {

View file

@ -11,19 +11,27 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent;
// This is a 1 to one copy of the original event listener, with a modified RESTART_REQUESTED_TIMESTAMP_KEY to match // This is a 1 to one copy of the original event listener, with a modified RESTART_REQUESTED_TIMESTAMP_KEY to match
// the restriction of the cache ids in flow. // the restriction of the cache ids in flow.
// Additionally, the DI is simplified // Also the DI is simplified
#[Flow\Scope('singleton')] /**
* @Flow\Scope("singleton")
*/
class StopWorkerOnRestartSignalListener implements EventSubscriberInterface class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
{ {
public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers_restart_requested_timestamp'; public const RESTART_REQUESTED_TIMESTAMP_KEY = 'workers_restart_requested_timestamp';
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool', lazy: false)] /**
protected CacheItemPoolInterface $cachePool; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:RestartSignalCachePool")
* @var CacheItemPoolInterface
*/
protected $cachePool;
#[Flow\Inject(lazy: false)] /**
protected LoggerInterface $logger; * @Flow\Inject
private float $workerStartedAt; * @var LoggerInterface
*/
protected $logger;
private $workerStartedAt;
public function onWorkerStarted(): void public function onWorkerStarted(): void
{ {
@ -34,11 +42,13 @@ class StopWorkerOnRestartSignalListener implements EventSubscriberInterface
{ {
if ($this->shouldRestart()) { if ($this->shouldRestart()) {
$event->getWorker()->stop(); $event->getWorker()->stop();
$this->logger->info('Worker stopped because a restart was requested.'); if (null !== $this->logger) {
$this->logger->info('Worker stopped because a restart was requested.');
}
} }
} }
public static function getSubscribedEvents(): array public static function getSubscribedEvents()
{ {
return [ return [
WorkerStartedEvent::class => 'onWorkerStarted', WorkerStartedEvent::class => 'onWorkerStarted',

View file

@ -5,23 +5,31 @@ namespace DigiComp\FlowSymfonyBridge\Messenger;
use Neos\Flow\Annotations as Flow; use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface; use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Neos\Flow\Reflection\ReflectionService; use Neos\Flow\Reflection\ReflectionService;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Handler\MessageSubscriberInterface; use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
class HandlersLocatorFactory class HandlersLocatorFactory
{ {
#[Flow\InjectConfiguration] /**
protected array $configuration; * @Flow\InjectConfiguration
* @var array
*/
protected $configuration;
#[Flow\Inject(lazy: false)] /**
protected ObjectManagerInterface $objectManager; * @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
#[Flow\Inject(lazy: false)] /**
protected ReflectionService $reflectionService; * @Flow\Inject
* @var ReflectionService
*/
protected $reflectionService;
public function create($busName = 'default'): HandlersLocator public function create($busName = 'default')
{ {
$messageHandlerClasses = $this->reflectionService $messageHandlerClasses = $this->reflectionService
->getAllImplementationClassNamesForInterface(MessageSubscriberInterface::class); ->getAllImplementationClassNamesForInterface(MessageSubscriberInterface::class);
@ -42,30 +50,6 @@ class HandlersLocatorFactory
); );
} }
} }
$asHandlerClasses = $this->reflectionService
->getClassNamesByAnnotation(AsMessageHandler::class);
foreach ($asHandlerClasses as $asHandlerClass) {
/** @var AsMessageHandler[] $annotations */
$annotations = $this->reflectionService->getClassAnnotations($asHandlerClass, AsMessageHandler::class);
foreach ($annotations as $annotation) {
$config['from_transport'] = $annotation->fromTransport;
$config['priority'] = $annotation->priority;
$method = $annotation->method ?? '__invoke';
$messageName = $annotation->handles;
if ($messageName === null) {
$arguments = $this->reflectionService->getMethodParameters($asHandlerClass, $method);
$messageName = $arguments[\array_key_first($arguments)]['class'];
}
if ($annotation->bus !== null && $annotation->bus !== $busName) {
continue;
}
$handler = $this->objectManager->get($asHandlerClass);
$handlerDescriptors[$messageName][] = new HandlerDescriptor(
$this->objectManager->get($asHandlerClass),
$config
);
}
}
// TODO: Maybe we can allow handlers to be added to bus or globally by configuration? // TODO: Maybe we can allow handlers to be added to bus or globally by configuration?
return new HandlersLocator($handlerDescriptors); return new HandlersLocator($handlerDescriptors);

View file

@ -7,17 +7,25 @@ use Neos\Flow\Annotations as Flow;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBus;
#[Flow\Scope('singleton')] /**
* @Flow\Scope("singleton")
*/
class MessageBusContainer implements ContainerInterface class MessageBusContainer implements ContainerInterface
{ {
#[Flow\InjectConfiguration(path: 'buses')] /**
protected array $configuration; * @Flow\InjectConfiguration(path="buses")
* @var array
*/
protected $configuration;
/** /**
* @var MessageBus[] * @var MessageBus[]
*/ */
protected array $buses = []; protected array $buses = [];
/**
* @inheritDoc
*/
public function get(string $id) public function get(string $id)
{ {
if (! isset($this->buses[$id])) { if (! isset($this->buses[$id])) {
@ -27,7 +35,10 @@ class MessageBusContainer implements ContainerInterface
return $this->buses[$id]; return $this->buses[$id];
} }
public function has(string $id): bool /**
* @inheritDoc
*/
public function has(string $id)
{ {
return isset($this->configuration[$id]); return isset($this->configuration[$id]);
} }

View file

@ -23,7 +23,7 @@ class ChainedContainer implements ContainerInterface
throw new \InvalidArgumentException('Service id is unknown: ' . $id); throw new \InvalidArgumentException('Service id is unknown: ' . $id);
} }
public function has(string $id): bool public function has(string $id)
{ {
foreach ($this->childContainers as $childContainer) { foreach ($this->childContainers as $childContainer) {
if ($childContainer->has($id)) { if ($childContainer->has($id)) {

View file

@ -15,8 +15,11 @@ use Psr\Log\LoggerInterface;
*/ */
class RewindableGenerator implements \IteratorAggregate, \Countable class RewindableGenerator implements \IteratorAggregate, \Countable
{ {
#[Flow\Inject(lazy: false)] /**
protected ObjectManagerInterface $objectManager; * @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
private array $serviceIds; private array $serviceIds;
@ -43,14 +46,14 @@ class RewindableGenerator implements \IteratorAggregate, \Countable
}; };
} }
public function getIterator(): \Traversable public function getIterator()
{ {
$g = $this->generator; $g = $this->generator;
return $g(); return $g();
} }
public function count(): int public function count()
{ {
return \count($this->serviceIds); return \count($this->serviceIds);
} }

View file

@ -8,14 +8,22 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
#[Flow\Scope('singleton')] /**
* @Flow\Scope("singleton")
*/
class RetryStrategiesContainer implements ContainerInterface class RetryStrategiesContainer implements ContainerInterface
{ {
#[Flow\InjectConfiguration] /**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration; protected array $configuration;
#[Flow\Inject] /**
protected ObjectManagerInterface $objectManager; * @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
/** /**
* @var RetryStrategyInterface[] * @var RetryStrategyInterface[]
@ -46,7 +54,7 @@ class RetryStrategiesContainer implements ContainerInterface
return $this->retryStrategies[$id]; return $this->retryStrategies[$id];
} }
public function has(string $id): bool public function has(string $id)
{ {
return isset($this->configuration['transports'][$id]); return isset($this->configuration['transports'][$id]);
} }

View file

@ -1,31 +0,0 @@
<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Neos\Flow\Annotations as Flow;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
#[Flow\Scope('singleton')]
class FailureTransportContainer implements ContainerInterface
{
/**
* @var TransportInterface[]
*/
protected array $transports;
public function get(string $id)
{
return $this->transports[$id];
}
public function has(string $id): bool
{
return isset($this->transports[$id]);
}
public function set(string $id, TransportInterface $transport): void
{
$this->transports[$id] = $transport;
}
}

View file

@ -13,7 +13,9 @@ use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
#[Flow\Scope('singleton')] /**
* @Flow\Scope("singleton")
*/
class FlowDoctrineTransportFactory implements TransportFactoryInterface class FlowDoctrineTransportFactory implements TransportFactoryInterface
{ {
private EntityManagerInterface $entityManager; private EntityManagerInterface $entityManager;
@ -49,8 +51,11 @@ class FlowDoctrineTransportFactory implements TransportFactoryInterface
return new DoctrineTransport($connection, $serializer); return new DoctrineTransport($connection, $serializer);
} }
/**
* @inheritDoc
*/
public function supports(string $dsn, array $options): bool public function supports(string $dsn, array $options): bool
{ {
return \str_starts_with($dsn, 'flow-doctrine://'); return 0 === \strpos($dsn, 'flow-doctrine://');
} }
} }

View file

@ -7,21 +7,33 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
class NullTransport implements TransportInterface class NullTransport implements TransportInterface
{ {
/**
* @inheritDoc
*/
public function get(): iterable public function get(): iterable
{ {
return new \EmptyIterator(); return new \EmptyIterator();
} }
/**
* @inheritDoc
*/
public function ack(Envelope $envelope): void public function ack(Envelope $envelope): void
{ {
// do nothing // do nothing
} }
/**
* @inheritDoc
*/
public function reject(Envelope $envelope): void public function reject(Envelope $envelope): void
{ {
// do nothing // do nothing
} }
/**
* @inheritDoc
*/
public function send(Envelope $envelope): Envelope public function send(Envelope $envelope): Envelope
{ {
return $envelope; return $envelope;

View file

@ -8,13 +8,19 @@ use Symfony\Component\Messenger\Transport\TransportInterface;
class NullTransportFactory implements TransportFactoryInterface class NullTransportFactory implements TransportFactoryInterface
{ {
/**
* @inheritDoc
*/
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{ {
return new NullTransport(); return new NullTransport();
} }
/**
* @inheritDoc
*/
public function supports(string $dsn, array $options): bool public function supports(string $dsn, array $options): bool
{ {
return \str_starts_with($dsn, 'null://'); return 0 === \strpos($dsn, 'null://');
} }
} }

View file

@ -8,20 +8,28 @@ use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Transport\TransportInterface;
#[Flow\Scope('singleton')] /**
* @Flow\Scope("singleton")
*/
class TransportsContainer implements ContainerInterface class TransportsContainer implements ContainerInterface
{ {
#[Flow\InjectConfiguration] /**
* @Flow\InjectConfiguration
* @var array
*/
protected array $configuration; protected array $configuration;
#[Flow\Inject(lazy: false)] /**
protected ObjectManagerInterface $objectManager; * @Flow\Inject
* @var ObjectManagerInterface
*/
protected $objectManager;
#[Flow\Inject(name: 'DigiComp.FlowSymfonyBridge.Messenger:TransportFactory', lazy: false)] /**
protected TransportFactoryInterface $transportFactory; * @Flow\Inject(name="DigiComp.FlowSymfonyBridge.Messenger:TransportFactory")
* @var TransportFactoryInterface
#[Flow\Inject(lazy: false)] */
protected FailureTransportContainer $failureTransports; protected $transportFactory;
/** /**
* @var TransportInterface[] * @var TransportInterface[]
@ -57,16 +65,11 @@ class TransportsContainer implements ContainerInterface
$transportDefinition['options'], $transportDefinition['options'],
$this->objectManager->get($transportDefinition['serializer']) $this->objectManager->get($transportDefinition['serializer'])
); );
if (isset($transportDefinition['failureTransport'])) {
$this->failureTransports->set($id, $this->get($transportDefinition['failureTransport']));
} elseif (isset($this->configuration['failureTransport'])) {
$this->failureTransports->set($id, $this->get($this->configuration['failureTransport']));
}
} }
return $this->transports[$id]; return $this->transports[$id];
} }
public function has(string $id): bool public function has(string $id)
{ {
return isset($this->configuration['transports'][$id]); return isset($this->configuration['transports'][$id]);
} }

View file

@ -89,13 +89,6 @@ DigiComp.FlowSymfonyBridge.Messenger:ReceiversContainer:
object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer' object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\TransportsContainer'
# TODO: add own receivers here, which are no transports # TODO: add own receivers here, which are no transports
DigiComp.FlowSymfonyBridge.Messenger:FailureSenderContainer:
className: 'DigiComp\FlowSymfonyBridge\Messenger\ObjectManagement\ChainedContainer'
scope: 'singleton'
arguments:
1:
object: 'DigiComp\FlowSymfonyBridge\Messenger\Transport\FailureTransportContainer'
DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher: DigiComp.FlowSymfonyBridge.Messenger:EventDispatcher:
className: 'Symfony\Component\EventDispatcher\EventDispatcher' className: 'Symfony\Component\EventDispatcher\EventDispatcher'
scope: 'singleton' scope: 'singleton'
@ -140,6 +133,10 @@ Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportLis
arguments: arguments:
1: 1:
object: object:
name: 'DigiComp.FlowSymfonyBridge.Messenger:FailureSenderContainer' factoryObjectName: 'DigiComp.FlowSymfonyBridge.Messenger:SendersContainer'
factoryMethodName: 'get'
arguments:
1:
setting: 'DigiComp.FlowSymfonyBridge.Messenger.failureTransport'
2: 2:
object: 'Psr\Log\LoggerInterface' object: 'Psr\Log\LoggerInterface'

View file

@ -10,6 +10,7 @@ DigiComp:
Messenger: Messenger:
defaultBusName: "default" defaultBusName: "default"
defaultSerializerName: "DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer" defaultSerializerName: "DigiComp.FlowSymfonyBridge.Messenger:DefaultSerializer"
# TODO: use this
defaultRetryStrategyOptions: defaultRetryStrategyOptions:
maxRetries: 3 maxRetries: 3
# milliseconds delay # milliseconds delay
@ -18,8 +19,8 @@ DigiComp:
# e.g. 1 second delay, 2 seconds, 4 seconds # e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2 multiplier: 2
maxDelay: 0 maxDelay: 0
# override all of this with a service that implements # override all of this with a service that
# Symfony\Component\Messenger\Retry\RetryStrategyInterface # implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
service: null service: null
eventDispatcher: eventDispatcher:
@ -54,4 +55,7 @@ DigiComp:
dsn: "null://" dsn: "null://"
failureTransport: "discard" failureTransport: "discard"
# TODO: Receivers and Senders? (As far as I can see not possible in Symfony)
# receivers:[]
# senders: []
routing: [] routing: []

View file

@ -1,58 +0,0 @@
# DigiComp.FlowSymfonyBridge.Messenger
![Build status](https://ci.digital-competence.de/api/badges/Packages/DigiComp.FlowSymfonyBridge.Messenger/status.svg)
This packages brings a DI configuration for the `symfony/messenger` component, so it can be used easily in `neos/flow` projects.
To see how to use it, you probably want to have a look at the [documentation](https://symfony.com/doc/current/messenger.html) of `symfony/messenger`.
## Getting started
To get it integrated, you all need to do is to get message bus injected:
```php
#[Flow\Inject]
protected MessageBusInterface $messageBus;
```
And later in your method:
```php
$this->messageBus->dispatch(new CustomMessage())
```
You should configure a routing, to let the messenger know, over which transport your message should be handled:
```yaml
DigiComp:
FlowSymfonyBridge:
Messenger:
transports:
"custom-messages":
dsn: "flow-doctrine://default?table_name=test_messenger_messages"
routing:
Acme\Vendor\Messenger\CustomMessage:
- "custom-messages"
```
In this example we are using a doctrine transport (the speciality "flow-transport" is a transport which uses the already existing connection to doctrine instead of creating a new one - for the rest of the DSN-Format have a look in the documentation of `symfony/messenger`)
A handler for your CustomMessage could look like this:
```php
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class CustomMessageHandler
{
public function __invoke(CustomMessage $message)
{
//your code here
}
}
```
It will be automatically found by Flow // the messenger and messages arriving at the bus will be handled by your handler.
Probably you'll want to consume the messengers with long living processes or as a cronjob. The Flow command for that task is `messenger:consume` (more help available)

View file

@ -2,11 +2,15 @@
namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message; namespace DigiComp\FlowSymfonyBridge\Messenger\Tests\Functional\Fixtures\Message;
use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
#[AsMessageHandler] class TestMessageHandler implements MessageSubscriberInterface
class TestMessageHandler
{ {
public static function getHandledMessages(): iterable
{
yield TestMessage::class => [];
}
public function __invoke(TestMessage $message) public function __invoke(TestMessage $message)
{ {
//do nothing for now //do nothing for now

View file

@ -1,12 +1,12 @@
{ {
"name": "digicomp/flow-symfony-bridge-messenger", "name": "digicomp/flow-symfony-bridge-messenger",
"type": "neos-package", "type": "neos-package",
"license": "MIT",
"description": "Flow dependency injection bridge to symfony/messenger", "description": "Flow dependency injection bridge to symfony/messenger",
"require": { "require": {
"php": "^8.1", "neos/flow": "^6.3",
"neos/flow": "^8.0", "symfony/doctrine-messenger": "^5.2.5",
"symfony/doctrine-messenger": "^6.2", "symfony/event-dispatcher": "^4.2 | ^5.2"
"symfony/event-dispatcher": "^4.2 | ^5.2 | ^6.2"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {
@ -20,26 +20,10 @@
}, },
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.1.x-dev" "dev-master": "0.0.x-dev"
}, },
"neos": { "neos": {
"package-key": "DigiComp.FlowSymfonyBridge.Messenger" "package-key": "DigiComp.FlowSymfonyBridge.Messenger"
} }
}, }
"authors": [
{
"name": "Ferdinand Kuhl",
"email": "f.kuhl@digital-competence.de",
"homepage": "https://www.digital-competence.de",
"role": "Developer"
}
],
"license": "MIT",
"homepage": "https://github.com/digital-competence/DigiComp.FlowSymfonyBridge.Messenger",
"keywords": [
"Neos",
"Flow",
"symfony",
"messenger"
]
} }