Skip to content

Commit fdfb24b

Browse files
committed
Use existing assoc. table for OP to DO
1 parent 5ae1606 commit fdfb24b

File tree

6 files changed

+34
-32
lines changed

6 files changed

+34
-32
lines changed

nmdc_server/aggregations.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,15 @@ def get_data_object_aggregation(
234234
func.count(models.DataObject.id),
235235
func.sum(func.coalesce(models.DataObject.file_size_bytes, 0)),
236236
)
237+
.join(
238+
models.omics_processing_output_association,
239+
models.omics_processing_output_association.c.data_object_id
240+
== models.DataObject.id
241+
)
237242
.filter(
238243
models.DataObject.workflow_type != None,
239244
models.DataObject.file_type != None,
240-
subquery.c.id == models.DataObject.omics_processing_id,
245+
subquery.c.id == models.omics_processing_output_association.c.omics_processing_id,
241246
models.DataObject.url != None,
242247
)
243248
.group_by(models.DataObject.workflow_type, models.DataObject.file_type)
@@ -255,9 +260,14 @@ def get_data_object_aggregation(
255260
func.count(models.DataObject.id),
256261
func.sum(func.coalesce(models.DataObject.file_size_bytes, 0)),
257262
)
263+
.join(
264+
models.omics_processing_output_association,
265+
models.omics_processing_output_association.c.data_object_id
266+
== models.DataObject.id
267+
)
258268
.filter(
259269
models.DataObject.workflow_type != None,
260-
subquery.c.id == models.DataObject.omics_processing_id,
270+
subquery.c.id == models.omics_processing_output_association.c.omics_processing_id,
261271
models.DataObject.url != None,
262272
)
263273
.group_by(models.DataObject.workflow_type)
@@ -274,10 +284,15 @@ def get_data_object_aggregation(
274284
func.count(models.DataObject.id),
275285
func.sum(func.coalesce(models.DataObject.file_size_bytes, 0)),
276286
)
287+
.join(
288+
models.omics_processing_output_association,
289+
models.omics_processing_output_association.c.data_object_id
290+
== models.DataObject.id
291+
)
277292
.filter(
278293
models.DataObject.workflow_type != None,
279294
models.DataObject.file_type != None,
280-
subquery.c.id == models.DataObject.omics_processing_id,
295+
subquery.c.id == models.omics_processing_output_association.c.omics_processing_id,
281296
models.DataObject.url != None,
282297
)
283298
.group_by(models.DataObject.workflow_type, models.DataObject.file_type)

nmdc_server/ingest/all.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,6 @@ def load(db: Session, function_limit=None, skip_annotation=False):
101101
)
102102
db.commit()
103103

104-
# Update the FK relationship from the data_object table to the
105-
# omics_processing table.
106-
logger.info("Updating foreign key relationship from Data Object to Data Generation")
107-
data_object.update_data_generation_relation(
108-
db,
109-
mongodb["data_object_set"].find(),
110-
)
111-
112104
workflow_set = "workflow_execution_set"
113105

114106
logger.info("Loading metabolomics analysis...")

nmdc_server/ingest/data_object.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,3 @@ def load(db: Session, cursor: Cursor, file_types: List[Dict[str, Any]]):
4343

4444
if objects_without_type:
4545
logger.error(f"Encountered {objects_without_type} objects without data_object_type")
46-
47-
48-
def update_data_generation_relation(db: Session, cursor: Cursor):
49-
"""
50-
Update DataObject's omics_processing_id FK.
51-
52-
This should run after ingesting all data objects and data generations (omics processing).
53-
"""
54-
for data_object in cursor:
55-
id = data_object["id"]
56-
was_generated_by = data_object.pop("was_generated_by", None)
57-
if not was_generated_by:
58-
continue
59-
# Mypy does not like db.get, and reports that "Session" has no attribute "get."
60-
# See https://docs.sqlalchemy.org/en/14/orm/session_basics.html#get-by-primary-key
61-
data_generation = db.get(OmicsProcessing, was_generated_by) # type: ignore
62-
row = db.get(DataObject, id) # type: ignore
63-
if row and data_generation:
64-
row.omics_processing_id = was_generated_by
65-
db.add(row)

nmdc_server/ingest/pipeline.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,15 @@ def load(
308308
.on_conflict_do_nothing()
309309
)
310310

311+
312+
for data_generation in was_informed_by:
313+
data_objects = inputs + outputs
314+
db.execute(
315+
insert(models.omics_processing_output_association)
316+
.values([(data_generation, data_object) for data_object in data_objects])
317+
.on_conflict_do_nothing()
318+
)
319+
311320
for id_ in outputs:
312321
output = db.query(models.DataObject).get(id_)
313322
assert output

nmdc_server/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,8 @@ class DataObject(Base):
819819
workflow_type = Column(String, nullable=True)
820820

821821
# denormalized relationship representing the source omics_processing
822+
# TODO: investigate whether or not these can be removed completely in
823+
# favor of the association table omics_processing_output_association
822824
omics_processing_id = Column(String, ForeignKey("omics_processing.id"), nullable=True)
823825
omics_processing = relationship(OmicsProcessing)
824826

nmdc_server/query.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,8 +963,12 @@ def _data_object_filter_subquery(
963963
) -> Query:
964964
"""Create a subquery that selects from a data object filter condition."""
965965
query = db.query(models.DataObject.id.label("id")).join(
966+
models.omics_processing_output_association,
967+
models.omics_processing_output_association.c.id
968+
== models.DataObject.id
969+
).join(
966970
op_cte,
967-
models.DataObject.omics_processing_id == op_cte.c.id,
971+
models.omics_processing_output_association.c.omics_processing_id == op_cte.c.id,
968972
)
969973
if filter.workflow:
970974
query = query.filter(models.DataObject.workflow_type == filter.workflow.value)

0 commit comments

Comments
 (0)