DigiComp.FlowSymfonyBridge..../Classes/Transport/FlowDoctrineTransportFactory.php

57 lines
2.2 KiB
PHP
Raw Permalink Normal View History

<?php
namespace DigiComp\FlowSymfonyBridge\Messenger\Transport;
use Doctrine\DBAL\Driver\AbstractPostgreSQLDriver;
use Doctrine\ORM\EntityManagerInterface;
2022-09-18 13:17:53 +02:00
use Neos\Flow\Annotations as Flow;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
2023-02-18 22:53:05 +01:00
#[Flow\Scope('singleton')]
class FlowDoctrineTransportFactory implements TransportFactoryInterface
{
private EntityManagerInterface $entityManager;
public function __construct(EntityManagerInterface $entityManager)
{
$this->entityManager = $entityManager;
}
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
$useNotify = ($options['use_notify'] ?? true);
unset($options['transport_name'], $options['use_notify']);
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver
// when LISTEN/NOTIFY isn't available
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
try {
$driverConnection = $this->entityManager->getConnection();
} catch (\InvalidArgumentException $e) {
2022-09-18 13:17:53 +02:00
throw new TransportException(\sprintf(
'Could not find Doctrine connection from Messenger DSN "%s".',
$dsn
), 0, $e);
}
if ($useNotify && $driverConnection->getDriver() instanceof AbstractPostgreSQLDriver) {
$connection = new PostgreSqlConnection($configuration, $driverConnection);
} else {
$connection = new Connection($configuration, $driverConnection);
}
return new DoctrineTransport($connection, $serializer);
}
public function supports(string $dsn, array $options): bool
{
2023-02-18 22:53:05 +01:00
return \str_starts_with($dsn, 'flow-doctrine://');
}
}