Announcement Announcement Module
Collapse
No announcement yet.
non-stop batch engine Page Title Module
Move Remove Collapse
X
Conversation Detail Module
Collapse
  • Filter
  • Time
  • Show
Clear All
new posts

  • non-stop batch engine

    Hi,

    I have a problem that a lot of ppl write about but in the other direction it seems.

    I have a ItemReader that reads items from a RabbitTemplate to get messages from RabbitMQ. Each message should be processed in parallel (but it should only take as many as it can process in one go).

    The problem is, that the messages come in bursts, do sometimes the engine can end up going for a long time, but sometimes it stops because there are no messages (ItemReader returns null). I do NOT want it to stop.

    Question 1:

    Is is possible to make the batch engine just wait until there is something to read, instead of exiting execution when ItemReader returns null?

    Question 2:

    In case I would go with a Spring Integration approach, and handle message polling from SI, is there a way to invoke/start the spring batch process from beans, and not re-queue or loose the initial message?

    Many thanks for any advice,
    pejot

  • #2
    What ItemReader are you using? I believe (I'd have to double check the code though) that the JmsItemReader provided in Spring Batch actually never returns null as long as the type it reads is the type expected. Would that meet your requirements or is there something else that is missing?

    Comment


    • #3
      Im using the standard reader as there is no AMQP reader. So I get the template injected. Problem is that when it returns null, the processor gets that and pretty much fails expecting a message.

      Comment


      • #4
        Actually....As of the 2.2.0.M1 release, there is a new AmqpItemReader: http://static.springsource.org/sprin...temReader.html

        If you can't upgrade to 2.2.0.M1, you can at least take a look at how that ItemReader works as a start.

        Comment


        • #5
          Thanks very much!

          All I need now is to make sure I get the messages in serial (will be still quite fast) and process them in parallel. Is is possible to have 1 reader for the whole program, and make spring batch distribute the work among several processors via queue ?

          Comment


          • #6
            There are a couple approaches for the type of processing you are describing, both utilize the spring-batch-integration project within Spring Batch Admin:
            1. AsyncItemProcessor/AsyncItemWriter - This approach utilizes a special ItemProcessor implementation that executes your processor on another thread via a task executor. Once the Future is created, it's passed to the ItemWriter for unpacking and writing.
            2. Remote Chunking - Here the processor is replaced with a special ItemProcessor that places the Items on a channel for consumption by slave processors. Once the processing is complete, the items are returned via another channel and written by the master job.

            That being said, before going down either of these routes, I would ask what the bottle neck is. Is it actually the execution of the ItemProcessor? If not, neither of these will be much help. Another approach would be to use a multithreaded step. If there is no ordering within the messages you are pulling off of the queue, that may get what you want done in a much more simple way.

            Comment


            • #7
              Thanks very much for your reply mminella

              Problem specific is a file conversion service:
              1) message for file gets inserted into RabbitMQ
              2) reads the message (presumably into a short queue)
              3) downloads file via url in message for local processing (think this should be a item reader or an entire step?)
              4) starts processing, as the nature of the problem is CPU bound, there is advantage in having the processing steps multit hreaded
              5) the last step is somewhat optional, as it needs to send the files via back (via provided url) and from a speed perspective does not matter if its sending files serially or not, this would still be a marginal part of the entire process

              A few things besides:
              a) the queues should not be too long, so the batch process doesn't take on much more than it can handle in a single moment, I'd rather see it polling RabbitMQ more often (which is pretty cheap) than having monster queues, I'm presuming that's throttled via commit-interval?
              b) the process needs to be continuous, so when i start it, it shouldn't just end because of a problem in a single file. Also figured out this can be mostly handled by good exception handling callbacks.
              c) system will run on multiple nodes, so multiple processes, to achieve speed + redundancy. What I could not figure out was if spring batch admin will play along when I use a global data storage (presumably something sharded like MongoDB) and use it from every batch process?

              Comment


              • #8
                A few things:
                1. We typically recommend not doing things like a file download from within your job itself. Spring Integration has some great facilities for doing this type of thing much more elegently and it doesn't impact the job itself (since file downloads can be a regular source of failures).
                2. You are correct in that keeping the job running when errors occur will be up to your job's error handling.
                3. Spring Batch Admin is intended to point to a single JobRepository. If your jobs are all pointing at the same one,then it would work. There are no facilities within Admin to handle things like MongoDB. What specific use case are you looking to accomplish here?

                Comment


                • #9
                  Ad 1
                  I also have a running SI of the entire thing running already, but seem to struggle a bit on making it work in parallel and I miss spring batch security and repeating features in it.

                  What I know is how to get messages nicely from RabbitMQ (might be helpful to others, that's why I'm putting it in)

                  Code:
                  <bean id="rabbitConnFactory"
                  		class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
                  		<constructor-arg value="${ei.messaging.amqp.servername}" />
                  		<property name="username" value="${ei.messaging.amqp.username}" />
                  		<property name="password" value="${ei.messaging.amqp.password}" />
                  </bean>
                  
                  <!-- not sure if this is needed, didn't seam to hurt -->
                  <rabbit:connection-factory id="connectionFactory" />
                  
                  <int-amqp:inbound-channel-adapter
                  		queue-names="doc" channel="input" connection-factory="rabbitConnFactory" error-channel="errorChannel"
                  		 />
                  
                  	<int:channel id="input" />

                  What I had difficulty finding out is (and believe me I searched, but maybe in wrong places):

                  a) The files are different and need different steps each, when I get a message with spring integration, how do I pass it though to spring batch, do I invoke a separate job and run it for every message? How to fire it up programmatically so it's efficient (and doesn't for instance load the entire context for every file, hence acts as a worker on stand-by). I presume there is some kind or JobLauncher involved?

                  b) once I pass the message, what does the message consist of? If it's just a file I'm loosing quite a lot of metadata then (compared for instance to the message headers in SI).

                  c) how do I get the message back from spring batch into the SI context when its all done and ready for sending out?

                  Ad 2 )

                  It's just to gain redundancy. Part of the system is already built with MongoDB with a 3 node Replica set and seams to be working brilliantly (probably just until it fails as with everything). So I'd like to have the same kind of assurance for my batch repository. Also presuming it's a question of writing new batch DAOs?

                  Comment


                  • #10
                    a) 1. Without knowing too much about the number of files, etc, launching a job per file may be ok. The alternative would be to batch the messages together via aggregation before passing them to your job. In either case, I would use the message based job launching stuff available in spring batch integration.
                    a) 2. There is a JobLaunchingMessageHandler within spring batch integration that takes a JobLaunchRequest message. I would expect you to use this to launch the jobs. Basically creating a message transformer to transform your file requests into JobLaunchRequest messages.
                    b) The message you will pass will be a JobLaunchRequest which consists of a Job and a JobParameters object.
                    c) The JobLaunchingMessageHandler will return the JobExecution via the configured channel once the execution is complete (assuming synchronous execution of the job).

                    Below is an example of launching a job via a message in action (it doesn't handle the message transformation mentioned above). The JobRepository and JobLauncher have been removed for brevity but they are standard configurations.

                    Code:
                    	
                           <int:annotation-config/>
                    	<int:channel id="job-requests"/>
                    	<int:channel id="job-responses">
                    		<int:queue/>
                    	</int:channel>
                    
                    	<int:service-activator input-channel="job-requests">
                    		<bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
                    			<constructor-arg ref="jobLauncher"/>
                    		</bean>
                    	</int:service-activator>
                    
                    	<bean id="taskletAdapter" class="org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter">
                    		<property name="targetObject">
                    			<bean class="java.lang.String">
                    				<constructor-arg value="Invoked via message"/>
                    			</bean>
                    		</property>
                    		<property name="targetMethod" value="length"/>
                    	</bean>
                    
                    	<batch:job id="simpleJob">
                    		<batch:step id="simpleJob.step1">
                    			<batch:tasklet ref="taskletAdapter"/>
                    		</batch:step>
                    	</batch:job>

                    Code:
                    import static org.junit.Assert.assertEquals;
                    
                    import org.junit.Before;
                    import org.junit.Test;
                    import org.junit.runner.RunWith;
                    import org.springframework.batch.core.BatchStatus;
                    import org.springframework.batch.core.Job;
                    import org.springframework.batch.core.JobExecution;
                    import org.springframework.batch.core.JobParameters;
                    import org.springframework.batch.integration.launch.JobLaunchRequest;
                    import org.springframework.beans.factory.annotation.Autowired;
                    import org.springframework.beans.factory.annotation.Qualifier;
                    import org.springframework.integration.Message;
                    import org.springframework.integration.MessageChannel;
                    import org.springframework.integration.MessageHeaders;
                    import org.springframework.integration.core.PollableChannel;
                    import org.springframework.integration.support.MessageBuilder;
                    import org.springframework.test.context.ContextConfiguration;
                    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
                    
                    @RunWith(SpringJUnit4ClassRunner.class)
                    @ContextConfiguration
                    public class MessageTriggerTests {
                    
                    	@Autowired
                    	private Job job;
                    
                    	@Autowired
                    	@Qualifier("job-requests")
                    	private MessageChannel jobRequestChannel;
                    
                    	@Autowired
                    	@Qualifier("job-responses")
                    	private PollableChannel jobResponseChannel;
                    
                    	@Before
                    	public void setUp() throws Exception {
                    	}
                    
                    	@SuppressWarnings("unchecked")
                    	@Test
                    	public void test() {
                    		JobLaunchRequest request = new JobLaunchRequest(job, new JobParameters());
                    		jobRequestChannel.send(MessageBuilder.withPayload(request).setHeader(MessageHeaders.REPLY_CHANNEL, jobResponseChannel).build());
                    		Message<JobExecution> results = (Message<JobExecution>) jobResponseChannel.receive(10l);
                    		assertEquals(BatchStatus.COMPLETED, results.getPayload().getStatus());
                    	}
                    }
                    With regards to the MongoDB aspect, are you implying that you want to use MongoDB for a JobRepository implementation? If so, that may be a bit problematic. MongoDB is not transactional so using it for a repository, while possible, would not be a sound option IMHO. You can take a look at the MongoItemWriter I committed today in the git repository to see what we are doing to emulate transactionality, but all it really does is push off until the last possible point the writing of the data to that repository (similar to how we handle transactions with files).

                    Comment


                    • #11
                      Fist of all thanks for your exhaustive answer

                      Ad b) Will definitely try that!

                      Ad c) I understand, will try to get the JobParameters included in the input message form JobExecution.

                      Ad a)
                      The files come in in irregular intervals, sometime bursts. The most important aspect for each file is the time it takes from submit to converting and sending back so aggregating (so waiting
                      for a queue to fill up) could cause the clients to wait needlessly in the queue. I would imagine the best way to do it, would be for the batch engine to have a queue, and try to utilize the resources concurrently and return the outcome into a blocking queue/channel to SI for sending the files back.

                      Here is an interesting thought: How to achieve a flow that does not need SI to utilize its own queue, so that it only polls the MQ when spring batch has a processing slot available? I see in your example, that the input channel is a direct channel, I presume it will block until the job is done?

                      Comment

                      Working...
                      X