Back to Blog
Backend ArchitectureAsync ProcessingTask Queue

Large-Scale Data Import/Export in Enterprise Systems: From Synchronous to Asynchronous Architecture

Sharing our practical experience in handling large-scale Excel import/export in enterprise management systems, from synchronous blocking to asynchronous queue solutions.

Author: ekent·Published on February 7, 2026

When developing management systems for small and medium enterprises, we often encounter scenarios where users need to import thousands or even tens of thousands of records, or export large reports. If not handled properly, the system will experience request timeouts and frozen pages. Today, I'll share how we solved this problem in our real projects.

The Problem: Synchronous Processing Dilemma

Imagine you're developing an enterprise management system where users need to bulk import product data:

// ❌ Traditional synchronous approach
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
  const workbook = new ExcelJS.Workbook();
  await workbook.xlsx.read(file.buffer);  // Read Excel

  const rows = workbook.getWorksheet(1).getRows();

  // Process all data directly in the request
  for (const row of rows) {
    await this.productService.create({...});  // Insert row by row
  }

  return { success: true, count: rows.length };
}

Problems with this approach:

  1. HTTP Timeout: Processing 5000 records may take 2-3 minutes, browser will timeout
  2. Poor UX: Page freezes, users don't know the progress
  3. Server Pressure: Single request occupies resources too long
  4. No Traceability: After failure, unclear which data succeeded or failed

Solution: Asynchronous Task Queue

Our solution is to introduce Bull + Redis asynchronous task queue:

Architecture Design

User uploads file
    ↓
API receives and creates task record (returns task ID immediately)
    ↓
File uploaded to OSS private bucket
    ↓
Task enters Redis queue
    ↓
Worker process handles asynchronously
    ↓
Results written to database + error file uploaded to OSS
    ↓
User queries progress and results via task ID

Core Implementation

1. Task Creation (Second-level Response)

// ✅ Asynchronous approach
@Post('import')
async importProducts(@UploadedFile() file: Express.Multer.File) {
  // 1. Upload file to OSS private bucket
  const fileUrl = await this.uploadService.uploadPrivate(
    file.buffer,
    'imports/products'
  );

  // 2. Create task record
  const task = await this.taskService.create({
    taskNo: this.generateTaskNo(),      // T[timestamp][random]
    taskType: TaskType.IMPORT,
    businessType: 'PRODUCT_IMPORT',
    status: TaskStatus.PENDING,
    fileUrl,
    userId: req.user.id,
    expiredAt: addDays(new Date(), 7),  // Auto-cleanup after 7 days
  });

  // 3. Add to queue (non-blocking)
  await this.importQueue.add('import', {
    taskId: task.id,
    fileUrl,
  });

  // 4. Return task ID immediately
  return {
    taskId: task.id,
    taskNo: task.taskNo,
    message: 'Import task created, processing in background'
  };
}

2. Worker Asynchronous Processing

@Processor('import-export')
export class ImportExportProcessor {

  @Process({ name: 'import', concurrency: 2 })  // Concurrency set to 2
  async handleImport(job: Job) {
    const { taskId, fileUrl } = job.data;

    try {
      // Update task status to "PROCESSING"
      await this.taskService.update(taskId, {
        status: TaskStatus.PROCESSING,
        startedAt: new Date(),
      });

      // 1. Download file
      const fileBuffer = await this.downloadFile(fileUrl);

      // 2. Parse Excel
      const workbook = new ExcelJS.Workbook();
      await workbook.xlsx.read(Readable.from(fileBuffer));
      const rows = workbook.getWorksheet(1).getRows(2);  // Skip header

      // 3. Batch process data
      let successCount = 0;
      let failCount = 0;
      const errors = [];

      for (const [index, row] of rows.entries()) {
        try {
          await this.productService.create({
            name: row.getCell(1).value,
            price: row.getCell(2).value,
            // ...
          });
          successCount++;

          // Real-time progress update
          if (index % 100 === 0) {
            await this.taskService.updateProgress(taskId, {
              totalCount: rows.length,
              successCount,
              failCount,
            });
          }
        } catch (error) {
          failCount++;
          errors.push({
            row: index + 2,
            data: row.values,
            error: error.message,
          });
        }
      }

      // 4. Generate error report (if errors exist)
      let errorFileUrl = null;
      if (errors.length > 0) {
        errorFileUrl = await this.generateErrorReport(errors);
      }

      // 5. Update task to completed status
      await this.taskService.update(taskId, {
        status: TaskStatus.SUCCESS,
        finishedAt: new Date(),
        totalCount: rows.length,
        successCount,
        failCount,
        errorFileUrl,
      });

    } catch (error) {
      // 6. Handle failure
      await this.taskService.update(taskId, {
        status: TaskStatus.FAILED,
        finishedAt: new Date(),
        errorMessage: error.message,
      });
      throw error;
    }
  }

  @OnQueueActive()
  onActive(job: Job) {
    console.log(`Task started: ${job.id}`);
  }

  @OnQueueCompleted()
  onCompleted(job: Job) {
    console.log(`Task completed: ${job.id}`);
  }

  @OnQueueFailed()
  onFailed(job: Job, error: Error) {
    console.error(`Task failed: ${job.id}`, error);
  }
}

3. Task Status Query

@Get('tasks/:taskId')
async getTaskStatus(@Param('taskId') taskId: string) {
  const task = await this.taskService.findOne(taskId);

  return {
    taskNo: task.taskNo,
    status: task.status,           // PENDING / PROCESSING / SUCCESS / FAILED
    progress: {
      totalCount: task.totalCount,
      successCount: task.successCount,
      failCount: task.failCount,
      percentage: task.totalCount > 0
        ? Math.floor((task.successCount + task.failCount) / task.totalCount * 100)
        : 0
    },
    errorFileUrl: task.errorFileUrl,  // Error details file
    startedAt: task.startedAt,
    finishedAt: task.finishedAt,
  };
}

Frontend Progress Polling

// Frontend code example
async function uploadAndTrack(file: File) {
  // 1. Upload file and create task
  const { taskId, taskNo } = await api.post('/import', { file });

  message.info(`Task ${taskNo} created, processing...`);

  // 2. Poll task status
  const interval = setInterval(async () => {
    const task = await api.get(`/tasks/${taskId}`);

    // Update progress bar
    setProgress(task.progress.percentage);

    if (task.status === 'SUCCESS') {
      clearInterval(interval);
      message.success(`Import completed! ${task.progress.successCount} records succeeded`);

      // Prompt to download error report if failures exist
      if (task.progress.failCount > 0) {
        Modal.confirm({
          title: `${task.progress.failCount} records failed to import`,
          content: 'Download error details?',
          onOk: () => window.open(task.errorFileUrl),
        });
      }
    } else if (task.status === 'FAILED') {
      clearInterval(interval);
      message.error('Import failed: ' + task.errorMessage);
    }
  }, 2000);  // Query every 2 seconds
}

Key Technical Points

1. Task Number Generation

To help users track tasks easily, we generate human-friendly task numbers:

generateTaskNo(): string {
  // T + Timestamp(Base36) + 4-byte random
  const timestamp = Date.now().toString(36).toUpperCase();
  const random = crypto.randomBytes(4).toString('hex').toUpperCase();
  return `T${timestamp}${random}`;
}

// Example output: T1ABCDE12345678

2. File Lifecycle Management

@Cron(CronExpression.EVERY_DAY_AT_2AM)
async cleanupExpiredTasks() {
  // Cleanup expired tasks (default 7 days)
  const expiredTasks = await this.taskRepository.find({
    where: {
      expiredAt: LessThan(new Date()),
      status: In([TaskStatus.SUCCESS, TaskStatus.FAILED]),
    },
  });

  for (const task of expiredTasks) {
    // Delete OSS files
    if (task.fileUrl) {
      await this.uploadService.deleteFile(task.fileUrl);
    }
    if (task.errorFileUrl) {
      await this.uploadService.deleteFile(task.errorFileUrl);
    }

    // Delete task record
    await this.taskRepository.remove(task);
  }

  console.log(`Cleaned up ${expiredTasks.length} expired tasks`);
}

3. Error Feedback Mechanism

When import fails, generate detailed error reports:

async generateErrorReport(errors: ErrorItem[]): Promise<string> {
  const workbook = new ExcelJS.Workbook();
  const sheet = workbook.addWorksheet('Import Errors');

  // Add header
  sheet.columns = [
    { header: 'Row', key: 'row', width: 10 },
    { header: 'Original Data', key: 'data', width: 50 },
    { header: 'Error Reason', key: 'error', width: 30 },
  ];

  // Add error data
  errors.forEach(err => {
    sheet.addRow({
      row: err.row,
      data: JSON.stringify(err.data),
      error: err.error,
    });
  });

  // Upload to OSS
  const buffer = await workbook.xlsx.writeBuffer();
  return await this.uploadService.uploadPrivate(
    buffer,
    'errors/import-errors.xlsx'
  );
}

Performance Optimization Tips

1. Batch Insert

// ❌ Insert one by one (slow)
for (const row of rows) {
  await this.productRepository.insert(row);
}

// ✅ Batch insert (10x faster)
const batchSize = 500;
for (let i = 0; i < rows.length; i += batchSize) {
  const batch = rows.slice(i, i + batchSize);
  await this.productRepository.insert(batch);
}

2. Concurrency Control

@Process({ name: 'import', concurrency: 2 })  // Limit concurrency

// Reason:
// - Too high: Database connection pool exhausted
// - Too low: Resource waste
// - Recommended: Based on DB connection pool size (typically 2-4)

3. Memory Optimization

// ✅ Stream processing for large files
const stream = Readable.from(fileBuffer);
await workbook.xlsx.read(stream);  // Process while reading, low memory usage

Solution Comparison

DimensionSynchronousAsynchronous Queue
Response SpeedSlow (2-3 min)Fast (<1 sec)
User ExperiencePoor (frozen page)Good (real-time progress)
Error HandlingAll-or-nothingPartial failure traceable
Server PressureHigh (blocking requests)Low (async processing)
ScalabilityPoorGood (horizontal Worker scaling)

Summary

For large-scale import/export in enterprise management systems, asynchronous task queues are the best practice:

  1. Great UX: Second-level response, real-time progress, non-blocking
  2. Strong Traceability: Task numbers, detailed logs, error reports
  3. Controlled Performance: Concurrency limits, batch operations, scheduled cleanup
  4. Good Scalability: Horizontal Worker node expansion

We've applied this solution in multiple enterprise projects with excellent results. If your system has similar requirements, feel free to refer to this architecture.


About the Author: ekent, Technical Lead at ek Studio, specializing in enterprise management system development and architecture design.