Metadata-Version: 2.1
Name: cf_data_tracker
Version: 0.3.19
Summary: A package for managing raw and clean data tracker operations
Author: Rami, R. K
Author-email: ramireddykowaluru@gmail.com
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: boto3
Requires-Dist: python-dotenv
Requires-Dist: beautifulsoup4
Requires-Dist: boto3
Requires-Dist: botocore
Requires-Dist: bs4
Requires-Dist: certifi
Requires-Dist: charset-normalizer
Requires-Dist: idna
Requires-Dist: jmespath
Requires-Dist: python-dateutil
Requires-Dist: python-dotenv
Requires-Dist: requests
Requires-Dist: s3transfer==0.10.0
Requires-Dist: six
Requires-Dist: soupsieve
Requires-Dist: urllib3

# CF Data Raw Tracker
## Purpose
This packages ensembles all the functions required to manage the raw and clean files loaded in the raw and clean pipelines at CF.

## Set up
To run the package, please ensure to have the following env variables in your environment. Either you can load them using dotenv by stuffing them .env or you can set directly from terminal.

```
AWS_DEST_BUCKET_RAW=s3 bucket to save the raw data json tracker
AWS_REGION_NAME= AWS Region to connect s3
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
```


# Raw Version Tracker Documentation

The version tracker is a system designed to maintain a consistent and accurate record of file versions stored in an Amazon S3 bucket. It ensures that each file is uniquely identified within a specific table and time period, avoiding duplicate entries and maintaining a clear version history.

## Schema Structure

The version tracker organizes file versions into a hierarchical structure:

1. **Schema**: The top-level grouping, typically representing a data source or domain.
2. **Table**: A subset within a schema, often corresponding to a specific type of data.
3. **File Name**: Represents a particular time period (e.g., fiscal year or month) within a table.
4. **Versions**: Individual file entries for a specific file name, each representing a unique upload.

## Key Concepts

### File Name and File

- **File Name**: Represents a specific time period (e.g., fiscal year or month) within a table.
- **File**: The actual filename of the uploaded file, which should be unique for each version within a file name entry.

### Version Tracking

The tracker maintains version information for each file, including:

- Version number
- Timestamp
- File size
- S3 location
- Fiscal year
- Upload date

## Core Functions

### 1. Checking Existing Entries

- `check_file_name_entry`: Checks if a file name entry exists in a table.
- `check_file_exists`: Verifies if a specific file already exists within a file name entry.

### 2. Creating and Updating Entries

- `create_file_name_entry`: Creates a new file name entry with the first version of a file.
- `add_file_version`: Adds a new version to an existing file name entry.
- `update_table_entry`: Updates the table entry with new or updated file information.

### 3. Main Update Process

- `update_file_info`: Coordinates the process of updating file information in the version tracker.
- `update_version_tracker`: The main function to update the version tracker, handling the entire process from reading existing information to writing updated data.

## Update Process

When a new file is processed:

1. The system checks if a file name entry exists for the given time period.
2. If it exists, it checks if the specific file (actual filename) already exists in the versions.
3. If the file doesn't exist:
   - For a new file name entry: A new entry is created with version 1.
   - For an existing file name entry: A new version is added, incrementing the version number.
4. If the file already exists: The upload is skipped to avoid duplicates.

## Usage in Pipelines

To use the version tracker in data pipelines:

1. Import necessary functions from the `raw_tracker` module.
2. Before uploading a file:
   - Check if the file name entry exists.
   - If it exists, check if the specific file already exists in its versions.
   - Only proceed with the upload if the file doesn't exist.
3. If uploading, call `update_version_tracker` with the required information.

Example:

```python
file_entry = check_file_name_entry(table_entry, file_name)
if file_entry and check_file_exists(file_entry, file):
    print(f"File {file} already exists in {file_name}. Skipping upload.")
else:
    # Proceed with file upload
    upload_to_s3(file_path, bucket_name, s3_key)
    # Then update the version tracker
    update_version_tracker(schema_name, table_name, file_name, file, file_size, s3_location)
```

## Benefits

- Prevents duplicate uploads and entries
- Maintains clear version history for each time period
- Allows for efficient processing and reduced S3 costs
- Provides flexibility for different data schemas and tables
- Enables easy tracking and management of file versions in the S3 bucket

By adhering to this structure and utilizing the version tracker effectively, you can ensure the integrity and consistency of file versions stored in the S3 bucket, avoiding duplicate entries and maintaining a clear version history across different schemas and tables.

# CF Data Tracker - Clean Tracker Documentation

## Purpose
The Clean Tracker is an essential component of the CF Data Tracker package, designed to manage the clean data pipeline. It works in conjunction with the Raw Tracker to ensure data consistency, version control, and efficient processing of cleaned data.

## Key Concepts

### Clean Version Tracker
- Maintains information about cleaned data files
- Tracks the relationship between raw and clean versions
- Manages the lifecycle of clean parquet files

### Version Comparison
- Compares raw and clean versions to determine if processing is needed
- Ensures that only outdated files are reprocessed

### Parquet File Management
- Handles the creation and deletion of parquet files
- Organizes clean data for efficient querying and storage

## Core Functions

### 1. Reading and Writing Clean Version Tracker

```python
def read_clean_version_tracker(clean_bucket: str, tracker_path: str) -> Dict:
    """Read the clean version tracker from S3."""

def write_clean_version_tracker(clean_bucket: str, tracker_path: str, clean_tracker: Dict) -> None:
    """Write the clean version tracker to S3."""
```

These functions manage the clean version tracker file in S3, allowing for consistent reading and writing of tracker information.

### 2. Managing Parquet Files

```python
def delete_outdated_parquets(clean_bucket: str, schema: str, table_name: str, file_name: str) -> None:
    """Delete outdated parquet files from S3."""

def process_and_upload_data(df: pd.DataFrame, clean_bucket: str, schema: str, table_name: str, file_name: str, group_by_column: str = None) -> List[str]:
    """Process and upload data to S3 in parquet format."""
```

These functions handle the lifecycle of parquet files, including deleting outdated files and creating new ones based on the processed data.

### 3. Updating Clean Version Information

```python
def update_and_write_clean_version_info(schema: str, clean_bucket: str, clean_version: Dict, file: Dict, table_name: str, latest_version_info: Dict, new_parquet_files: List[str]) -> Dict:
    """Update and write the clean version info."""
```

This function updates the clean version tracker with new information after processing a file, including new parquet file locations.

## Clean Pipeline Process

The clean pipeline follows these general steps:

1. **Read Raw and Clean Trackers**: Fetch the latest information from both raw and clean trackers.
2. **Compare Versions**: Check if the clean version is outdated compared to the raw version.
3. **Process Data**: If an update is needed, download and process the raw data.
4. **Manage Parquet Files**: Delete outdated parquet files and create new ones with the processed data.
5. **Update Clean Tracker**: Update the clean version tracker with new file information and parquet locations.

## Sample Clean Pipeline Implementation

```python
def clean_scmd_pipeline(schema: str, table_name: str, file_name: str, raw_file_path: str, clean_bucket: str):
    # Read raw and clean trackers
    raw_tracker = read_version_file(schema)
    clean_tracker = read_clean_version_tracker(clean_bucket, f"{schema}/{schema}_clean_version_info.json")
    
    # Compare versions
    raw_version = get_raw_version(raw_tracker, schema, table_name, file_name)
    clean_version = get_clean_version(clean_tracker, table_name, file_name)
    
    if clean_version >= raw_version:
        logger.info(f"File {file_name} is already up to date. Skipping.")
        return
    
    # Process data
    df = download_and_process_raw_data(raw_file_path)
    df = process_scmd_data(df)
    
    # Manage parquet files
    delete_outdated_parquets(clean_bucket, schema, table_name, file_name)
    new_parquet_files = process_and_upload_data(df, clean_bucket, schema, table_name, file_name, group_by_column='YEAR_MONTH')
    
    # Update clean tracker
    latest_version_info = create_version_info(raw_version, file_size, raw_file_path)
    clean_tracker = update_and_write_clean_version_info(
        schema, clean_bucket, clean_tracker, file_info, table_name, latest_version_info, new_parquet_files
    )
    
    write_clean_version_tracker(clean_bucket, f"{schema}/{schema}_clean_version_info.json", clean_tracker)
```

## Best Practices

1. **Error Handling**: Implement robust error handling to manage S3 operations, data processing, and tracker updates.
2. **Logging**: Use detailed logging to track the progress of the clean pipeline and aid in debugging.
3. **Modular Design**: Break down the pipeline into modular functions for easier maintenance and testing.
4. **Configuration Management**: Use environment variables or configuration files to manage bucket names and other settings.
5. **Data Validation**: Implement data validation steps to ensure the quality of processed data before creating parquet files.

## Integration with Raw Tracker

The Clean Tracker works closely with the Raw Tracker:

- It uses raw version information to determine if processing is needed.
- It maintains a link between raw file versions and corresponding clean parquet files.
- The clean version is updated to match the raw version after successful processing.

## Benefits

- Ensures data consistency between raw and clean pipelines
- Optimizes processing by only updating outdated files
- Manages the lifecycle of parquet files efficiently
- Provides a clear audit trail of data processing and versions
- Enables easy tracking of clean data versions and locations

By leveraging the Clean Tracker in conjunction with the Raw Tracker, you can create a robust and efficient data processing pipeline that maintains version control and data integrity across your raw and clean data stores.


## Publishing the Package to PyPI

To publish the CF Data Tracker package to PyPI, follow these steps:

1. Ensure you have the latest versions of `setuptools`, `wheel`, and `twine` installed:
   ```
   pip install --upgrade setuptools wheel twine
   ```

2. In the root directory of the project, run the following command to build the distribution files:
   ```
   python setup.py sdist bdist_wheel
   ```
   This command creates a `dist` directory with both a source archive (.tar.gz) and a wheel (.whl) file.

3. Once the distribution files are created, use `twine` to upload the package to PyPI:
   ```
   twine upload dist/*
   ```
   You will be prompted for your PyPI username and password.

After successful upload, the package will be available for installation via pip:
```
pip install cf-data-tracker
```

Remember to update the version number in your `setup.py` file before creating a new distribution, to ensure users can access the latest version of the package.

## CF Data Tracker - Email Alerter Documentation

### Purpose
The Email Alerter is a component of the CF Data Tracker package designed to send consolidated email reports for different stages of data pipelines. It provides a flexible and customizable way to notify stakeholders about the progress and status of data ingestion and processing tasks.

### Key Features

- Supports multiple pipeline stages (e.g., Raw, Clean)
- Sends HTML and plain text email reports
- Customizable pipeline name and recipient list
- Uses Amazon SES for reliable email delivery
- Configurable via environment variables

### Core Class: PipelineAlert

The `PipelineAlert` class is the main component of the Email Alerter system.

### Initialization

```python
from cf_data_tracker import PipelineAlert

pipeline_alert = PipelineAlert()
```


### Key Methods

```python
add_report(pipeline_stage, report_type, message)

Adds a report message for a specific pipeline stage and report type.

pipeline_alert.add_report("Raw", "ingestion", "Processed 5 files successfully")
pipeline_alert.add_report("Clean", "error", "Failed to process 1 file")
send_email_alert()

Compiles and sends the email alert with all collected reports.
```

### Configuration
The Email Alerter uses the following environment variables:

```
AWS_REGION: AWS region for SES client (default: 'eu-west-2')
SENDER_EMAIL: Email address to send alerts from
RECIPIENT_EMAILS: JSON-formatted list of email addresses to receive alerts
PIPELINE_NAME: Name of the pipeline for reporting
```

Example configuration in .env file:

```
AWS_REGION=eu-west-2
SENDER_EMAIL=alerts@example.com
RECIPIENT_EMAILS=["user1@example.com", "user2@example.com"]
PIPELINE_NAME="Secondary Care Medicines Data Ingestion Pipeline"
```

## Usage in Pipelines
To use the Email Alerter in your data pipelines:

Initialize the PipelineAlert at the beginning of your pipeline script.
Add reports at key points in your pipeline using the add_report method.
Call send_email_alert at the end of your pipeline to send the consolidated report.
Example:
```python
from cf_data_tracker import PipelineAlert

def main():
    pipeline_alert = PipelineAlert()

    try:
        # Raw data processing
        pipeline_alert.add_report("Raw", "ingestion", "Started raw data processing")
        # ... raw data processing logic ...
        pipeline_alert.add_report("Raw", "ingestion", "Completed raw data processing")

        # Clean data processing
        pipeline_alert.add_report("Clean", "ingestion", "Started clean data processing")
        # ... clean data processing logic ...
        pipeline_alert.add_report("Clean", "ingestion", "Completed clean data processing")

    except Exception as e:
        pipeline_alert.add_report("Error", "error", f"Pipeline failed: {str(e)}")

    finally:
        pipeline_alert.send_email_alert()

if __name__ == "__main__":
    main()
```

### Benefits

- Provides real-time updates on pipeline progress
- Centralizes reporting for multiple pipeline stages
- Customizable to fit specific pipeline needs
- Integrates seamlessly with AWS infrastructure
- Enhances visibility and monitoring of data processing tasks
By incorporating the Email Alerter into your data pipelines, you can ensure that all relevant stakeholders are kept informed about the status and progress of your data processing tasks, enabling quick responses to any issues that may arise during the pipeline execution.


This README section provides an overview of the Email Alerter component, its key features, configuration details, and usage examples. You can add this content to your cf_df_datatracker package project README, making any necessary adjustments to fit the overall structure and style of your documentation.
