Skip to content

Commit ec54390

Browse files
committed
Create text_analysis_ai_use_case_discovery.py
1 parent 849c17d commit ec54390

File tree

1 file changed

+252
-0
lines changed

1 file changed

+252
-0
lines changed
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
# Databricks notebook source
2+
# MAGIC %md
3+
# MAGIC # Text analysis GenAI use cases discovery
4+
# MAGIC
5+
# MAGIC This notebooks uses [DiscoverX](https://github.com/databrickslabs/discoverx) to analyze text with [AI Functions](https://docs.databricks.com/aws/en/large-language-models/ai-functions) over a set of tables in Unity Catalog.
6+
# MAGIC
7+
# MAGIC The notebook will:
8+
# MAGIC 1. Use DiscoverX to find free text columns across multiple scanned tables
9+
# MAGIC 3. Provide a set of possible use cases for that text with cost estimation and example query
10+
# MAGIC
11+
# MAGIC
12+
13+
# COMMAND ----------
14+
15+
# MAGIC %md
16+
# MAGIC ## Install dependencies
17+
18+
# COMMAND ----------
19+
20+
# MAGIC %pip install dbl-discoverx
21+
# MAGIC
22+
# MAGIC dbutils.library.restartPython()
23+
24+
# COMMAND ----------
25+
26+
# MAGIC %md
27+
# MAGIC ## Setup widgets
28+
29+
# COMMAND ----------
30+
31+
dbutils.widgets.text("from_tables", "your_catalog.*.*", "from tables")
32+
33+
# COMMAND ----------
34+
35+
# MAGIC %md
36+
# MAGIC ## Import required libs and initialize variables
37+
38+
# COMMAND ----------
39+
40+
import pandas as pd
41+
from pyspark.sql.functions import (
42+
pandas_udf,
43+
col,
44+
concat,
45+
lit,
46+
explode,
47+
count,
48+
avg,
49+
min,
50+
max,
51+
sum,
52+
collect_set,
53+
concat_ws,
54+
)
55+
from pyspark.sql.types import ArrayType, StringType, StructType, FloatType, StructField
56+
from typing import Iterator
57+
58+
# COMMAND ----------
59+
60+
from_tables = dbutils.widgets.get("from_tables")
61+
62+
# Set the sample rows size
63+
sample_size = 3
64+
65+
66+
# COMMAND ----------
67+
68+
# MAGIC %md
69+
# MAGIC ## Initialize discoverx
70+
71+
# COMMAND ----------
72+
73+
from discoverx import DX
74+
75+
dx = DX()
76+
77+
# COMMAND ----------
78+
79+
# MAGIC %md
80+
# MAGIC ## Extract samples from all string values
81+
82+
# COMMAND ----------
83+
84+
from pyspark.sql.functions import col, expr
85+
86+
unpivoted_df = (
87+
dx.from_tables(from_tables)
88+
.unpivot_string_columns(sample_size=sample_size)
89+
.apply()
90+
91+
)
92+
93+
# COMMAND ----------
94+
95+
display(unpivoted_df)
96+
97+
# COMMAND ----------
98+
99+
# MAGIC %md
100+
# MAGIC ## Empirically find free-text columns
101+
102+
# COMMAND ----------
103+
104+
from pyspark.sql.functions import length, col, expr, stddev
105+
106+
free_text_columns = (unpivoted_df
107+
108+
.withColumn("str_length", length(col("string_value")))
109+
.withColumn("space_count", expr("LENGTH(string_value) - LENGTH(REPLACE(string_value, ' ', ''))"))
110+
.withColumn("string_value", expr("IF(LENGTH(string_value) > 1000, SUBSTRING(string_value, 1, 1000), string_value)"))
111+
.groupBy("table_catalog", "table_schema", "table_name", "column_name")
112+
.agg(
113+
avg("str_length").alias("avg_str_length"),
114+
stddev("str_length").alias("stddev_str_length"),
115+
avg("space_count").alias("avg_space_count"),
116+
stddev("space_count").alias("stddev_space_count"),
117+
collect_set("string_value").alias("sample_values"),
118+
)
119+
.filter( # Find free text columns empirically
120+
(col("avg_str_length") > 40) &
121+
(col("avg_space_count") > 5) &
122+
(col("stddev_str_length") > 0) &
123+
(col("stddev_space_count") > 0))
124+
)
125+
126+
127+
128+
# COMMAND ----------
129+
130+
# MAGIC %md
131+
# MAGIC ### GenAI use cases
132+
133+
# COMMAND ----------
134+
135+
free_text_columns.display()
136+
137+
138+
# COMMAND ----------
139+
140+
expression = """ai_query(
141+
"databricks-meta-llama-3-3-70b-instruct",
142+
concat('Provide 2-3 useful, interesting and creative genAI use cases for batch processing a column named ', column_name, ' for a table named ', table_catalog, '.', table_schema, '.', table_name, '. Provide the use cases as a JSON array of objects with the following properties: title, description, type, example_prompt. The example_prompt should be a prompt that can be used process the use case, the row content will be appeneded to the example_prompt. Sample data: ', string(sample_values)),
143+
responseFormat => '{
144+
"type": "json_schema",
145+
"json_schema": {
146+
"name": "response",
147+
"schema": {
148+
"type": "object",
149+
"properties": {
150+
"use_cases": {
151+
"type": "array",
152+
"items": {
153+
"type": "object",
154+
"properties": {
155+
"title": {"type": "string"},
156+
"description": {"type": "string"},
157+
"type": {"type": "string", "enum": ["classification", "information extraction", "question answering", "summarization", "translation", "sentiment analysis", "other"]},
158+
"example_prompt": {"type": "string"},
159+
"output_json_schema": {"type": "string", "description": "A JSON schema that could be used to process the output of the AI query executed with example_prompt."},
160+
"expected_average_output_tokens": {"type": "number"}
161+
},
162+
"required": ["title", "description", "type", "example_prompt", "output_json_schema", "expected_average_output_tokens"]
163+
}
164+
}
165+
}
166+
}
167+
}
168+
}'
169+
)"""
170+
171+
# COMMAND ----------
172+
173+
from pyspark.sql.functions import from_json, explode, col
174+
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
175+
176+
schema = StructType([
177+
StructField("use_cases", ArrayType(StructType([
178+
StructField("title", StringType(), True),
179+
StructField("description", StringType(), True),
180+
StructField("type", StringType(), True),
181+
StructField("example_prompt", StringType(), True),
182+
StructField("output_json_schema", StringType(), True),
183+
StructField("expected_average_output_tokens", FloatType(), True)
184+
])), True)
185+
])
186+
187+
use_cases = (free_text_columns
188+
.withColumn("use_case", expr(expression))
189+
.withColumn("use_case", from_json(col("use_case"), schema))
190+
.withColumn("use_case", explode(col("use_case.use_cases")))
191+
)
192+
use_cases = spark.createDataFrame(use_cases.toPandas()) # Caching the result in pandas to avoid re-running the AI query
193+
display(use_cases)
194+
195+
# COMMAND ----------
196+
197+
# MAGIC %md
198+
# MAGIC ## Count the number of rows per table
199+
200+
# COMMAND ----------
201+
202+
row_count = dx.from_tables(from_tables).with_sql("SELECT COUNT(*) AS row_count FROM {full_table_name}").apply()
203+
row_count = spark.createDataFrame(row_count.toPandas())
204+
display(row_count)
205+
206+
# COMMAND ----------
207+
208+
# MAGIC %md
209+
# MAGIC ## Estimate cost and provide SQL examples
210+
211+
# COMMAND ----------
212+
213+
# Check costs on https://www.databricks.com/product/pricing/foundation-model-serving
214+
cost_per_M_input_tokens = 0.5
215+
cost_per_M_output_tokens = 1.5
216+
217+
result = (use_cases
218+
.withColumn("estimated_token_count_per_row", expr("ROUND((LENGTH(use_case.example_prompt) + avg_str_length)/ 4)"))
219+
.join(row_count, ["table_catalog", "table_schema", "table_name"])
220+
.withColumn("estimated_total_table_token_count", expr("estimated_token_count_per_row * row_count"))
221+
.withColumn("estimated_total_table_input_processing_cost", col("estimated_total_table_token_count") * cost_per_M_input_tokens / 1000000)
222+
.withColumn("estimated_total_table_output_processing_cost", col("row_count") * col("use_case.expected_average_output_tokens") * cost_per_M_output_tokens / 1000000)
223+
.withColumn("example_query", expr("""
224+
"SELECT ai_query('databricks-meta-llama-3-3-70b-instruct', concat('" ||
225+
use_case.example_prompt ||
226+
"', " ||
227+
column_name ||
228+
"), responseFormat => '{\\\"type\\\": \\\"json_schema\\\", \\\"json_schema\\\": {\\\"name\\\": \\\"response\\\", \\\"schema\\\": " || use_case.output_json_schema || "}}'" ||
229+
") AS ai_output, * FROM " ||
230+
table_catalog || "." || table_schema || "." || table_name || " LIMIT 100;"
231+
"""))
232+
.withColumn("estimated_total_table_processing_cost", col("estimated_total_table_input_processing_cost") + col("estimated_total_table_output_processing_cost"))
233+
.select("table_catalog", "table_schema", "table_name", "column_name", "use_case", "estimated_total_table_processing_cost", "example_query")
234+
)
235+
236+
display(result)
237+
238+
# COMMAND ----------
239+
240+
# MAGIC %md
241+
# MAGIC ## Try out some sample queryes from above
242+
243+
# COMMAND ----------
244+
245+
# MAGIC %sql
246+
# MAGIC
247+
# MAGIC -- TODO: Copy-paste a query form the result above. Se documentation on https://docs.databricks.com/aws/en/large-language-models/ai-functions
248+
# MAGIC
249+
# MAGIC
250+
251+
# COMMAND ----------
252+

0 commit comments

Comments
 (0)