Skip to content

Add data export methods for select query_type; #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pydruid/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def export_tsv(self, dest_path):
header = list(self.result[0]['event'].keys())
header.append('timestamp')
header.append('version')
elif self.query_type == "select":
header = list(self.result[0]['result']['events'][0]['event'].keys())
else:
raise NotImplementedError('TSV export not implemented for query type: {0}'.format(self.query_type))

Expand All @@ -120,6 +122,11 @@ def export_tsv(self, dest_path):
version = item['version']
w.writerow(
list(item['event'].values()) + [timestamp] + [version])
elif self.query_type == "select":
for item in self.result:
result = item['result']['events']
for line in result:
w.writerow(list(line['event'].values()))

f.close()

Expand Down Expand Up @@ -172,6 +179,13 @@ def export_pandas(self):
nres = [list(v['event'].items()) + [('timestamp', v['timestamp'])]
for v in self.result]
nres = [dict(v) for v in nres]
elif self.query_type == "select":
nres = []
for item in self.result:
results = item['result']
tres = [dict(list(res['event'].items()))
for res in results['events']]
nres += tres
else:
raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type))

Expand Down
60 changes: 60 additions & 0 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ def create_query_with_results():
return query


def create_query_with_select_results():
query = Query({}, 'select')
query.result = [
{
"timestamp": "2017-01-01T00:00:00.000Z",
"result": {
"pagingIdentifiers": { "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00": 4 },
"dimensions": ["value1", "value2"],
"metrics": [],
"events": [
{
"segmentId": "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00",
"offset": 0,
"event": {"timestamp": "2017-01-01T08:00:00.000+08:00", "value1": 1, "value2": 2}
},
{
"segmentId": "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00",
"offset": 1,
"event": {"timestamp": "2017-01-01T08:00:01.000+08:00", "value1": 11, "value2": 12}
}
]
}
}
]
return query


EXPECTED_RESULTS_PANDAS = [{
'timestamp': '2015-01-01T00:00:00.000-05:00',
'value1': 1,
Expand All @@ -54,6 +81,12 @@ def create_query_with_results():
}]


EXPECTED_SELECT_RESULTS_PANDAS = [
{"timestamp": "2017-01-01T08:00:00.000+08:00", "value1": 1, "value2": 2},
{"timestamp": "2017-01-01T08:00:01.000+08:00", "value1": 11, "value2": 12}
]


def expected_results_csv_reader():
# csv.DictReader does not perform promotion to int64
expected_results = []
Expand All @@ -64,6 +97,16 @@ def expected_results_csv_reader():
return expected_results


def expected_results_select_csv_reader():
expected_results = []
for element in EXPECTED_SELECT_RESULTS_PANDAS:
modified_elem = element.copy()
modified_elem.update({'value1': str(modified_elem['value1'])})
modified_elem.update({'value2': str(modified_elem['value2'])})
expected_results.append(modified_elem)
return expected_results


class TestQueryBuilder:
def test_build_query(self):
# given
Expand Down Expand Up @@ -197,13 +240,30 @@ def test_export_tsv(self, tmpdir):
actual = [line for line in reader]
assert actual == expected_results_csv_reader()

def test_export_select_tsv(self, tmpdir):
query = create_query_with_select_results()
file_path = tmpdir.join('out.tsv')
query.export_tsv(str(file_path))

with open(str(file_path)) as tsv_file:
reader = csv.DictReader(tsv_file, delimiter="\t")
actual = [line for line in reader]
assert actual == expected_results_select_csv_reader()

@pytest.mark.skipif(pandas is None, reason="requires pandas")
def test_export_pandas(self):
query = create_query_with_results()
df = query.export_pandas()
expected_df = pandas.DataFrame(EXPECTED_RESULTS_PANDAS)
assert_frame_equal(df, expected_df)

@pytest.mark.skipif(pandas is None, reason="requires pandas")
def test_export_select_pandas(self):
query = create_query_with_select_results()
df = query.export_pandas()
expected_df = pandas.DataFrame(EXPECTED_SELECT_RESULTS_PANDAS)
assert_frame_equal(df, expected_df)

def test_query_acts_as_a_wrapper_for_raw_result(self):
# given
query = create_query_with_results()
Expand Down