Skip to content

Conversation

radektomasek
Copy link

Description

Add Keboola Storage API integration tool enabling CrewAI agents to access and extract structured data directly from Keboola projects.

This tool is the first in a planned series of Keboola-native tools intended to simplify AI-powered workflows and analytics inside and outside of Keboola. It's already in use for internal use-cases within Keboola and contributes toward broader enterprise adoption of CrewAI.

Tool Added

  • KeboolaTableExtractTool - Downloads a Keboola table using asynchronous export (multi-cloud supported: AWS, Azure, GCP) and returns its content as a CSV string.

Features

  • Supports Keboola’s async table export across all major cloud platforms.
  • Auto-detects backend (S3, GCS, Azure Blob) from manifest.
  • Downloads and merges sliced CSV data into a single result.
  • Designed to work seamlessly in CrewAI agent flows.
  • Includes args_schema with proper field descriptions.
  • Production-tested in Keboola AI workflows.
  • Easily extensible with additional Keboola API endpoints.

Testing

  • All existing tests pass
  • Type checking passes
  • Tools follow CrewAI BaseTool patterns

For integration testing: Please email radek.tomasek@keboola.com to request access to a test Keboola project where you can validate the tool end-to-end.

Dependencies

Uses existing requests, boto3, pandas, and google-auth libraries.

Breaking Changes

None – this is a purely additive contribution.

…ocumentation. The tool follows the Storage API flows and download table based on the underlying stack (S3, GCP, Azure).
@joaomdmoura
Copy link
Collaborator

Disclaimer: This review was made by a crew of AI Agents.

Code Review Comment for Keboola Storage API Tool

Overall Impression

The implementation of the Keboola Storage API Tool is a solid foundation, allowing for the extraction of data from the Keboola Storage API with support across multiple cloud providers. While it demonstrates a structured approach, there are opportunities for improvement in documentation, error handling, testing, and configuration management that can enhance overall usability and reliability.

Documentation (README.md)

Strengths:

  • The documentation provides a clear overview of features and installation instructions, supporting users in implementing the tool effectively.

Suggestions for Improvement:

  • Error Handling Examples: Including examples of how to handle various errors will help users anticipate issues.
  • Version Compatibility Information: It is crucial to specify which API versions the tool is compatible with.

Main Implementation (keboola_table_extract_tool.py)

A. Input Validation

Current Implementation:

class ExtractInput(BaseModel):
    table_id: str = Field(..., description="Full table ID like 'in.c-usage.usage_data'")
    api_token: str = Field(..., description="Keboola Storage API token")
    base_url: str = Field(..., description="Keboola base API URL")

Suggested Improvements:

  • Include regex validations for more robust input verification. Consider modifying the table_id, api_token, and base_url fields to ensure they meet expected formats:
class ExtractInput(BaseModel):
    table_id: str = Field(..., regex="^(in|out)\.c-[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+$")
    api_token: str = Field(..., min_length=32)
    base_url: str = Field(..., regex="^https://connection\..*\.keboola\.com$")

B. Error Handling

Current Implementation Lacks Specificity:

  • Instead of generic error messages, create specific exceptions for errors encountered during API interactions:
class KeboolaAPIError(Exception):
    """Custom exception for Keboola API related errors"""
    pass

C. Resource Management

  • Utilizing context managers can enhance resource handling, particularly while managing temporary files in data extraction processes:
from contextlib import contextmanager

@contextmanager
def temporary_file():
    tmp_path = tempfile.NamedTemporaryFile(delete=False)
    try:
        yield tmp_path.name
    finally:
        if os.path.exists(tmp_path.name):
            os.remove(tmp_path.name)

D. Configuration Management

  • Implement validation with Pydantic to ensure configuration values are collected and managed centrally:
class KeboolaConfig(BaseSettings):
    max_retries: int = 30
    # Additional configurations

E. Testing Improvements

  • While testing is underway, consider expanding the coverage to include more diverse scenarios. For example, test how the tool interfaces with each cloud provider:
@pytest.mark.parametrize("cloud_backend", ["s3", "gcp", "azure"])
def test_backend_detection(cloud_backend, tool):
    # Implementation of the test logic

Recommendations for Future Improvements

  1. Retry Mechanism: Implement a retry mechanism for transient failures when making API calls.
  2. Logging: Serve as the first level of debugging and monitoring throughout the extraction process.
  3. Metrics Collection: Introduce basic metrics to monitor performance and extract insights from operations.

Conclusion:
In summary, while the Keboola Storage API Tool is off to a commendable start, incorporating the outlined suggestions will significantly enhance its robustness, usability, and maintainability. Keeping an eye on the testing and documentation aspects will ensure a smoother integration and user experience for future updates.

Copy link
Contributor

@tonykipkemboi tonykipkemboi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add the tool in this init file:
crewai_tools/__init__.py/

### Usage Example (Manual)

```python
from keboola_storage_api_tool.keboola_table_extract_tool import KeboolaTableExtractTool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change this to reference import from crewai_tools like so:

from crewai_tools import KeboolaTableExtractTool


```python
from crewai import Agent, Task, Crew
from keboola_storage_api_tool.keboola_table_extract_tool import KeboolaTableExtractTool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with this one. see above comment

…l test coverage

Summary of changes:

- Implemented KeboolaTableExtractTool for asynchronous table export via Keboola Storage API.
- Added support for auto-detection and download from AWS S3, GCP, and Azure based on manifest URLs.
- Split cloud-specific logic into modular utility files:
   - s3_slice_download.py
   - gcp_slice_download.py
   - azure_slice_download.py

- Introduced utils.py with reusable polling and metadata helpers.
- Added config.py using pydantic-settings for polling configuration.
- Defined and raised consistent custom exceptions via exceptions.py.
- Added full unit test suite:
   - Tool behavior tests (success, empty table, failure, backend detection, timeout)
   - Separate tests for each cloud downloader with mocked credentials and I/O
…stency with YAML task action

- Updated `KeboolaTableExtractTool.name` to "download_keboola_table_tool"
@radektomasek
Copy link
Author

Hello @tonykipkemboi 👋,

Thank you very much for your initial review. I spent quality time yesterday addressing both your comments and the automated feedback I had received earlier.

Summary of Changes

  • Enhanced the original implementation of KeboolaTableExtractTool for asynchronous table export via the Keboola Storage API.

  • Modularized cloud-specific logic into dedicated utility files:

    • s3_slice_download.py (for downloading data from AWS S3 Storage Based Projects).
    • gcp_slice_download.py (for downloading data from GCP Cloud Storage Based Projects).
    • azure_slice_download.py (for downloading data from Azure Cloud Storage Projects).
  • Introduced utils.py for reusable polling and metadata helper functions.

  • Added config.py using pydantic-settings to configure polling behavior.

  • Defined consistent custom exceptions in exceptions.py.

  • Added full unit test coverage:

    • Tool behavior tests (success, empty table, failure, backend detection, timeout).
    • Independent tests for each cloud downloader with mocked credentials and I/O.

Additional Notes

I've also thoroughly tested the tool manually by integrating it into a test CrewAI project and validating it against live exports from AWS, GCP, and Azure. Everything seems to be working as expected.

I'd really appreciate it if you could take another look when you have a moment. Let me know if you'd like me to share anything specific - I’d be happy to invite you to a Keboola project and provide you additional credits on top of the free tier for hands-on testing if that helps.

Thanks again, and have a wonderful day!

Radek

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants