Metadata-Version: 2.1
Name: sop_deutils
Version: 0.1.2
Summary: A utils package for Yes4All SOP
Author-email: liuliukiki aka nclong <longnc@yes4all.com>
Project-URL: Author_Github, https://github.com/dribblewithclong
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: openpyxl>=3.1.2
Requires-Dist: pytz>=2023.3
Requires-Dist: pandas>=2.1.1
Requires-Dist: python-telegram-bot>=20.6
Requires-Dist: SQLAlchemy>=2.0.21
Requires-Dist: lxml>=4.9.3
Requires-Dist: gspread>=5.11.3
Requires-Dist: psycopg2-binary>=2.9.9
Requires-Dist: minio>=7.1.17
Requires-Dist: pyarrow>=13.0.0
Requires-Dist: requests>=2.31.0
Requires-Dist: aiofiles>=23.2.1
Requires-Dist: fastparquet>=2023.8.0
Requires-Dist: mysql-connector-python>=8.0.33

# Yes4All SOP Utils Packages

This is a utils package served for SOP Data Analytics team at **Yes4All**. It contains various modules to work with **PostgreSQL, MySQL, MinIO, Google API, Airflow, Telegram...**

<img src="https://lh3.googleusercontent.com/drive-viewer/AK7aPaCxUGMFDqsjfE2pz3I5DdEupNwPVjPkoLd_6ifiMPt5harMEBNBUuJTh812_KmI_p1cWZpEMQRtSZ8O8-uyhdGmIc7Wxg=s1600" alt="liuliukiki" width="200" />

## User Guide Documentation

### Install this package

```bash
$ pip install --upgrade sop-deutils
```

### Modules usage

#### Airflow

***Use case:*** when having a new scheduled task file on Airflow.

***Functional:***

Auto naming DAG ID and alerting failed DAG to Telegram:
- Sample code of base config Airflow  ```dag``` file: 

	```python
	from airflow import DAG
	from airflow.decorators import task
	from sop_deutils.y4a_airflow import auto_dag_id, telegram_alert

	default_args = {
	    "retries":  20,			# number times to retry when the task is failed
	    "retry_delay": timedelta(minutes=7),			# time delay among retries
	    "start_date": datetime(2023, 7, 14, 0, 0, 0),			# date that the DAG start to run 
	    "owner": 'liuliukiki',			# telegram user name of DAG owner
	    "on_failure_callback": telegram_alert,			# this contains function to alert to Telegram when the DAG/task is failed
	    "execution_timeout": timedelta(hours=4),			# limit time the DAG run
	}

	dag = DAG(
	    dag_id=auto_dag_id(),			# this contains function to name the DAG based on the file directory
	    description='Sample DAG',			# description about the DAG
	    default_args=default_args,			# default arguments contains dictionary of predefined params above
	    catchup=False,			# If True, the DAG will backfill tasks from the start_date to current date
	)

	with dag:
	    @task
	    def function_1():
	        ...

	    @task
	    def function_2():
	        ...

	    function_1() >> function_2()
	```

#### GoogleSheet

***(to be developed)***
#### MinIO

MinIO is an object storage, it is API compatible with the Amazon S3 cloud storage service. MinIO can be used as a **datalake** to store unstructured data (photos, videos, log files, backups, and container images) and structured data.

***Use case:*** when need to store raw data or get raw data from datalake. Notes that the stored data extension must be ```.parquet``` .

***Notes about how to determine the*** ```file_path``` ***parameter in minIO when using this module:***

![minIO file path](https://lh3.googleusercontent.com/drive-viewer/AK7aPaCoN6qQ0K5BuEHT_7c0CznHRpJu2LpxyqTpIY9_lNVOk7f_eB9kAVx_wl6iiOB9ia9vbiSJ6WtmvRXX6FDb8g7VU8Sy=s1600)

> For example, if the directory to the data file in minIO is as above, then the ```file_path``` is ```"/scraping/amazon_vendor/avc_bulk_buy_request/2023/9/24/batch_1695525619"``` (after removing bucket name, data storage mode, and data file extension).


***Functional:*** 

Firstly, import minIO utils module class. This class requires one parameters:
- ```account_name```: the client account name to minio storage. Only ```'sop'``` available now for ```account_name```. ***(str)***

	```python
	from sop_deutils.datalake.y4a_minio import MinioUtils

	minio_utils = MinioUtils(
	    account_name='sop',
	)
	```

To check whether data exists in a storage directory, using ```data_exist``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to check. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to check. The default value is ```'sc-bucket'```. ***(str)***
	
	The method will return ```True``` if data exists otherwise ```False```.

	```python
	minio_utils.data_exist(
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)
	```
	Output:
	```bash
	True
	```

To get the distinct values of a specified column of data in a data directory, using ```get_data_value_exist``` method, it has four parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to get distinct values. ***(str)***
- ```column_key``` (required): the column name to get distinct values. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get distinct values.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return list of distinct values.

	```python
	minio_utils.get_data_value_exist(
	    mode='stag',
	    file_path='your-data-path',
	    column_key='your-chosen-column',
	    bucket_name='sc-bucket',
	)
	```
	Output:
	```bash
	['value_1', 'value_2']
	```

To load data from dataframe to storage, using ```load_data``` method, it has four parameters:
- ```data``` (required): dataframe contains data to load. ***(pd.DataFrame)***
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the directory to load the data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to load the data.  The default value is ```'sc-bucket'```. ***(str)***

	```python
	minio_utils.load_data(
	    data=df,
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)
	```

To get data from a single file of directory of storage, using ```get_data``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to get data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get data.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return dataframe contains data to get.

	```python
	df = minio_utils.get_data(
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```

To get data from multiple files of directories of storage,  using ```get_data_wildcard``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the parent data directory to get the data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get data.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return dataframe contains data to get.

	```python
	df = minio_utils.get_data_wildcard(
	    mode='stag',
	    file_path='your-parent-data-path',
	    bucket_name='sc-bucket',
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```

#### MySQL

***(no docs available now)***

#### PostgreSQL

***Use case:*** when interacting with Postgres database.

***Functional:***

Firstly, import PostgreSQL utils module class. This class requires three parameters:
- ```account_name```: the client account name to connect to PostgreSQL. Only ```'da'``` value is available now. In the future, the value can be used as DA member name such as ```'linhvk'```, ```'trieuna'```... ***(str)***
- ```db_host```: host db name to connect. Available values are ```'raw_master'```, ```'raw_repl'```, ```'serving_master'```, ```'serving_repl'```.  ***(str)***
- ```db```: database to connect. The default value is ```'serving'```. ***(str)***

	```python
	from sop_deutils.sql.y4a_postgresql import PostgreSQLUtils

	pg_utils = PostgreSQLUtils(
	    account_name='da',
	    db_host='serving_master',
	    db='serving',
	)
	```

To create a new PostgreSQL connection pool, using ```create_pool_conn``` method, it has one parameter:
- ```pool_size``` (optional): number of connections in the pool. The default value is ```1```, it means there is only a connection in pool. ***(int)***

	The method will return connection pool contains connections to the database.

	```python
	pool = pg_utils.create_pool_conn(
	    pool_size=1,
	)
	```

To close and remove the PostgreSQL connection pool after being used, using ```close_pool_conn``` method, it has one parameter:
- ```db_pool_conn``` (required): connection pool created by ```create_pool_conn``` method  ***(callable)***

	```python
	pg_utils.close_pool_conn(
	    db_pool_conn=pool,
	)
	```

To get the SQL query given by SQL file, using ```read_sql_file``` method, it has one parameter:
- ```sql_file_path``` (required): the located path of SQL file. ***(str)***

	The method will return the string of SQL query.

	```python
	sql = pg_utils.read_sql_file(
	    sql_file_path: 'your-path/select_all.sql',
	)

	print(sql)
	```
	Output:
	```bash
	"SELECT * FROM your_schema.your_table"
	```	

To insert data to PostgreSQL table, using ```insert_data``` method, it has five parameters:
- ```data``` (required): a dataframe contains data to insert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to insert. ***(str)***
- ```table``` (required): table name to insert. ***(str)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.insert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To insert large data to PostgreSQL table, using ```bulk_insert_data``` method, it has five parameters:
- ```data``` (required): a dataframe contains data to insert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to insert. ***(str)***
- ```table``` (required): table name to insert. ***(str)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.bulk_insert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To upsert data to PostgreSQL table, using ```upsert_data``` method, it has six parameters:
- ```data``` (required): a dataframe contains data to upsert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to upsert. ***(str)***
- ```table``` (required): table name to upsert. ***(str)***
- ```primary_keys``` (required): list of primary keys of the table. ***(list)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.upsert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    primary_keys=['pk1', 'pk2', 'pk3'],
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To upsert large data to PostgreSQL table, using ```bulk_upsert_data``` method, it has six parameters:
- ```data``` (required): a dataframe contains data to upsert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to upsert. ***(str)***
- ```table``` (required): table name to upsert. ***(str)***
- ```primary_keys``` (required): list of primary keys of the table. ***(list)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.bulk_upsert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    primary_keys=['pk1', 'pk2', 'pk3'],
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To update new data of specific columns in the table based on primary keys, using ```update_table``` method, it has seven parameters:
- ```data``` (required): a dataframe contains data to update, including primary keys and columns to update. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to update data. ***(str)***
- ```table``` (required): table to update data. ***(str)***
- ```columns``` (required): list of column names to update data. ***(list)***
- ```primary_keys``` (required): list of primary keys of table to update data. ***(list)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.update_table(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    columns=['col1', 'col2'],
	    primary_keys=['pk1', 'pk2', 'pk3'],
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To get data from PostgreSQL database given by a SQL query, using ```get_data``` method, it has two parameters:
- ```sql``` (required): SQL query to get data. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return dataframe contains data extracted by the given SQL query.

	```python
	df = pg_utils.get_data(
	    sql='your-query',
	    db_pool_conn=pool,
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```


To get the distinct values of a specified column in a PostgreSQL table, using ```select_distinct``` method, it has four parameters:
- ```col``` (required): column name to get the distinct data. ***(str)***
- ```schema``` (required): schema contains table to get data. ***(str)***
- ```table``` (required): table to get data. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of distinct values.

	```python
	distinct_values = pg_utils.select_distinct(
	    col='chosen-column',
	    schema='your-schema',
	    table='your-table',
	    db_pool_conn=pool,
	)

	print(distinct_values)
	```
	Output:
	```bash
	['val1', 'val2', 'val3']
	```

To get list of columns name of a specific PostgreSQL table, using ```show_columns``` method, it has three parameters:
- ```schema``` (required): schema contains table to get columns. ***(str)***
- ```table``` (required): table to get columns. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of column names of the table.

	```python
	col_names = pg_utils.show_columns(
	    schema='your-schema',
	    table='your-table',
	    db_pool_conn=pool,
	)

	print(col_names)
	```
	Output:
	```bash
	['col1', 'col2', 'col3']
	```

To execute the given SQL query, using ```execute``` method, it has three parameters:
- ```sql``` (required): SQL query to execute. ***(str)***
- ```fetch_output``` (optional): whether to fetch the results of the query. The default value is ```False```. ***(bool)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of query output if ```fetch_output``` is ```True```, otherwise ```None```.
	
	```python
	sql = """
	    UPDATE
	        sales_order_avc_di o,
	        (
	            SELECT
	                DISTINCT po_name, 
	                asin,
	                CASE
	                    WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
	                    WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date) 
	                    ELSE o.status END po_asin_amazon_status
	            FROM
	                sales_order_avc_order_status o
	                LEFT JOIN
	                    sales_order_avc_order_asin_status a USING (updated_at, po_name)
	            WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
	        ) s
	    SET
	        o.po_asin_amazon_status = s.po_asin_amazon_status
	    WHERE
	        o.po_name = s.po_name
	        AND o.asin = s.asin
	"""

	pg_utils.execute(
	    sql=sql,
	    fetch_output=False,
	    db_pool_conn=pool,
	)
	```

To create new column for a specific PostgreSQL table, using ```add_column``` method, it has six parameters:
- ```schema``` (required): schema contains table to create column. ***(str)***
- ```table``` (required): table to create column. ***(str)***
- ```column_name``` (optional): name of the column to create available when creating single column. The default value is ```None``` ***(str)***
- ```dtype``` (optional): data type of the column to create available when creating single column. The default value is ```None``` ***(str)***
- ```muliple_columns``` (optional): dictionary contains columns name as key and data type of columns as value respectively. The default value is ```{}``` ***(dict)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.add_column(
	    schema='my-schema',
	    table='my-table',
	    muliple_columns={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	    },
	    db_pool_conn=pool,
	)
	```

To create new table in PostgreSQL database, using ```create_table``` method, it has seven parameters:
- ```schema``` (required): schema contains table to create. ***(str)***
- ```table``` (required): table name to create. ***(str)***
- ```columns_with_dtype``` (required): dictionary contains column names as key and the data type of column as value respectively. ***(dict)***
- ```columns_primary_key``` (optional): list of columns to set primary keys. The default value is ```[]```. ***(list)***
- ```columns_not_null``` (optional): list of columns to set constraints not null. The default value is ```[]```.  ***(list)***
- ```columns_with_default``` (optional): dictionary contains column names as key and the default value of column as value respectively. The default value is ```{}```. ***(dict)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	    db_pool_conn=pool,
	)
	```

To grant table privileges to users in PostgreSQL, using ```grant_table``` method, it has five parameters:
- ```schema``` (required): schema contains table to grant. ***(str)***
- ```table``` (required): table name to grant. ***(str)***
- ```list_users``` (required): list of users to grant. ***(list)***
- ```privileges``` (optional): list of privileges to grant. The default value is ```['SELECT']```. The accepted values in privileges list are ```'SELECT'```, ```'INSERT'```, ```'UPDATE'```, ```'DELETE'```, ```'TRUNCATE'```, ```'REFERENCES'```, ```'TRIGGER'```. ***(list)***
- ```all_privileges``` (optional): whether to grant all privileges. The default value is ```False```.  ***(bool)***

	```python
	pg_utils.grant_table(
	    schema='my-schema',
	    table='my-new-table',
	    list_users=[
	        'linhvk',
			'trieuna',
	    ],
	    privileges=[
	        'SELECT',
			'INSERT',
			'UPDATE',
	    ],
	)
	```

To remove all the data of PostgreSQL table, using ```truncate_table``` method, it has four parameters:
- ```schema``` (required): schema contains table to truncate. ***(str)***
- ```table``` (required): table name to truncate. ***(str)***
- ```reset_identity``` (optional): whether to reset identity of the table. The defaults value is ```False```. ***(bool)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-table',
	    db_pool_conn=pool,
	)
	```

To check if the PostgreSQL table exists in database, using ```table_exists``` method, it has three parameters:
- ```schema``` (required): schema contains table to check. ***(str)***
- ```table``` (required): table name to check. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return ```True``` if table exists and ```False``` if not.

	```python
	pg_utils.table_exists(
	    schema='my-schema',
	    table='my-exists-table',
	    db_pool_conn=pool,
	)
	```
	Output:
	```bash
	True
	```

***Best practices:*** Remember the **Trade-Off**

Pre-define connection and reuse it for multiple tasks instead of each tasks create a new connection.
- ***Should:***

	```python
	pool = pg_utils.create_pool_conn()          # Create a new connection

	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	    db_pool_conn=pool,
	)           # Task 1

	pg_utils.insert_data(
	    data=your_df,
	    schema='my-schema',
	    table='my-new-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)           # Task 2

	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-new-table',
	    db_pool_conn=pool,
	)           # Task 3

	pg_utils.close_pool_conn(pool)          # Close connection

	# All the process will used only one connection
	```

- ***Shouldn't:***

	```python
	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	)           # This will create a new connection

	pg_utils.insert_data(
	    data=your_df,
	    schema='my-schema',
	    table='my-new-table',
	    commit_every=1000,
	)           # This will create a new connection

	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-new-table',
	)           # This will create a new connection

	# All the process will used three connection
	```

Config SQL query file outside the code file and import it instead of let query inside the code file.
- ***Should:***

	```python
	sql = pg_utils.read_sql_file(
	    sql_file_path: 'your-path/your-query.sql',
	)

	pg_utils.execute(
	    sql=sql,
	    db_pool_conn=pool,
	)
	```

- ***Shouldn't:***

	```python
	sql = """
	    UPDATE
	        sales_order_avc_di o,
	        (
	            SELECT
	                DISTINCT po_name, 
	                asin,
	                CASE
	                    WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
	                    WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date) 
	                    ELSE o.status END po_asin_amazon_status
	            FROM
	                sales_order_avc_order_status o
	                LEFT JOIN
	                    sales_order_avc_order_asin_status a USING (updated_at, po_name)
	            WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
	        ) s
	    SET
	        o.po_asin_amazon_status = s.po_asin_amazon_status
	    WHERE
	        o.po_name = s.po_name
	        AND o.asin = s.asin
	"""

	pg_utils.execute(
	    sql=sql,
	    db_pool_conn=pool,
	)
	```

#### Telegram

***Use case:*** when need to send messages to Telegram by using bot

***Functional:***

To send messages to Telegram, using ```send_message``` method, it has three parameters:
- ```text``` (required): message to send. ***(str)***
- ```bot_token``` (optional): token of the bot which send the message. The default value is ```None```. If the value is ```None```, the bot ```sleep at 9pm``` will be used to send messages. ***(str)***
- ```chat_id``` (optional): id of group chat where the message is sent. The default value is ```None```. If the value is ```None```,the group chat ```Airflow Status Alert``` will be used.  ***(str)***

	```python
	from sop_deutils.y4a_telegram import send_message

	send_message(
	    text='Hello liuliukiki'
	)
	```
