I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:

def greet(name):
    return "Hello %s!" % name

def run():
    result = greet.delay(args=['marc'], queue='greet.1',
    print result.ready()


then I have a custom router:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

this creates an exchange called greet.1 and a queue called greet.1 but the queue is empty. The exchange should be just called greet which knows how to route a routing key like greet.1 to the queue called greet.1.




task.apply_async(queue='foo', routing_key='foobar')

Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)

So if 'foo' does not exist in CELERY_QUEUES you will end up with:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

The producer will then declare that queue, but since you override the routing_key, actually send the message using routing_key = 'foobar'


This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.

It's harder to do what you want though, you can create the queue yourself and declare it, but that won't work well with automatic message publish retries. It would be better if the queue argument to apply_async could support a custom kombu.Queue instead that will be both declared and used as the destination. Maybe you could open an issue for that at http://github.com/celery/celery/issues

