Added async loading for document uploading to external system.

This commit is contained in:
piyushkag
2024-12-10 12:47:11 +05:30
parent 05f64af404
commit 2a5f344ea0
9 changed files with 100 additions and 56 deletions

View File

@@ -339,5 +339,7 @@ public class GepafinConstant {
public static final String HMAC_ALGO = "HmacSHA256";
public static final String ERROR_IN_GENERATING_NDG_TRY_AGAIN = "error.try.again";
public static final String POLLING_THREAD_NAME = "Ndg-Polling-Thread-";
public static final String DOCUMENT_UPLOADING_IN_PROGRESS = "document.uploading.is.in.progress";
public static final String ASYNC_DOCUMENT_UPLOAD_NAME = "AsyncDocumentUpload-";
}

View File

@@ -5,9 +5,11 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import feign.FeignException;
import io.jsonwebtoken.Claims;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import net.gepafin.tendermanagement.config.Translator;
import net.gepafin.tendermanagement.config.jwt.TokenProvider;
import net.gepafin.tendermanagement.constants.AppointmentApiConstant;
import net.gepafin.tendermanagement.constants.GepafinConstant;
import net.gepafin.tendermanagement.entities.ApplicationEntity;
@@ -125,8 +127,15 @@ public class AppointmentDao {
@Autowired
private LoggingUtil loggingUtil;
@Autowired
private TokenProvider tokenProvider;
private final Map<Long, ExecutorService> executorMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, ExecutorService> threadForDocumentMap = new ConcurrentHashMap<>();
private static final ThreadLocal<Long> threadLocalHubId = new ThreadLocal<>();
public NdgResponse checkNdgForAppointment(Long applicationId) {
ApplicationEntity application = applicationService.validateApplication(applicationId);
@@ -719,87 +728,101 @@ public class AppointmentDao {
return appointmentCreationRequest;
}
public DocumentUploadResponse uploadDocumentToExternalSystem(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest, Long applicationId) {
DocumentUploadResponse response = new DocumentUploadResponse();
public DocumentUploadResponse uploadDocumentToExternalSystem(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) {
// Check if the document is already being processed
DocumentEntity systemDoc = documentDao.validateDocument(documentId);
ApplicationEntity application = getApplicationEntityForDocument(applicationId, systemDoc);
//cloned for old document data
DocumentEntity oldDocumentEntity = Utils.getClonedEntityForData(systemDoc);
if (!docToExternalSystemRequest.getInput().getAttributes().getNdg().equalsIgnoreCase(application.getNdg())) {
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.NDG_NOT_MATCHED_OR_NOT_FOUND));
Claims claims = tokenProvider.getClaimsFromToken(tokenProvider.extractTokenFromRequest(request));
Long hubId = Utils.extractHubIdFromPayload(claims.getSubject());
if (systemDoc.getDocumentAttachmentId() != null) {
// If the documentAttachmentId is already set, return the response
log.info("Document already uploaded with documentAttachmentId: {}", systemDoc.getDocumentAttachmentId());
DocumentUploadResponse response = new DocumentUploadResponse();
response.setDocumentAttachmentId(systemDoc.getDocumentAttachmentId());
return response;
}
// Check if a thread is already running for this document upload
if (threadForDocumentMap.containsKey(documentId)) {
log.warn("Document upload already running for documentId: {}", documentId);
throw new CustomValidationException(Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADING_IN_PROGRESS));
}
// Start the upload process in the background
ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName(GepafinConstant.ASYNC_DOCUMENT_UPLOAD_NAME + documentId);
return thread;
});
threadForDocumentMap.put(documentId, executor);
Long hubId = application.getHubId();
executor.submit(() -> {
threadLocalHubId.set(hubId);
try {
log.info("Starting async document upload for documentId: {}", documentId);
uploadDocumentToExternalSystemSync(documentId, docToExternalSystemRequest);
} catch (Exception e) {
log.error("Error in async document upload for documentId: {}", documentId, e);
} finally {
// Cleanup resources
ExecutorService executorToShutdown = threadForDocumentMap.remove(documentId);
if (executorToShutdown != null) {
executorToShutdown.shutdown();
threadLocalHubId.remove();
}
log.info("Async document upload completed for documentId: {}", documentId);
}
});
// Return an immediate response indicating the process is in progress
throw new CustomValidationException(Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADING_IN_PROGRESS));
}
private void uploadDocumentToExternalSystemSync(Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) {
// Synchronous upload logic
DocumentEntity systemDoc = documentDao.validateDocument(documentId);
Long hubId = threadLocalHubId.get();
HubEntity hub = hubRepository.findByHubId(hubId);
if (!hub.getUniqueUuid().equals(defaultHubUuid)) {
log.info("Document cannot be uploaded for another Hub, it is default for Gepafin.");
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.NO_DOCUMENT_UPLOAD_FOR_ANOTHER_HUB));
}
log.info("Got Document in system {}", systemDoc);
log.info("Got Document in system: {}", systemDoc);
String oldUrl = systemDoc.getFilePath();
log.info("Processing {}", oldUrl);
String authorizationToken = getBearerToken(hub);
try {
File localFile = downloadFileFromS3(oldUrl);
MultipartFile multipartFile = convertFileToMultipartFile(localFile);
UploadDocToExternalSystemRequest externalSystemRequest = new UploadDocToExternalSystemRequest();
UploadDocToExternalSystemRequest.Input input = getUploadDocumentInput(docToExternalSystemRequest);
externalSystemRequest.setInput(input);
externalSystemRequest.setInput(getUploadDocumentInput(docToExternalSystemRequest));
String uploadDocRequest = Utils.convertObjectToJson(externalSystemRequest);
ResponseEntity<Object> uploadedDocumentData = appointmentApiService.uploadDocumentToExternalSystemForAppointment(authorizationToken, context, uploadDocRequest,
multipartFile);
String responseData = Utils.convertObjectToJson(uploadedDocumentData.getBody());
DocumentUploadResponse parsedDocumentUploadResponse = parseDocumentUploadResponse(responseData);
if (parsedDocumentUploadResponse == null) {
String responseData = Utils.convertObjectToJson(uploadedDocumentData.getBody());
DocumentUploadResponse parsedResponse = parseDocumentUploadResponse(responseData);
if (parsedResponse == null) {
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.ERROR_UPLOADING_DOCUMENT));
}
systemDoc.setDocumentAttachmentId(parsedDocumentUploadResponse.getDocumentAttachmentId());
// Save the documentAttachmentId to the database
systemDoc.setDocumentAttachmentId(parsedResponse.getDocumentAttachmentId());
documentRepository.save(systemDoc);
/** This code is responsible for adding a version history log for the "Update document with document attachment id" operation. **/
loggingUtil.addVersionHistory(
VersionHistoryRequest.builder().request(request).actionType(VersionActionTypeEnum.UPDATE).oldData(oldDocumentEntity).newData(systemDoc).build());
log.info("Document uploaded successfully to external system : {}", parsedDocumentUploadResponse);
response.setDocumentAttachmentId(systemDoc.getDocumentAttachmentId());
return response;
log.info("Document uploaded successfully to external system: {}", parsedResponse);
} catch (FeignException.Forbidden forbiddenException) {
log.error("403 Forbidden received while uploading document to external system. Regenerating token...");
log.error("403 Forbidden received while uploading document. Regenerating token...");
regenerateTokenAndSave(hub);
return uploadDocumentToExternalSystem(systemDoc.getSourceId(), docToExternalSystemRequest, applicationId);
uploadDocumentToExternalSystemSync(documentId, docToExternalSystemRequest);
} catch (Exception e) {
log.error("Exception in uploading document to external system {}", e.getMessage());
log.error("Exception during document upload: {}", e.getMessage(), e);
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.EXTERNAL_DOCUMENT_UPLOAD_FAILURE_MSG));
}
}
private ApplicationEntity getApplicationEntityForDocument(Long applicationId, DocumentEntity systemDoc) {
if (systemDoc.getDocumentAttachmentId() != null) {
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.DOCUMENT_ALREADY_UPLOADED));
}
ApplicationEntity application;
if (systemDoc.getSource().equalsIgnoreCase(DocumentSourceTypeEnum.APPLICATION.getValue()) && Objects.equals(systemDoc.getSourceId(), applicationId)) {
application = applicationService.validateApplication(systemDoc.getSourceId());
} else {
throw new CustomValidationException(Status.BAD_REQUEST, Translator.toLocale(GepafinConstant.PROVIDE_VALID_APPLICATION_DOC_ID));
}
return application;
}
private UploadDocToExternalSystemRequest.Input getUploadDocumentInput(UploadDocToExternalSystemRequest docToExternalSystemRequest) {

View File

@@ -12,5 +12,5 @@ public interface AppointmentService {
AppointmentCreationResponse createAppointmentForApplication(HttpServletRequest request, Long applicationId, CreateAppointmentRequest createAppointmentRequest);
DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest, Long applicationId);
DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest);
}

View File

@@ -30,9 +30,8 @@ public class AppointmentServiceImpl implements AppointmentService {
}
@Override
public DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest,
Long applicationId) {
public DocumentUploadResponse uploadDocToExternalSystem(HttpServletRequest request, Long documentId, UploadDocToExternalSystemRequest docToExternalSystemRequest) {
return appointmentDao.uploadDocumentToExternalSystem(documentId, docToExternalSystemRequest, applicationId);
return appointmentDao.uploadDocumentToExternalSystem(documentId, docToExternalSystemRequest);
}
}

View File

@@ -19,12 +19,15 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.jsonwebtoken.Claims;
import jakarta.persistence.ManyToMany;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;
import jakarta.servlet.http.HttpServletRequest;
import net.gepafin.tendermanagement.config.Translator;
import net.gepafin.tendermanagement.constants.GepafinConstant;
import net.gepafin.tendermanagement.web.rest.api.errors.CustomValidationException;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -650,4 +653,20 @@ public class Utils {
// log.info("Successfully set mock request for NDG process with URI: {}", mockRequest.getRequestURI());
// }
}
public static Long extractHubIdFromPayload(String payload) {
Long hubId;
try {
String[] parts = payload.split(":");
if (parts.length > 2) {
hubId = Long.valueOf(parts[2]);
return hubId;
} else {
hubId = null;
}
} catch (Exception e) {
throw new RuntimeException("No Hub id present in payload", e);
}
return null;
}
}

View File

@@ -52,9 +52,8 @@ public interface AppointmentApi {
@ExampleObject(value = ErrorConstants.UNAUTHORIZED_ERROR_EXAMPLE) })),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, examples = {
@ExampleObject(value = ErrorConstants.BADREQUEST_ERROR_EXAMPLE) })) })
@PostMapping(value = "/application/{applicationId}/document/{documentId}", produces = MediaType.APPLICATION_JSON_VALUE)
@PostMapping(value = "/document/{documentId}", produces = MediaType.APPLICATION_JSON_VALUE)
ResponseEntity<Response<DocumentUploadResponse>> uploadDocumentToExternalSystem(HttpServletRequest request,
@Parameter(description = "The document id", required = true) @PathVariable(value = "documentId", required = true) Long documentId,
@Parameter(description = "The application id", required = true) @PathVariable(value = "applicationId", required = true) Long applicationId,
@RequestBody UploadDocToExternalSystemRequest docToExternalSystemRequest);
}

View File

@@ -61,14 +61,14 @@ public class AppointmentController implements AppointmentApi {
}
@Override
public ResponseEntity<Response<DocumentUploadResponse>> uploadDocumentToExternalSystem(HttpServletRequest request, Long documentId, Long applicationId,
public ResponseEntity<Response<DocumentUploadResponse>> uploadDocumentToExternalSystem(HttpServletRequest request, Long documentId,
UploadDocToExternalSystemRequest docToExternalSystemRequest) {
/** This code is responsible for creating user action logs for the "Upload document to external system" operation. **/
loggingUtil.logUserAction(
UserActionRequest.builder().request(request).actionType(UserActionLogsEnum.UPLOAD).actionContext(UserActionContextEnum.UPLOAD_DOCUMENT_TO_EXTERNAL_SYSTEM).build());
DocumentUploadResponse documentUploadResponse = appointmentService.uploadDocToExternalSystem(request, documentId, docToExternalSystemRequest, applicationId);
DocumentUploadResponse documentUploadResponse = appointmentService.uploadDocToExternalSystem(request, documentId, docToExternalSystemRequest);
return ResponseEntity.status(HttpStatus.OK)
.body(new Response<>(documentUploadResponse, Status.SUCCESS, Translator.toLocale(GepafinConstant.DOCUMENT_UPLOADED_SUCCESSFULLY_TO_EXTERNAL_SYSTEM)));

View File

@@ -334,3 +334,4 @@ appointment.creation.is.only.for.gepafin = Appointment creation is only allowed
upload.document.is.only.for.gepafin = Document cant be uploaded, this is only available for GEPAFIN Hub.
appointment.created.successfully = Appointment created successfully.
error.try.again = Service call error while performing the operation. Please try again.
document.uploading.is.in.progress = Document uploading is in progress.

View File

@@ -324,3 +324,4 @@ appointment.creation.is.only.for.gepafin = La creazione degli appuntamenti ? con
upload.document.is.only.for.gepafin = Il documento non pu? essere caricato, questa operazione ? disponibile solo per il Hub GEPAFIN.
appointment.created.successfully = Appuntamento creato con successo.
error.try.again = Errore di chiamata di servizio durante l'esecuzione dell'operazione. Riprovare.
document.uploading.is.in.progress = Il documento è in fase di caricamento.