Skip to content

Commit c1afd83

Browse files
authored
Merge pull request #50 from duckdb/jray/result-reader
result reader
2 parents 5f86ef1 + 6456ee6 commit c1afd83

File tree

8 files changed

+335
-2
lines changed

8 files changed

+335
-2
lines changed

api/pkgs/@duckdb/node-api/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,36 @@ for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
146146
}
147147
```
148148

149+
### Result Reader
150+
151+
Run and read all data:
152+
```ts
153+
const reader = await connection.runAndReadAll('from test_all_types()');
154+
const rows = reader.getRows();
155+
// OR: const columns = reader.getColumns();
156+
```
157+
158+
Run and read up to (at lesat) some number of rows:
159+
```ts
160+
const reader = await connection.runAndReadUtil('from range(5000)', 1000);
161+
const rows = reader.getRows();
162+
// rows.length === 2048. (Rows are read in chunks of 2048.)
163+
```
164+
165+
Read rows incrementally:
166+
```ts
167+
const reader = await connection.runAndRead('from range(5000)');
168+
reader.readUntil(2000);
169+
// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.)
170+
// reader.done === false
171+
reader.readUntil(4000);
172+
// reader.currentRowCount === 4096
173+
// reader.done === false
174+
reader.readUntil(6000);
175+
// reader.currentRowCount === 5000
176+
// reader.done === true
177+
```
178+
149179
### Inspect Data Types
150180

151181
```ts

api/src/DuckDBConnection.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { DuckDBExtractedStatements } from './DuckDBExtractedStatements';
44
import { DuckDBInstance } from './DuckDBInstance';
55
import { DuckDBPreparedStatement } from './DuckDBPreparedStatement';
66
import { DuckDBResult } from './DuckDBResult';
7+
import { DuckDBResultReader } from './DuckDBResultReader';
78

89
export class DuckDBConnection {
910
private readonly connection: duckdb.Connection;
@@ -24,6 +25,19 @@ export class DuckDBConnection {
2425
public async run(sql: string): Promise<DuckDBResult> {
2526
return new DuckDBResult(await duckdb.query(this.connection, sql));
2627
}
28+
public async runAndRead(sql: string): Promise<DuckDBResultReader> {
29+
return new DuckDBResultReader(await this.run(sql));
30+
}
31+
public async runAndReadAll(sql: string): Promise<DuckDBResultReader> {
32+
const reader = new DuckDBResultReader(await this.run(sql));
33+
await reader.readAll();
34+
return reader;
35+
}
36+
public async runAndReadUntil(sql: string, targetRowCount: number): Promise<DuckDBResultReader> {
37+
const reader = new DuckDBResultReader(await this.run(sql));
38+
await reader.readUntil(targetRowCount);
39+
return reader;
40+
}
2741
public async prepare(sql: string): Promise<DuckDBPreparedStatement> {
2842
return new DuckDBPreparedStatement(
2943
await duckdb.prepare(this.connection, sql)

api/src/DuckDBDataChunk.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { DuckDBValue } from './values';
44

55
export class DuckDBDataChunk {
66
public readonly chunk: duckdb.DataChunk;
7+
private readonly vectors: DuckDBVector[] = [];
78
constructor(chunk: duckdb.DataChunk) {
89
this.chunk = chunk;
910
}
@@ -17,11 +18,15 @@ export class DuckDBDataChunk {
1718
return duckdb.data_chunk_get_column_count(this.chunk);
1819
}
1920
public getColumnVector(columnIndex: number): DuckDBVector {
20-
// TODO: cache vectors?
21-
return DuckDBVector.create(
21+
if (this.vectors[columnIndex]) {
22+
return this.vectors[columnIndex];
23+
}
24+
const vector = DuckDBVector.create(
2225
duckdb.data_chunk_get_vector(this.chunk, columnIndex),
2326
this.rowCount
2427
);
28+
this.vectors[columnIndex] = vector;
29+
return vector;
2530
}
2631
public getColumnValues(columnIndex: number): DuckDBValue[] {
2732
return this.getColumnVector(columnIndex).toArray();

api/src/DuckDBPendingResult.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import duckdb from '@duckdb/node-bindings';
22
import { DuckDBResult } from './DuckDBResult';
3+
import { DuckDBResultReader } from './DuckDBResultReader';
34

45
// Values match similar enum in C API.
56
export enum DuckDBPendingResultState {
@@ -35,4 +36,17 @@ export class DuckDBPendingResult {
3536
public async getResult(): Promise<DuckDBResult> {
3637
return new DuckDBResult(await duckdb.execute_pending(this.pending_result));
3738
}
39+
public async read(): Promise<DuckDBResultReader> {
40+
return new DuckDBResultReader(await this.getResult());
41+
}
42+
public async readAll(): Promise<DuckDBResultReader> {
43+
const reader = new DuckDBResultReader(await this.getResult());
44+
await reader.readAll();
45+
return reader;
46+
}
47+
public async readUntil(targetRowCount: number): Promise<DuckDBResultReader> {
48+
const reader = new DuckDBResultReader(await this.getResult());
49+
await reader.readUntil(targetRowCount);
50+
return reader;
51+
}
3852
}

api/src/DuckDBPreparedStatement.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import duckdb from '@duckdb/node-bindings';
22
import { DuckDBPendingResult } from './DuckDBPendingResult';
33
import { DuckDBResult } from './DuckDBResult';
4+
import { DuckDBResultReader } from './DuckDBResultReader';
45
import { DuckDBTypeId } from './DuckDBTypeId';
56
import { StatementType } from './enums';
67
import {
@@ -115,6 +116,19 @@ export class DuckDBPreparedStatement {
115116
await duckdb.execute_prepared(this.prepared_statement)
116117
);
117118
}
119+
public async runAndRead(): Promise<DuckDBResultReader> {
120+
return new DuckDBResultReader(await this.run());
121+
}
122+
public async runAndReadAll(): Promise<DuckDBResultReader> {
123+
const reader = new DuckDBResultReader(await this.run());
124+
await reader.readAll();
125+
return reader;
126+
}
127+
public async runAndReadUntil(targetRowCount: number): Promise<DuckDBResultReader> {
128+
const reader = new DuckDBResultReader(await this.run());
129+
await reader.readUntil(targetRowCount);
130+
return reader;
131+
}
118132
public start(): DuckDBPendingResult {
119133
return new DuckDBPendingResult(
120134
duckdb.pending_prepared(this.prepared_statement)

api/src/DuckDBResult.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ import { DuckDBDataChunk } from './DuckDBDataChunk';
33
import { DuckDBLogicalType } from './DuckDBLogicalType';
44
import { DuckDBType } from './DuckDBType';
55
import { DuckDBTypeId } from './DuckDBTypeId';
6+
import { DuckDBVector } from './DuckDBVector';
67
import { ResultReturnType, StatementType } from './enums';
8+
import { DuckDBValue } from './values';
79

810
export class DuckDBResult {
911
private readonly result: duckdb.Result;
@@ -70,4 +72,45 @@ export class DuckDBResult {
7072
chunks.push(chunk);
7173
}
7274
}
75+
public async getColumns(): Promise<DuckDBValue[][]> {
76+
const chunks = await this.fetchAllChunks();
77+
if (chunks.length === 0) {
78+
return [];
79+
}
80+
const firstChunk = chunks[0];
81+
const columns: DuckDBValue[][] = [];
82+
const columnCount = this.columnCount;
83+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
84+
columns.push(firstChunk.getColumnValues(columnIndex));
85+
}
86+
for (let chunkIndex = 1; chunkIndex < chunks.length; chunkIndex++) {
87+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
88+
const vector = chunks[chunkIndex].getColumnVector(columnIndex);
89+
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
90+
columns[columnIndex].push(vector.getItem(itemIndex));
91+
}
92+
}
93+
}
94+
return columns;
95+
}
96+
public async getRows(): Promise<DuckDBValue[][]> {
97+
const chunks = await this.fetchAllChunks();
98+
const rows: DuckDBValue[][] = [];
99+
for (const chunk of chunks) {
100+
const chunkVectors: DuckDBVector[] = [];
101+
const columnCount = chunk.columnCount;
102+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
103+
chunkVectors.push(chunk.getColumnVector(columnIndex));
104+
}
105+
const rowCount = chunk.rowCount;
106+
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
107+
const row: DuckDBValue[] = [];
108+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
109+
row.push(chunkVectors[columnIndex].getItem(rowIndex));
110+
}
111+
rows.push(row);
112+
}
113+
}
114+
return rows;
115+
}
73116
}

api/src/DuckDBResultReader.ts

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import { DuckDBDataChunk } from './DuckDBDataChunk';
2+
import { DuckDBLogicalType } from './DuckDBLogicalType';
3+
import { DuckDBResult } from './DuckDBResult';
4+
import { DuckDBType } from './DuckDBType';
5+
import { DuckDBTypeId } from './DuckDBTypeId';
6+
import { DuckDBVector } from './DuckDBVector';
7+
import { ResultReturnType, StatementType } from './enums';
8+
import { DuckDBValue } from './values';
9+
10+
interface ChunkSizeRun {
11+
chunkCount: number;
12+
chunkSize: number;
13+
rowCount: number; // Equal to chunkCount * chunkSize; precalculated for efficiency.
14+
}
15+
16+
export class DuckDBResultReader {
17+
private readonly result: DuckDBResult;
18+
private readonly chunks: DuckDBDataChunk[];
19+
private readonly chunkSizeRuns: ChunkSizeRun[];
20+
private currentRowCount_: number;
21+
private done_: boolean;
22+
23+
constructor(result: DuckDBResult) {
24+
this.result = result;
25+
this.chunks = [];
26+
this.chunkSizeRuns = [];
27+
this.currentRowCount_ = 0;
28+
this.done_ = false;
29+
}
30+
31+
public get returnType(): ResultReturnType {
32+
return this.result.returnType;
33+
}
34+
public get statementType(): StatementType {
35+
return this.result.statementType;
36+
}
37+
public get columnCount(): number {
38+
return this.result.columnCount;
39+
}
40+
public columnName(columnIndex: number): string {
41+
return this.result.columnName(columnIndex);
42+
}
43+
public columnNames(): string[] {
44+
return this.result.columnNames();
45+
}
46+
public columnTypeId(columnIndex: number): DuckDBTypeId {
47+
return this.result.columnTypeId(columnIndex);
48+
}
49+
public columnLogicalType(columnIndex: number): DuckDBLogicalType {
50+
return this.result.columnLogicalType(columnIndex);
51+
}
52+
public columnType(columnIndex: number): DuckDBType {
53+
return this.result.columnType(columnIndex);
54+
}
55+
public columnTypes(): DuckDBType[] {
56+
return this.result.columnTypes();
57+
}
58+
public get rowsChanged(): number {
59+
return this.result.rowsChanged;
60+
}
61+
62+
/** Total number of rows read so far. Call `readAll` or `readUntil` to read rows. */
63+
public get currentRowCount() {
64+
return this.currentRowCount_;
65+
}
66+
67+
/** Whether reading is done, that is, there are no more rows to read. */
68+
public get done() {
69+
return this.done_;
70+
}
71+
72+
/**
73+
* Returns the value for the given column and row. Both are zero-indexed.
74+
*
75+
* Will return an error if `rowIndex` is greater than `currentRowCount`.
76+
*/
77+
public value(columnIndex: number, rowIndex: number): DuckDBValue {
78+
if (this.currentRowCount_ === 0) {
79+
throw Error(`No rows have been read`);
80+
}
81+
let chunkIndex = 0;
82+
let currentRowIndex = rowIndex;
83+
// Find which run of chunks our row is in.
84+
// Since chunkSizeRuns shouldn't ever be longer than 2, this should be O(1).
85+
for (const run of this.chunkSizeRuns) {
86+
if (currentRowIndex < run.rowCount) {
87+
// The row we're looking for is in this run.
88+
// Calculate the chunk index and the row index in that chunk.
89+
chunkIndex += Math.floor(currentRowIndex / run.chunkSize);
90+
const rowIndexInChunk = currentRowIndex % run.chunkSize;
91+
const chunk = this.chunks[chunkIndex];
92+
return chunk.getColumnVector(columnIndex).getItem(rowIndexInChunk);
93+
}
94+
// The row we're looking for is not in this run.
95+
// Update our counts for this run and move to the next one.
96+
chunkIndex += run.chunkCount;
97+
currentRowIndex -= run.rowCount;
98+
}
99+
// We didn't find our row. It must have been out of range.
100+
throw Error(
101+
`Row index ${rowIndex} requested, but only ${this.currentRowCount_} row have been read so far.`,
102+
);
103+
}
104+
105+
/** Read all rows. */
106+
public async readAll(): Promise<void> {
107+
return this.fetchChunks();
108+
}
109+
110+
/**
111+
* Read rows until at least the given target row count has been met.
112+
*
113+
* Note that the resulting row count could be greater than the target, since rows are read in chunks, typically of 2048 rows each.
114+
*/
115+
public async readUntil(targetRowCount: number): Promise<void> {
116+
return this.fetchChunks(targetRowCount);
117+
}
118+
119+
private async fetchChunks(targetRowCount?: number): Promise<void> {
120+
while (
121+
!(
122+
this.done_ ||
123+
(targetRowCount !== undefined && this.currentRowCount_ >= targetRowCount)
124+
)
125+
) {
126+
const chunk = await this.result.fetchChunk();
127+
if (chunk.rowCount > 0) {
128+
this.updateChunkSizeRuns(chunk);
129+
this.chunks.push(chunk);
130+
this.currentRowCount_ += chunk.rowCount;
131+
} else {
132+
this.done_ = true;
133+
}
134+
}
135+
}
136+
137+
private updateChunkSizeRuns(chunk: DuckDBDataChunk) {
138+
if (this.chunkSizeRuns.length > 0) {
139+
const lastRun = this.chunkSizeRuns[this.chunkSizeRuns.length - 1];
140+
if (lastRun.chunkSize === chunk.rowCount) {
141+
// If the new batch is the same size as the last one, just update our last run.
142+
lastRun.chunkCount += 1;
143+
lastRun.rowCount += lastRun.chunkSize;
144+
return;
145+
}
146+
}
147+
// If this is our first batch, or it's a different size, create a new run.
148+
this.chunkSizeRuns.push({
149+
chunkCount: 1,
150+
chunkSize: chunk.rowCount,
151+
rowCount: chunk.rowCount,
152+
});
153+
}
154+
155+
public getColumns(): DuckDBValue[][] {
156+
if (this.chunks.length === 0) {
157+
return [];
158+
}
159+
const firstChunk = this.chunks[0];
160+
const columns: DuckDBValue[][] = [];
161+
const columnCount = this.columnCount;
162+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
163+
columns.push(firstChunk.getColumnValues(columnIndex));
164+
}
165+
for (let chunkIndex = 1; chunkIndex < this.chunks.length; chunkIndex++) {
166+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
167+
const vector = this.chunks[chunkIndex].getColumnVector(columnIndex);
168+
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
169+
columns[columnIndex].push(vector.getItem(itemIndex));
170+
}
171+
}
172+
}
173+
return columns;
174+
}
175+
176+
public getRows(): DuckDBValue[][] {
177+
const rows: DuckDBValue[][] = [];
178+
for (const chunk of this.chunks) {
179+
const chunkVectors: DuckDBVector[] = [];
180+
const columnCount = chunk.columnCount;
181+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
182+
chunkVectors.push(chunk.getColumnVector(columnIndex));
183+
}
184+
const rowCount = chunk.rowCount;
185+
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
186+
const row: DuckDBValue[] = [];
187+
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
188+
row.push(chunkVectors[columnIndex].getItem(rowIndex));
189+
}
190+
rows.push(row);
191+
}
192+
}
193+
return rows;
194+
}
195+
196+
}

0 commit comments

Comments
 (0)