Announcement Announcement Module
Collapse
No announcement yet.
Dynamic channel creation and JMX metrics Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • Dynamic channel creation and JMX metrics

    Hi,

    I'm facing a difficulty with SI:
    I have to create (jms) channels at runtime based on a property in the payload. I've got this working already.
    I use a static array of destinations to make things somewhat easier. That destination i'm going to extract from the payload in the 'real' implementation.

    Code:
    private String[] monitorQueues = {"monitor.a", "monitor.b","monitor.c","monitor.d","monitor.e","monitor.f","monitor.g"};
    	
    public MessageChannel routeMessage(Message<?> message) {
    		
    	logger.info("received message {}", message);
    		
    	int queueIndex = new Random().nextInt(monitorQueues.length-1);
    	String destination = monitorQueues[queueIndex];
    		
    	return createChannelForDestination(destination);
    			
    }
    	
    private PollableJmsChannel createChannelForDestination(String destination) {
    		
    	JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
    		
    	jmsTemplate.setDefaultDestinationName(destination);
    		
    	PollableJmsChannel jmsChannel = new PollableJmsChannel(jmsTemplate);
    	jmsChannel.setBeanName(destination);
    		
    	ConfigurableApplicationContext ctx = new GenericApplicationContext(applicationContext);
    
    	ConfigurableListableBeanFactory beanFactory = ctx.getBeanFactory();
    		
    	beanFactory.registerSingleton(destination, jmsChannel);
    		
    	ctx.refresh();
    		
    	return jmsChannel;
    }
    The next challenge I have, is to gather metrics of the channels I have created dynamically. The metric I'm most interested in is the 'MeanSendRate'.
    I've tried to register the channels with JMX at runtime, but that failed.

    When I add
    Code:
    integrationMBeanExporter.registerManagedResource(jmsChannel);
    between ctx.refresh() and return jmsChannel; the following exception is thrown:
    Code:
    Caused by: java.lang.NullPointerException: name cannot be null
    	at javax.management.ObjectName.construct(ObjectName.java:405)
    	at javax.management.ObjectName.<init>(ObjectName.java:1403)
    	at javax.management.ObjectName.getInstance(ObjectName.java:1285)
    	at org.springframework.jmx.support.ObjectNameManager.getInstance(ObjectNameManager.java:62)
    	at org.springframework.jmx.export.naming.MetadataNamingStrategy.getObjectName(MetadataNamingStrategy.java:118)
    	at org.springframework.jmx.export.MBeanExporter.getObjectName(MBeanExporter.java:728)
    	at org.springframework.jmx.export.MBeanExporter.registerManagedResource(MBeanExporter.java:443)
    	... 70 more
    If I change that to
    Code:
    integrationMBeanExporter.registerManagedResource(jmsChannel, new ObjectName("blah:type=MessageChannel,name=" + destination));
    The following exception is thrown:
    Code:
    org.springframework.jmx.export.MBeanExportException: Could not create ModelMBean for managed resource [monitor.f] with key 'org.springframework.integration.jms.PollableJmsChannel'; nested exception is org.springframework.jmx.export.metadata.InvalidMetadataException: No ManagedResource attribute found for class: class org.springframework.integration.jms.PollableJmsChannel
    Does anybody has an suggestion how I can get an hold on those metrics?
    It is not required to achieve this via JMX, but that was my first flash of inspiration.

    Regards,
    Auke

  • #2
    You need to take a look at the IntegrationMBeanExporter - as you can see the channel stats are provided by wrapping the channels in a ...Metrics AOP interceptor.

    Comment


    • #3
      Dear Gary,

      Thanks for your swift reply.

      I've been playing around a bit, and I've managed to register a dynamically created channel into the JMX registry:
      Code:
      PollableChannelMetrics managedResource = new PollableChannelMetrics(jmsChannel, destination);
      			
      integrationMBeanExporter.registerManagedResource(managedResource, new ObjectName("blah:type=MessageChannel,name=channel_" + destination));
      But...
      I bypassed the AOP advisory because of some exceptions about dynamic proxies:
      Code:
      org.springframework.jmx.export.MBeanExportException: Could not create ModelMBean for managed resource [monitor.a] with key 'com.sun.proxy.$Proxy20'; nested exception is java.lang.IllegalArgumentException: MetadataMBeanInfoAssembler does not support JDK dynamic proxies - export the target beans directly or use CGLIB proxies instead
      I was able to resolve this error by setting a MethodNameBasedMBeanInfoAssember instance on the mbeanExporter.
      I've found this 'solution' here

      Code:
      integrationMBeanExporter.setAssembler(new MethodNameBasedMBeanInfoAssembler());
      If I use this code, the MBean gets registered properly and is visual in VisualVM, but the mbean has no operations/attributes exposed.
      This warning is also present in the logging:
      Code:
      Bean with key 'com.sun.proxy.$Proxy20' has been registered as an MBean but has no exposed attributes or operations
      Do you, or anybody else, have an idea to resolve this? eg. get the attributes exposed in the JMX registry.

      Regards,
      Auke
      Last edited by Fishbeast; Apr 23rd, 2013, 02:37 AM. Reason: styling/typo

      Comment


      • #4
        Unfortunately, it's a lot more complex than that -

        First, it has to be an AOP proxy otherwise no statistics will be collected (even if they were exposed) - you have to wrap your channel in the proxy, then when the send() etc methods are called, they are intercepted and that's how the timers/counters are maintained.

        Second, the Metrics classes rely on the Spring MBeanExporter logic (via annotations) to dynamically set up the MBean methods (see methods annotated with @ManagedMetric in the ...Metrics classes.

        If you don't use Spring to do the proxying/exporting for you, you'll have to build the model bean yourself.

        One quick way to do what you want might be to define a "mini" application context file (passing the destination in as a property); with the template, the channel and an IntegrationMbeanExporter. Instantiate the context; get a reference to the channel using getBean() (you will get the proxy and the MBean will have been registered).

        See the dynamic-ftp sample for a technique similar to this (although it's DynamicFtpChannelResolver doesn't go as far as registering the new channel with the main context.

        Comment


        • #5
          Hey Gary,

          I've seen the DynamicFtpChannelResolver example from the GitHub repo, and i've used some parts of that trick to come to the solution i have until now: Programmatically creating a applicationContext and add beans to it.
          I'm curious about how to change channel properties, like name etc.
          Is this possible until the ctx.refresh() is called?

          Thanks in advance,
          Auke

          Comment


          • #6
            You can use a BeanFactoryPostProcessor.

            See this Gist for an example of how to dynamically create JMS channels that are exposed as MBeans.

            Comment


            • #7
              Thank you for the Gist.
              I'm going to try that tomorrow.
              It gives me a nice insight about how stuff works I didn't knew.

              Regards,
              Auke

              Comment


              • #8
                It works like charm!

                Gary,

                I've implemented the concept of the gist you created yesterday and it works like expected!

                For the community sake, I want to share the solution:

                Router implementation:
                Code:
                public MessageChannel routeMessage(Message<?> message) {
                		
                	logger.info("received message {}", message);
                		
                	int queueIndex = new Random().nextInt(monitorQueues.length-1);
                	String destination = monitorQueues[queueIndex];
                		
                	if (channels != null) {
                		MessageChannel messageChannel = dynamicChannels.get(destination);
                		
                		if (messageChannel != null) {
                			return messageChannel;
                		} else {
                			PollableChannel channel = createChannelForDestination(destination);
                			dynamicChannels.put(destination, channel);				return channel;
                		}
                	}
                		
                	return null;
                }
                	
                private PollableChannel createChannelForDestination(String destination) {
                		
                	JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
                		
                	jmsTemplate.setDefaultDestinationName(destination);
                		
                	GenericApplicationContext genericApplicationContext = new GenericApplicationContext(applicationContext);
                		
                	genericApplicationContext.addBeanFactoryPostProcessor(new DynamicJmsJmxChannelFactory(destination, jmsTemplate));
                		
                	genericApplicationContext.refresh();
                		
                	PollableChannel jmsChannel = genericApplicationContext.getBean(PollableChannel.class);
                		
                	return jmsChannel;
                }
                And if I put 100 messages into ActiveMQ the result is in VisualVM is:
                Attachment
                The resulting queues in ActiveMQ:
                Attachment

                Gary, thanks for your guidence of the past few days.

                Auke
                Attached Files

                Comment

                Working...
                X