From 2a5f344ea0fdeab800e099c43b12d145dfc62632 Mon Sep 17 00:00:00 2001 From: piyushkag Date: Tue, 10 Dec 2024 12:47:11 +0530 Subject: [PATCH] Added async loading for document uploading to external system. --- .../constants/GepafinConstant.java | 2 + .../tendermanagement/dao/AppointmentDao.java | 117 +++++++++++------- .../service/AppointmentService.java | 2 +- .../service/impl/AppointmentServiceImpl.java | 5 +- .../gepafin/tendermanagement/util/Utils.java | 21 +++- .../web/rest/api/AppointmentApi.java | 3 +- .../rest/api/impl/AppointmentController.java | 4 +- src/main/resources/message_en.properties | 1 + src/main/resources/message_it.properties | 1 + 9 files changed, 100 insertions(+), 56 deletions(-) diff --git a/src/main/java/net/gepafin/tendermanagement/constants/GepafinConstant.java b/src/main/java/net/gepafin/tendermanagement/constants/GepafinConstant.java index b553df9f..14d82b91 100644 --- a/src/main/java/net/gepafin/tendermanagement/constants/GepafinConstant.java +++ b/src/main/java/net/gepafin/tendermanagement/constants/GepafinConstant.java @@ -339,5 +339,7 @@ public class GepafinConstant { public static final String HMAC_ALGO = "HmacSHA256"; public static final String ERROR_IN_GENERATING_NDG_TRY_AGAIN = "error.try.again"; public static final String POLLING_THREAD_NAME = "Ndg-Polling-Thread-"; + public static final String DOCUMENT_UPLOADING_IN_PROGRESS = "document.uploading.is.in.progress"; + public static final String ASYNC_DOCUMENT_UPLOAD_NAME = "AsyncDocumentUpload-"; } diff --git a/src/main/java/net/gepafin/tendermanagement/dao/AppointmentDao.java b/src/main/java/net/gepafin/tendermanagement/dao/AppointmentDao.java index 86a6aceb..1b644d6e 100644 --- a/src/main/java/net/gepafin/tendermanagement/dao/AppointmentDao.java +++ b/src/main/java/net/gepafin/tendermanagement/dao/AppointmentDao.java @@ -5,9 +5,11 @@ import com.amazonaws.services.s3.model.GetObjectRequest; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import feign.FeignException; +import io.jsonwebtoken.Claims; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; import net.gepafin.tendermanagement.config.Translator; +import net.gepafin.tendermanagement.config.jwt.TokenProvider; import net.gepafin.tendermanagement.constants.AppointmentApiConstant; import net.gepafin.tendermanagement.constants.GepafinConstant; import net.gepafin.tendermanagement.entities.ApplicationEntity; @@ -125,8 +127,15 @@ public class AppointmentDao { @Autowired private LoggingUtil loggingUtil; + @Autowired + private TokenProvider tokenProvider; + private final Map executorMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap threadForDocumentMap = new ConcurrentHashMap<>(); + + private static final ThreadLocal threadLocalHubId = new ThreadLocal<>(); + public NdgResponse checkNdgForAppointment(Long applicationId) { ApplicationEntity application = applicationService.validateApplication(applicationId); @@ -719,87 +728,101 @@ public class AppointmentDao { return appointmentCreationRequest; } - public DocumentUploadResponse uploadDocumentToExternalSystem(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest, Long applicationId) { - - DocumentUploadResponse response = new DocumentUploadResponse(); + public DocumentUploadResponse uploadDocumentToExternalSystem(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) { + // Check if the document is already being processed DocumentEntity systemDoc = documentDao.validateDocument(documentId); - ApplicationEntity application = getApplicationEntityForDocument(applicationId, systemDoc); - - //cloned for old document data - DocumentEntity oldDocumentEntity = Utils.getClonedEntityForData(systemDoc); - - if (!docToExternalSystemRequest.getInput().getAttributes().getNdg().equalsIgnoreCase(application.getNdg())) { - throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.NDG_NOT_MATCHED_OR_NOT_FOUND)); + Claims claims = tokenProvider.getClaimsFromToken(tokenProvider.extractTokenFromRequest(request)); + Long hubId = Utils.extractHubIdFromPayload(claims.getSubject()); + if (systemDoc.getDocumentAttachmentId() != null) { + // If the documentAttachmentId is already set, return the response + log.info("Document already uploaded with documentAttachmentId: {}", systemDoc.getDocumentAttachmentId()); + DocumentUploadResponse response = new DocumentUploadResponse(); + response.setDocumentAttachmentId(systemDoc.getDocumentAttachmentId()); + return response; } + // Check if a thread is already running for this document upload + if (threadForDocumentMap.containsKey(documentId)) { + log.warn("Document upload already running for documentId: {}", documentId); + throw new CustomValidationException(Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADING_IN_PROGRESS)); + } + // Start the upload process in the background + ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName(GepafinConstant.ASYNC_DOCUMENT_UPLOAD_NAME + documentId); + return thread; + }); + threadForDocumentMap.put(documentId, executor); - Long hubId = application.getHubId(); + executor.submit(() -> { + threadLocalHubId.set(hubId); + try { + log.info("Starting async document upload for documentId: {}", documentId); + uploadDocumentToExternalSystemSync(documentId, docToExternalSystemRequest); + } catch (Exception e) { + log.error("Error in async document upload for documentId: {}", documentId, e); + } finally { + // Cleanup resources + ExecutorService executorToShutdown = threadForDocumentMap.remove(documentId); + if (executorToShutdown != null) { + executorToShutdown.shutdown(); + threadLocalHubId.remove(); + } + log.info("Async document upload completed for documentId: {}", documentId); + } + }); + // Return an immediate response indicating the process is in progress + throw new CustomValidationException(Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADING_IN_PROGRESS)); + } + + private void uploadDocumentToExternalSystemSync(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) { + // Synchronous upload logic + DocumentEntity systemDoc = documentDao.validateDocument(documentId); + + Long hubId = threadLocalHubId.get(); HubEntity hub = hubRepository.findByHubId(hubId); if (!hub.getUniqueUuid().equals(defaultHubUuid)) { log.info("Document cannot be uploaded for another Hub, it is default for Gepafin."); throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.NO_DOCUMENT_UPLOAD_FOR_ANOTHER_HUB)); } - log.info("Got Document in system {}", systemDoc); + log.info("Got Document in system: {}", systemDoc); String oldUrl = systemDoc.getFilePath(); - log.info("Processing {}", oldUrl); - String authorizationToken = getBearerToken(hub); + try { File localFile = downloadFileFromS3(oldUrl); MultipartFile multipartFile = convertFileToMultipartFile(localFile); UploadDocToExternalSystemRequest externalSystemRequest = new UploadDocToExternalSystemRequest(); - - UploadDocToExternalSystemRequest.Input input = getUploadDocumentInput(docToExternalSystemRequest); - externalSystemRequest.setInput(input); + externalSystemRequest.setInput(getUploadDocumentInput(docToExternalSystemRequest)); String uploadDocRequest = Utils.convertObjectToJson(externalSystemRequest); ResponseEntity uploadedDocumentData = appointmentApiService.uploadDocumentToExternalSystemForAppointment(authorizationToken, context, uploadDocRequest, multipartFile); - String responseData = Utils.convertObjectToJson(uploadedDocumentData.getBody()); - DocumentUploadResponse parsedDocumentUploadResponse = parseDocumentUploadResponse(responseData); - if (parsedDocumentUploadResponse == null) { + String responseData = Utils.convertObjectToJson(uploadedDocumentData.getBody()); + DocumentUploadResponse parsedResponse = parseDocumentUploadResponse(responseData); + + if (parsedResponse == null) { throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.ERROR_UPLOADING_DOCUMENT)); } - systemDoc.setDocumentAttachmentId(parsedDocumentUploadResponse.getDocumentAttachmentId()); + // Save the documentAttachmentId to the database + systemDoc.setDocumentAttachmentId(parsedResponse.getDocumentAttachmentId()); documentRepository.save(systemDoc); - /** This code is responsible for adding a version history log for the "Update document with document attachment id" operation. **/ - loggingUtil.addVersionHistory( - VersionHistoryRequest.builder().request(request).actionType(VersionActionTypeEnum.UPDATE).oldData(oldDocumentEntity).newData(systemDoc).build()); - - log.info("Document uploaded successfully to external system : {}", parsedDocumentUploadResponse); - response.setDocumentAttachmentId(systemDoc.getDocumentAttachmentId()); - return response; + log.info("Document uploaded successfully to external system: {}", parsedResponse); } catch (FeignException.Forbidden forbiddenException) { - log.error("403 Forbidden received while uploading document to external system. Regenerating token..."); - + log.error("403 Forbidden received while uploading document. Regenerating token..."); regenerateTokenAndSave(hub); - return uploadDocumentToExternalSystem(systemDoc.getSourceId(), docToExternalSystemRequest, applicationId); + uploadDocumentToExternalSystemSync(documentId, docToExternalSystemRequest); } catch (Exception e) { - log.error("Exception in uploading document to external system {}", e.getMessage()); + log.error("Exception during document upload: {}", e.getMessage(), e); throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.EXTERNAL_DOCUMENT_UPLOAD_FAILURE_MSG)); } } - private ApplicationEntity getApplicationEntityForDocument(Long applicationId, DocumentEntity systemDoc) { - - if (systemDoc.getDocumentAttachmentId() != null) { - throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.DOCUMENT_ALREADY_UPLOADED)); - } - - ApplicationEntity application; - - if (systemDoc.getSource().equalsIgnoreCase(DocumentSourceTypeEnum.APPLICATION.getValue()) && Objects.equals(systemDoc.getSourceId(), applicationId)) { - application = applicationService.validateApplication(systemDoc.getSourceId()); - } else { - throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.PROVIDE_VALID_APPLICATION_DOC_ID)); - } - return application; - } private UploadDocToExternalSystemRequest.Input getUploadDocumentInput(UploadDocToExternalSystemRequest docToExternalSystemRequest) { diff --git a/src/main/java/net/gepafin/tendermanagement/service/AppointmentService.java b/src/main/java/net/gepafin/tendermanagement/service/AppointmentService.java index 512014f3..842901a4 100644 --- a/src/main/java/net/gepafin/tendermanagement/service/AppointmentService.java +++ b/src/main/java/net/gepafin/tendermanagement/service/AppointmentService.java @@ -12,5 +12,5 @@ public interface AppointmentService { AppointmentCreationResponse createAppointmentForApplication(HttpServletRequest request, Long applicationId, CreateAppointmentRequest createAppointmentRequest); - DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest, Long applicationId); + DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest); } diff --git a/src/main/java/net/gepafin/tendermanagement/service/impl/AppointmentServiceImpl.java b/src/main/java/net/gepafin/tendermanagement/service/impl/AppointmentServiceImpl.java index f9c1ba85..2e7960df 100644 --- a/src/main/java/net/gepafin/tendermanagement/service/impl/AppointmentServiceImpl.java +++ b/src/main/java/net/gepafin/tendermanagement/service/impl/AppointmentServiceImpl.java @@ -30,9 +30,8 @@ public class AppointmentServiceImpl implements AppointmentService { } @Override - public DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest, - Long applicationId) { + public DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) { - return appointmentDao.uploadDocumentToExternalSystem(documentId, docToExternalSystemRequest, applicationId); + return appointmentDao.uploadDocumentToExternalSystem(documentId, docToExternalSystemRequest); } } diff --git a/src/main/java/net/gepafin/tendermanagement/util/Utils.java b/src/main/java/net/gepafin/tendermanagement/util/Utils.java index a560b29d..e93c8ed9 100644 --- a/src/main/java/net/gepafin/tendermanagement/util/Utils.java +++ b/src/main/java/net/gepafin/tendermanagement/util/Utils.java @@ -19,12 +19,15 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.jsonwebtoken.Claims; import jakarta.persistence.ManyToMany; import jakarta.persistence.ManyToOne; import jakarta.persistence.OneToMany; import jakarta.persistence.OneToOne; import jakarta.servlet.http.HttpServletRequest; +import net.gepafin.tendermanagement.config.Translator; import net.gepafin.tendermanagement.constants.GepafinConstant; +import net.gepafin.tendermanagement.web.rest.api.errors.CustomValidationException; import org.apache.commons.collections4.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -650,4 +653,20 @@ public class Utils { // log.info("Successfully set mock request for NDG process with URI: {}", mockRequest.getRequestURI()); // } -} + public static Long extractHubIdFromPayload(String payload) { + + Long hubId; + try { + String[] parts = payload.split(":"); + if (parts.length > 2) { + hubId = Long.valueOf(parts[2]); + return hubId; + } else { + hubId = null; + } + } catch (Exception e) { + throw new RuntimeException("No Hub id present in payload", e); + } + return null; + } +} \ No newline at end of file diff --git a/src/main/java/net/gepafin/tendermanagement/web/rest/api/AppointmentApi.java b/src/main/java/net/gepafin/tendermanagement/web/rest/api/AppointmentApi.java index 04616f92..5507492a 100644 --- a/src/main/java/net/gepafin/tendermanagement/web/rest/api/AppointmentApi.java +++ b/src/main/java/net/gepafin/tendermanagement/web/rest/api/AppointmentApi.java @@ -52,9 +52,8 @@ public interface AppointmentApi { @ExampleObject(value = ErrorConstants.UNAUTHORIZED_ERROR_EXAMPLE) })), @ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, examples = { @ExampleObject(value = ErrorConstants.BADREQUEST_ERROR_EXAMPLE) })) }) - @PostMapping(value = "/application/{applicationId}/document/{documentId}", produces = MediaType.APPLICATION_JSON_VALUE) + @PostMapping(value = "/document/{documentId}", produces = MediaType.APPLICATION_JSON_VALUE) ResponseEntity> uploadDocumentToExternalSystem(HttpServletRequest request, @Parameter(description = "The document id", required = true) @PathVariable(value = "documentId", required = true) Long documentId, - @Parameter(description = "The application id", required = true) @PathVariable(value = "applicationId", required = true) Long applicationId, @RequestBody UploadDocToExternalSystemRequest docToExternalSystemRequest); } diff --git a/src/main/java/net/gepafin/tendermanagement/web/rest/api/impl/AppointmentController.java b/src/main/java/net/gepafin/tendermanagement/web/rest/api/impl/AppointmentController.java index a8986626..ca55e55f 100644 --- a/src/main/java/net/gepafin/tendermanagement/web/rest/api/impl/AppointmentController.java +++ b/src/main/java/net/gepafin/tendermanagement/web/rest/api/impl/AppointmentController.java @@ -61,14 +61,14 @@ public class AppointmentController implements AppointmentApi { } @Override - public ResponseEntity> uploadDocumentToExternalSystem(HttpServletRequest request, Long documentId, Long applicationId, + public ResponseEntity> uploadDocumentToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) { /** This code is responsible for creating user action logs for the "Upload document to external system" operation. **/ loggingUtil.logUserAction( UserActionRequest.builder().request(request).actionType(UserActionLogsEnum.UPLOAD).actionContext(UserActionContextEnum.UPLOAD_DOCUMENT_TO_EXTERNAL_SYSTEM).build()); - DocumentUploadResponse documentUploadResponse = appointmentService.uploadDocToExternalSystem(request, documentId, docToExternalSystemRequest, applicationId); + DocumentUploadResponse documentUploadResponse = appointmentService.uploadDocToExternalSystem(request, documentId, docToExternalSystemRequest); return ResponseEntity.status(HttpStatus.OK) .body(new Response<>(documentUploadResponse, Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADED_SUCCESSFULLY_TO_EXTERNAL_SYSTEM))); diff --git a/src/main/resources/message_en.properties b/src/main/resources/message_en.properties index 487c8d95..24150081 100644 --- a/src/main/resources/message_en.properties +++ b/src/main/resources/message_en.properties @@ -334,3 +334,4 @@ appointment.creation.is.only.for.gepafin = Appointment creation is only allowed upload.document.is.only.for.gepafin = Document cant be uploaded, this is only available for GEPAFIN Hub. appointment.created.successfully = Appointment created successfully. error.try.again = Service call error while performing the operation. Please try again. +document.uploading.is.in.progress = Document uploading is in progress. diff --git a/src/main/resources/message_it.properties b/src/main/resources/message_it.properties index ac8b75d9..084fe6aa 100644 --- a/src/main/resources/message_it.properties +++ b/src/main/resources/message_it.properties @@ -324,3 +324,4 @@ appointment.creation.is.only.for.gepafin = La creazione degli appuntamenti ? con upload.document.is.only.for.gepafin = Il documento non pu? essere caricato, questa operazione ? disponibile solo per il Hub GEPAFIN. appointment.created.successfully = Appuntamento creato con successo. error.try.again = Errore di chiamata di servizio durante l'esecuzione dell'operazione. Riprovare. +document.uploading.is.in.progress = Il documento è in fase di caricamento.