Announcement Announcement Module
Collapse
No announcement yet.
JDBCMessageStore error with aggregator Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • JDBCMessageStore error with aggregator

    After configuring JDBC message Store with aggregator i getting below error:


    2012-07-12 15:44:30,703 ERROR [scheduler-1] [TaskUtils$LoggingErrorHandler] Unexpected error occurred in scheduled task.
    java.lang.NullPointerException
    at java.util.concurrent.LinkedBlockingQueue.offer(Lin kedBlockingQueue.java:368)
    at org.springframework.integration.store.SimpleMessag eGroup.addMessage(SimpleMessageGroup.java:96)
    at org.springframework.integration.store.SimpleMessag eGroup.<init>(SimpleMessageGroup.java:59)
    at org.springframework.integration.jdbc.JdbcMessageSt ore.getMessageGroup(JdbcMessageStore.java:390)
    at org.springframework.integration.jdbc.JdbcMessageSt ore$9.next(JdbcMessageStore.java:504)
    at org.springframework.integration.jdbc.JdbcMessageSt ore$9.next(JdbcMessageStore.java:497)
    at org.springframework.integration.store.AbstractMess ageGroupStore.expireMessageGroups(AbstractMessageG roupStore.java:78)
    at org.springframework.integration.store.MessageGroup StoreReaper.run(MessageGroupStoreReaper.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.scheduling.support.ScheduledMe thodRunnable.run(ScheduledMethodRunnable.java:64)
    at org.springframework.scheduling.support.DelegatingE rrorHandlingRunnable.run(DelegatingErrorHandlingRu nnable.java:53)
    at java.util.concurrent.Executors$RunnableAdapter.cal l(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRunAndRe set(FutureTask.java:317)
    at java.util.concurrent.FutureTask.runAndReset(Future Task.java:150)
    at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.access$101(ScheduledThreadPoolE xecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.runPeriodic(ScheduledThreadPool Executor.java:181)
    at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.run(ScheduledThreadPoolExecutor .java:205)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:619)


    Configuration:

    <int:aggregator id="catMessageAggregator" auto-startup="true"
    message-store="messageStore" input-channel="aggregate.message"
    output-channel="catalogue.eventMessange"
    send-partial-result-on-expiry="true" correlation-strategy="aggreagatorStartegy"
    correlation-strategy-method="correlationStartegy"
    release-strategy-expression="size()>9"
    expire-groups-upon-completion="false" />
    <jee:jndi-lookup id="messageStoreDB" jndi-name="${jndi.name.messageStore}" />

    <bean id="messageStore" class="org.springframework.integration.jdbc.JdbcMe ssageStore">
    <constructor-arg>
    <ref bean="messageStoreDB" />
    </constructor-arg>
    <property name="timeoutOnIdle" value="true"/>
    </bean>


    <bean id="reaper"
    class="org.springframework.integration.store.Messa geGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore" />
    <property name="timeout" value="${message.timeouttime}" />
    <property name="autoStartup" value="false"/>
    </bean>

    <task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run"
    fixed-rate="150000" />
    </task:scheduled-tasks>

    <task:scheduler id="scheduler" />

    I have no clue why it is happening
    please help

  • #2
    Please rais a JIRA issue.
    Also, what version of SI you are using?

    Comment


    • #3
      Thanks Oleg,
      I have raised JIRA issue.
      Issue Number: https://jira.springsource.org/browse/INT-2666

      Comment


      • #4
        Can you please also upgrade ro 2.1.3 s I remember something along the lines of a bug related to reaper having a raise condition.
        I am investigating but it would help if you can do this quickly.

        Comment


        • #5
          Hi Oleg,
          I have tried with 2.1.3 , I am getting the same error.

          Meanwhile for workaround i tried with below changes and tested.
          It is working as expected, but i am not sure if it has any effect some where else ,till now it has not broken any thing.


          public class JDBCMessageStoreOverride extends JdbcMessageStore {

          private volatile JdbcOperations jdbcTemplate;

          private String LIST_MESSAGES_BY_GROUP_KEY="SELECT MESSAGE_ID, CREATED_DATE, UPDATED_DATE, GROUP_KEY, MESSAGE_BYTES, MARKED, COMPLETE, LAST_RELEASED_SEQUENCE from INT_MESSAGE_GROUP where GROUP_KEY=? and REGION=? order by UPDATED_DATE";

          public JDBCMessageStoreOverride(DataSource dataSource) {
          super();
          jdbcTemplate = new JdbcTemplate(dataSource);
          setJdbcTemplate(jdbcTemplate);

          }

          private String getKey(Object input) {
          return input == null ? null : UUIDConverter.getUUID(input).toString();
          }

          @Override
          public MessageGroup getMessageGroup(Object groupId) {
          String key = getKey(groupId);
          final List<Message<?>> messages = new ArrayList<Message<?>>();
          final AtomicReference<Date> date = new AtomicReference<Date>();
          final AtomicReference<Date> updateDate = new AtomicReference<Date>();
          final AtomicReference<Boolean> completeFlag = new AtomicReference<Boolean>();
          final AtomicReference<Integer> lastReleasedSequenceRef = new AtomicReference<Integer>();

          final AtomicInteger size = new AtomicInteger();
          jdbcTemplate.query(getQuery(LIST_MESSAGES_BY_GROUP _KEY), new Object[] { key, "DEFAULT" },

          new RowCallbackHandler() {
          public void processRow(ResultSet rs) throws SQLException {

          if(null!=getMessage(UUID.fromString(rs.getString(" MESSAGE_ID")))){
          size.incrementAndGet();

          messages.add(getMessage(UUID.fromString(rs.getStri ng("MESSAGE_ID"))));

          date.set(rs.getTimestamp("CREATED_DATE"));

          updateDate.set(rs.getTimestamp("UPDATED_DATE"));

          completeFlag.set(rs.getInt("COMPLETE") > 0);

          lastReleasedSequenceRef.set(rs.getInt("LAST_RELEAS ED_SEQUENCE"));
          }
          }
          });

          if (size.get() == 0){
          return new SimpleMessageGroup(groupId);
          }
          Assert.state(date.get() != null, "Could not locate created date for groupId=" + groupId);
          Assert.state(updateDate.get() != null, "Could not locate updated date for groupId=" + groupId);
          long timestamp = date.get().getTime();
          boolean complete = completeFlag.get().booleanValue();
          SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, groupId, timestamp, complete);
          if (updateDate.get() != null){
          messageGroup.setLastModified(updateDate.get().getT ime());
          }
          int lastReleasedSequenceNumber = lastReleasedSequenceRef.get();
          if (lastReleasedSequenceNumber > 0){
          messageGroup.setLastReleasedMessageSequenceNumber( lastReleasedSequenceNumber);
          }

          return messageGroup;
          }

          Comment


          • #6
            Its already fixed and will be merged shortly so you can try a snapshot. You'll be notified when JIRA is resolved and then you can switch to 2.1.0.BUILD-SNAPSHOT to test it out and we can release 2.1.4 shortly as well.

            The problem is that there is a race condition where during the build of the MessageGroup we end up in possession of message ids for the messages that have just been removed by the reaper or other component (not sure until i see full config, but i don't really need to), so the SimpleMessageGroup is being built with the list of messages that contain null, hence NPE.

            Comment

            Working...
            X