Skip to content

Commit c48efb7

Browse files
committed
wip
1 parent 4eb56fe commit c48efb7

File tree

4 files changed

+93
-8
lines changed

4 files changed

+93
-8
lines changed

pgvectorscale/src/access_method/build.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ unsafe fn insert_storage<S: Storage>(
487487
spare_vector,
488488
storage,
489489
stats,
490+
false,
490491
);
491492
}
492493

@@ -662,12 +663,9 @@ fn do_heap_scan(
662663
);
663664
}
664665

665-
finalize_index_build(
666-
&mut plain,
667-
bs.into_build_state(),
668-
index_relation,
669-
write_stats,
670-
)
666+
// In parallel mode, nodes are finalized during insertion via streaming
667+
// Just need to handle any remaining cached nodes and update meta page
668+
finalize_remaining_parallel_nodes(&mut plain, bs, index_relation, write_stats)
671669
}
672670
StorageType::SbqCompression => {
673671
let mut bq = unsafe {
@@ -701,7 +699,9 @@ fn do_heap_scan(
701699
);
702700
}
703701

704-
finalize_index_build(&mut bq, bs.into_build_state(), index_relation, write_stats)
702+
// In parallel mode, nodes are finalized during insertion via streaming
703+
// Just need to handle any remaining cached nodes and update meta page
704+
finalize_remaining_parallel_nodes(&mut bq, bs, index_relation, write_stats)
705705
}
706706
}
707707
} else {
@@ -774,6 +774,17 @@ fn do_heap_scan(
774774
}
775775
}
776776

777+
fn finalize_remaining_parallel_nodes<S: Storage>(
778+
storage: &mut S,
779+
state: BuildStateParallel,
780+
index_relation: &PgRelation,
781+
write_stats: WriteStats,
782+
) -> usize {
783+
// Convert parallel state to regular build state for final processing
784+
let build_state = state.into_build_state();
785+
finalize_index_build(storage, build_state, index_relation, write_stats)
786+
}
787+
777788
fn finalize_index_build<S: Storage>(
778789
storage: &mut S,
779790
state: BuildState,
@@ -1001,6 +1012,7 @@ fn build_callback_internal<S: Storage>(
10011012
spare_vector,
10021013
storage,
10031014
&mut state.stats,
1015+
false,
10041016
);
10051017
}
10061018

@@ -1060,14 +1072,16 @@ fn build_callback_parallel_internal<S: Storage>(
10601072
&mut state.local_stats,
10611073
);
10621074

1063-
// Insert node into graph - PostgreSQL page locking handles concurrency
1075+
// Insert node into graph with parallel build mode enabled
1076+
// PostgreSQL page locking handles concurrency when finalizing nodes
10641077
state.graph.insert(
10651078
index,
10661079
index_pointer,
10671080
vector,
10681081
spare_vector,
10691082
storage,
10701083
&mut state.local_stats,
1084+
true,
10711085
);
10721086
}
10731087

pgvectorscale/src/access_method/graph/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ digraph G {
642642
spare_vec: LabeledVector,
643643
storage: &S,
644644
stats: &mut InsertStats,
645+
parallel_build: bool,
645646
) {
646647
self.update_start_nodes(
647648
index,
@@ -658,6 +659,13 @@ digraph G {
658659

659660
// Insert starting from default start node and avoid label filtering
660661
self.insert_internal(index_pointer, vec, true, storage, stats);
662+
663+
// In parallel build mode, periodically flush cached nodes to disk to avoid
664+
// accumulating large amounts of data in memory that would need to be
665+
// processed sequentially at the end
666+
if parallel_build {
667+
self.maybe_flush_neighbor_cache(storage, stats);
668+
}
661669
}
662670

663671
fn insert_internal<S: Storage>(
@@ -736,4 +744,18 @@ digraph G {
736744
let (_pruned, n) = self.add_neighbors(storage, from, from_labels, new.clone(), prune_stats);
737745
n.contains(&new[0])
738746
}
747+
748+
/// In parallel builds, periodically flush cached neighbor data to disk
749+
/// to prevent accumulating too much data in memory
750+
fn maybe_flush_neighbor_cache<S: Storage>(
751+
&mut self,
752+
storage: &S,
753+
stats: &mut InsertStats,
754+
) {
755+
if let GraphNeighborStore::Builder(ref cache) = self.neighbor_store {
756+
// Flush cache when it's getting full to avoid memory pressure
757+
// and ensure nodes get written to disk during parallel processing
758+
cache.flush_if_above_threshold(storage, &mut stats.prune_neighbor_stats, 0.8);
759+
}
760+
}
739761
}

pgvectorscale/src/access_method/graph/neighbor_store.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,45 @@ impl BuilderNeighborCache {
136136
storage.set_neighbors_on_disk(key, new_neighbors.as_slice(), stats);
137137
}
138138
}
139+
140+
/// Flush cached entries to disk if cache usage is above the given threshold.
141+
/// This helps prevent memory buildup during parallel builds.
142+
pub fn flush_if_above_threshold<S: Storage>(
143+
&self,
144+
storage: &S,
145+
stats: &mut PruneNeighborStats,
146+
threshold: f64,
147+
) {
148+
let mut cache = self.neighbor_map.borrow_mut();
149+
let current_size = cache.len();
150+
let capacity_val = cache.cap().get();
151+
152+
if current_size as f64 / capacity_val as f64 > threshold {
153+
let target_size = (capacity_val as f64 * 0.5) as usize;
154+
155+
// Flush least recently used entries to disk
156+
while cache.len() > target_size {
157+
if let Some((neighbors_of, entry)) = cache.pop_lru() {
158+
let pruned_neighbors = if entry.neighbors.len() > self.num_neighbors {
159+
Graph::prune_neighbors(
160+
self.max_alpha,
161+
self.num_neighbors,
162+
entry.labels.as_ref(),
163+
entry.neighbors,
164+
storage,
165+
stats,
166+
)
167+
} else {
168+
entry.neighbors
169+
};
170+
171+
storage.set_neighbors_on_disk(neighbors_of, &pruned_neighbors, stats);
172+
} else {
173+
break;
174+
}
175+
}
176+
}
177+
}
139178
}
140179

141180
pub enum GraphNeighborStore {

pgvectorscale/src/util/lru.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,14 @@ impl<K: Hash + Eq + Clone, V> LruCacheWithStats<K, V> {
110110
pub fn into_parts(self) -> (LruCache<K, V>, CacheStats) {
111111
(self.cache, self.stats)
112112
}
113+
114+
/// Remove and return the least recently used key-value pair
115+
pub fn pop_lru(&mut self) -> Option<(K, V)> {
116+
if let Some((key, value)) = self.cache.pop_lru() {
117+
self.stats.evictions += 1;
118+
Some((key, value))
119+
} else {
120+
None
121+
}
122+
}
113123
}

0 commit comments

Comments
 (0)