Detailed Implementation Analysis: ImportDataFromCSV1
This document provides an exhaustive, step-by-step breakdown of the ImportDataFromCSV1.java service. It outlines exactly what happens from the moment the service is triggered until the final log is written.
High-Level Architecture Flow
Phase 1: Initialization & Configuration (Constructor)
When importDataFromCSV1 is called via the service dispatcher, it immediately instantiates the ImportDataFromCSV1 class, passing the context parameters.
- Extract Parameters: It wraps the input context into an
InputParametersobject (extractingconfigId,jobName,logId,userLogin, etc.). - Fetch DataManagerConfig: It queries the database for
DataManagerConfigusing the providedconfigIdto determine custom settings (like the delimiter). - Configure CSV Format:
- It determines the
fieldDelimiter(e.g.,,,\t,|). - It identifies the
fieldEncapsulator(usually"). - It builds the Apache Commons
CSVFormatobject, setting it to ignore empty lines and surrounding spaces.
- It determines the
Phase 2: Preprocessing (processImport - Part 1)
Before parsing the actual file, the system checks if a preprocessor is required.
- Check Preprocessor Flag: Queries
DataManagerConfigforenablePreprocessorEvent. - Execute Preprocessor: If enabled (
Y), it calls thememberProductCSVPreprocessorservice synchronously.- What it does: The preprocessor reads the raw file, splits out obvious errors (like completely malformed rows) into a separate error file, and returns a new
uploadFileContentIdrepresenting the "clean" CSV that is safe to process.
- What it does: The preprocessor reads the raw file, splits out obvious errors (like completely malformed rows) into a separate error file, and returns a new
- Resolve File Path: Uses the
uploadFileContentIdto query theContentandDataResourceentities to find the absolute file path on the server. - Open Stream: Wraps the file in a
BOMInputStream(to strip byte-order marks) and aBufferedReader.
Phase 3: Parsing & Grouping (createDataRows)
This is the most memory-intensive phase. The system reads the entire file into memory before executing any business logic.
- Determine Grouping Strategy: It reads
import.csv.multithreading.groupByFieldfromimport.properties. If missing, it defaults to"mfg"(Manufacturer). - Process Header (
processHeaderRecord):- Reads the first row.
- Calls
DataImportUtil.getCSVHeaderwhich maps the raw CSV headers to the expected service parameters of the target business logic wrapper (e.g.,importMemberProductWrapper). - Creates arrays to track exactly which column index corresponds to which service parameter (
relevantFieldsPosition).
- Iterate Rows (
createDataRow):- For every subsequent row, it trims whitespace.
- It creates a
DataRowobject. - It injects generic parameters:
partdomain(ifwdPartyIdwas provided),userLogin, andlastImportedDate.
- Group the Rows:
- It checks the value of the
groupByField(e.g., the Manufacturer ID) for the current row. - It places the
DataRowinto aDataRowGroupinside thedataRowGroupsLinkedHashMap. - Why? To ensure that rows belonging to the same manufacturer are processed sequentially by a single thread, avoiding database locks on shared entities (like the manufacturer's
Partyrecord).
- It checks the value of the
Phase 4: Multi-Threaded Execution (importDataRowsWithMultiThreading)
The core parallel processing engine.
- Initialize Thread Pool:
- Reads
import.csv.multithreading.rampUpCount(default: 20). - Creates a
ThreadPoolExecutorwith a core and max pool size of 20, a 10-minute timeout, and a customThreadFactorynaming threadsDataImport-{logId}-{increment}. - Wraps it in an
ExecutorCompletionServiceto easily retrieve completed tasks.
- Reads
- Submit Tasks:
- Iterates through each
DataRowGroupin thedataRowGroupsmap. - Creates a
DataRowGroupProcessor(which implementsCallable). - Submits the task.
- Ramp-up Delay: It explicitly
Thread.sleep(100)between the first 20 submissions to prevent "slamming" the database with 20 simultaneous connection requests instantly.
- Iterates through each
- Execution (
DataRowGroupProcessor.processDataRowGroup):- The thread takes its
DataRowGroupand iterates through the rows sequentially. - For each row, it calls the underlying business service (e.g.,
importMemberProduct) viadispatcher.runSync(...). - If the service fails, it catches the error and marks the
DataRowas an error, extracting flags likemandatoryFieldMissing. - If successful, it extracts flags like
isNewProduct,isBrandMapExist, etc.
- The thread takes its
- Collect Results:
- The main thread uses a
forloop to callcompletionService.take().get(), blocking until each group completes. - Cancellation Check: In every loop iteration, it queries the
DataManagerLogtable. If the status isSERVICE_CANCELLED, it immediately shuts down the thread pool (shutdownNow) and breaks the loop. - It tallies the results (
processedRecords,succeedRecords,errorSize, etc.).
- The main thread uses a
Phase 5: Post-Processing & Result Generation (processImport - Part 2)
After all threads finish or are cancelled, the system wraps up.
- Empty File Check: If
processedRecords == 0, it triggerssendMemberProductFailureNotificationvia email and aborts. - Error File Generation (
generateErrorFile):- If any rows failed (
errorRecords.size() > 1), it generates a new CSV namedError_Records_CSV_{jobName}.csv. - It writes the failed rows, appending the specific error message to the last column.
- It creates a new OFBiz
ContentandDataResourcerecord for this file. - It updates the
DataManagerLogwith theerrorRecordContentIdso users can download it from the UI. - It triggers an email notification.
- If any rows failed (
- Unmapped File Generation (
generateUnmappedRecordsFile):- Similar to the error file, if any rows were flagged as "unmapped", it generates
Unmapped_Records_{jobName}.csvand attaches it to the log.
- Similar to the error file, if any rows were flagged as "unmapped", it generates
- Scorecard Generation (
createDataManagerLogItems):- Calls the
createDataManagerLogItemsservice to insert the final statistics (Total Items, Success, Error, New Parts, Missing Mandatory) into the database for UI display. - Updates the
DataManagerLogstatus toFILE_SUCCESS,FILE_ERROR, orFILE_PART_SUCCESS.
- Calls the
- Cleanup: Clears the Logging MDC context and returns the final success/error map to the user.
Sub-Service Trace: Synchronous & Asynchronous Invocations
Throughout the execution of ImportDataFromCSV1, several sub-services are invoked to handle file processing, logging, and actual data persistence. Here is the detailed breakdown of these services:
1. memberProductCSVPreprocessor (Synchronous)
- Where it runs:
Phase 2: Preprocessing(Invoked byprocessImport()). - Purpose: Filters out severely malformed rows before the main multithreaded engine attempts to parse them.
- Detailed Flow:
- Reads the raw uploaded CSV file.
- Iterates line by line. It performs a basic structure check (splitting by
\t") to identify unparseable or incorrectly delimited lines. - Writes valid lines to a new "clean" CSV file.
- Writes invalid lines to a new "Error" CSV file.
- Invokes
createContentFromUploadedFile(Sync) to register both the clean and error files in the system. - Invokes
updateDataManagerLog(Sync) to attach these new files to the activeDataManagerLog. - Returns the new
uploadFileContentIdrepresenting the "clean" file to the main import process.
2. createContentFromUploadedFile (Synchronous)
- Where it runs: Called by
memberProductCSVPreprocessor, and later byImportDataFromCSV1when generating the final Error/Unmapped files. - Purpose: Standardizes file tracking within the OFBiz framework.
- Detailed Flow: Creates a
DataResourcerecord (representing the physical file path/binary) and aContentrecord (metadata like MIME type and file name). This allows the files to be downloaded from the AutoCore UI via acontentId.
3. updateDataManagerLog (Synchronous)
- Where it runs: Called by
memberProductCSVPreprocessor, and byImportDataFromCSV1during post-processing. - Purpose: Updates the main
DataManagerLogentry. For example, it updates the log with the newuploadFileContentId(the clean file) andexportFileContentId(the error CSV file) so that the user interface correctly links to the generated error reports.
4. Business Wrapper Service (e.g., importMemberProductWrapper) (Synchronous within Async Thread)
- Where it runs: Inside the
DataRowGroupProcessor(Phase 4). - Purpose: Performs the actual data mapping and database insertion for a single CSV row.
- Detailed Flow:
- The
ImportDataFromCSV1main thread spawns multiple asynchronous worker threads viaThreadPoolExecutor. - Inside each worker thread (
DataRowGroupProcessor), it iterates over its assigned rows and callsdispatcher.runSync(this.modelService.getName(), ...)synchronously. - This ensures that while multiple rows are processed in parallel (across different threads), each individual row waits for its specific database transaction to commit or rollback before moving to the next row in that specific thread's group.
- The
5. sendMemberProductFailureNotification (Synchronous)
- Where it runs: Post-processing (Phase 5) or upon immediate failure (e.g., empty file).
- Purpose: Triggers an email notification to the administrator or user indicating that the import job failed completely, or generated significant error records.
6. createDataManagerLogItems (Synchronous)
- Where it runs: At the very end of Phase 5.
- Purpose: Scorecard generation.
- Detailed Flow: Persists the granular import statistics (
totalRecordSize,succeedRecordSize,errorRecordSize,newPartProcessedCount, etc.) to theDataManagerLogItementity, which powers the dashboard analytics for the import job.