vendor/php-amqplib/rabbitmq-bundle/DependencyInjection/OldSoundRabbitMqExtension.php line 74

Open in your IDE?
  1. <?php
  2. namespace OldSound\RabbitMqBundle\DependencyInjection;
  3. use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
  4. use Symfony\Component\DependencyInjection\ContainerInterface;
  5. use Symfony\Component\DependencyInjection\Extension\Extension;
  6. use Symfony\Component\DependencyInjection\ContainerBuilder;
  7. use Symfony\Component\DependencyInjection\Definition;
  8. use Symfony\Component\DependencyInjection\Reference;
  9. use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
  10. use Symfony\Component\Config\FileLocator;
  11. /**
  12.  * OldSoundRabbitMqExtension.
  13.  *
  14.  * @author Alvaro Videla
  15.  * @author Marc Weistroff <marc.weistroff@sensio.com>
  16.  */
  17. class OldSoundRabbitMqExtension extends Extension
  18. {
  19.     /**
  20.      * @var ContainerBuilder
  21.      */
  22.     private $container;
  23.     /**
  24.      * @var Boolean Whether the data collector is enabled
  25.      */
  26.     private $collectorEnabled;
  27.     private $channelIds = array();
  28.     private $config = array();
  29.     public function load(array $configsContainerBuilder $container)
  30.     {
  31.         $this->container $container;
  32.         $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ '/../Resources/config')));
  33.         $loader->load('rabbitmq.xml');
  34.         $configuration $this->getConfiguration($configs$container);
  35.         $this->config $this->processConfiguration($configuration$configs);
  36.         $this->collectorEnabled $this->config['enable_collector'];
  37.         $this->loadConnections();
  38.         $this->loadBindings();
  39.         $this->loadProducers();
  40.         $this->loadConsumers();
  41.         $this->loadMultipleConsumers();
  42.         $this->loadDynamicConsumers();
  43.         $this->loadBatchConsumers();
  44.         $this->loadAnonConsumers();
  45.         $this->loadRpcClients();
  46.         $this->loadRpcServers();
  47.         if ($this->collectorEnabled && $this->channelIds) {
  48.             $channels = array();
  49.             foreach (array_unique($this->channelIds) as $id) {
  50.                 $channels[] = new Reference($id);
  51.             }
  52.             $definition $container->getDefinition('old_sound_rabbit_mq.data_collector');
  53.             $definition->replaceArgument(0$channels);
  54.         } else {
  55.             $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
  56.         }
  57.     }
  58.     public function getConfiguration(array $configContainerBuilder $container)
  59.     {
  60.         return new Configuration($this->getAlias());
  61.     }
  62.     protected function loadConnections()
  63.     {
  64.         foreach ($this->config['connections'] as $key => $connection) {
  65.             $connectionSuffix $connection['use_socket'] ? 'socket_connection.class' 'connection.class';
  66.             $classParam =
  67.                 $connection['lazy']
  68.                     ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
  69.                     '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
  70.             $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
  71.                 $classParam$connection,
  72.             ));
  73.             if (isset($connection['connection_parameters_provider'])) {
  74.                 $definition->addArgument(new Reference($connection['connection_parameters_provider']));
  75.                 unset($connection['connection_parameters_provider']);
  76.             }
  77.             $definition->setPublic(false);
  78.             $factoryName sprintf('old_sound_rabbit_mq.connection_factory.%s'$key);
  79.             $this->container->setDefinition($factoryName$definition);
  80.             $definition = new Definition($classParam);
  81.             if (method_exists($definition'setFactory')) {
  82.                 // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
  83.                 $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
  84.             } else {
  85.                 // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
  86.                 $definition->setFactoryService($factoryName);
  87.                 $definition->setFactoryMethod('createConnection');
  88.             }
  89.             $definition->addTag('old_sound_rabbit_mq.connection');
  90.             $definition->setPublic(true);
  91.             $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s'$key), $definition);
  92.         }
  93.     }
  94.     protected function loadBindings()
  95.     {
  96.         if ($this->config['sandbox']) {
  97.             return;
  98.         }
  99.         foreach ($this->config['bindings'] as $binding) {
  100.             ksort($binding);
  101.             $definition = new Definition($binding['class']);
  102.             $definition->addTag('old_sound_rabbit_mq.binding');
  103.             $definition->addMethodCall('setArguments', array($binding['arguments']));
  104.             $definition->addMethodCall('setDestination', array($binding['destination']));
  105.             $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
  106.             $definition->addMethodCall('setExchange', array($binding['exchange']));
  107.             $definition->addMethodCall('isNowait', array($binding['nowait']));
  108.             $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
  109.             $this->injectConnection($definition$binding['connection']);
  110.             $key md5(json_encode($binding));
  111.             if ($this->collectorEnabled) {
  112.                 // in the context of a binding, I don't thing logged channels are needed?
  113.                 $this->injectLoggedChannel($definition$key$binding['connection']);
  114.             }
  115.             $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s'$key), $definition);
  116.         }
  117.     }
  118.     protected function loadProducers()
  119.     {
  120.         if ($this->config['sandbox'] == false) {
  121.             foreach ($this->config['producers'] as $key => $producer) {
  122.                 $definition = new Definition($producer['class']);
  123.                 $definition->setPublic(true);
  124.                 $definition->addTag('old_sound_rabbit_mq.base_amqp');
  125.                 $definition->addTag('old_sound_rabbit_mq.producer');
  126.                 //this producer doesn't define an exchange -> using AMQP Default
  127.                 if (!isset($producer['exchange_options'])) {
  128.                     $producer['exchange_options'] = $this->getDefaultExchangeOptions();
  129.                 }
  130.                 $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
  131.                 //this producer doesn't define a queue -> using AMQP Default
  132.                 if (!isset($producer['queue_options'])) {
  133.                     $producer['queue_options'] = $this->getDefaultQueueOptions();
  134.                 }
  135.                 $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
  136.                 $this->injectConnection($definition$producer['connection']);
  137.                 if ($this->collectorEnabled) {
  138.                     $this->injectLoggedChannel($definition$key$producer['connection']);
  139.                 }
  140.                 if (!$producer['auto_setup_fabric']) {
  141.                     $definition->addMethodCall('disableAutoSetupFabric');
  142.                 }
  143.                 if ($producer['enable_logger']) {
  144.                     $this->injectLogger($definition);
  145.                 }
  146.                 $producerServiceName sprintf('old_sound_rabbit_mq.%s_producer'$key);
  147.                 $this->container->setDefinition($producerServiceName$definition);
  148.                 if (null !== $producer['service_alias']) {
  149.                     $this->container->setAlias($producer['service_alias'], $producerServiceName);
  150.                 }
  151.             }
  152.         } else {
  153.             foreach ($this->config['producers'] as $key => $producer) {
  154.                 $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
  155.                 $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer'$key), $definition);
  156.             }
  157.         }
  158.     }
  159.     protected function loadConsumers()
  160.     {
  161.         foreach ($this->config['consumers'] as $key => $consumer) {
  162.             $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
  163.             $definition->setPublic(true);
  164.             $definition->addTag('old_sound_rabbit_mq.base_amqp');
  165.             $definition->addTag('old_sound_rabbit_mq.consumer');
  166.             //this consumer doesn't define an exchange -> using AMQP Default
  167.             if (!isset($consumer['exchange_options'])) {
  168.                 $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
  169.             }
  170.             $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
  171.             //this consumer doesn't define a queue -> using AMQP Default
  172.             if (!isset($consumer['queue_options'])) {
  173.                 $consumer['queue_options'] = $this->getDefaultQueueOptions();
  174.             }
  175.             $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
  176.             $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
  177.             if (array_key_exists('qos_options'$consumer)) {
  178.                 $definition->addMethodCall('setQosOptions', array(
  179.                     $consumer['qos_options']['prefetch_size'],
  180.                     $consumer['qos_options']['prefetch_count'],
  181.                     $consumer['qos_options']['global']
  182.                 ));
  183.             }
  184.             if (isset($consumer['idle_timeout'])) {
  185.                 $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
  186.             }
  187.             if (isset($consumer['idle_timeout_exit_code'])) {
  188.                 $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
  189.             }
  190.             if (isset($consumer['timeout_wait'])) {
  191.                 $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
  192.             }
  193.             if (isset($consumer['graceful_max_execution'])) {
  194.                 $definition->addMethodCall(
  195.                     'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
  196.                     array($consumer['graceful_max_execution']['timeout'])
  197.                 );
  198.                 $definition->addMethodCall(
  199.                     'setGracefulMaxExecutionTimeoutExitCode',
  200.                     array($consumer['graceful_max_execution']['exit_code'])
  201.                 );
  202.             }
  203.             if (!$consumer['auto_setup_fabric']) {
  204.                 $definition->addMethodCall('disableAutoSetupFabric');
  205.             }
  206.             $this->injectConnection($definition$consumer['connection']);
  207.             if ($this->collectorEnabled) {
  208.                 $this->injectLoggedChannel($definition$key$consumer['connection']);
  209.             }
  210.             if ($consumer['enable_logger']) {
  211.                 $this->injectLogger($definition);
  212.             }
  213.             $name sprintf('old_sound_rabbit_mq.%s_consumer'$key);
  214.             $this->container->setDefinition($name$definition);
  215.             $this->addDequeuerAwareCall($consumer['callback'], $name);
  216.         }
  217.     }
  218.     protected function loadMultipleConsumers()
  219.     {
  220.         foreach ($this->config['multiple_consumers'] as $key => $consumer) {
  221.             $queues = array();
  222.             $callbacks = array();
  223.             if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
  224.                 throw new InvalidConfigurationException(
  225.                     "Error on loading $key multiple consumer. " .
  226.                     "Either 'queues' or 'queues_provider' parameters should be defined."
  227.                 );
  228.             }
  229.             foreach ($consumer['queues'] as $queueName => $queueOptions) {
  230.                 $queues[$queueOptions['name']] = $queueOptions;
  231.                 $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
  232.                 $callbacks[] = $queueOptions['callback'];
  233.             }
  234.             $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
  235.             $definition
  236.                 ->setPublic(true)
  237.                 ->addTag('old_sound_rabbit_mq.base_amqp')
  238.                 ->addTag('old_sound_rabbit_mq.multi_consumer')
  239.                 ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
  240.                 ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
  241.             if ($consumer['queues_provider']) {
  242.                 $definition->addMethodCall(
  243.                     'setQueuesProvider',
  244.                     array(new Reference($consumer['queues_provider']))
  245.                 );
  246.             }
  247.             if (array_key_exists('qos_options'$consumer)) {
  248.                 $definition->addMethodCall('setQosOptions', array(
  249.                     $consumer['qos_options']['prefetch_size'],
  250.                     $consumer['qos_options']['prefetch_count'],
  251.                     $consumer['qos_options']['global']
  252.                 ));
  253.             }
  254.             if (isset($consumer['idle_timeout'])) {
  255.                 $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
  256.             }
  257.             if (isset($consumer['idle_timeout_exit_code'])) {
  258.                 $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
  259.             }
  260.             if (isset($consumer['timeout_wait'])) {
  261.                 $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
  262.             }
  263.             if (isset($consumer['graceful_max_execution'])) {
  264.                 $definition->addMethodCall(
  265.                     'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
  266.                     array($consumer['graceful_max_execution']['timeout'])
  267.                 );
  268.                 $definition->addMethodCall(
  269.                     'setGracefulMaxExecutionTimeoutExitCode',
  270.                     array($consumer['graceful_max_execution']['exit_code'])
  271.                 );
  272.             }
  273.             if (!$consumer['auto_setup_fabric']) {
  274.                 $definition->addMethodCall('disableAutoSetupFabric');
  275.             }
  276.             $this->injectConnection($definition$consumer['connection']);
  277.             if ($this->collectorEnabled) {
  278.                 $this->injectLoggedChannel($definition$key$consumer['connection']);
  279.             }
  280.             if ($consumer['enable_logger']) {
  281.                 $this->injectLogger($definition);
  282.             }
  283.             $name sprintf('old_sound_rabbit_mq.%s_multiple'$key);
  284.             $this->container->setDefinition($name$definition);
  285.             if ($consumer['queues_provider']) {
  286.                 $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
  287.             }
  288.             foreach ($callbacks as $callback) {
  289.                 $this->addDequeuerAwareCall($callback$name);
  290.             }
  291.         }
  292.     }
  293.     protected function loadDynamicConsumers()
  294.     {
  295.         foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
  296.             if (empty($consumer['queue_options_provider'])) {
  297.                 throw new InvalidConfigurationException(
  298.                     "Error on loading $key dynamic consumer. " .
  299.                     "'queue_provider' parameter should be defined."
  300.                 );
  301.             }
  302.             $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
  303.             $definition
  304.                 ->setPublic(true)
  305.                 ->addTag('old_sound_rabbit_mq.base_amqp')
  306.                 ->addTag('old_sound_rabbit_mq.consumer')
  307.                 ->addTag('old_sound_rabbit_mq.dynamic_consumer')
  308.                 ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
  309.                 ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
  310.             if (array_key_exists('qos_options'$consumer)) {
  311.                 $definition->addMethodCall('setQosOptions', array(
  312.                     $consumer['qos_options']['prefetch_size'],
  313.                     $consumer['qos_options']['prefetch_count'],
  314.                     $consumer['qos_options']['global']
  315.                 ));
  316.             }
  317.             $definition->addMethodCall(
  318.                 'setQueueOptionsProvider',
  319.                 array(new Reference($consumer['queue_options_provider']))
  320.             );
  321.             if (isset($consumer['idle_timeout'])) {
  322.                 $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
  323.             }
  324.             if (isset($consumer['idle_timeout_exit_code'])) {
  325.                 $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
  326.             }
  327.             if (isset($consumer['timeout_wait'])) {
  328.                 $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
  329.             }
  330.             if (isset($consumer['graceful_max_execution'])) {
  331.                 $definition->addMethodCall(
  332.                     'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
  333.                     array($consumer['graceful_max_execution']['timeout'])
  334.                 );
  335.                 $definition->addMethodCall(
  336.                     'setGracefulMaxExecutionTimeoutExitCode',
  337.                     array($consumer['graceful_max_execution']['exit_code'])
  338.                 );
  339.             }
  340.             if (!$consumer['auto_setup_fabric']) {
  341.                 $definition->addMethodCall('disableAutoSetupFabric');
  342.             }
  343.             $this->injectConnection($definition$consumer['connection']);
  344.             if ($this->collectorEnabled) {
  345.                 $this->injectLoggedChannel($definition$key$consumer['connection']);
  346.             }
  347.             if ($consumer['enable_logger']) {
  348.                 $this->injectLogger($definition);
  349.             }
  350.             $name sprintf('old_sound_rabbit_mq.%s_dynamic'$key);
  351.             $this->container->setDefinition($name$definition);
  352.             $this->addDequeuerAwareCall($consumer['callback'], $name);
  353.             $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
  354.         }
  355.     }
  356.     protected function loadBatchConsumers()
  357.     {
  358.         foreach ($this->config['batch_consumers'] as $key => $consumer) {
  359.             $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
  360.             if (!isset($consumer['exchange_options'])) {
  361.                 $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
  362.             }
  363.             $definition
  364.                 ->setPublic(true)
  365.                 ->addTag('old_sound_rabbit_mq.base_amqp')
  366.                 ->addTag('old_sound_rabbit_mq.batch_consumer')
  367.                 ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
  368.                 ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
  369.                 ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
  370.                 ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
  371.                 ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
  372.                 ->addMethodCall('setQosOptions', array(
  373.                     $consumer['qos_options']['prefetch_size'],
  374.                     $consumer['qos_options']['prefetch_count'],
  375.                     $consumer['qos_options']['global']
  376.                 ))
  377.             ;
  378.             if (isset($consumer['idle_timeout_exit_code'])) {
  379.                 $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
  380.             }
  381.             if (isset($consumer['idle_timeout'])) {
  382.                 $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
  383.             }
  384.             if (isset($consumer['graceful_max_execution'])) {
  385.                 $definition->addMethodCall(
  386.                     'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
  387.                     array($consumer['graceful_max_execution']['timeout'])
  388.                 );
  389.             }
  390.             if (!$consumer['auto_setup_fabric']) {
  391.                 $definition->addMethodCall('disableAutoSetupFabric');
  392.             }
  393.             if ($consumer['keep_alive']) {
  394.                 $definition->addMethodCall('keepAlive');
  395.             }
  396.             $this->injectConnection($definition$consumer['connection']);
  397.             if ($this->collectorEnabled) {
  398.                 $this->injectLoggedChannel($definition$key$consumer['connection']);
  399.             }
  400.             if ($consumer['enable_logger']) {
  401.                 $this->injectLogger($definition);
  402.             }
  403.             $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch'$key), $definition);
  404.         }
  405.     }
  406.     protected function loadAnonConsumers()
  407.     {
  408.         foreach ($this->config['anon_consumers'] as $key => $anon) {
  409.             $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
  410.             $definition
  411.                 ->setPublic(true)
  412.                 ->addTag('old_sound_rabbit_mq.base_amqp')
  413.                 ->addTag('old_sound_rabbit_mq.anon_consumer')
  414.                 ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
  415.                 ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
  416.             $this->injectConnection($definition$anon['connection']);
  417.             if ($this->collectorEnabled) {
  418.                 $this->injectLoggedChannel($definition$key$anon['connection']);
  419.             }
  420.             $name sprintf('old_sound_rabbit_mq.%s_anon'$key);
  421.             $this->container->setDefinition($name$definition);
  422.             $this->addDequeuerAwareCall($anon['callback'], $name);
  423.         }
  424.     }
  425.     /**
  426.      * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
  427.      * parameter. So we revert the change for right configurations.
  428.      *
  429.      * @param array $config
  430.      *
  431.      * @return array
  432.      */
  433.     private function normalizeArgumentKeys(array $config)
  434.     {
  435.         if (isset($config['arguments'])) {
  436.             $arguments $config['arguments'];
  437.             // support for old configuration
  438.             if (is_string($arguments)) {
  439.                 $arguments $this->argumentsStringAsArray($arguments);
  440.             }
  441.             $newArguments = array();
  442.             foreach ($arguments as $key => $value) {
  443.                 if (strstr($key'_')) {
  444.                     $key str_replace('_''-'$key);
  445.                 }
  446.                 $newArguments[$key] = $value;
  447.             }
  448.             $config['arguments'] = $newArguments;
  449.         }
  450.         return $config;
  451.     }
  452.     /**
  453.      * Support for arguments provided as string. Support for old configuration files.
  454.      *
  455.      * @deprecated
  456.      * @param string $arguments
  457.      * @return array
  458.      */
  459.     private function argumentsStringAsArray($arguments)
  460.     {
  461.         $argumentsArray = array();
  462.         $argumentPairs explode(','$arguments);
  463.         foreach ($argumentPairs as $argument) {
  464.             $argumentPair explode(':'$argument);
  465.             $type 'S';
  466.             if (isset($argumentPair[2])) {
  467.                 $type $argumentPair[2];
  468.             }
  469.             $argumentsArray[$argumentPair[0]] = array($type$argumentPair[1]);
  470.         }
  471.         return $argumentsArray;
  472.     }
  473.     protected function loadRpcClients()
  474.     {
  475.         foreach ($this->config['rpc_clients'] as $key => $client) {
  476.             $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
  477.             $definition->setLazy($client['lazy']);
  478.             $definition
  479.                 ->addTag('old_sound_rabbit_mq.rpc_client')
  480.                 ->addMethodCall('initClient', array($client['expect_serialized_response']));
  481.             $this->injectConnection($definition$client['connection']);
  482.             if ($this->collectorEnabled) {
  483.                 $this->injectLoggedChannel($definition$key$client['connection']);
  484.             }
  485.             if (array_key_exists('unserializer'$client)) {
  486.                 $definition->addMethodCall('setUnserializer', array($client['unserializer']));
  487.             }
  488.             if (array_key_exists('direct_reply_to'$client)) {
  489.                 $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
  490.             }
  491.             $definition->setPublic(true);
  492.             $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc'$key), $definition);
  493.         }
  494.     }
  495.     protected function loadRpcServers()
  496.     {
  497.         foreach ($this->config['rpc_servers'] as $key => $server) {
  498.             $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
  499.             $definition
  500.                 ->setPublic(true)
  501.                 ->addTag('old_sound_rabbit_mq.base_amqp')
  502.                 ->addTag('old_sound_rabbit_mq.rpc_server')
  503.                 ->addMethodCall('initServer', array($key))
  504.                 ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
  505.             $this->injectConnection($definition$server['connection']);
  506.             if ($this->collectorEnabled) {
  507.                 $this->injectLoggedChannel($definition$key$server['connection']);
  508.             }
  509.             if (array_key_exists('qos_options'$server)) {
  510.                 $definition->addMethodCall('setQosOptions', array(
  511.                     $server['qos_options']['prefetch_size'],
  512.                     $server['qos_options']['prefetch_count'],
  513.                     $server['qos_options']['global']
  514.                 ));
  515.             }
  516.             if (array_key_exists('exchange_options'$server)) {
  517.                 $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
  518.             }
  519.             if (array_key_exists('queue_options'$server)) {
  520.                 $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
  521.             }
  522.             if (array_key_exists('serializer'$server)) {
  523.                 $definition->addMethodCall('setSerializer', array($server['serializer']));
  524.             }
  525.             $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server'$key), $definition);
  526.         }
  527.     }
  528.     protected function injectLoggedChannel(Definition $definition$name$connectionName)
  529.     {
  530.         $id sprintf('old_sound_rabbit_mq.channel.%s'$name);
  531.         $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
  532.         $channel
  533.             ->setPublic(false)
  534.             ->addTag('old_sound_rabbit_mq.logged_channel');
  535.         $this->injectConnection($channel$connectionName);
  536.         $this->container->setDefinition($id$channel);
  537.         $this->channelIds[] = $id;
  538.         $definition->addArgument(new Reference($id));
  539.     }
  540.     protected function injectConnection(Definition $definition$connectionName)
  541.     {
  542.         $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s'$connectionName)));
  543.     }
  544.     public function getAlias()
  545.     {
  546.         return 'old_sound_rabbit_mq';
  547.     }
  548.     /**
  549.      * Add proper dequeuer aware call
  550.      *
  551.      * @param string $callback
  552.      * @param string $name
  553.      */
  554.     protected function addDequeuerAwareCall($callback$name)
  555.     {
  556.         if (!$this->container->has($callback)) {
  557.             return;
  558.         }
  559.         $callbackDefinition $this->container->findDefinition($callback);
  560.         $refClass = new \ReflectionClass($callbackDefinition->getClass());
  561.         if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
  562.             $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
  563.         }
  564.     }
  565.     private function injectLogger(Definition $definition)
  566.     {
  567.         $definition->addTag('monolog.logger', array(
  568.             'channel' => 'phpamqplib'
  569.         ));
  570.         $definition->addMethodCall('setLogger', array(new Reference('logger'ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
  571.     }
  572.     /**
  573.      * Get default AMQP exchange options
  574.      *
  575.      * @return array
  576.      */
  577.     protected function getDefaultExchangeOptions()
  578.     {
  579.         return array(
  580.             'name' => '',
  581.             'type' => 'direct',
  582.             'passive' => true,
  583.             'declare' => false
  584.         );
  585.     }
  586.     /**
  587.      * Get default AMQP queue options
  588.      *
  589.      * @return array
  590.      */
  591.     protected function getDefaultQueueOptions()
  592.     {
  593.         return array(
  594.             'name' => '',
  595.             'declare' => false
  596.         );
  597.     }
  598. }