Skip to content
Prev Previous commit
Next Next commit
fix
  • Loading branch information
p-hoffmann committed May 13, 2026
commit d696f1e23a2cc2f3cc492233a9d2d71596c24a5f
21 changes: 21 additions & 0 deletions plugins/functions/demo/api/PortalAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ export class PortalAPI {
}
}

async getCacheStatus(datasetId: string): Promise<{
ready: boolean;
cacheExists: boolean;
cacheAttached: boolean;
activeJobStatus?: string | null;
lastJobStatus?: string | null;
lastJobError?: string | null;
}> {
try {
const options = await this.getRequestConfig();
const url = `${this.baseURL}/dataset/${encodeURIComponent(datasetId)}/cache-status`;
const result = await this.channel.get(url, options);
return result.data;
} catch (error: any) {
const status = error.status || error.response?.status;
const responseData = error.response?.data;
console.error(`Error while getting cache status: ${error.message}, status: ${status}, data: ${JSON.stringify(responseData)}`);
throw error;
}
}

private getRequestConfig() {
let options: AxiosRequestConfig = {};

Expand Down
15 changes: 15 additions & 0 deletions plugins/functions/demo/controllers/DemoController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export class DemoController {
message: "Adding demo dataset...",
task: this.service.addDataset.bind(this.service),
},
{
code: "cache",
message: "Waiting for cache to be ready...",
task: this.service.waitForCache.bind(this.service),
},
{
code: "dqd",
message: "Running DQD on demo dataset...",
Expand Down Expand Up @@ -72,6 +77,11 @@ export class DemoController {
message: "Adding HTTP test dataset...",
task: this.service.addDataset.bind(this.service),
},
{
code: "cache",
message: "Waiting for cache to be ready...",
task: this.service.waitForCache.bind(this.service),
},
];

return await this.executeSteps(req, res, steps);
Expand All @@ -96,6 +106,11 @@ export class DemoController {
message: "Adding demo dataset...",
task: this.service.addDataset.bind(this.service),
},
{
code: "cache",
message: "Waiting for cache to be ready...",
task: this.service.waitForCache.bind(this.service),
},
{
code: "dqd",
message: "Running DQD on demo dataset...",
Expand Down
69 changes: 67 additions & 2 deletions plugins/functions/demo/services/DemoService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,72 @@ export class DemoService {

const result = await datasetAPI.createDataset(dataset);
this.logger.info(`Dataset added: ${JSON.stringify(result)}`);
return { ...dataset, ...result };

// Look the dataset back up so we always carry the server-assigned id forward,
// regardless of which fields the gateway echoes in its response.
const refreshed = await portalAPI.getDatasets();
const createdDataset =
(result?.id && refreshed.find((d) => d.id === result.id)) ||
refreshed.find(
(d) =>
d.databaseCode === env.DEMO_DB_CODE &&
d.schemaName === env.DEMO_DB_CDM_SCHEMA &&
d.vocabSchemaName === env.DEMO_DB_CDM_SCHEMA &&
d.sourceStudyId == null &&
d.visibilityStatus !== "HIDDEN"
);

if (!createdDataset?.id) {
throw new Error(
`Dataset created but not visible in portal (result=${JSON.stringify(result)})`
);
}
this.logger.info(`Dataset confirmed in portal: ${JSON.stringify(createdDataset)}`);
return createdDataset;
}

// Poll cache status until bao reports COMPLETED. The dataset POST returns as
// soon as the row is inserted, so DQD/DC would otherwise race against the
// still-building TrexSQL cache and fail with cache-not-ready errors.
public async waitForCache(token: string, _input: any, progress?: IProgress) {
this.logger.info("Waiting for cache");

const dataset = progress?.steps?.find(
(step) => step.code === "dataset"
)?.result;
if (!dataset?.id) {
throw new Error("Dataset not found in progress; cannot wait for cache");
}

const portalAPI = new PortalAPI(token);
const pollTimeoutMs = 15 * 60 * 1000;
const pollIntervalMs = 5000;
const deadline = Date.now() + pollTimeoutMs;
let lastStatus;
while (Date.now() < deadline) {
lastStatus = await portalAPI.getCacheStatus(dataset.id);
if (lastStatus.ready) {
this.logger.info(
`Cache ready for dataset ${dataset.id}: ${JSON.stringify(lastStatus)}`
);
return lastStatus;
}
if (
lastStatus.lastJobStatus &&
["FAILED", "STOPPED", "ABANDONED"].includes(lastStatus.lastJobStatus)
) {
throw new Error(
`Cache build for dataset ${dataset.id} ${lastStatus.lastJobStatus}: ${lastStatus.lastJobError ?? "no error message"}`
);
}
this.logger.info(
`Cache not ready yet for dataset ${dataset.id}: ${JSON.stringify(lastStatus)}`
);
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
}
throw new Error(
`Cache build for dataset ${dataset.id} did not become ready within ${pollTimeoutMs}ms (last=${JSON.stringify(lastStatus)})`
);
}

public async runDQD(token: string, _input: IDemoInput, progress?: IProgress) {
Expand All @@ -128,7 +193,7 @@ export class DemoService {
const dqdFlowRun = await jobPluginsAPI.createDqdFlowRun({
datasetId,
releaseId: "",
vocabSchemaName: datasetId,
vocabSchemaName,
comment: "Demo setup",
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ export class DatasetCommandService {

return result;
};
// First sync to WebAPI - fail fast if WebAPI is not accessible
// Insert the dataset row first so its id is visible to callers as soon as we return,
// even if the upstream WebAPI sync takes longer than the caller's HTTP timeout.
const result = await this.transactionRunner.run(createDatasetFn, datasetDto);

// Then register the source and kick off the TrexSQL cache build. The cache build
// is fire-and-forget inside syncDatasetToWebApi — downstream consumers (DQD, DC)
// must poll cache readiness via GET /system-portal/dataset/:id/cache-status
// before issuing queries that read the cache catalog.
await this.syncDatasetToWebApi({
id: datasetDto.id,
databaseCode: datasetDto.databaseCode,
Expand All @@ -137,9 +144,6 @@ export class DatasetCommandService {
resultSchemaName: datasetDto.resultSchemaName,
}, datasetDto.detail);

// Then create dataset in database
const result = await this.transactionRunner.run(createDatasetFn, datasetDto);

// Best-effort: notify trex to (re)attach the new dataset's cache file and source DB
// so a freshly-set cache_id becomes available without a trex restart. The cache_id
// mirrors the entity's @BeforeInsert default (sanitized dataset id) when the DTO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { DatasetController } from './dataset.controller'
import { DatasetFilterService } from './dataset-filter.service'
import { DatasetQueryService } from './query/dataset-query.service'
import { DatasetCommandService } from './command/dataset-command.service'
import { WebApiSourceService } from '../webapi/webapi-source.service'
import { RequestContextService } from '../common/request-context.service'
import {
datasetCommandServiceMockFactory,
datasetFilterServiceMockFactory,
Expand All @@ -18,7 +20,9 @@ describe('DatasetController', () => {
providers: [
{ provide: DatasetQueryService, useFactory: datasetQueryServiceMockFactory },
{ provide: DatasetFilterService, useFactory: datasetFilterServiceMockFactory },
{ provide: DatasetCommandService, useFactory: datasetCommandServiceMockFactory }
{ provide: DatasetCommandService, useFactory: datasetCommandServiceMockFactory },
{ provide: WebApiSourceService, useValue: { getCacheStatus: jest.fn() } },
{ provide: RequestContextService, useValue: { getOriginalToken: jest.fn() } }
]
}).compile()

Expand Down
13 changes: 13 additions & 0 deletions plugins/functions/portal/src/dataset/dataset.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
DatasetSnapshotDto,
} from "./dto/index.ts";
import { DatasetQueryService } from "./query/dataset-query.service.ts";
import { RequestContextService } from "../common/request-context.service.ts";
import { WebApiSourceService } from "../webapi/webapi-source.service.ts";

@Middleware(RequestContextMiddleware)
@Controller("system-portal/dataset")
Expand All @@ -35,6 +37,8 @@ export class DatasetController {
private readonly datasetQueryService: DatasetQueryService,
private readonly datasetCommandService: DatasetCommandService,
private readonly datasetFilterService: DatasetFilterService,
private readonly webApiSourceService: WebApiSourceService,
private readonly requestContextService: RequestContextService,
) {}

@Get()
Expand Down Expand Up @@ -239,4 +243,13 @@ export class DatasetController {
async transformToWebApi(@Param("id") id: string) {
return await this.datasetCommandService.transformToWebApi(id);
}

// Lightweight cache-readiness poll. Callers that need a hot cache before
// dispatching downstream work (DQD, DC, demo setup) should hit this until
// `ready === true` rather than blocking inside the dataset POST.
@Get(":id/cache-status")
async getCacheStatus(@Param("id") id: string) {
const authToken = this.requestContextService.getOriginalToken();
return await this.webApiSourceService.getCacheStatus(id, authToken);
}
}
47 changes: 42 additions & 5 deletions plugins/functions/portal/src/webapi/webapi-source.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ export class WebApiSourceService {
}
}

// Build the cache and block until bao reports lastJob.status === "COMPLETED".
// Downstream consumers (analytics-svc cdmversion, DQD, DC) query the cache
// catalog; without this wait they race against bao mid-copy.
// Kick off the TrexSQL cache build. We deliberately do NOT await
// `waitForCacheReady` here: bao's COMPLETED transition can take minutes, and
// edge-function HTTP callers (e.g. the dataset gateway) time out well before
// that. Consumers that depend on a hot cache (DQD, DC, analytics-svc
// cdmversion) must wait for readiness explicitly via `waitForCacheReady`.
private async triggerCacheCreation(
sourceKey: string,
schemaName: string,
Expand All @@ -48,14 +50,49 @@ export class WebApiSourceService {
const result = await this.webApiSourceApi.createCache(sourceKey, schemaName, authToken)
if (!result.success) {
this.logger.warn(`TrexSQL cache creation failed for ${sourceKey}: ${result.error}`)
return
}
await this.webApiSourceApi.waitForCacheReady(sourceKey, authToken)
} catch (error) {
this.logger.error(`Failed to create TrexSQL cache for ${sourceKey}: ${error}`)
}
}

// Block until the TrexSQL cache for the given dataset is COMPLETED.
// Call this from consumers that explicitly need a hot cache (DQD/DC kickoff).
async waitForCacheReady(sourceKey: string, authToken?: string): Promise<void> {
try {
await this.webApiSourceApi.waitForCacheReady(sourceKey, authToken)
} catch (error) {
this.logger.error(`Cache wait failed for ${sourceKey}: ${error}`)
throw error
}
}

// Snapshot the TrexSQL cache state for a dataset. Callers poll this and decide
// when it's safe to issue queries that read from the cache catalog.
async getCacheStatus(sourceKey: string, authToken?: string): Promise<{
ready: boolean
cacheExists: boolean
cacheAttached: boolean
activeJobStatus?: string | null
lastJobStatus?: string | null
lastJobError?: string | null
}> {
const status = await this.webApiSourceApi.getCacheStatus(sourceKey, authToken)
const ready =
!status.activeJob &&
!!status.cacheExists &&
!!status.cacheAttached &&
status.lastJob?.status === 'COMPLETED'
return {
ready,
cacheExists: !!status.cacheExists,
cacheAttached: !!status.cacheAttached,
activeJobStatus: status.activeJob?.status ?? null,
lastJobStatus: status.lastJob?.status ?? null,
lastJobError: status.lastJob?.error ?? null,
}
}

async deleteSourceForDataset(datasetId: string, authToken?: string): Promise<void> {
try {
const existing = await this.webApiSourceApi.getSourceByKey(datasetId, authToken)
Expand Down
Loading