Skip to content

Import Pipeline

The import pipeline provides a generic, reusable mechanism for uploading CSV or XLSX files and turning them into database records. Each domain entity supplies column schemas, field descriptors, and four callbacks; the pipeline handles orchestration, header validation, error aggregation, and reporting.


1. Overview — 6-Stage Flow

parse file → validate headers → load lookup data → validate rows → check duplicates → create records
Stage Responsibility
parse file Parse CSV or XLSX buffer into raw Record<string, string>[] rows. Rejects empty files and files exceeding 10,000 rows.
validate headers Confirm that all required columns are present. Missing required columns produce an HEADERS_MISSING error immediately — no row processing occurs.
load lookup data Fetch any tenant-specific reference data needed for row validation (grades, departments, email sets, etc.).
validate rows Run the declarative field-descriptor pass then entity-specific hooks. Collects all errors before reporting.
check duplicates Compare incoming rows against existing records in the target academic year. Marks duplicate rows as skipped.
create records Insert non-duplicate rows inside a prisma.$transaction().

2. Using runImportPipeline

runImportPipeline is the sole orchestrator. Entity services call it from their importFromFile() method after performing two pre-flight checks:

  1. Resolve the target academic year (fall back to the tenant's ACTIVE year if none is supplied).
  2. Assert the year is writable — the base service does this automatically for single-record CRUD, but import pipelines must call it explicitly.
async importFromFile(
  buffer: Buffer | Uint8Array,
  mimetype: string,
  tenantId: string,
  academicYearId?: string,
): Promise<EntityImportSuccessDto> {
  // Pre-flight: resolve + guard year before entering the pipeline
  const yearId = academicYearId ?? await this.resolveActiveYear(tenantId);
  await this.assertYearWritable(tenantId, yearId);   // must call explicitly

  return runImportPipeline<ParsedRow, ImportLookupData, EntityImportSuccessDto>(
    buffer,
    mimetype,
    tenantId,
    { allColumns: ALL_COLUMNS, requiredColumns: REQUIRED_COLUMNS },
    {
      loadLookupData: (tid) => this.loadLookupData(tid, yearId),
      validateRows:   (rows, lookup, agg) => this.validateRows(rows, lookup, agg),
      checkDuplicates:(rows, tid) => checkImportDuplicates(rows, { ... }),
      createRecords:  (rows, flags, lookup, tid) =>
        this.createImportRecords(rows, flags, lookup, tid, yearId),
    },
  );
}

The four generic type parameters are: - TRow — the typed row shape (a Record<ColumnName, string> derived from ImportColumnName) - TLookup — the lookup data interface populated by loadLookupData - TResult — the success DTO returned to the controller

Canonical implementation: src/students/students.service.tsimportFromFile() Pipeline source: src/common/services/import-pipeline.ts


3. Import Field Descriptors

Declarative schema

Field descriptors live in src/<domain>/constants/import-schema.ts. Each entry declares the CSV column name, an ordered list of validation rules, and optional allowed values for error messages.

// src/students/constants/import-schema.ts
export const STUDENT_IMPORT_FIELDS: ImportFieldDescriptor[] = [
  {
    column: 'first_name',
    rules: [{ type: 'required' }, { type: 'maxLength', max: 100 }],
  },
  {
    column: 'date_of_birth',
    rules: [
      { type: 'required' },
      { type: 'date' },
      {
        type: 'custom',
        validate: (v) => {
          const d = parseImportDate(v);
          return d && d >= new Date() ? 'future' : null;
        },
      },
    ],
  },
  {
    column: 'gender',
    rules: [
      { type: 'custom', validate: (v) => (parseGender(v) === null ? 'invalid' : null) },
    ],
    allowedValues: () => Object.values(Gender),  // lazy — evaluated only on error
  },
  {
    column: 'nationality',
    rules: [
      { type: 'set', values: VALID_ISO_COUNTRIES, normalize: (v) => v.toUpperCase() },
    ],
  },
  {
    column: 'school_email',
    rules: [{ type: 'pattern', regex: EMAIL_REGEX, errorKey: 'invalid_email' }],
  },
];

Six rule types

Rule type Fields Behaviour
required Fails if the value is absent or blank
maxLength max: number Fails if the string exceeds max characters
date Fails if the value cannot be parsed as a valid date
pattern regex: RegExp, errorKey?: string Fails if the value does not match the regex; errorKey overrides the default error key
set values: Set<string>, normalize?: fn Fails if the (optionally normalised) value is not in the set
custom validate: (v: string) => string \| null Runs arbitrary logic; return an error key string on failure, null on success

Deriving column lists

ALL_COLUMNS and REQUIRED_COLUMNS are derived automatically — no hand-maintained duplicates:

const { allColumns: ALL_COLUMNS, requiredColumns: REQUIRED_COLUMNS } =
  deriveImportColumns(STUDENT_IMPORT_FIELDS);

export { ALL_COLUMNS, REQUIRED_COLUMNS };

deriveImportColumns() treats any descriptor that contains a required rule as a required column.

Error registries

Two registries map error keys to ImportErrorCode values:

  • STANDARD_CODE_REGISTRY — ships in src/common. Covers the generic keys produced by the declarative pass: required, maxlength, invalid, invalid_email, future.
  • <ENTITY>_EXTRA_REGISTRY — defined per entity in constants/import-schema.ts. Covers hook-only keys that the standard registry does not know about.
// src/students/constants/import-schema.ts
export const STUDENT_EXTRA_REGISTRY: CodeRegistry = {
  missing_pair: { code: ImportErrorCode.FIELD_INVALID },
  ambiguous:    { code: ImportErrorCode.FIELD_INVALID },
  duplicate:    { code: ImportErrorCode.FIELD_INVALID },
  'referent:required': { code: ImportErrorCode.FIELD_REQUIRED },
};

Merge them when resolving errors:

const errors = buildValidationResult(
  agg,
  { ...STANDARD_CODE_REGISTRY, ...STUDENT_EXTRA_REGISTRY },
  STUDENT_IMPORT_FIELDS,
);

Hooks — entity-specific validation

The declarative pass runs first via validateImportRows(rows, IMPORT_FIELDS, agg). Entity-specific logic (lookup validation, cross-column rules, within-file uniqueness) runs afterwards in the validateRows callback:

validateRows: (rows, lookup, agg) => {
  // 1. Declarative pass
  validateImportRows(rows, STUDENT_IMPORT_FIELDS, agg);

  // 2. Hook: resolve (grade, department) pair. Department is required so
  //    grade names that exist in multiple departments resolve unambiguously.
  for (let i = 0; i < rows.length; i++) {
    const gradeVal = rows[i]['grade']?.trim();
    const deptVal = rows[i]['department']?.trim();
    if (!gradeVal || !deptVal) continue;
    const entries = lookup.gradeMap.get(gradeVal.toLowerCase());
    if (!entries) {
      agg.addError('grade:invalid', i);
    } else {
      const deptKey = deptVal.toLowerCase();
      if (!entries.some((e) => e.departmentName.toLowerCase() === deptKey)) {
        agg.addError('department:invalid', i);
      }
    }
  }

  // 3. Resolve + attach allowedValues
  const errors = buildValidationResult(
    agg,
    { ...STANDARD_CODE_REGISTRY, ...STUDENT_EXTRA_REGISTRY },
    STUDENT_IMPORT_FIELDS,
  );

  // 4. Dynamic allowedValues for hook-produced errors
  for (const e of errors) {
    if (e.code !== ImportErrorCode.FIELD_INVALID) continue;
    if (e.column === 'grade') e.allowedValues = lookup.allGradeNames;
    else if (e.column === 'department') e.allowedValues = lookup.allDepartmentNames;
  }

  return errors;
}

applyFieldAllowedValues (called inside buildValidationResult) attaches static or lazy allowedValues from descriptors automatically. Dynamic values that depend on tenant data (like the grade list) are attached manually after resolution.

Conventions

  • Error key format: {column}:{rule} — e.g. email:invalid_email, grade:invalid.
  • allowedValues accepts string[] (static) or () => string[] (lazy, evaluated only when an error occurs for that column).
  • CSV column naming: snake_case — first_name, date_of_birth, institutional_email.

4. Import Error Codes

ImportErrorCode is an enum in src/common/constants/:

Code Meaning
HEADERS_MISSING One or more required columns are absent from the file header row
FIELD_REQUIRED A required field is blank in a data row
FIELD_MAX_LENGTH A field value exceeds the maximum allowed length
FIELD_INVALID A field value fails pattern, set, date, or custom validation

All import failures are surfaced as ErrorCode.IMPORT_VALIDATION_FAILED (HTTP 422) with a structured data.errors[] payload using ImportValidationError objects. Each error carries code, column, rows (compressed row list), optional params, and optional allowedValues.


5. Wiring an Import Route

Controller route

/** Import <entities> from CSV/XLSX file */
@Post('import')
@HttpCode(HttpStatus.OK)
@RequireAction(EntityKey.<ENTITY>, 'create')
@RequireRoles('admin')
@AggregateResponse()
@UseInterceptors(
  FileInterceptor('file', { limits: { fileSize: 10_485_760 } }),
)
@ApiImport<Entity>()
async import<Entity>(
  @UploadedFile() file: Express.Multer.File,
  @TenantId() tenantId: string,
  @Query('academicYearId') academicYearId?: string,
): Promise<<Entity>ImportSummaryDto> {
  if (!file) throw new BadRequestException('No file uploaded');
  return this.service.importFromFile(
    file.buffer,
    file.mimetype,
    tenantId,
    academicYearId,
  );
}

FileInterceptor is imported from @nestjs/platform-express. The import route reuses the create action — no separate action is needed.

@AggregateResponse() is mandatory on import routes, and the summary DTO must extend AggregateResponseDto. The summary has top-level keys count and items — neither of which is a scope name. Without the marker, FieldFilterInterceptor strips every key and non-platform-admin callers receive {}. The marker is security-sensitive: re-read the rules in chapter 04 → Interceptor Contract: Scope-Grouped vs Aggregate Responses before applying it, and re-audit whenever you enrich the summary with new fields. The interceptor runs a runtime assertion that throws in dev/test if an aggregate response ever grows a scope-grouped top-level key (e.g. sensitive) — don't disable that check.

@RequireRoles('admin') is the current role gate on import routes. Imports are bulk/destructive and restricted to tenant admins today; the list will widen to additional roles (e.g. 'secretary') once the product team signs off. The decorator takes OR semantics — add roles to the call list rather than stacking multiple decorators.

Shared envelope: ImportSummaryDtoOf

All three import endpoints — POST /students/import, /teachers/import, /staff/import — return the same envelope shape, defined once in src/common/dto/import-summary.dto.ts:

interface ImportSummary<T> {
  count: number;   // total records for this entity after the import
  items: T[];      // preview of up to 5 most recent records
}

Per-entity DTOs extend the factory-generated class and only vary in the item type:

export class StudentImportItemDto {
  @ApiProperty({ format: 'uuid' }) id: string;
  @ApiProperty() firstName: string;
  @ApiProperty() lastName: string;
  @ApiPropertyOptional({ nullable: true }) identificationCode: string | null;
  @ApiProperty() departmentName: string;
  @ApiPropertyOptional({ nullable: true }) gradeName: string | null;
}

export class StudentImportSummaryDto extends ImportSummaryDtoOf(
  StudentImportItemDto,
  'StudentImportSummaryDto',
) {}

The factory renames the generated class via Object.defineProperty(..., 'name', ...) so Swagger emits a distinct schema per entity; don't remove that or the three DTOs collapse into a single anonymous class in the OpenAPI output. The factory extends AggregateResponseDto, so subclasses automatically satisfy the @AggregateResponse() contract.

When adding a new import endpoint, reuse the factory — don't re-invent an envelope. The only thing you should define per-entity is the <Entity>ImportItemDto class.

Steps

  1. src/<domain>/dto/<domain>-import.dto.ts — create <Entity>ImportSuccessDto with success: boolean, created: number, skipped: number.
  2. src/<domain>/<domain>.controller.ts — add the import route above. Declare it before any @Get(':id') or @Patch(':id') routes to avoid path collisions.
  3. src/<domain>/<domain>.service.ts — add importFromFile() calling runImportPipeline<>() with the four callbacks. Call assertYearWritable() explicitly before the pipeline. Scope all checkDuplicates queries to the target academicYearId. For person entities (students, teachers, staff), call resolvePersonUuid() inside createRecords to assign or reuse personUuid.
  4. src/<domain>/<domain>.swagger.ts — add ApiImport<Entity>() using the ApiImportEndpoint factory.
  5. src/<domain>/constants/import-schema.ts — define field descriptors (see §3).
  6. Update barrel exportssrc/<domain>/index.ts.

Canonical examples: src/students/students.controller.ts, src/students/students.service.ts


6. Duplicate Detection

Duplicate detection is scoped to the target academic year only. The same personUuid is valid in multiple years — only same-year duplicates are blocked.

Single-record create

assertNoPeopleDuplicate() is called in the beforeCreate hook for single-record creation. It checks both identity match (name + date-of-birth) and email uniqueness within the target year:

await assertNoPeopleDuplicate(this.prisma, tenantId, yearId, {
  firstName, lastName, dateOfBirth, schoolEmail,
});

Import batch

checkImportDuplicates() handles bulk detection. It accepts separate rule objects for identity matching and email uniqueness, and returns a boolean[] parallel to the rows array:

checkDuplicates: (rows, tid) =>
  checkImportDuplicates(rows, {
    loadExisting: () =>
      findExistingStudentsForDuplicates(this.prisma, tid, yearId),
    rules: [
      // Rule 1: identity match (firstName + lastName + dateOfBirth)
      {
        existingTuple: (s) => existingRecordTuple(s),
        rowTuple:      (row) => importRowTuple(row),
      },
      // Rule 2: school email uniqueness (vs existing DB records)
      {
        existingTuple: (s) => existingEmailTuple(s.schoolEmail),
        rowTuple:      (row) => importEmailTuple(row, 'school_email'),
      },
    ],
  }),

Duplicate rows are not rejected with an error — they are silently skipped. The skipped count in the response DTO reflects how many rows were omitted.


7. Cross-Year Identity

For person entities (Student, Teacher, Staff), the same physical person may be imported again in a new academic year. resolvePersonUuid() preserves identity continuity across years without requiring a manual rollover.

Location: src/common/utils/resolve-person-uuid.ts

Match cascade (most reliable first):

  1. taxCode — case-insensitive match against any record for the same tenant in a different academic year.
  2. firstName + lastName + dateOfBirth — case-insensitive name match plus exact date match.

If a match is found, the existing personUuid is reused. If no match is found, the caller generates a fresh UUID (Prisma default).

// Inside createRecords — called per non-duplicate row
const personUuid = await resolvePersonUuid(
  this.prisma,
  tenantId,
  academicYearId,
  EntityKey.STUDENTS,
  {
    taxCode:     row['tax_code']?.trim() || undefined,
    firstName:   row['first_name'].trim(),
    lastName:    row['last_name'].trim(),
    dateOfBirth: parseImportDate(row['date_of_birth'])!,
  },
);

await tx.student.create({
  data: {
    tenantId,
    academicYearId,
    ...(personUuid ? { personUuid } : {}),  // omit to let Prisma generate a fresh UUID
    // ...other fields
  },
});

resolvePersonUuid() searches records outside the target year (academicYearId: { not: yearId }) so within-year duplicates are never matched — they are caught by checkDuplicates instead.