#!python
import logging
import subprocess
import sys
import time
import os
import uuid
from typing import Optional

from tqdm import tqdm
from datetime import datetime

from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.common.provider.dbt import DbtArtifactProcessor, ParentRunMetadata

__version__ = "0.5.1"

from openlineage.common.utils import parse_single_arg

PRODUCER = f'https://github.com/OpenLineage/OpenLineage/tree/{__version__}/integration/dbt'


def setup_client() -> Optional[OpenLineageClient]:
    url = os.getenv("OPENLINEAGE_URL")
    if not url:
        return None
    return OpenLineageClient(
        url=url,
        options=OpenLineageClientOptions(
            api_key=os.getenv("OPENLINEAGE_API_KEY", None)
        )
    )


def dbt_run_event(
    state: RunState,
    run_id: str = str(uuid.uuid4()),
    job_namespace: str = 'dbt',
    job_name: str = f"dbt-run-{str(uuid.uuid4())}",
    parent: Optional[ParentRunMetadata] = None
) -> RunEvent:
    return RunEvent(
        eventType=state,
        eventTime=datetime.now().isoformat(),
        run=Run(
            runId=run_id,
            facets={
                "parent": parent.to_openlineage()
            } if parent else {}
        ),
        job=Job(
            namespace=parent.job_namespace if parent else job_namespace,
            name=job_name
        ),
        producer=PRODUCER
    )


def dbt_run_event_start(job_name: str, parent_run_metadata: ParentRunMetadata) -> RunEvent:
    return dbt_run_event(
        state=RunState.START,
        job_name=job_name,
        parent=parent_run_metadata
    )


def dbt_run_event_end(
    run_id: str,
    job_namespace: str,
    job_name: str,
    parent_run_metadata: Optional[ParentRunMetadata]
) -> RunEvent:
    return dbt_run_event(
        state=RunState.COMPLETE,
        run_id=run_id,
        job_namespace=job_namespace,
        job_name=job_name,
        parent=parent_run_metadata
    )


logger = logging.getLogger("dbtol")
logger.setLevel("INFO")
logger.addHandler(logging.StreamHandler(sys.stdout))


def main():
    logger.info(f"Running OpenLineage dbt wrapper version {__version__}")
    logger.info(f"This wrapper will send OpenLineage events at the end of dbt execution.")

    args = sys.argv[2:]
    target = parse_single_arg(args, ['-t', '--target'])
    project_dir = parse_single_arg(args, ['--project-dir'], default='./')
    profile_name = parse_single_arg(args, ['--profile'])

    client = setup_client()
    if client is None:
        logger.info("OPENLINEAGE_URL is not set: stopping execution")
        sys.exit(1)

    # We can get this if we have been orchestrated by an external system like airflow
    parent_id = os.getenv("OPENLINEAGE_PARENT_ID")
    parent_run_metadata = None

    client = OpenLineageClient.from_environment()

    if parent_id:
        parent_namespace, parent_job_name, parent_run_id = parent_id.split('/')
        parent_run_metadata = ParentRunMetadata(
            run_id=parent_run_id,
            job_name=parent_job_name,
            job_namespace=parent_namespace
        )

    processor = DbtArtifactProcessor(
        producer=PRODUCER,
        target=target,
        project_dir=project_dir,
        profile_name=profile_name,
        logger=logger
    )

    # Always emit "wrapping event" around dbt run. This indicates start of dbt execution, since
    # both the start and complete events for dbt models won't get emitted until end of execution.
    dbt_run_event = dbt_run_event_start(
        job_name=f"dbt-run-{processor.project['name']}",
        parent_run_metadata=parent_run_metadata
    )
    dbt_run_metadata = ParentRunMetadata(
        run_id=dbt_run_event.run.runId,
        job_name=dbt_run_event.job.name,
        job_namespace=dbt_run_event.job.namespace
    )

    # Set parent run metadata to use it as parent run facet
    processor.dbt_run_metadata = dbt_run_metadata

    pre_run_time = time.time()
    # Execute dbt in external process
    process = subprocess.Popen(
        ["dbt"] + sys.argv[1:],
        stdout=sys.stdout,
        stderr=sys.stderr
    )
    process.wait()

    if len(sys.argv) < 2 or sys.argv[1] not in ['run', 'test', 'build']:
        return

    # If run_result has modification time before dbt command
    # or does not exist, do not emit dbt events.
    try:
        if os.stat(processor.run_result_path).st_mtime < pre_run_time:
            logger.info(f"OpenLineage events not emittled: run_result file "
                        f"({processor.run_result_path}) was not modified by dbt")
            return
    except FileNotFoundError:
        logger.info(f"OpenLineage events not emittled:"
                    f"did not find run_result file ({processor.run_result_path})")
        return

    events = processor.parse().events()

    for event in tqdm(events, desc="Emitting OpenLineage events"):
        client.emit(event)

    client.emit(dbt_run_event_end(
        run_id=dbt_run_metadata.run_id,
        job_namespace=dbt_run_metadata.job_namespace,
        job_name=dbt_run_metadata.job_name,
        parent_run_metadata=parent_run_metadata
    ))
    logger.info(f"Emitted {len(events) + 2} openlineage events")


if __name__ == '__main__':
    main()
