Import Pipeline¶
The import pipeline provides a generic, reusable mechanism for uploading CSV or XLSX files and turning them into database records. Each domain entity supplies column schemas, field descriptors, and four callbacks; the pipeline handles orchestration, header validation, error aggregation, and reporting.
1. Overview — 6-Stage Flow¶
parse file → validate headers → load lookup data → validate rows → check duplicates → create records
| Stage | Responsibility |
|---|---|
| parse file | Parse CSV or XLSX buffer into raw Record<string, string>[] rows. Rejects empty files and files exceeding 10,000 rows. |
| validate headers | Confirm that all required columns are present. Missing required columns produce an HEADERS_MISSING error immediately — no row processing occurs. |
| load lookup data | Fetch any tenant-specific reference data needed for row validation (grades, departments, email sets, etc.). |
| validate rows | Run the declarative field-descriptor pass then entity-specific hooks. Collects all errors before reporting. |
| check duplicates | Compare incoming rows against existing records in the target academic year. Marks duplicate rows as skipped. |
| create records | Insert non-duplicate rows inside a prisma.$transaction(). |
2. Using runImportPipeline¶
runImportPipeline is the sole orchestrator. Entity services call it from their importFromFile() method after performing two pre-flight checks:
- Resolve the target academic year (fall back to the tenant's ACTIVE year if none is supplied).
- Assert the year is writable — the base service does this automatically for single-record CRUD, but import pipelines must call it explicitly.
async importFromFile(
buffer: Buffer | Uint8Array,
mimetype: string,
tenantId: string,
academicYearId?: string,
): Promise<EntityImportSuccessDto> {
// Pre-flight: resolve + guard year before entering the pipeline
const yearId = academicYearId ?? await this.resolveActiveYear(tenantId);
await this.assertYearWritable(tenantId, yearId); // must call explicitly
return runImportPipeline<ParsedRow, ImportLookupData, EntityImportSuccessDto>(
buffer,
mimetype,
tenantId,
{ allColumns: ALL_COLUMNS, requiredColumns: REQUIRED_COLUMNS },
{
loadLookupData: (tid) => this.loadLookupData(tid, yearId),
validateRows: (rows, lookup, agg) => this.validateRows(rows, lookup, agg),
checkDuplicates:(rows, tid) => checkImportDuplicates(rows, { ... }),
createRecords: (rows, flags, lookup, tid) =>
this.createImportRecords(rows, flags, lookup, tid, yearId),
},
);
}
The four generic type parameters are:
- TRow — the typed row shape (a Record<ColumnName, string> derived from ImportColumnName)
- TLookup — the lookup data interface populated by loadLookupData
- TResult — the success DTO returned to the controller
Canonical implementation:
src/students/students.service.ts→importFromFile()Pipeline source:src/common/services/import-pipeline.ts
3. Import Field Descriptors¶
Declarative schema¶
Field descriptors live in src/<domain>/constants/import-schema.ts. Each entry declares the CSV column name, an ordered list of validation rules, and optional allowed values for error messages.
// src/students/constants/import-schema.ts
export const STUDENT_IMPORT_FIELDS: ImportFieldDescriptor[] = [
{
column: 'first_name',
rules: [{ type: 'required' }, { type: 'maxLength', max: 100 }],
},
{
column: 'date_of_birth',
rules: [
{ type: 'required' },
{ type: 'date' },
{
type: 'custom',
validate: (v) => {
const d = parseImportDate(v);
return d && d >= new Date() ? 'future' : null;
},
},
],
},
{
column: 'gender',
rules: [
{ type: 'custom', validate: (v) => (parseGender(v) === null ? 'invalid' : null) },
],
allowedValues: () => Object.values(Gender), // lazy — evaluated only on error
},
{
column: 'nationality',
rules: [
{ type: 'set', values: VALID_ISO_COUNTRIES, normalize: (v) => v.toUpperCase() },
],
},
{
column: 'school_email',
rules: [{ type: 'pattern', regex: EMAIL_REGEX, errorKey: 'invalid_email' }],
},
];
Six rule types¶
| Rule type | Fields | Behaviour |
|---|---|---|
required |
— | Fails if the value is absent or blank |
maxLength |
max: number |
Fails if the string exceeds max characters |
date |
— | Fails if the value cannot be parsed as a valid date |
pattern |
regex: RegExp, errorKey?: string |
Fails if the value does not match the regex; errorKey overrides the default error key |
set |
values: Set<string>, normalize?: fn |
Fails if the (optionally normalised) value is not in the set |
custom |
validate: (v: string) => string \| null |
Runs arbitrary logic; return an error key string on failure, null on success |
Deriving column lists¶
ALL_COLUMNS and REQUIRED_COLUMNS are derived automatically — no hand-maintained duplicates:
const { allColumns: ALL_COLUMNS, requiredColumns: REQUIRED_COLUMNS } =
deriveImportColumns(STUDENT_IMPORT_FIELDS);
export { ALL_COLUMNS, REQUIRED_COLUMNS };
deriveImportColumns() treats any descriptor that contains a required rule as a required column.
Error registries¶
Two registries map error keys to ImportErrorCode values:
STANDARD_CODE_REGISTRY— ships insrc/common. Covers the generic keys produced by the declarative pass:required,maxlength,invalid,invalid_email,future.<ENTITY>_EXTRA_REGISTRY— defined per entity inconstants/import-schema.ts. Covers hook-only keys that the standard registry does not know about.
// src/students/constants/import-schema.ts
export const STUDENT_EXTRA_REGISTRY: CodeRegistry = {
missing_pair: { code: ImportErrorCode.FIELD_INVALID },
ambiguous: { code: ImportErrorCode.FIELD_INVALID },
duplicate: { code: ImportErrorCode.FIELD_INVALID },
'referent:required': { code: ImportErrorCode.FIELD_REQUIRED },
};
Merge them when resolving errors:
const errors = buildValidationResult(
agg,
{ ...STANDARD_CODE_REGISTRY, ...STUDENT_EXTRA_REGISTRY },
STUDENT_IMPORT_FIELDS,
);
Hooks — entity-specific validation¶
The declarative pass runs first via validateImportRows(rows, IMPORT_FIELDS, agg). Entity-specific logic (lookup validation, cross-column rules, within-file uniqueness) runs afterwards in the validateRows callback:
validateRows: (rows, lookup, agg) => {
// 1. Declarative pass
validateImportRows(rows, STUDENT_IMPORT_FIELDS, agg);
// 2. Hook: resolve (grade, department) pair. Department is required so
// grade names that exist in multiple departments resolve unambiguously.
for (let i = 0; i < rows.length; i++) {
const gradeVal = rows[i]['grade']?.trim();
const deptVal = rows[i]['department']?.trim();
if (!gradeVal || !deptVal) continue;
const entries = lookup.gradeMap.get(gradeVal.toLowerCase());
if (!entries) {
agg.addError('grade:invalid', i);
} else {
const deptKey = deptVal.toLowerCase();
if (!entries.some((e) => e.departmentName.toLowerCase() === deptKey)) {
agg.addError('department:invalid', i);
}
}
}
// 3. Resolve + attach allowedValues
const errors = buildValidationResult(
agg,
{ ...STANDARD_CODE_REGISTRY, ...STUDENT_EXTRA_REGISTRY },
STUDENT_IMPORT_FIELDS,
);
// 4. Dynamic allowedValues for hook-produced errors
for (const e of errors) {
if (e.code !== ImportErrorCode.FIELD_INVALID) continue;
if (e.column === 'grade') e.allowedValues = lookup.allGradeNames;
else if (e.column === 'department') e.allowedValues = lookup.allDepartmentNames;
}
return errors;
}
applyFieldAllowedValues (called inside buildValidationResult) attaches static or lazy allowedValues from descriptors automatically. Dynamic values that depend on tenant data (like the grade list) are attached manually after resolution.
Conventions¶
- Error key format:
{column}:{rule}— e.g.email:invalid_email,grade:invalid. allowedValuesacceptsstring[](static) or() => string[](lazy, evaluated only when an error occurs for that column).- CSV column naming: snake_case —
first_name,date_of_birth,institutional_email.
4. Import Error Codes¶
ImportErrorCode is an enum in src/common/constants/:
| Code | Meaning |
|---|---|
HEADERS_MISSING |
One or more required columns are absent from the file header row |
FIELD_REQUIRED |
A required field is blank in a data row |
FIELD_MAX_LENGTH |
A field value exceeds the maximum allowed length |
FIELD_INVALID |
A field value fails pattern, set, date, or custom validation |
All import failures are surfaced as ErrorCode.IMPORT_VALIDATION_FAILED (HTTP 422) with a structured data.errors[] payload using ImportValidationError objects. Each error carries code, column, rows (compressed row list), optional params, and optional allowedValues.
5. Wiring an Import Route¶
Controller route¶
/** Import <entities> from CSV/XLSX file */
@Post('import')
@HttpCode(HttpStatus.OK)
@RequireAction(EntityKey.<ENTITY>, 'create')
@RequireRoles('admin')
@AggregateResponse()
@UseInterceptors(
FileInterceptor('file', { limits: { fileSize: 10_485_760 } }),
)
@ApiImport<Entity>()
async import<Entity>(
@UploadedFile() file: Express.Multer.File,
@TenantId() tenantId: string,
@Query('academicYearId') academicYearId?: string,
): Promise<<Entity>ImportSummaryDto> {
if (!file) throw new BadRequestException('No file uploaded');
return this.service.importFromFile(
file.buffer,
file.mimetype,
tenantId,
academicYearId,
);
}
FileInterceptor is imported from @nestjs/platform-express. The import route reuses the create action — no separate action is needed.
@AggregateResponse() is mandatory on import routes, and the summary DTO must extend AggregateResponseDto. The summary has top-level keys count and items — neither of which is a scope name. Without the marker, FieldFilterInterceptor strips every key and non-platform-admin callers receive {}. The marker is security-sensitive: re-read the rules in chapter 04 → Interceptor Contract: Scope-Grouped vs Aggregate Responses before applying it, and re-audit whenever you enrich the summary with new fields. The interceptor runs a runtime assertion that throws in dev/test if an aggregate response ever grows a scope-grouped top-level key (e.g. sensitive) — don't disable that check.
@RequireRoles('admin') is the current role gate on import routes. Imports are bulk/destructive and restricted to tenant admins today; the list will widen to additional roles (e.g. 'secretary') once the product team signs off. The decorator takes OR semantics — add roles to the call list rather than stacking multiple decorators.
Shared envelope: ImportSummaryDtoOf¶
All three import endpoints — POST /students/import, /teachers/import, /staff/import — return the same envelope shape, defined once in src/common/dto/import-summary.dto.ts:
interface ImportSummary<T> {
count: number; // total records for this entity after the import
items: T[]; // preview of up to 5 most recent records
}
Per-entity DTOs extend the factory-generated class and only vary in the item type:
export class StudentImportItemDto {
@ApiProperty({ format: 'uuid' }) id: string;
@ApiProperty() firstName: string;
@ApiProperty() lastName: string;
@ApiPropertyOptional({ nullable: true }) identificationCode: string | null;
@ApiProperty() departmentName: string;
@ApiPropertyOptional({ nullable: true }) gradeName: string | null;
}
export class StudentImportSummaryDto extends ImportSummaryDtoOf(
StudentImportItemDto,
'StudentImportSummaryDto',
) {}
The factory renames the generated class via Object.defineProperty(..., 'name', ...) so Swagger emits a distinct schema per entity; don't remove that or the three DTOs collapse into a single anonymous class in the OpenAPI output. The factory extends AggregateResponseDto, so subclasses automatically satisfy the @AggregateResponse() contract.
When adding a new import endpoint, reuse the factory — don't re-invent an envelope. The only thing you should define per-entity is the <Entity>ImportItemDto class.
Steps¶
src/<domain>/dto/<domain>-import.dto.ts— create<Entity>ImportSuccessDtowithsuccess: boolean,created: number,skipped: number.src/<domain>/<domain>.controller.ts— add the import route above. Declare it before any@Get(':id')or@Patch(':id')routes to avoid path collisions.src/<domain>/<domain>.service.ts— addimportFromFile()callingrunImportPipeline<>()with the four callbacks. CallassertYearWritable()explicitly before the pipeline. Scope allcheckDuplicatesqueries to the targetacademicYearId. For person entities (students, teachers, staff), callresolvePersonUuid()insidecreateRecordsto assign or reusepersonUuid.src/<domain>/<domain>.swagger.ts— addApiImport<Entity>()using theApiImportEndpointfactory.src/<domain>/constants/import-schema.ts— define field descriptors (see §3).- Update barrel exports —
src/<domain>/index.ts.
Canonical examples:
src/students/students.controller.ts,src/students/students.service.ts
6. Duplicate Detection¶
Duplicate detection is scoped to the target academic year only. The same personUuid is valid in multiple years — only same-year duplicates are blocked.
Single-record create¶
assertNoPeopleDuplicate() is called in the beforeCreate hook for single-record creation. It checks both identity match (name + date-of-birth) and email uniqueness within the target year:
await assertNoPeopleDuplicate(this.prisma, tenantId, yearId, {
firstName, lastName, dateOfBirth, schoolEmail,
});
Import batch¶
checkImportDuplicates() handles bulk detection. It accepts separate rule objects for identity matching and email uniqueness, and returns a boolean[] parallel to the rows array:
checkDuplicates: (rows, tid) =>
checkImportDuplicates(rows, {
loadExisting: () =>
findExistingStudentsForDuplicates(this.prisma, tid, yearId),
rules: [
// Rule 1: identity match (firstName + lastName + dateOfBirth)
{
existingTuple: (s) => existingRecordTuple(s),
rowTuple: (row) => importRowTuple(row),
},
// Rule 2: school email uniqueness (vs existing DB records)
{
existingTuple: (s) => existingEmailTuple(s.schoolEmail),
rowTuple: (row) => importEmailTuple(row, 'school_email'),
},
],
}),
Duplicate rows are not rejected with an error — they are silently skipped. The skipped count in the response DTO reflects how many rows were omitted.
7. Cross-Year Identity¶
For person entities (Student, Teacher, Staff), the same physical person may be imported again in a new academic year. resolvePersonUuid() preserves identity continuity across years without requiring a manual rollover.
Location: src/common/utils/resolve-person-uuid.ts
Match cascade (most reliable first):
taxCode— case-insensitive match against any record for the same tenant in a different academic year.firstName+lastName+dateOfBirth— case-insensitive name match plus exact date match.
If a match is found, the existing personUuid is reused. If no match is found, the caller generates a fresh UUID (Prisma default).
// Inside createRecords — called per non-duplicate row
const personUuid = await resolvePersonUuid(
this.prisma,
tenantId,
academicYearId,
EntityKey.STUDENTS,
{
taxCode: row['tax_code']?.trim() || undefined,
firstName: row['first_name'].trim(),
lastName: row['last_name'].trim(),
dateOfBirth: parseImportDate(row['date_of_birth'])!,
},
);
await tx.student.create({
data: {
tenantId,
academicYearId,
...(personUuid ? { personUuid } : {}), // omit to let Prisma generate a fresh UUID
// ...other fields
},
});
resolvePersonUuid() searches records outside the target year (academicYearId: { not: yearId }) so within-year duplicates are never matched — they are caught by checkDuplicates instead.