Aggregating messages from different sources with Mule (part 2)

This post continues the use case I explained here. The part left to explain was the ‘custom-correlation-aggregator-router‘.
Let us start with the source code of this routing component.
The class ‘MyAggregator’

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package nl.redstream.routing.inbound;
 
...
import nl.redstream.routing.MyEventCorrelator;
...
/**
 *
 * @author redstream
 */
public class MyAggregator extends AbstractCorrelationAggregator {
 
    @Override
    // Custom EventCorrelator implementation to accomodate special requirements for xml file handling
    public void initialise() throws InitialisationException
    {
        eventCorrelator = new MyEventCorrelator(getCorrelatorCallback(), getMessageInfoMapping(), muleContext);
        if (getTimeout() != 0)
        {
            eventCorrelator.setTimeout(getTimeout());
            eventCorrelator.setFailOnTimeout(isFailOnTimeout());
            try
            {
                eventCorrelator.enableTimeoutMonitor();
            }
            catch (WorkException e)
            {
                throw new InitialisationException(e, this);
            }
        }
    }
 
    @Override
    protected EventCorrelatorCallback getCorrelatorCallback()
    {
        return new DelegateCorrelatorCallback();
    }
 
    @Override
    public MuleMessage aggregateEvents(EventGroup events) throws AggregationException {
 
       ...
    }
 
    private class DelegateCorrelatorCallback extends CollectionCorrelatorCallback
    {
        @Override
        public MuleMessage aggregateEvents(EventGroup events) throws AggregationException
        {
            return MyAggregator.this.aggregateEvents(events);
        }
 
        @Override
        public EventGroup createEventGroup(MuleEvent event, Object groupId)
        {
            return new MyEventGroup(groupId, event.getMessage().getCorrelationGroupSize());
        }
    }
}

The important methods in this class are the following:

  • initialise()
  • The initialise-method is overridden so we were able to instantiate our own EventCorrelator instead of the default one. The source code of our implementation is shown here in this blog.

  • aggregateEvents(EventGroup events)
  • This is where the aggregation of the two XML messages is performed. It contains lot of XML related code
    that does the real work. This method is called when the two messages from different sources with the same correlationID are received.

  • private class DelegateCorrelatorCallback
  • This implementation is necessary to instantiate our own EventGroup class instead of the default one. The source for our EventGroup is shown here in this post.

    The class ‘MyEventCorrelator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    package nl.redstream.routing;
     
    import nl.redstream.mule.routing.CustomEventCorrelator;
    import org.mule.api.MuleContext;
    import org.mule.api.routing.MessageInfoMapping;
    import org.mule.routing.EventCorrelatorCallback;
     
    /**
     *
     * @author Redstream
     */
    public class MyEventCorrelator extends CustomEventCorrelator {
     
        public MyEventCorrelator(EventCorrelatorCallback callback, MessageInfoMapping messageInfoMapping, MuleContext context)
        {
            super(callback, messageInfoMapping, context);
        }
     
        @Override
        protected void addProcessedGroup(Object id)
        {
            // Do not remember already processed groups
        }
     
        @Override
        protected boolean isGroupAlreadyProcessed(Object id)
        {
            return false;
        }
    }

    The thing we changed for this class was the way it handles already processed correlationID’s. Normally when a group of messages with a certain correlationID is processed and another message with that correlationID is received this message is rejected. However, in our case we wanted the possibility to process this group of messages again, so we override the methods responsible for this behavior and made it to behave our way.
    As you might have noticed this last class extends another class that we have modified nl.redstream.mule.routing.CustomEventCorrelator. We had to create this class because the original one org.mule.routing.EventCorrelator wasn’t working properly in our case when we set the time-out value for the correlator.
    What we did for this fix was we extended the original class and copied the source of the method ‘enableTimeoutMonitor()’. In this method we replaced the lines:

    1
    2
    
    // TODO which use cases would need a sync reply event returned?
    service.getComponent().invoke(newEvent);

    with:

    1
    2
    3
    
    //REDSTREAM Some fix here
    msg = service.getComponent().invoke(newEvent);
    service.getOutboundRouter().route(msg, new DefaultMuleSession(service, context));

    This made the process continue after a timeout occurred and our tests succeed.

    The class ‘MyEventGroup’
    The last class I want to show is this one. The source looks like this:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    
    package nl.redstream.routing.inbound;
     
    import org.mule.api.MuleEvent;
    import org.mule.routing.inbound.EventGroup;
     
    /**
     *
     * @author Redstream
     */
    public class MyEventGroup extends EventGroup {
     
        public static String MY_SOURCE = "RS_SOURCE";
     
        public MyEventGroup(Object groupId, int expectedSize)
        {
            super(groupId, expectedSize);
        }
     
        /**
         * Replaces existing event in the eventGroup is a new event with the same SOURCE is
         * added.
         *
         * @param event
         */
        @Override
        public void addEvent(MuleEvent event)
        {
            MuleEvent[] eventArray = super.toArray();
            for (int i = 0; i < eventArray.length; i++) {
                MuleEvent muleEvent = eventArray[i];
                if (event.getProperty(MY_SOURCE) != null && muleEvent.getProperty(MY_SOURCE) != null
                        && event.getProperty(MY_SOURCE).toString().equalsIgnoreCase(muleEvent.getProperty(MY_SOURCE).toString())) {
                    super.removeEvent(muleEvent);
                }
            }
            super.addEvent(event);
        }
    }

    The keypoints here is that when an event is added to the group it has a property (RS_SOURCE) indicating from which source it is originated. If there already is an event for that source the original event will be removed from the group and the new one will be added. This way we are able to replace a message that was post by resending a newer version before the aggregation has taken place.

    This concludes the rather complex example of this business case. The chance you have the exact same business case is rather small but I hope you can pick the pieces that are relevant for you.

tags:

About Pascal Alma

Pascal started as an Oracle Developer in 1997 and developed numerous applications with Oracle Designer/Developer and PL/SQL. Since 2001 Pascal becomes more and more active with the development of software at the Java/J2EE platform. Nowadays Pascal is a senior JEE Developer/ Architect and has a lot of experience with several open source initiatives/ frameworks especially within the Enterprise Integration area. Besides these technical skills Pascal is a big Scrum enthusiastic.

2 Responses to Aggregating messages from different sources with Mule (part 2)

  1. Mohammed says:

    Hi Pascal,

    I am new to SoapUI, could you please forward me any documentation or some material so that i could get the idea of how to use soapui.

    I appreciate your help.

    Thanks

  2. Pingback: Aggregating messages from different sources with Mule (part I)… « Designurimagination.com – Social Media News