Parallel execution can be accomplished in a planning application in a variety of methods. One of the most effective methods is to build a rule set with a parallel or serial tag. Parallel execution and error handling are automatically handled by the system. Since there is no coding required, I would always choose the rule set approach.
But I feel like the rule set methods are kind of static in nature. You cannot add or remove a rule dynamically during execution. Here is where I rely more on the REST API execution. I believe with REST API approach, the control is more on our end. However, I believe that the rule set approaches are quite static. During execution, you cannot add or remove rules dynamically. I rely more on the REST API execution in this situation.
Here is how the parallelism is achieved:
- Execute all the rules and don’t wait for the completion
- Get all HTTP responses for the executed rules.
- loop through each HTTP response and await it’s completion
Let’s Build The Script
Define The Essentials
Get the predefined connection details, latest REST version & application name from the system and construct the target URL.
Connection connection = operation.application.getConnection(connectionName)
String version = getVersion(connection)
String applicationName = getApplication(connection, version)
String targetUrl = "$version/applications/$applicationName/jobs"
Execute Jobs And Fetch Responses
Here is where all the jobs executed concurrently. We will not wait for the jobs to be completed.
//execute all jobs and get the responses - doesn't wait
List<HttpResponse<String>> jsonResponses = []
jobs.each { payLoad ->
jsonResponses << (connection.post(targetUrl).header("Content-Type", 'application/json').body(payLoad).asString())
}
Wait for Jobs Completion
The job ID will be obtained from the responses as we capture the HTTP Responses from the previous steps, and we will then wait for each job to complete. We work through each job in sequence; we won’t move on to the next one until the previous one is recorded as completed. While doing that, we will also be able to follow each job’s status.
//wait for each job and fetch jobs - awaits until completion
Map<String, Integer> jobStatuses =[:]
jsonResponses.eachWithIndex { jsonResponse, idx ->
jobStatuses[(jobs[idx])] = getStatusOfJob(connection,targetUrl,jsonResponse)
}
The custom method getStatusOfJob monitors each job’s progress and waits for it to change from IN PROGRESS to COMPLETED. It is not a new method; those who have used REST API in the past will be familiar with the script below.
//check status of the current job, returns status code once the job is complete
Integer getStatusOfJob(Connection connection, String url, HttpResponse<String> jsonResponse) {
//check if the initial call is successful.. else stop the process
if(!(200..299).contains(jsonResponse.status)){
println (jsonResponse.body)
throwVetoException("REST trigger failed: $jsonResponse.statusText")
}
//get job Id and Job Status from the body of the rest call
def restResponseBody = new JsonSlurper().parseText(jsonResponse.body) as Map
String jobId = restResponseBody?.jobId as String
int jobStatus = restResponseBody?.status as Integer
//run loop to check if the status is changing from -1 (Running) to any other code in a regular interval
if(jobId != null && jobStatus != null){
for(long delay = 50; jobStatus == -1 ; delay = Math.min(1000, delay * 2)) {
sleep(delay)
HttpResponse<String> jsonRunning = connection.get( "${url}/${jobId}").header("Content-Type", 'application/json').asString()
def restRunningBody = new JsonSlurper().parseText(jsonRunning.body) as Map
jobStatus = restRunningBody.status as Integer
}
} else {
throwVetoException("Unable to get Job Id and Status")
}
return jobStatus
}
Track The Status Of Each Jobs
The job codes from each job are provided in the previous steps. It is crucial to design the main script to fail if any one of the rules is failed. Any status other than 0 is considered a failure.
if(jobStatuses.any {it.value != 0 } ) {
println "Error occured while executing below parallel job(s)..\nJobs :\n${jobStatuses.findAll{it.value != 0 }.collect{it.key}.join('\n')}"
throwVetoException("One or more jobs failed. Please check job console for more details")
} else {
println "All parallel jobs completed successfully..\nJobs :\n${jobs.join('\n')}"
}
I would wrap the script in a method to call it any number of time.
Script Validation
I tried to execute three groovy business rules in parallel. Each rule will have a different execution time. The main rule from which we are triggering these three rules will wait and goes to completion status only after all the sub rules are completed. Only after all the sub rules have been completed will the main rule enter completion status.
List<String> parallelJobs = [ '{ "jobType":"Rules", "jobName":"Groovy - Actual Currency Conversion", "parameters": {} }','{ "jobType":"Rules", "jobName":"Groovy - Expense Adjustment Calculation", "parameters": {} }','{ "jobType":"Rules", "jobName":"Groovy - Copy Data from Actual to Forecast", "parameters": {} }' ]
executeParallelJobs(parallelJobs)
CASE 1 : All Rules Are Successfully Executed
Job Console :
All jobs are in progress:
Few jobs got over & Main rule shows in progress:
All jobs are competed:
Main Rule Status:
CASE 2 : One Of More Rules Failed
I am purposely making the second rule to fail.
Job Console :
All Jobs are in progress:
One of the rule is failed:
The main rule will still continue with the status tracking of all the rules.
All Jobs are completed:
Since one of the sub rule failed, the main rule status changed to ‘Error’ at the end.
Main Rule Status
If you want to have flexibility and control, I would recommend the REST API route. Else, I suggest making a rule set and putting it to use if you want to accomplish parallelism with a few easy steps. I hope this knowledge should be useful.