Skip to content

Commit 1fc8215

Browse files
authored
Merge pull request #1701 from microbiomedata/1693-wfe-dgn-many-to-many
Update data model for multivalued `was_informed_by`
2 parents ccb5caa + 9e67c41 commit 1fc8215

File tree

13 files changed

+906
-167
lines changed

13 files changed

+906
-167
lines changed

nmdc_server/fakes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class PipelineStepBase(SQLAlchemyModelFactory):
203203
started_at_time: datetime = Faker("date_time")
204204
ended_at_time: datetime = Faker("date_time")
205205
execution_resource: str = Faker("word")
206-
omics_processing: models.OmicsProcessing = SubFactory(OmicsProcessingFactory)
206+
was_informed_by: List[models.OmicsProcessing] = []
207207

208208

209209
class ReadsQCFactory(PipelineStepBase):

nmdc_server/filters.py

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
MetaPGeneFunction,
3636
MetaTGeneFunction,
3737
Table,
38+
_table_model_map,
3839
workflow_execution_tables,
3940
)
4041

@@ -107,7 +108,13 @@ def join(self, target_table: Table, query: Query) -> Query:
107108
raise NotImplementedError()
108109

109110
def _join_omics_processing_related_tables(self, target_table: Table, query: Query) -> Query:
110-
if target_table != Table.omics_processing:
111+
if target_table in workflow_execution_tables:
112+
association_table = models.workflow_activity_to_data_generation_map[target_table.value]
113+
query = query.join(association_table).join(
114+
models.OmicsProcessing,
115+
models.OmicsProcessing.id == association_table.c.data_generation_id,
116+
)
117+
elif target_table != Table.omics_processing:
111118
query = query.join(models.OmicsProcessing)
112119

113120
return self.join_omics_processing(query)
@@ -252,10 +259,25 @@ def join_study(self, query: Query) -> Query:
252259
)
253260

254261

255-
workflow_filter_classes: List[Type[OmicsProcessingFilter]] = []
262+
class WorkflowExecutionFilter(OmicsProcessingFilter):
263+
table = Table.reads_qc
264+
265+
def join_omics_processing(self, query: Query) -> Query:
266+
association_table = models.workflow_activity_to_data_generation_map[self.table.value]
267+
model = _table_model_map[self.table]
268+
q = query.join(
269+
association_table,
270+
association_table.c.data_generation_id == models.OmicsProcessing.id,
271+
).join(
272+
model, model.id == association_table.c[f"{self.table.value}_id"] # type: ignore
273+
)
274+
return q
275+
276+
277+
workflow_filter_classes: List[Type[WorkflowExecutionFilter]] = []
256278
for table in workflow_execution_tables:
257279
workflow_filter_classes.append(
258-
type(f"{table.value}_filter", (OmicsProcessingFilter,), {"table": table})
280+
type(f"{table.value}_filter", (WorkflowExecutionFilter,), {"table": table})
259281
)
260282

261283

@@ -274,10 +296,19 @@ def join(self, target_table: Table, query: Query) -> Query:
274296
)
275297

276298
query = super().join(target_table, query)
299+
# Use the association table to join from OmicsProcessing/DataGeneration to
300+
# MetagenomeAnnotation. Due to how the association table(s) are generated
301+
# dynamically, mypy does not know what the columns are.
302+
association_table = models.metagenome_annotation_data_generation_association
277303
return (
278304
query.join(
305+
association_table,
306+
association_table.data_generation_id == models.OmicsProcessing.id, # type: ignore
307+
)
308+
.join(
279309
models.MetagenomeAnnotation,
280-
models.MetagenomeAnnotation.omics_processing_id == models.OmicsProcessing.id,
310+
models.MetagenomeAnnotation.id
311+
== association_table.metagenome_annotation_id, # type: ignore
281312
)
282313
.join(
283314
models.MGAGeneFunctionAggregation,
@@ -325,10 +356,16 @@ def join(self, target_table: Table, query: Query) -> Query:
325356
)
326357

327358
query = super().join(target_table, query)
359+
association_table = models.metaproteomic_analysis_data_generation_association
328360
return (
329361
query.join(
362+
association_table,
363+
association_table.data_generation_id == models.OmicsProcessing.id, # type: ignore
364+
)
365+
.join(
330366
models.MetaproteomicAnalysis,
331-
models.MetaproteomicAnalysis.omics_processing_id == models.OmicsProcessing.id,
367+
models.MetaproteomicAnalysis.id
368+
== association_table.metaproteomic_analysis_id, # type: ignore
332369
)
333370
.join(
334371
models.MetaPGeneFunctionAggregation,
@@ -359,10 +396,16 @@ def join(self, target_table: Table, query: Query) -> Query:
359396
MetaTGeneFunction.id == models.MetaTGeneFunctionAggregation.gene_function_id,
360397
)
361398
query = super().join(target_table, query)
399+
association_table = models.metatranscriptome_annotation_data_generation_association
362400
return (
363401
query.join(
402+
association_table,
403+
association_table.data_generation_id == models.OmicsProcessing.id, # type: ignore
404+
)
405+
.join(
364406
models.MetatranscriptomeAnnotation,
365-
models.MetatranscriptomeAnnotation.omics_processing_id == models.OmicsProcessing.id,
407+
models.MetatranscriptomeAnnotation.id
408+
== association_table.metatranscriptome_annotation_id, # type: ignore
366409
)
367410
.join(
368411
models.MetaTGeneFunctionAggregation,
@@ -383,7 +426,14 @@ class MetaproteomicAnalysisFilter(OmicsProcessingFilter):
383426
table = Table.metaproteomic_analysis
384427

385428
def join_omics_processing(self, query: Query) -> Query:
386-
return query.join(self.table.model)
429+
association_table = models.metaproteomic_analysis_data_generation_association
430+
return query.join(
431+
association_table,
432+
association_table.c.data_generation_id == models.OmicsProcessing.id,
433+
).join(
434+
models.MetaproteomicAnalysis,
435+
models.MetaproteomicAnalysis.id == association_table.c.metaproteomic_analysis_id,
436+
)
387437

388438
def join_biosample(self, query: Query) -> Query:
389439
return (

nmdc_server/ingest/all.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ def load(db: Session, function_limit=None, skip_annotation=False):
101101
)
102102
db.commit()
103103

104+
# Update the FK relationship from the data_object table to the
105+
# omics_processing table.
106+
logger.info("Updating foreign key relationship from Data Object to Data Generation")
107+
data_object.update_data_generation_relation(
108+
db,
109+
mongodb["data_object_set"].find(),
110+
)
111+
104112
workflow_set = "workflow_execution_set"
105113

106114
logger.info("Loading metabolomics analysis...")

nmdc_server/ingest/data_object.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from sqlalchemy.orm import Session
66

77
from nmdc_server.logger import get_logger
8-
from nmdc_server.models import DataObject
8+
from nmdc_server.models import DataObject, OmicsProcessing
99
from nmdc_server.schemas import DataObjectCreate
1010

1111
file_type_map: Dict[str, Tuple[str, str]] = {}
@@ -43,3 +43,23 @@ def load(db: Session, cursor: Cursor, file_types: List[Dict[str, Any]]):
4343

4444
if objects_without_type:
4545
logger.error(f"Encountered {objects_without_type} objects without data_object_type")
46+
47+
48+
def update_data_generation_relation(db: Session, cursor: Cursor):
49+
"""
50+
Update DataObject's omics_processing_id FK.
51+
52+
This should run after ingesting all data objects and data generations (omics processing).
53+
"""
54+
for data_object in cursor:
55+
id = data_object["id"]
56+
was_generated_by = data_object.pop("was_generated_by", None)
57+
if not was_generated_by:
58+
continue
59+
# Mypy does not like db.get, and reports that "Session" has no attribute "get."
60+
# See https://docs.sqlalchemy.org/en/14/orm/session_basics.html#get-by-primary-key
61+
data_generation = db.get(OmicsProcessing, was_generated_by) # type: ignore
62+
row = db.get(DataObject, id) # type: ignore
63+
if row and data_generation:
64+
row.omics_processing_id = was_generated_by
65+
db.add(row)

nmdc_server/ingest/pipeline.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,11 @@ def load(
238238
if reported_type != workflow_type:
239239
logger.warning(f"Unexpected type {reported_type} (expected {workflow_type})")
240240

241-
obj["omics_processing_id"] = obj.pop("was_informed_by")
241+
was_informed_by: str | list[str] = obj.pop("was_informed_by")
242+
if isinstance(was_informed_by, str):
243+
was_informed_by = [was_informed_by]
244+
obj["omics_processing_id"] = was_informed_by[0]
245+
obj["was_informed_by"] = was_informed_by
242246

243247
# TODO: pydantic should parse datetime like this... need to look into it
244248
# 2021-01-26T21:36:26.759770Z+0000
@@ -267,6 +271,7 @@ def load(
267271

268272
input_association = getattr(models, f"{table_name}_input_association")
269273
output_association = getattr(models, f"{table_name}_output_association")
274+
was_informed_by_association = getattr(models, f"{table_name}_data_generation_association")
270275

271276
# TODO: Find a different way to validate ref. integrity
272277
valid_inputs = [d for d in inputs if db.query(models.DataObject).get(d)]
@@ -296,12 +301,12 @@ def load(
296301
.values([(id_, f) for f in outputs])
297302
.on_conflict_do_nothing()
298303
)
299-
300-
db.execute(
301-
models.DataObject.__table__.update()
302-
.where(models.DataObject.id.in_(inputs + outputs))
303-
.values({"omics_processing_id": pipeline.omics_processing_id})
304-
)
304+
if was_informed_by:
305+
db.execute(
306+
insert(was_informed_by_association)
307+
.values([(id_, data_generation) for data_generation in was_informed_by])
308+
.on_conflict_do_nothing()
309+
)
305310

306311
for id_ in outputs:
307312
output = db.query(models.DataObject).get(id_)

0 commit comments

Comments
 (0)