Skip to content

Commit c9bf6db

Browse files
committed
Use subset of collection for pipeline ingest
1 parent 934d30c commit c9bf6db

File tree

1 file changed

+28
-14
lines changed

1 file changed

+28
-14
lines changed

nmdc_server/ingest/all.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import click
44
from pymongo import MongoClient
55
from pymongo.collection import Collection
6+
from pymongo.cursor import Cursor
67
from sqlalchemy.orm import Session
78

89
from nmdc_server import models
@@ -101,10 +102,21 @@ def load(db: Session, function_limit=None, skip_annotation=False):
101102
)
102103
db.commit()
103104

105+
"""
106+
nmdc:ReadQcAnalysis
107+
nmdc:MagsAnalysis
108+
nmdc:MetabolomicsAnalysis
109+
nmdc:MetagenomeSequencing
110+
nmdc:ReadBasedTaxonomyAnalysis
111+
nmdc:MetagenomeAssembly
112+
nmdc:MetagenomeAnnotation
113+
nmdc:NomAnalysis
114+
"""
115+
104116
logger.info("Loading metabolomics analysis...")
105117
pipeline.load(
106118
db,
107-
mongodb["metabolomics_analysis_activity_set"].find(),
119+
mongodb["workflow_execution_set"].find({"type": "nmdc:MetabolomicsAnalysis"}),
108120
pipeline.load_metabolomics_analysis,
109121
WorkflowActivityTypeEnum.metabolomics_analysis.value,
110122
)
@@ -113,7 +125,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
113125
logger.info("Loading read based analysis...")
114126
pipeline.load(
115127
db,
116-
mongodb["read_based_taxonomy_analysis_activity_set"].find(),
128+
mongodb["workflow_execution_set"].find({"type": "nmdc:ReadBasedTaxonomyAnalysis"}),
117129
pipeline.load_read_based_analysis,
118130
WorkflowActivityTypeEnum.read_based_analysis.value,
119131
)
@@ -122,15 +134,15 @@ def load(db: Session, function_limit=None, skip_annotation=False):
122134
logger.info("Loading metatranscriptome activities...")
123135
pipeline.load(
124136
db,
125-
mongodb["metatranscriptome_activity_set"].find(),
137+
mongodb["workflow_execution_set"].find({"type": "nmdc:MetatranscriptomeAnalysis"}),
126138
pipeline.load_metatranscriptome,
127139
WorkflowActivityTypeEnum.metatranscriptome.value,
128140
)
129141

130142
logger.info("Loading NOM analysis...")
131143
pipeline.load(
132144
db,
133-
mongodb["nom_analysis_activity_set"].find(),
145+
mongodb["workflow_execution_set"].find({"type": "nmdc:NomAnalysis"}),
134146
pipeline.load_nom_analysis,
135147
WorkflowActivityTypeEnum.nom_analysis.value,
136148
)
@@ -139,7 +151,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
139151
logger.info("Loading MAGs...")
140152
pipeline.load(
141153
db,
142-
mongodb["mags_activity_set"].find(),
154+
mongodb["workflow_execution_set"].find({"type": "nmdc:MagsAnalysis"}),
143155
pipeline.load_mags,
144156
WorkflowActivityTypeEnum.mags_analysis.value,
145157
)
@@ -154,13 +166,14 @@ def load(db: Session, function_limit=None, skip_annotation=False):
154166

155167
# This has historically been fast, but it is only for the progress bar.
156168
# It can be removed if it becomes slow.
157-
count = mongodb["metagenome_annotation_activity_set"].estimated_document_count()
158-
iterator = paginate_cursor(
159-
mongodb["metagenome_annotation_activity_set"],
160-
page_size=1, # prevent cursor from timing out
161-
no_cursor_timeout=True,
169+
annotation_activities = list(
170+
mongodb["workflow_execution_set"].find(
171+
{"type": "nmdc:MetagenomeAnnotation"}, batch_size=100
172+
)
162173
)
163-
with click.progressbar(iterator, length=count) as bar:
174+
# TODO test this and make sure it works as expected
175+
# this undoes the pagination that existed before
176+
with click.progressbar(annotation_activities, length=len(annotation_activities)) as bar:
164177
pipeline.load(
165178
db,
166179
bar,
@@ -180,7 +193,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
180193
logger.info("Loading read qc...")
181194
pipeline.load(
182195
db,
183-
mongodb["read_qc_analysis_activity_set"].find(),
196+
mongodb["workflow_execution_set"].find({"type": "nmdc:ReadQcAnalysis"}),
184197
pipeline.load_reads_qc,
185198
WorkflowActivityTypeEnum.reads_qc.value,
186199
)
@@ -190,7 +203,8 @@ def load(db: Session, function_limit=None, skip_annotation=False):
190203
logger.info("Loading metaproteomic analysis...")
191204
pipeline.load(
192205
db,
193-
mongodb["metaproteomics_analysis_activity_set"].find(
206+
mongodb["workflow_execution_set"].find(
207+
{"type": "nmdc:MetaproteomicAnalysis"},
194208
no_cursor_timeout=True,
195209
),
196210
pipeline.load_mp_analysis,
@@ -207,7 +221,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
207221
logger.info("Loading metagenome assembly...")
208222
pipeline.load(
209223
db,
210-
mongodb["metagenome_assembly_set"].find(),
224+
mongodb["workflow_execution_set"].find({"type": "nmdc:MetagenomeAssembly"}),
211225
pipeline.load_mg_assembly,
212226
WorkflowActivityTypeEnum.metagenome_assembly.value,
213227
)

0 commit comments

Comments
 (0)