diff --git a/pydruid/query.py b/pydruid/query.py index fb00f795..02f8d14d 100644 --- a/pydruid/query.py +++ b/pydruid/query.py @@ -99,6 +99,8 @@ def export_tsv(self, dest_path): header = list(self.result[0]['event'].keys()) header.append('timestamp') header.append('version') + elif self.query_type == "select": + header = list(self.result[0]['result']['events'][0]['event'].keys()) else: raise NotImplementedError('TSV export not implemented for query type: {0}'.format(self.query_type)) @@ -120,6 +122,11 @@ def export_tsv(self, dest_path): version = item['version'] w.writerow( list(item['event'].values()) + [timestamp] + [version]) + elif self.query_type == "select": + for item in self.result: + result = item['result']['events'] + for line in result: + w.writerow(list(line['event'].values())) f.close() @@ -172,6 +179,13 @@ def export_pandas(self): nres = [list(v['event'].items()) + [('timestamp', v['timestamp'])] for v in self.result] nres = [dict(v) for v in nres] + elif self.query_type == "select": + nres = [] + for item in self.result: + results = item['result'] + tres = [dict(list(res['event'].items())) + for res in results['events']] + nres += tres else: raise NotImplementedError('Pandas export not implemented for query type: {0}'.format(self.query_type)) diff --git a/tests/test_query.py b/tests/test_query.py index 83b91617..3c8e8a74 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -43,6 +43,33 @@ def create_query_with_results(): return query +def create_query_with_select_results(): + query = Query({}, 'select') + query.result = [ + { + "timestamp": "2017-01-01T00:00:00.000Z", + "result": { + "pagingIdentifiers": { "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00": 4 }, + "dimensions": ["value1", "value2"], + "metrics": [], + "events": [ + { + "segmentId": "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00", + "offset": 0, + "event": {"timestamp": "2017-01-01T08:00:00.000+08:00", "value1": 1, "value2": 2} + }, + { + "segmentId": "test_2017-01-01T08:00:00.000+08:00_2017-01-02T08:00:00.000+08:00_2017-07-04T16:37:03.563+08:00", + "offset": 1, + "event": {"timestamp": "2017-01-01T08:00:01.000+08:00", "value1": 11, "value2": 12} + } + ] + } + } + ] + return query + + EXPECTED_RESULTS_PANDAS = [{ 'timestamp': '2015-01-01T00:00:00.000-05:00', 'value1': 1, @@ -54,6 +81,12 @@ def create_query_with_results(): }] +EXPECTED_SELECT_RESULTS_PANDAS = [ + {"timestamp": "2017-01-01T08:00:00.000+08:00", "value1": 1, "value2": 2}, + {"timestamp": "2017-01-01T08:00:01.000+08:00", "value1": 11, "value2": 12} +] + + def expected_results_csv_reader(): # csv.DictReader does not perform promotion to int64 expected_results = [] @@ -64,6 +97,16 @@ def expected_results_csv_reader(): return expected_results +def expected_results_select_csv_reader(): + expected_results = [] + for element in EXPECTED_SELECT_RESULTS_PANDAS: + modified_elem = element.copy() + modified_elem.update({'value1': str(modified_elem['value1'])}) + modified_elem.update({'value2': str(modified_elem['value2'])}) + expected_results.append(modified_elem) + return expected_results + + class TestQueryBuilder: def test_build_query(self): # given @@ -197,6 +240,16 @@ def test_export_tsv(self, tmpdir): actual = [line for line in reader] assert actual == expected_results_csv_reader() + def test_export_select_tsv(self, tmpdir): + query = create_query_with_select_results() + file_path = tmpdir.join('out.tsv') + query.export_tsv(str(file_path)) + + with open(str(file_path)) as tsv_file: + reader = csv.DictReader(tsv_file, delimiter="\t") + actual = [line for line in reader] + assert actual == expected_results_select_csv_reader() + @pytest.mark.skipif(pandas is None, reason="requires pandas") def test_export_pandas(self): query = create_query_with_results() @@ -204,6 +257,13 @@ def test_export_pandas(self): expected_df = pandas.DataFrame(EXPECTED_RESULTS_PANDAS) assert_frame_equal(df, expected_df) + @pytest.mark.skipif(pandas is None, reason="requires pandas") + def test_export_select_pandas(self): + query = create_query_with_select_results() + df = query.export_pandas() + expected_df = pandas.DataFrame(EXPECTED_SELECT_RESULTS_PANDAS) + assert_frame_equal(df, expected_df) + def test_query_acts_as_a_wrapper_for_raw_result(self): # given query = create_query_with_results()