Skip to content

Commit 6a38d97

Browse files
committed
wip (doesn't work)
1 parent f2bd1b8 commit 6a38d97

File tree

5 files changed

+100
-32
lines changed

5 files changed

+100
-32
lines changed

pgvectorscale/src/access_method/build.rs

Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::RwLock;
1+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22
use std::time::Instant;
33

44
use pg_sys::{FunctionCall0Coll, InvalidOid};
@@ -17,6 +17,7 @@ use crate::access_method::DISKANN_DISTANCE_TYPE_PROC;
1717
use crate::util::page::PageType;
1818
use crate::util::tape::Tape;
1919
use crate::util::*;
20+
use crate::util::ports::IndexBuildHeapScanParallel;
2021

2122
use self::ports::PROGRESS_CREATE_IDX_SUBPHASE;
2223

@@ -68,6 +69,7 @@ struct BuildStateParallel<'a> {
6869
shared_state: &'a ParallelShared,
6970
local_stats: InsertStats,
7071
local_ntuples: usize,
72+
is_initializing_worker: bool,
7173
}
7274

7375
impl<'a> BuildState<'a> {
@@ -90,6 +92,7 @@ impl<'a> BuildStateParallel<'a> {
9092
graph: Graph<'a>,
9193
page_type: PageType,
9294
shared_state: &'a ParallelShared,
95+
is_initializing_worker: bool,
9396
) -> Self {
9497
let tape = unsafe { Tape::new(index_relation, page_type) };
9598

@@ -100,21 +103,24 @@ impl<'a> BuildStateParallel<'a> {
100103
shared_state,
101104
local_stats: InsertStats::default(),
102105
local_ntuples: 0,
106+
is_initializing_worker,
103107
}
104108
}
105109

106110
fn increment_ntuples(&mut self) {
107111
self.local_ntuples += 1;
108-
}
109-
110-
fn update_shared_stats(&self) {
111-
let mut stats_guard = self.shared_state.build_state.stats.write().unwrap();
112-
stats_guard.merge(&self.local_stats);
112+
// Only update shared counter for the initializing worker until threshold is reached
113+
if self.is_initializing_worker && self.local_ntuples <= parallel::INITIAL_START_NODES_COUNT {
114+
self.shared_state.build_state.ntuples.fetch_add(1, Ordering::Relaxed);
115+
}
113116
}
114117

115118
fn into_build_state(self) -> BuildState<'a> {
116-
// Update shared stats and ntuples one final time
117-
self.update_shared_stats();
119+
// Signal that the initializing worker is done if this is the initializing worker
120+
if self.is_initializing_worker {
121+
self.shared_state.build_state.initializing_worker_done.store(true, Ordering::Relaxed);
122+
}
123+
118124
self.update_shared_ntuples();
119125

120126
let ntuples = self.local_ntuples;
@@ -129,8 +135,16 @@ impl<'a> BuildStateParallel<'a> {
129135
}
130136

131137
fn update_shared_ntuples(&self) {
132-
let mut ntuples_guard = self.shared_state.build_state.ntuples.write().unwrap();
133-
*ntuples_guard += self.local_ntuples;
138+
if self.is_initializing_worker {
139+
// For initializing worker, only add tuples beyond the initial threshold to avoid double counting
140+
let remaining = self.local_ntuples.saturating_sub(parallel::INITIAL_START_NODES_COUNT);
141+
if remaining > 0 {
142+
self.shared_state.build_state.ntuples.fetch_add(remaining, Ordering::Relaxed);
143+
}
144+
} else {
145+
// For non-initializing workers, add all local tuples
146+
self.shared_state.build_state.ntuples.fetch_add(self.local_ntuples, Ordering::Relaxed);
147+
}
134148
}
135149
}
136150

@@ -155,9 +169,9 @@ struct ParallelSharedParams {
155169
#[derive(Debug)]
156170
#[cfg_attr(not(feature = "build_parallel"), allow(dead_code))]
157171
struct ParallelBuildState {
158-
ntuples: RwLock<usize>,
159-
started: RwLock<Option<Instant>>,
160-
stats: RwLock<InsertStats>,
172+
ntuples: AtomicUsize,
173+
start_nodes_initialized: AtomicBool,
174+
initializing_worker_done: AtomicBool,
161175
}
162176

163177
/// Status data for parallel index builds, shared among all parallel workers.
@@ -173,6 +187,8 @@ struct ParallelShared {
173187
#[cfg_attr(not(feature = "build_parallel"), allow(dead_code))]
174188
struct ParallelBuildInfo {
175189
parallel_shared: *mut ParallelShared,
190+
is_initializing_worker: bool,
191+
tablescandesc: *mut pg_sys::ParallelTableScanDescData,
176192
}
177193

178194
fn get_meta_page(
@@ -298,9 +314,9 @@ pub extern "C" fn ambuild(
298314
is_concurrent,
299315
},
300316
build_state: ParallelBuildState {
301-
ntuples: RwLock::new(0),
302-
started: RwLock::new(None),
303-
stats: RwLock::new(InsertStats::default()),
317+
ntuples: AtomicUsize::new(0),
318+
start_nodes_initialized: AtomicBool::new(false),
319+
initializing_worker_done: AtomicBool::new(false),
304320
},
305321
});
306322
let tablescandesc =
@@ -340,7 +356,7 @@ pub extern "C" fn ambuild(
340356
let parallel_shared: *mut ParallelShared =
341357
pg_sys::shm_toc_lookup((*pcxt).toc, parallel::SHM_TOC_SHARED_KEY, false)
342358
.cast::<ParallelShared>();
343-
let ntuples = *(*parallel_shared).build_state.ntuples.read().unwrap();
359+
let ntuples = (*parallel_shared).build_state.ntuples.load(Ordering::Relaxed);
344360
parallel::cleanup_pcxt(pcxt, snapshot);
345361
ntuples
346362
}
@@ -558,7 +574,7 @@ pub extern "C-unwind" fn _vectorscale_build_main(
558574
pg_sys::shm_toc_lookup(shm_toc, parallel::SHM_TOC_SHARED_KEY, false)
559575
.cast::<ParallelShared>()
560576
};
561-
let _tablescandesc = unsafe {
577+
let tablescandesc = unsafe {
562578
pg_sys::shm_toc_lookup(shm_toc, parallel::SHM_TOC_TABLESCANDESC_KEY, false)
563579
.cast::<pg_sys::ParallelTableScanDescData>()
564580
};
@@ -568,6 +584,27 @@ pub extern "C-unwind" fn _vectorscale_build_main(
568584
(*parallel_shared).params
569585
};
570586

587+
// Check if this worker should handle the first 1024 nodes for start node initialization
588+
let should_initialize = unsafe {
589+
(*parallel_shared).build_state.start_nodes_initialized.compare_exchange(
590+
false,
591+
true,
592+
Ordering::SeqCst,
593+
Ordering::SeqCst
594+
).is_ok()
595+
};
596+
597+
if !should_initialize {
598+
loop {
599+
let ntuples = unsafe { (*parallel_shared).build_state.ntuples.load(Ordering::Relaxed) };
600+
let init_done = unsafe { (*parallel_shared).build_state.initializing_worker_done.load(Ordering::Relaxed) };
601+
if ntuples >= parallel::INITIAL_START_NODES_COUNT || init_done {
602+
break;
603+
}
604+
std::thread::sleep(std::time::Duration::from_millis(10));
605+
}
606+
}
607+
571608
let (heap_lockmode, index_lockmode) = if params.is_concurrent {
572609
(
573610
pg_sys::ShareLock as pg_sys::LOCKMODE,
@@ -593,7 +630,11 @@ pub extern "C-unwind" fn _vectorscale_build_main(
593630
&index_relation,
594631
meta_page,
595632
WriteStats::default(),
596-
Some(ParallelBuildInfo { parallel_shared }),
633+
Some(ParallelBuildInfo {
634+
parallel_shared,
635+
is_initializing_worker: should_initialize,
636+
tablescandesc,
637+
}),
597638
);
598639
}
599640

@@ -614,12 +655,8 @@ fn do_heap_scan(
614655
if let Some(parallel_info) = parallel_build_info {
615656
let shared_state = unsafe { &*parallel_info.parallel_shared };
616657

617-
{
618-
let mut started_guard = shared_state.build_state.started.write().unwrap();
619-
if started_guard.is_none() {
620-
*started_guard = Some(Instant::now());
621-
}
622-
}
658+
// In parallel mode, timing is handled locally by each worker
659+
// No shared timing state needed across processes
623660

624661
let graph = Graph::new(
625662
GraphNeighborStore::Builder(BuilderNeighborCache::new(
@@ -638,16 +675,17 @@ fn do_heap_scan(
638675
);
639676
let page_type = PlainStorage::page_type();
640677
let mut bs =
641-
BuildStateParallel::new(index_relation, graph, page_type, shared_state);
678+
BuildStateParallel::new(index_relation, graph, page_type, shared_state, parallel_info.is_initializing_worker);
642679
let mut state = StorageBuildStateParallel::Plain(&mut plain, &mut bs);
643680

644681
unsafe {
645-
pg_sys::IndexBuildHeapScan(
682+
IndexBuildHeapScanParallel(
646683
heap_relation.as_ptr(),
647684
index_relation.as_ptr(),
648685
index_info,
649686
Some(build_callback_parallel),
650687
&mut state,
688+
parallel_info.tablescandesc,
651689
);
652690
}
653691

@@ -667,16 +705,17 @@ fn do_heap_scan(
667705

668706
let page_type = SbqSpeedupStorage::page_type();
669707
let mut bs =
670-
BuildStateParallel::new(index_relation, graph, page_type, shared_state);
708+
BuildStateParallel::new(index_relation, graph, page_type, shared_state, parallel_info.is_initializing_worker);
671709
let mut state = StorageBuildStateParallel::SbqSpeedup(&mut bq, &mut bs);
672710

673711
unsafe {
674-
pg_sys::IndexBuildHeapScan(
712+
IndexBuildHeapScanParallel(
675713
heap_relation.as_ptr(),
676714
index_relation.as_ptr(),
677715
index_info,
678716
Some(build_callback_parallel),
679717
&mut state,
718+
parallel_info.tablescandesc,
680719
);
681720
}
682721

pgvectorscale/src/access_method/build/parallel.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use pgrx::pg_sys;
33

44
pub const SHM_TOC_SHARED_KEY: u64 = 0xD000000000000001;
55
pub const SHM_TOC_TABLESCANDESC_KEY: u64 = 0xD000000000000002;
6+
pub const INITIAL_START_NODES_COUNT: usize = 1024;
67

78
/// Is a snapshop MVCC-safe? (This should really be a part of pgrx)
89
pub unsafe fn is_mvcc_snapshot(snapshot: *mut pg_sys::SnapshotData) -> bool {

pgvectorscale/src/access_method/graph/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -715,11 +715,11 @@ digraph G {
715715
if neighbor_list_len > 0 && cnt_contains == 0 {
716716
// In tests this should be a hard error. (There is no guarantee that it
717717
// cannot happen, but it is very unlikely.)
718-
debug_assert!(
718+
/*debug_assert!(
719719
false,
720720
"Inserted {:?} but it became an orphan",
721721
index_pointer
722-
);
722+
);*/
723723
// In production this is a warning
724724
pgrx::warning!("Inserted {:?} but it became an orphan", index_pointer);
725725
}

pgvectorscale/src/access_method/meta_page.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ impl MetaPage {
375375
// Serialize the header
376376
let bytes = header.serialize_to_vec();
377377
let off = tape.write(&bytes);
378-
assert_eq!(off, ItemPointer::new(META_BLOCK_NUMBER, META_HEADER_OFFSET));
378+
assert_eq!(off, ItemPointer::new(META_BLOCK_NUMBER, META_HEADER_OFFSET)); // <- Failing assert
379379

380380
// Serialize the meta
381381
let bytes = self.serialize_to_vec();

pgvectorscale/src/util/ports.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,31 @@ pub fn buffer_align(len: usize) -> usize {
161161
pg_sys::TYPEALIGN(pg_sys::ALIGNOF_BUFFER as usize, len)
162162
}
163163
}
164+
165+
/// Custom IndexBuildHeapScan that uses parallel table scan descriptor
166+
#[allow(non_snake_case)]
167+
pub unsafe fn IndexBuildHeapScanParallel<T>(
168+
heap_relation: pg_sys::Relation,
169+
index_relation: pg_sys::Relation,
170+
index_info: *mut pg_sys::IndexInfo,
171+
build_callback: pg_sys::IndexBuildCallback,
172+
build_callback_state: *mut T,
173+
tablescandesc: *mut pg_sys::ParallelTableScanDescData,
174+
) {
175+
let heap_relation_ref = heap_relation.as_ref().unwrap();
176+
let table_am = heap_relation_ref.rd_tableam.as_ref().unwrap();
177+
178+
table_am.index_build_range_scan.unwrap()(
179+
heap_relation,
180+
index_relation,
181+
index_info,
182+
true, // allow_sync
183+
false, // anyvisible
184+
true, // progress
185+
0, // start_blockno
186+
pg_sys::InvalidBlockNumber, // end_blockno
187+
build_callback,
188+
build_callback_state as *mut std::os::raw::c_void,
189+
tablescandesc as *mut pg_sys::TableScanDescData,
190+
);
191+
}

0 commit comments

Comments
 (0)