PDA

View Full Version : More Queue Problems


keenan
08-26-2008, 04:46 PM
Can you please take a look at the following and let me know if you see anything obviously wrong. I'm getting some strange behavior with the NmsTemplate.

Here is my configuration:


<object id="mqConnFactory" type="Apache.NMS.ActiveMQ.ConnectionFactory, Apache.NMS.ActiveMQ">
<constructor-arg index="0" value="tcp://localhost:61616"/>
</object>
<object id="nmsTemplate" type="Spring.Messaging.Nms.Core.NmsTemplate, Spring.Messaging.Nms">
<constructor-arg name="connectionFactory" ref="mqConnFactory"/>
<property name="DefaultDestinationName" value="application.default"/>
<property name="Persistent" value="true"/>
<property name="ReceiveTimeout" value="3000"/>
</object>
<object id="msgProducer1" type="MyMessageDemo.Producer, MyMessageDemo" init-method="Init">
<property name="NmsTemplate" ref="nmsTemplate"/>
<property name="QueueName" value="sample.9"/>
<property name="Delay" value="250"/>
<property name="MaxMessages" value="500"/>
</object>
<object id="threadSyncConsumer" type="MyMessageDemo.ThreadSyncConsumer, MyMessageDemo" init-method="Init">
<property name="NmsTemplate" ref="nmsTemplate"/>
<property name="QueueName" value="sample.9"/>
</object>


Producer.cs puts messages on a queue, sleeps for Delay, and quits when MaxMessages has been reached. Here is the loop:


count++;
string text = "message # " + count.ToString();
NmsTemplate.ConvertAndSend(QueueName, text);
Thread.Sleep(Delay);


The consumer read from the queue, prints to the console, and sleeps. Here is the loop:


Stopwatch sw = Stopwatch.StartNew();
while (!_done)
{
try
{
object msg = NmsTemplate.ReceiveAndConvert(QueueName);
if (msg != null)
{
Console.Write("[" + sw.Elapsed.TotalSeconds + "] ");
Console.WriteLine("msg=" + msg);
}
Thread.Sleep(2000);
}
catch (ThreadInterruptedException tie)
{
// normal when shutting down
}
catch (Exception e)
{
Console.WriteLine("Caught Error: " + e.Message);
}
}


Here is the output I received on a sample run. I changed the QUEUE name between runs so as not to pick up messages from a previous run.


Press any key to exit...
[0.3500199] msg=message # 1
[4.3684035] msg=message # 2
[6.383342] msg=message # 3
[12.4685721] msg=message # 4
[14.4795179] msg=message # 5
[22.5588252] msg=message # 6
[24.5715039] msg=message # 6
[44.7761617] msg=message # 6
[48.8023502] msg=message # 12
[87.1042657] msg=message # 14
[89.1159798] msg=message # 17
[91.1264979] msg=message # 18
[95.1451064] msg=message # 19
[97.1671625] msg=message # 20
^C


the numbers in brackets are seconds from the running StopWatch.

keenan
08-26-2008, 04:59 PM
If I had a nickel for every time I found the solution after taking the time to type up a complete problem description....

For the benefit of others who wander into this thread-- this is a "feature" of ActiveMQ that Mark pointed out already. ActiveMQ uses a prefetch policy to boost read performance.

In the original post you can see I was not using a cached connection. Therefore a new connection and session are created for each send and receive. On the receive side, the messages that are "lost" were actually auto-acknowledged (the default) but not received because that session/connection was released.

I do feel this is a bit of a trap door for AMQ. If you are using a standard connection factory with a synchronous receive from NmsTemplate and you have not specifically set consumer.prefetchSize=1 then messages will appear be lost.

Perhaps we could stress this in the documentation? I have a strong feeling this is going to bite other people. Although caching is almost essential in a production application it is not intuitive that removing the connection cache would cause message loss (due to the prefetch policy).