#!/usr/bin/env python
#
# Copyright 2018-2023 contributors to the OpenLineage project
# SPDX-License-Identifier: Apache-2.0

import logging
import os
import subprocess
import sys
import time
import uuid
from datetime import datetime, timezone
from typing import Optional

from openlineage.client.client import OpenLineageClient
from openlineage.client.facet import JobTypeJobFacet
from openlineage.client.run import Job, Run, RunEvent, RunState
from openlineage.common.provider.dbt import (
    DbtLocalArtifactProcessor,
    ParentRunMetadata,
    UnsupportedDbtCommand,
)
from tqdm import tqdm

__version__ = "1.9.1"

from openlineage.common.utils import parse_single_arg

PRODUCER = f"https://github.com/OpenLineage/OpenLineage/tree/{__version__}/integration/dbt"
JOB_TYPE_FACET = JobTypeJobFacet(
    jobType="JOB",
    integration="DBT",
    processingType="BATCH",
)


def dbt_run_event(
    state: RunState,
    job_name: str,
    job_namespace: str,
    run_id: str = str(uuid.uuid4()),
    parent: Optional[ParentRunMetadata] = None,
) -> RunEvent:
    return RunEvent(
        eventType=state,
        eventTime=datetime.now(timezone.utc).isoformat(),
        run=Run(runId=run_id, facets={"parent": parent.to_openlineage()} if parent else {}),
        job=Job(
            namespace=parent.job_namespace if parent else job_namespace,
            name=job_name,
            facets={"jobType": JOB_TYPE_FACET},
        ),
        producer=PRODUCER,
    )


def dbt_run_event_start(
    job_name: str, job_namespace: str, parent_run_metadata: ParentRunMetadata
) -> RunEvent:
    return dbt_run_event(
        state=RunState.START,
        job_name=job_name,
        job_namespace=job_namespace,
        parent=parent_run_metadata,
    )


def dbt_run_event_end(
    run_id: str,
    job_namespace: str,
    job_name: str,
    parent_run_metadata: Optional[ParentRunMetadata],
) -> RunEvent:
    return dbt_run_event(
        state=RunState.COMPLETE,
        job_namespace=job_namespace,
        job_name=job_name,
        run_id=run_id,
        parent=parent_run_metadata,
    )


def dbt_run_event_failed(
    run_id: str,
    job_namespace: str,
    job_name: str,
    parent_run_metadata: Optional[ParentRunMetadata],
) -> RunEvent:
    return dbt_run_event(
        state=RunState.FAIL,
        job_namespace=job_namespace,
        job_name=job_name,
        run_id=run_id,
        parent=parent_run_metadata,
    )


openlineage_logger = logging.getLogger("openlineage.dbt")
openlineage_logger.setLevel(os.getenv("OPENLINEAGE_DBT_LOGGING", "INFO"))
openlineage_logger.addHandler(logging.StreamHandler(sys.stdout))
# deprecated dbtol logger
logger = logging.getLogger("dbtol")
for handler in openlineage_logger.handlers:
    logger.addHandler(handler)
    logger.setLevel(openlineage_logger.level)


def main():
    logger.info(f"Running OpenLineage dbt wrapper version {__version__}")
    logger.info("This wrapper will send OpenLineage events at the end of dbt execution.")

    args = sys.argv[1:]
    target = parse_single_arg(args, ["-t", "--target"])
    project_dir = parse_single_arg(args, ["--project-dir"], default="./")
    profile_name = parse_single_arg(args, ["--profile"])

    # We can get this if we have been orchestrated by an external system like airflow
    parent_id = os.getenv("OPENLINEAGE_PARENT_ID")
    parent_run_metadata = None
    job_namespace = os.environ.get("OPENLINEAGE_NAMESPACE", "dbt")

    client = OpenLineageClient()

    if parent_id:
        parent_namespace, parent_job_name, parent_run_id = parent_id.split("/")
        parent_run_metadata = ParentRunMetadata(
            run_id=parent_run_id,
            job_name=parent_job_name,
            job_namespace=parent_namespace,
        )

    processor = DbtLocalArtifactProcessor(
        producer=PRODUCER,
        target=target,
        job_namespace=job_namespace,
        project_dir=project_dir,
        profile_name=profile_name,
        logger=logger,
    )

    # Always emit "wrapping event" around dbt run. This indicates start of dbt execution, since
    # both the start and complete events for dbt models won't get emitted until end of execution.
    start_event = dbt_run_event_start(
        job_name=f"dbt-run-{processor.project_name}",
        job_namespace=job_namespace,
        parent_run_metadata=parent_run_metadata,
    )
    dbt_run_metadata = ParentRunMetadata(
        run_id=start_event.run.runId,
        job_name=start_event.job.name,
        job_namespace=start_event.job.namespace,
    )

    # Failed start event emit should not stop dbt command from running.
    emitted_events = 0
    try:
        client.emit(start_event)
        emitted_events += 1
    except Exception as e:
        logger.warning("OpenLineage client failed to emit start event. Exception: %s", e)

    # Set parent run metadata to use it as parent run facet
    processor.dbt_run_metadata = dbt_run_metadata

    pre_run_time = time.time()
    # Execute dbt in external process

    force_send_events = len(sys.argv) > 1 and sys.argv[1] == "send-events"
    if not force_send_events:
        process = subprocess.Popen(["dbt"] + sys.argv[1:], stdout=sys.stdout, stderr=sys.stderr)
        return_code = process.wait()
    else:
        logger.warning("Sending events for the last run without running the job")
        return_code = 0

    # If run_result has modification time before dbt command
    # or does not exist, do not emit dbt events.
    try:
        if os.stat(processor.run_result_path).st_mtime < pre_run_time and not force_send_events:
            logger.info(
                f"OpenLineage events not emitted: run_result file "
                f"({processor.run_result_path}) was not modified by dbt"
            )
            return
    except FileNotFoundError:
        logger.info(
            f"OpenLineage events not emitted:" f"did not find run_result file ({processor.run_result_path})"
        )
        return

    try:
        events = processor.parse().events()
    except UnsupportedDbtCommand as e:
        # log exception message
        logger.info(e)
        events = []

    if return_code == 0:
        last_event = dbt_run_event_end(
            run_id=dbt_run_metadata.run_id,
            job_namespace=dbt_run_metadata.job_namespace,
            job_name=dbt_run_metadata.job_name,
            parent_run_metadata=parent_run_metadata,
        )
    else:
        last_event = dbt_run_event_failed(
            run_id=dbt_run_metadata.run_id,
            job_namespace=dbt_run_metadata.job_namespace,
            job_name=dbt_run_metadata.job_name,
            parent_run_metadata=parent_run_metadata,
        )
    for event in tqdm(
        events + [last_event],
        desc="Emitting OpenLineage events",
        initial=1,
        total=len(events) + 2,
    ):
        try:
            client.emit(event)
            emitted_events += 1
        except Exception as e:
            logger.warning(
                "OpenLineage client failed to emit event %s runId %s. " "Exception: %s",
                event.eventType.value,
                event.run.runId,
                e,
                exc_info=True,
            )
    logger.info(f"Emitted {emitted_events} openlineage events")


if __name__ == "__main__":
    main()
