Skip to content

Added table freshness overview #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions discoverx/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ def scan_result(self):

return self.scanner.scan_result.df

def table_freshness(
self,
from_tables="*.*.*",
what_if: bool = False,
):
"""Scans the lakehouse for columns matching the given rules

Args:
from_tables (str, optional): The tables to be scanned in format "catalog.schema.table", use "*" as a wildcard. Defaults to "*.*.*".
what_if (bool, optional): Whether to run the scan in what-if mode and print the SQL commands instead of executing them. Defaults to False.
"""
catalogs, schemas, tables = Msql.validate_from_components(from_tables)

self.scanner = Scanner(
self.spark,
self.rules,
catalogs=catalogs,
schemas=schemas,
tables=tables,
what_if=what_if,
)

return self.scanner.scan_history()

def _classify(self, classification_threshold: float):
"""Classifies the columns in the lakehouse

Expand Down
108 changes: 106 additions & 2 deletions discoverx/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,117 @@ def scan(self):

logger.debug("Launching lakehouse scanning task\n")

self.scan_result = self.do_scan(self.scan_table)


def scan_table_history(self, table):

try:
if self.what_if:
logger.friendly(f"SQL that would be executed for '{table.catalog}.{table.schema}.{table.table}'")
else:
logger.friendly(f"Checking history for table '{table.catalog}.{table.schema}.{table.table}'")

frequency = "1 day"
# Build rule matching SQL
# TODO: Add more metrics https://docs.delta.io/latest/delta-utility.html#operation-metrics-keys
sql = f"""
WITH

all_dates AS (
SELECT explode(sequence(to_date(date_sub(now(), 90)), to_date(now()), interval {frequency})) as date
),

table_metrics AS (
SELECT w.start, w.end, *
FROM
(
SELECT *
FROM
(
SELECT window(timestamp, '{frequency}') AS w, metric, sum(value) AS value
FROM (
SELECT timestamp, explode(operationMetrics) AS (metric, value)
FROM (
DESCRIBE HISTORY {table.catalog}.{table.schema}.{table.table}
)
)
GROUP BY window(timestamp, '{frequency}'), metric
)
PIVOT (
SUM(value) AS val
FOR (metric) IN (

'numAddedBytes' AS numAddedBytes,
'numOutputBytes' AS numOutputBytes,
'numRemovedBytes' AS numRemovedBytes,

'numFiles' AS numFiles,
'numAddedFiles' AS numAddedFiles,
'numAddedChangeFiles' AS numAddedChangeFiles,
'numRemovedFiles' AS numRemovedFiles,

'numOutputRows' AS numOutputRows,
'numCopiedRows' AS numCopiedRows,
'numDeletedRows' AS numDeletedRows,

'numDeletionVectorsAdded' AS numDeletionVectorsAdded,
'numDeletionVectorsRemoved' AS numDeletionVectorsRemoved,

'executionTimeMs' AS executionTimeMs,
'scanTimeMs' AS scanTimeMs,
'rewriteTimeMs' AS rewriteTimeMs
)
)
)
)


SELECT
'{table.catalog}' as table_catalog,
'{table.schema}' as table_schema,
'{table.table}' as table_name,
*
FROM table_metrics
"""

if self.what_if:
logger.friendly(sql)
else:
# Execute SQL and return the result
return self.spark.sql(sql).toPandas()
except Exception as e:
logger.error(f"Error while scanning table history '{table.catalog}.{table.schema}.{table.table}': {e}")
return None

def scan_history(self):

logger.friendly("""Ok, I'm going to scan your lakehouse data history.""")
text = f"""
This is what you asked for:

catalogs ({self.content.n_catalogs}) = {self.catalogs}
schemas ({self.content.n_schemas}) = {self.schemas}
tables ({self.content.n_tables}) = {self.tables}

This may take a while, so please be patient. I'll let you know when I'm done.
...
"""

logger.friendly(strip_margin(text))

logger.debug("Launching lakehouse history scanning task\n")

return self.do_scan(self.scan_table_history)

def do_scan(self, f):
if len(self.content.table_list) == 0:
raise Exception("No tables found matching your filters")

dfs = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
# Submit tasks to the thread pool
futures = [executor.submit(self.scan_table, table) for table in self.content.table_list]
futures = [executor.submit(f, table) for table in self.content.table_list]

# Process completed tasks
for future in concurrent.futures.as_completed(futures):
Expand All @@ -208,7 +312,7 @@ def scan(self):
logger.debug("Finished lakehouse scanning task")

if dfs:
self.scan_result = ScanResult(df=pd.concat(dfs))
return ScanResult(df=pd.concat(dfs))
else:
raise Exception("No tables were scanned successfully.")

Expand Down
84 changes: 84 additions & 0 deletions notebooks/interaction_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@

# COMMAND ----------

df = dx.table_freshness(from_tables="discoverx*.*.*", what_if=False)

# COMMAND ----------

display(df.df)

# COMMAND ----------

# MAGIC %md
# MAGIC ### Scan
# MAGIC This section demonstrates a typical DiscoverX workflow which consists of the following steps:
Expand Down Expand Up @@ -174,3 +182,79 @@
help(DX)

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC

# COMMAND ----------

# MAGIC %sql
# MAGIC
# MAGIC
# MAGIC
# MAGIC WITH
# MAGIC
# MAGIC all_dates AS (
# MAGIC SELECT explode(sequence(to_date(date_sub(now(), 90)), to_date(now()), interval 1 day)) as date
# MAGIC ),
# MAGIC
# MAGIC table_metrics AS (
# MAGIC SELECT w.start, w.end, *
# MAGIC FROM
# MAGIC (
# MAGIC SELECT *
# MAGIC FROM
# MAGIC (
# MAGIC SELECT window(timestamp, '1 day') AS w, metric, sum(value) AS value
# MAGIC FROM (
# MAGIC SELECT timestamp, explode(operationMetrics) AS (metric, value)
# MAGIC FROM (
# MAGIC DESCRIBE HISTORY discoverx_sample.sample_datasets.cyber_data
# MAGIC )
# MAGIC )
# MAGIC GROUP BY window(timestamp, '1 day'), metric
# MAGIC )
# MAGIC PIVOT (
# MAGIC SUM(value) AS val
# MAGIC FOR (metric) IN (
# MAGIC -- TODO: add metrics https://docs.delta.io/latest/delta-utility.html#operation-metrics-keys
# MAGIC 'numAddedBytes' AS numAddedBytes,
# MAGIC 'numOutputBytes' AS numOutputBytes,
# MAGIC 'numRemovedBytes' AS numRemovedBytes,
# MAGIC
# MAGIC 'numFiles' AS numFiles,
# MAGIC 'numAddedFiles' AS numAddedFiles,
# MAGIC 'numAddedChangeFiles' AS numAddedChangeFiles,
# MAGIC 'numRemovedFiles' AS numRemovedFiles,
# MAGIC
# MAGIC 'numOutputRows' AS numOutputRows,
# MAGIC 'numCopiedRows' AS numCopiedRows,
# MAGIC 'numDeletedRows' AS numDeletedRows,
# MAGIC
# MAGIC 'numDeletionVectorsAdded' AS numDeletionVectorsAdded,
# MAGIC 'numDeletionVectorsRemoved' AS numDeletionVectorsRemoved,
# MAGIC
# MAGIC 'executionTimeMs' AS executionTimeMs,
# MAGIC 'scanTimeMs' AS scanTimeMs,
# MAGIC 'rewriteTimeMs' AS rewriteTimeMs
# MAGIC )
# MAGIC )
# MAGIC )
# MAGIC )
# MAGIC
# MAGIC
# MAGIC SELECT 'cyber_data' AS table_name, *
# MAGIC FROM all_dates
# MAGIC LEFT OUTER JOIN table_metrics
# MAGIC ON date = to_date(w.end)

# COMMAND ----------

dx._msql(
"SELECT * FROM (DESCRIBE HISTORY *.*.*)",
what_if=True)

# COMMAND ----------