Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
webapi-managed datasets + cache_id alignment
  • Loading branch information
p-hoffmann committed May 12, 2026
commit 350819b720b57cb78fb39362de0c1cab7c841bc0
3 changes: 2 additions & 1 deletion plugins/flows/_shared_flow_utils/create_dataset_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,6 @@ def is_safe_schema_name(schema: str) -> bool:
if not s:
return False

pattern = r"^[A-Za-z][A-Za-z0-9_]*(?:\.[A-Za-z][A-Za-z0-9_]*)?$"
# Each segment may start with an underscore (cache_id from sanitized UUIDs does).
pattern = r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$"
return match(pattern, s) is not None
5 changes: 3 additions & 2 deletions plugins/flows/_shared_flow_utils/dao/trexdao.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ def get_r_database_connector_connection_string(
user = self.tenant_configs.user
password = self.tenant_configs.password.get_secret_value()

# Use jdbc:postgresql for DatabaseConnector
conn_url = f"{DialectDrivers.jdbc.trex}://{host}:{port}/{self.database_code}?preferQueryMode=simple&autocommit=true"
# Match Python's _get_connection: connect on cache_id so unqualified queries resolve there.
jdbc_dbname = self.cache_id or self.database_code
conn_url = f"{DialectDrivers.jdbc.trex}://{host}:{port}/{jdbc_dbname}?preferQueryMode=simple&autocommit=true"

return f"""connectionDetails <- DatabaseConnector::createConnectionDetails(dbms = '{DialectDrivers.database_connector.trex}', connectionString = '{conn_url}', user = '{user}', password = '{password}', pathToDriver = '{self.path_to_driver}')"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,52 @@ execute_achilles <- function(
createTable,
resultsDatabaseSchema,
outputFolder,
sqlOnly,
numThreads,
sqlOnly,
numThreads,
verboseMode,
excludeAnalysisIds,
createIndices) {

excludeAnalysisIds,
createIndices,
cacheId = NULL) {

# Set TREX and DB driver environment variables and create connection details
eval(parse(text = set_trex_env_string))
eval(parse(text = setDBDriverEnv))
eval(parse(text = connectionDetailsString))

# Force each pooled JDBC connection onto the cache catalog (Achilles opens many).
if (!is.null(cacheId) && nzchar(cacheId)) {
.ns <- asNamespace("DatabaseConnector")
.original_connect <- get("connect", envir = .ns)
.use_sql <- sprintf('USE "%s"', gsub('"', '""', cacheId, fixed = TRUE))
.patched_connect <- function(...) {
conn <- .original_connect(...)
tryCatch(
DatabaseConnector::executeSql(
connection = conn,
sql = .use_sql,
progressBar = FALSE,
reportOverallTime = FALSE
),
error = function(e) {
message(sprintf("[execute_achilles] USE \"%s\" failed: %s", cacheId, conditionMessage(e)))
}
)
conn
}
assignInNamespace("connect", .patched_connect, ns = "DatabaseConnector")
}

Achilles::achilles(
connectionDetail = connectionDetails,
cdmVersion = cdmVersion,
cdmDatabaseSchema = cdmDatabaseSchema,
createTable = createTable,
resultsDatabaseSchema = resultsDatabaseSchema,
outputFolder = outputFolder,
sqlOnly = sqlOnly,
numThreads = numThreads,
verboseMode = verboseMode,
excludeAnalysisIds = excludeAnalysisIds,
connectionDetail = connectionDetails,
cdmVersion = cdmVersion,
cdmDatabaseSchema = cdmDatabaseSchema,
createTable = createTable,
resultsDatabaseSchema = resultsDatabaseSchema,
outputFolder = outputFolder,
sqlOnly = sqlOnly,
numThreads = numThreads,
verboseMode = verboseMode,
excludeAnalysisIds = excludeAnalysisIds,
createIndices = createIndices
)
}
5 changes: 4 additions & 1 deletion plugins/flows/base/data_characterization_plugin/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ def data_characterization_plugin(options: DCOptionsType):
)
# For TREX connections, set vocabSchemaName to schemaName
if dbdao.dialect != SupportedDatabaseDialects.HANA and use_trex_connection:
achilles_params.schemaName = f"{options.databaseCode}.{options.schemaName}"
# Qualify reads against the cache catalog; resultsSchema stays unprefixed so dbdao.create_schema doesn't quote "catalog.schema" as one literal.
catalog = options.cacheId or options.databaseCode
achilles_params.schemaName = f"{catalog}.{options.schemaName}"
achilles_params.vocabSchemaName = achilles_params.schemaName

dc_schema = create_results_schema(
Expand Down Expand Up @@ -263,6 +265,7 @@ def execute_achilles(achilles_params: AchillesParams, flow_run_id: str):
verboseMode=achilles_params.verboseMode,
excludeAnalysisIds=convert_to_int_vector(achilles_params.excludeAnalysisIds),
createIndices=achilles_params.createIndices,
cacheId=achilles_params.cacheId or "",
)

# Task might succeed if there are failed analyses so need to check for error report or failed analyses inside output folder
Expand Down
6 changes: 4 additions & 2 deletions plugins/flows/base/data_characterization_plugin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ def failed_analysis_ids_to_str(failed_ids: list[int]) -> str:


def is_safe_schema_name(schema: str) -> bool:
return match(r"^[a-zA-Z][a-zA-Z0-9_]*$", schema) is not None
# Allow leading underscore (cache_ids from sanitized UUIDs) and a single catalog.schema pair.
return match(r"^[A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?$", schema) is not None


def get_cdm_source(dbdao, schema: str, *, use_trex_connection: bool = False) -> str:
"""
Get the cdm_source_abbreviation from the cdm_source table.
"""
if use_trex_connection:
sql = f'SELECT cdm_source_abbreviation FROM "{dbdao.database_code}"."{schema}"."cdm_source"'
catalog = getattr(dbdao, "cache_id", None) or dbdao.database_code
sql = f'SELECT cdm_source_abbreviation FROM "{catalog}"."{schema}"."cdm_source"'
value = dbdao.execute_sql(
sql,
fetch=True,
Expand Down
8 changes: 5 additions & 3 deletions plugins/flows/base/dqd_plugin/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ def execute_dqd(dqd_params: DqdParams, flow_run_id: str, dialect: SupportedDatab
vocab_database_schema = dqd_params.vocabSchemaName
cdm_source_name = dqd_params.schemaName
else:
cdm_database_schema = f"{dqd_params.databaseCode}.{dqd_params.schemaName}"
vocab_database_schema = f"{dqd_params.databaseCode}.{dqd_params.vocabSchemaName}"
cdm_source_name = f"{dqd_params.databaseCode}.{dqd_params.schemaName}"
# Qualify reads against the cache catalog (populated CDM tables) instead of the live source.
catalog = dqd_params.cacheId or dqd_params.databaseCode
cdm_database_schema = f"{catalog}.{dqd_params.schemaName}"
vocab_database_schema = f"{catalog}.{dqd_params.vocabSchemaName}"
cdm_source_name = f"{catalog}.{dqd_params.schemaName}"

r_execute_dqd(
set_trex_env_string=set_trex_env_string,
Expand Down
112 changes: 3 additions & 109 deletions plugins/functions/dataset/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ export class DatasetRouter {

const id = uuidv4();
const {
type,
tokenStudyCode,
tenantId,
schemaOption,
Expand All @@ -124,14 +123,11 @@ export class DatasetRouter {
dataModel,
plugin,
paConfigId,
visibilityStatus,
detail,
dashboards,
attributes,
tags,
fhirProjectId,
cacheDatasetName,
cacheDatasetType,
} = req.body;

const newCacheSchemaName = schemaName
Expand Down Expand Up @@ -228,7 +224,7 @@ export class DatasetRouter {
this.logger.info("Creating new dataset in Portal");
const newDatasetInput = {
id,
type, // TODO: validate type
type: "webapi",
tokenDatasetCode: tokenStudyCode,
schemaOption,
dialect,
Expand All @@ -240,7 +236,7 @@ export class DatasetRouter {
plugin,
tenantId,
paConfigId,
visibilityStatus,
visibilityStatus: "DEFAULT",
detail,
dashboards,
attributes,
Expand All @@ -264,111 +260,9 @@ export class DatasetRouter {
.json(responseData || { error: createError.message });
}

this.logger.info("Creating cache dataset in Portal");

let newCacheDataset: any = {};

if (
type === SourceDatasetType.FHIR &&
cacheDatasetType === CacheDatasetType.NON_OMOP
) {
// Create FHIR project synchronously so fhir_project_id is available before triggering cache flow
let resolvedFhirProjectId = fhirProjectId;
if (!resolvedFhirProjectId) {
try {
this.logger.info(
`Creating FHIR project for dataset '${tokenStudyCode}'..`,
);
const fhirGatewayAPI = new FhirGatewayAPI(token);
resolvedFhirProjectId = await fhirGatewayAPI.createProject(
id,
detail?.name || tokenStudyCode,
);
this.logger.info(
`FHIR project created with id '${resolvedFhirProjectId}' for dataset '${tokenStudyCode}'`,
);
} catch (error) {
await portalAPI.deleteDataset(id); // Rollback dataset creation if FHIR project creation fails
this.logger.error(
`Error while creating FHIR project for dataset '${tokenStudyCode}'! ${error}`,
);
return res
.status(500)
.send("Dataset cannot be created because of FHIR project creation failure");
}
}

try {
this.logger.info(
`Creating cache of source FHIR schema '${schemaName}'. FHIR cache schema name is ${parsedNewCacheSchemaName}`,
);
const fhirCacheFlowRunDto = {
databaseCode: databaseCode,
schemaName: schemaName,
cacheSchemaName: parsedNewCacheSchemaName,
studyCode: tokenStudyCode,
fhirProjectId: resolvedFhirProjectId,
};
const fhirCacheResult = await jobpluginsAPI.createFhirCacheFlowRun(fhirCacheFlowRunDto);
flowRunId = fhirCacheResult?.flowRunId;
} catch (error) {
this.logger.error(
`Error while creating FHIR cache schema! ${error}`,
);
return res
.status(500)
.send("Error while creating FHIR cache schema");
}
}

if (cacheDatasetName && cacheDatasetType) {
const snapshotRequest = {
id: uuidv4(),
sourceDatasetId: id,
newDatasetName: cacheDatasetName,
schemaName: schemaName,
timestamp: new Date(),
type: cacheDatasetType,
};
newCacheDataset = await portalAPI.copyDataset(snapshotRequest);

// Trigger cache creation for existing schema with OMOP cache type
if (
schemaOption === CDMSchemaTypes.ExistingCDM &&
cacheDatasetType === CacheDatasetType.OMOP
) {
try {
this.logger.info(
`Creating cache for existing schema ${schemaName}. Cache schema name is ${schemaName}`,
);

const dataModels = await jobpluginsAPI.getDatamodels();
const dataModelInfo = dataModels.find(
(model) => model.datamodel === dataModel,
);

const datamartCacheResult = await jobpluginsAPI.createDatamartCacheFlowRun(
id,
newCacheDataset.id,
{},
dataModelInfo?.flowId,
`datamart-cache-${schemaName}`,
);
flowRunId = datamartCacheResult?.flowRunId;
} catch (error) {
this.logger.error(
`Error while creating cache for existing schema! ${error}`,
);
return res
.status(500)
.send("Error while creating cache for existing schema");
}
}
}

return res
.status(200)
.json({ id: newDataset.id, cacheId: newCacheDataset.id, flowRunId });
.json({ id: newDataset.id, flowRunId });
} catch (error) {
this.logger.error(
`Error while creating dataset: ${JSON.stringify(error)}`,
Expand Down
15 changes: 0 additions & 15 deletions plugins/functions/demo/controllers/DemoController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ export class DemoController {
message: "Adding demo dataset...",
task: this.service.addDataset.bind(this.service),
},
{
code: "cache",
message: "Creating cache for demo dataset...",
task: this.service.createCache.bind(this.service),
},
{
code: "dqd",
message: "Running DQD on demo dataset...",
Expand Down Expand Up @@ -77,11 +72,6 @@ export class DemoController {
message: "Adding HTTP test dataset...",
task: this.service.addDataset.bind(this.service),
},
{
code: "cache",
message: "Creating cache for HTTP test dataset...",
task: this.service.createCache.bind(this.service),
},
];

return await this.executeSteps(req, res, steps);
Expand Down Expand Up @@ -116,11 +106,6 @@ export class DemoController {
message: "Running DC on demo dataset...",
task: this.service.runDC.bind(this.service),
},
{
code: "cache",
message: "Creating cache for demo dataset...",
task: this.service.createCache.bind(this.service),
},
{
code: "metadata",
message: "Updating metadata for dataset...",
Expand Down
24 changes: 10 additions & 14 deletions plugins/functions/demo/services/DemoService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,18 @@ export class DemoService {
const portalAPI = new PortalAPI(token);
const datasets = await portalAPI.getDatasets();

const sourceDataset = datasets.find(
const existingDataset = datasets.find(
(dataset) =>
dataset.databaseCode === env.DEMO_DB_CODE &&
dataset.schemaName === env.DEMO_DB_CDM_SCHEMA &&
dataset.vocabSchemaName === env.DEMO_DB_CDM_SCHEMA &&
dataset.visibilityStatus === "HIDDEN" &&
dataset.sourceStudyId == null
dataset.sourceStudyId == null &&
dataset.visibilityStatus !== "HIDDEN"
);

const cacheDataset = datasets.find(
(dataset) => dataset.sourceStudyId === sourceDataset?.id
);

if (sourceDataset && cacheDataset) {
this.logger.info(`Dataset exists: ${JSON.stringify(sourceDataset)}`);
return { ...sourceDataset, cacheId: cacheDataset.id };
if (existingDataset) {
this.logger.info(`Dataset exists: ${JSON.stringify(existingDataset)}`);
return existingDataset;
}

const datasetAPI = new DatasetAPI(token);
Expand Down Expand Up @@ -128,7 +124,7 @@ export class DemoService {
throw new Error("Dataset not found");
}

const { cacheId: datasetId, vocabSchemaName } = dataset;
const { id: datasetId, vocabSchemaName } = dataset;
const dqdFlowRun = await jobPluginsAPI.createDqdFlowRun({
datasetId,
releaseId: "",
Expand Down Expand Up @@ -186,7 +182,7 @@ export class DemoService {
throw new Error("Dataset not found");
}

const { cacheId: datasetId } = dataset;
const { id: datasetId } = dataset;
const result = await jobPluginsAPI.createDcFlowRun({
datasetId,
releaseId: "",
Expand Down Expand Up @@ -250,7 +246,7 @@ export class DemoService {
throw new Error("Dataset not found");
}
const portalAPI = new PortalAPI(token);
const { cacheId: datasetId } = dataset;
const { id: datasetId } = dataset;

const cacheDataset = await portalAPI.getDataset(datasetId);

Expand Down Expand Up @@ -332,7 +328,7 @@ export class DemoService {
const dataset = progress?.steps?.find(
(step) => step.code === "dataset"
)?.result;
const { cacheId: datasetId } = dataset;
const { id: datasetId } = dataset;

if (!dataset) {
this.logger.error("Dataset not found in progress");
Expand Down
Loading
Loading