Done ticket GEPAFINBE-228
This commit is contained in:
@@ -18,10 +18,7 @@ import net.gepafin.tendermanagement.entities.ApplicationEvaluationEntity;
|
||||
import net.gepafin.tendermanagement.entities.CompanyEntity;
|
||||
import net.gepafin.tendermanagement.entities.DocumentEntity;
|
||||
import net.gepafin.tendermanagement.entities.HubEntity;
|
||||
import net.gepafin.tendermanagement.enums.ApplicationStatusTypeEnum;
|
||||
import net.gepafin.tendermanagement.enums.DocumentSourceTypeEnum;
|
||||
import net.gepafin.tendermanagement.enums.NotificationTypeEnum;
|
||||
import net.gepafin.tendermanagement.enums.VersionActionTypeEnum;
|
||||
import net.gepafin.tendermanagement.enums.*;
|
||||
import net.gepafin.tendermanagement.model.request.AppointmentCreationRequest;
|
||||
import net.gepafin.tendermanagement.model.request.AppointmentNdgRequest;
|
||||
import net.gepafin.tendermanagement.model.request.AppointmentVisuraListRequest;
|
||||
@@ -66,7 +63,10 @@ import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@@ -150,8 +150,9 @@ public class AppointmentDao {
|
||||
@Autowired
|
||||
private ApplicationEvaluationDao applicationEvaluationDao;
|
||||
|
||||
private final Map<Long, ExecutorService> executorMap = new ConcurrentHashMap<>();
|
||||
private final Map<Long, ScheduledExecutorService> executorMap = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
private final ConcurrentHashMap<Long, ExecutorService> threadForDocumentMap = new ConcurrentHashMap<>();
|
||||
|
||||
private static final ThreadLocal<Long> threadLocalHubId = new ThreadLocal<>();
|
||||
@@ -432,35 +433,89 @@ public class AppointmentDao {
|
||||
}
|
||||
|
||||
private void startAsyncNdgProcessing(Long applicationId) {
|
||||
// Check if a thread is already running for this application
|
||||
// If already polling for this applicationId, do nothing:
|
||||
if (executorMap.containsKey(applicationId)) {
|
||||
log.warn("Async processing already running for applicationId: {}", applicationId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create a dedicated thread for asynchronous processing
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
|
||||
Thread thread = new Thread(runnable);
|
||||
thread.setName("AsyncNdgProcessing-" + applicationId);
|
||||
return thread;
|
||||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
|
||||
Thread t = new Thread(runnable);
|
||||
t.setName("AsyncNdgProcessing-" + applicationId);
|
||||
return t;
|
||||
});
|
||||
executorMap.put(applicationId, executor);
|
||||
executorMap.put(applicationId, scheduler);
|
||||
|
||||
executor.submit(() -> {
|
||||
// Record the start time so we can stop after 2 hours:
|
||||
long startTime = System.currentTimeMillis();
|
||||
long twoHoursMs = TimeUnit.HOURS.toMillis(2);
|
||||
long fifteenMin = 15; // in MINUTES
|
||||
|
||||
// We need a reference to cancel the scheduled task from inside itself when we're done:
|
||||
AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
|
||||
|
||||
Runnable pollingTask = () -> {
|
||||
try {
|
||||
log.info("Starting async processing for applicationId: {}", applicationId);
|
||||
processNdgGeneration(applicationId);
|
||||
} catch (Exception e) {
|
||||
log.error("Error in async NDG processing for applicationId: {}", applicationId, e);
|
||||
} finally {
|
||||
// Cleanup resources
|
||||
ExecutorService executorToShutdown = executorMap.remove(applicationId);
|
||||
if (executorToShutdown != null) {
|
||||
executorToShutdown.shutdown();
|
||||
// 1) If 2 hours have already passed, mark as FAILED and shut down:
|
||||
if (System.currentTimeMillis() - startTime > twoHoursMs) {
|
||||
ApplicationEntity app = applicationService.validateApplication(applicationId);
|
||||
log.warn("2-hour timeout reached for applicationId {}. Marking NDG_FAILED.", applicationId);
|
||||
app.setNdgStatus(GepafinConstant.NDG_FAILED);
|
||||
applicationRepository.save(app);
|
||||
|
||||
futureRef.get().cancel(false);
|
||||
shutdownScheduler(applicationId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2) Otherwise, call processNdgGeneration once:
|
||||
processNdgGeneration(applicationId);
|
||||
|
||||
// 3) After return, check if NDG is now set or timed out. If so, cancel & shut down:
|
||||
ApplicationEntity updated = applicationService.validateApplication(applicationId);
|
||||
if (isNdgValid(updated.getNdg())) {
|
||||
log.info("NDG found for applicationId {}. Shutting down scheduler.", applicationId);
|
||||
futureRef.get().cancel(false);
|
||||
shutdownScheduler(applicationId);
|
||||
} else if (updated.getNdgStatus() != null && updated.getNdgStatus().equals(GepafinConstant.NDG_FAILED)) {
|
||||
log.info("NDG status is NDG_FAILED for applicationId {}. Shutting down scheduler.", applicationId);
|
||||
futureRef.get().cancel(false);
|
||||
shutdownScheduler(applicationId);
|
||||
}
|
||||
// Otherwise: no NDG yet, not timed out → next run happens in 15 minutes automatically.
|
||||
} catch (Exception ex) {
|
||||
log.error("Unexpected error during scheduled polling for applicationId {}: {}", applicationId, ex.getMessage(), ex);
|
||||
try {
|
||||
ApplicationEntity checkApp = applicationService.validateApplication(applicationId);
|
||||
if (System.currentTimeMillis() - startTime > twoHoursMs) {
|
||||
log.warn("After exception, 2-hour window passed for applicationId {}. Marking NDG_FAILED.", applicationId);
|
||||
checkApp.setNdgStatus(GepafinConstant.NDG_FAILED);
|
||||
applicationRepository.save(checkApp);
|
||||
|
||||
futureRef.get().cancel(false);
|
||||
shutdownScheduler(applicationId);
|
||||
}
|
||||
} catch (Exception ignore) {
|
||||
futureRef.get().cancel(false);
|
||||
shutdownScheduler(applicationId);
|
||||
}
|
||||
log.info("Async processing completed for applicationId: {}", applicationId);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Schedule pollingTask: run now (delay=0), then every fifteen minutes:
|
||||
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(pollingTask, 0, // initial delay = 0 min → run immediately
|
||||
fifteenMin, // subsequent runs every 15 minutes
|
||||
TimeUnit.MINUTES);
|
||||
futureRef.set(future);
|
||||
}
|
||||
|
||||
private void shutdownScheduler(Long applicationId) {
|
||||
|
||||
ScheduledExecutorService shed = executorMap.remove(applicationId);
|
||||
if (shed != null) {
|
||||
shed.shutdownNow();
|
||||
}
|
||||
log.info("Scheduler shut down for applicationId: {}", applicationId);
|
||||
}
|
||||
|
||||
private void processNdgGeneration(Long applicationId) {
|
||||
@@ -489,97 +544,108 @@ public class AppointmentDao {
|
||||
saveNdgAndIdVisura(application, company, ndgResponse.getNdg());
|
||||
log.info("NDG successfully generated for applicationId: {}", applicationId);
|
||||
} else {
|
||||
// If NDG isn't immediately available, start polling
|
||||
log.info("Polling for NDG for applicationId: {}", applicationId);
|
||||
handleNdgPolling(application, company, hub, authorizationToken);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception occurred during NDG generation. ApplicationId: {}, CompanyId: {}, HubId: {}, Error: {}",
|
||||
applicationId, company.getId(), hub.getId(), e.getMessage(), e);
|
||||
log.error("Exception occurred during NDG generation. ApplicationId: {}, CompanyId: {}, HubId: {}, Error: {}", applicationId, company.getId(), hub.getId(),
|
||||
e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleNdgPolling(ApplicationEntity application, CompanyEntity company, HubEntity hub, String authorizationToken) {
|
||||
|
||||
log.info("Starting single‐shot NDG polling attempt for applicationId: {}, CompanyId: {}, HubId: {}", application.getId(), company.getId(), hub.getId());
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
long twoHoursMs = TimeUnit.HOURS.toMillis(2);
|
||||
|
||||
try {
|
||||
log.info("Starting NDG polling for applicationId: {}, CompanyId: {}, HubId: {}", application.getId(),company.getId(), hub.getId());
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
while (true) {
|
||||
if (application.getNdg() != null) {
|
||||
log.info("NDG retrieved for applicationId: {}", application.getId());
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
// Fetch Ndg via creating visura
|
||||
AppointmentLoginResponse visuraResponse = createVisura(company, authorizationToken, hub);
|
||||
if (isNdgValid(visuraResponse.getNdg())) {
|
||||
log.info("Valid NDG retrieved from create visura api response: {} | ApplicationId: {}", visuraResponse.getNdg(), application.getId());
|
||||
company.setNdg(visuraResponse.getNdg());
|
||||
application.setNdg(visuraResponse.getNdg());
|
||||
application.setNdgStatus(GepafinConstant.NDG_GENERATED);
|
||||
application.setStatus(ApplicationStatusTypeEnum.NDG.getValue());
|
||||
application.setIdVisura(visuraResponse.getIdVisura());
|
||||
applicationRepository.save(application);
|
||||
companyRepository.save(company);
|
||||
ApplicationEvaluationEntity applicationEvaluationEntity = applicationEvaluationService.validateApplicationEvaluation(
|
||||
application.getApplicationEvaluationId());
|
||||
Map<String, String> placeHolders = new HashMap<>();
|
||||
placeHolders.put("{{call_name}}", application.getCall().getName());
|
||||
placeHolders.put("{{protocol_number}}", String.valueOf(application.getProtocol().getProtocolNumber()));
|
||||
notificationDao.sendNotificationToInstructor(placeHolders, applicationEvaluationEntity, NotificationTypeEnum.NDG_GENERATION);
|
||||
notificationDao.sendNotificationToSuperUser(application, placeHolders, NotificationTypeEnum.NDG_GENERATION);
|
||||
log.info("Got NDG and saved successfully for applicationId: {}", application.getId());
|
||||
break;
|
||||
} else {
|
||||
// Fetch Visura list and attempt to parse NDG
|
||||
String visuraListJson = getVisuraList(visuraResponse.getIdVisura(), authorizationToken, application, hub);
|
||||
log.debug("Parsing NDG from visura list response | ApplicationId: {}", application.getId());
|
||||
String ndg = parseNdgFromVisuraListResponse(visuraListJson);
|
||||
if (isNdgValid(ndg)) {
|
||||
// CompanyEntity oldCompanyData = Utils.getClonedEntityForData(company);
|
||||
// ApplicationEntity oldApplicationData = Utils.getClonedEntityForData(application);
|
||||
|
||||
log.info("Valid NDG retrieved: {} | ApplicationId: {}", ndg, application.getId());
|
||||
company.setNdg(ndg);
|
||||
application.setNdg(ndg);
|
||||
application.setNdgStatus(GepafinConstant.NDG_GENERATED);
|
||||
application.setStatus(ApplicationStatusTypeEnum.NDG.getValue());
|
||||
application.setIdVisura(visuraResponse.getIdVisura());
|
||||
applicationRepository.save(application);
|
||||
companyRepository.save(company);
|
||||
ApplicationEvaluationEntity applicationEvaluationEntity = applicationEvaluationService.validateApplicationEvaluation(
|
||||
application.getApplicationEvaluationId());
|
||||
Map<String, String> placeHolders = new HashMap<>();
|
||||
placeHolders.put("{{call_name}}", application.getCall().getName());
|
||||
placeHolders.put("{{protocol_number}}", String.valueOf(application.getProtocol().getProtocolNumber()));
|
||||
notificationDao.sendNotificationToInstructor(placeHolders, applicationEvaluationEntity, NotificationTypeEnum.NDG_GENERATION);
|
||||
notificationDao.sendNotificationToSuperUser(application, placeHolders, NotificationTypeEnum.NDG_GENERATION);
|
||||
log.info("NDG saved successfully for applicationId: {}", application.getId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Check if polling has timed out
|
||||
if (System.currentTimeMillis() - startTime > TimeUnit.HOURS.toMillis(2)) {
|
||||
log.warn("NDG polling timed out for applicationId: {}", application.getId());
|
||||
application.setNdgStatus(GepafinConstant.NDG_FAILED);
|
||||
applicationRepository.save(application);
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait before the next polling attempt
|
||||
Thread.sleep(TimeUnit.MINUTES.toMillis(15));
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("NDG polling interrupted for applicationId: {}", application.getId());
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
log.error("Error during NDG polling for applicationId: {}", application.getId(), e);
|
||||
}
|
||||
// 1) If NDG was already populated (perhaps by another thread), skip polling.
|
||||
if (application.getNdg() != null) {
|
||||
log.info("NDG already present for applicationId {}. Exiting single‐shot polling.", application.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2) Attempt to create Visura (this may immediately return a valid NDG)
|
||||
AppointmentLoginResponse visuraResponse = createVisura(company, authorizationToken, hub);
|
||||
|
||||
// 2a) If createVisura gave us a valid NDG, persist & exit
|
||||
String fetchedNdg = visuraResponse.getNdg();
|
||||
if (isNdgValid(fetchedNdg)) {
|
||||
log.info("Valid NDG retrieved from createVisura(): {} | applicationId: {}", fetchedNdg, application.getId());
|
||||
|
||||
company.setNdg(fetchedNdg);
|
||||
application.setNdg(fetchedNdg);
|
||||
application.setNdgStatus(GepafinConstant.NDG_GENERATED);
|
||||
application.setStatus(ApplicationStatusTypeEnum.NDG.getValue());
|
||||
application.setIdVisura(visuraResponse.getIdVisura());
|
||||
|
||||
companyRepository.save(company);
|
||||
applicationRepository.save(application);
|
||||
|
||||
ApplicationEvaluationEntity eval = applicationEvaluationService.validateApplicationEvaluation(application.getApplicationEvaluationId());
|
||||
Map<String, String> placeholders = new HashMap<>();
|
||||
placeholders.put("{{call_name}}", application.getCall().getName());
|
||||
placeholders.put("{{protocol_number}}", String.valueOf(application.getProtocol().getProtocolNumber()));
|
||||
notificationDao.sendNotificationToInstructor(placeholders, eval, NotificationTypeEnum.NDG_GENERATION);
|
||||
notificationDao.sendNotificationToSuperUser(application, placeholders, NotificationTypeEnum.NDG_GENERATION);
|
||||
|
||||
log.info("NDG saved successfully for applicationId: {}", application.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2b) If no immediate NDG, fetch the Visura list JSON and parse out NDG
|
||||
String visuraListJson = getVisuraList(visuraResponse.getIdVisura(), authorizationToken, application, hub);
|
||||
log.debug("Parsing NDG from VisuraList JSON for applicationId: {}", application.getId());
|
||||
String parsedNdg = parseNdgFromVisuraListResponse(visuraListJson);
|
||||
|
||||
if (isNdgValid(parsedNdg)) {
|
||||
log.info("Valid NDG parsed from VisuraList: {} | applicationId: {}", parsedNdg, application.getId());
|
||||
|
||||
company.setNdg(parsedNdg);
|
||||
application.setNdg(parsedNdg);
|
||||
application.setNdgStatus(GepafinConstant.NDG_GENERATED);
|
||||
application.setStatus(ApplicationStatusTypeEnum.NDG.getValue());
|
||||
application.setIdVisura(visuraResponse.getIdVisura());
|
||||
|
||||
companyRepository.save(company);
|
||||
applicationRepository.save(application);
|
||||
|
||||
ApplicationEvaluationEntity eval = applicationEvaluationService.validateApplicationEvaluation(application.getApplicationEvaluationId());
|
||||
Map<String, String> placeholders = new HashMap<>();
|
||||
placeholders.put("{{call_name}}", application.getCall().getName());
|
||||
placeholders.put("{{protocol_number}}", String.valueOf(application.getProtocol().getProtocolNumber()));
|
||||
notificationDao.sendNotificationToInstructor(placeholders, eval, NotificationTypeEnum.NDG_GENERATION);
|
||||
notificationDao.sendNotificationToSuperUser(application, placeholders, NotificationTypeEnum.NDG_GENERATION);
|
||||
|
||||
log.info("NDG saved successfully for applicationId: {}", application.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3) Neither direct API nor parsing yielded a valid NDG. Check timeout.
|
||||
if (System.currentTimeMillis() - startTime > twoHoursMs) {
|
||||
log.warn("NDG polling timed out after 2 hours for applicationId: {}", application.getId());
|
||||
application.setNdgStatus(GepafinConstant.NDG_FAILED);
|
||||
applicationRepository.save(application);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4) No NDG yet—just return. The scheduler will retry in 15 minutes.
|
||||
log.info("No valid NDG yet for applicationId {}. Returning to scheduler—next attempt in 15 minutes.", application.getId());
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during NDG polling for applicationId: {}", application.getId(), e);
|
||||
// If timeout after exception, mark NDG_FAILED
|
||||
if (System.currentTimeMillis() - startTime > twoHoursMs) {
|
||||
log.warn("Example: exiting single‐shot polling due to timeout after exception for applicationId: {}", application.getId());
|
||||
application.setNdgStatus(GepafinConstant.NDG_FAILED);
|
||||
applicationRepository.save(application);
|
||||
} else {
|
||||
log.info("Exception occurred but not timed out for applicationId {}. Returning to scheduler for next attempt in 15 minutes.", application.getId());
|
||||
}
|
||||
} finally {
|
||||
log.info("NDG polling completed for applicationId: {}", application.getId());
|
||||
}
|
||||
|
||||
log.info("NDG polling completed for applicationId: {}", application.getId());
|
||||
}
|
||||
|
||||
private static String getBearerToken(HubEntity hub) {
|
||||
@@ -594,12 +660,15 @@ public class AppointmentDao {
|
||||
|
||||
private void saveNdgAndIdVisura(ApplicationEntity application, CompanyEntity company, String ndg) {
|
||||
|
||||
ApplicationEntity oldApplication = Utils.getClonedEntityForData(application);
|
||||
application.setNdg(ndg);
|
||||
application.setNdgStatus(GepafinConstant.NDG_GENERATED);
|
||||
application.setStatus(ApplicationStatusTypeEnum.NDG.getValue());
|
||||
company.setNdg(ndg);
|
||||
companyRepository.save(company);
|
||||
applicationRepository.save(application);
|
||||
loggingUtil.addVersionHistory(
|
||||
VersionHistoryRequest.builder().request(request).actionType(VersionActionTypeEnum.UPDATE).oldData(oldApplication).newData(application).build());
|
||||
ApplicationEvaluationEntity applicationEvaluationEntity = applicationEvaluationService.validateApplicationEvaluation(application.getApplicationEvaluationId());
|
||||
// Map<String, String> placeHolders = notificationDao.sendNotificationToBeneficiary(application, NotificationTypeEnum.NDG_GENERATION);
|
||||
Map<String, String> placeHolders = new HashMap<>();
|
||||
|
||||
Reference in New Issue
Block a user