Metadata-Version: 2.1
Name: sop-deutils
Version: 0.2.3
Summary: A utils package for Yes4All SOP
Author-email: liuliukiki aka clong <longnc@yes4all.com>
Project-URL: Author_Github, https://github.com/dribblewithclong
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.7
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: openpyxl>=3.1.2
Requires-Dist: pytz>=2023.3
Requires-Dist: pandas>=2.1.1
Requires-Dist: python-telegram-bot>=20.6
Requires-Dist: SQLAlchemy>=2.0.21
Requires-Dist: lxml>=4.9.3
Requires-Dist: gspread>=5.11.3
Requires-Dist: psycopg2-binary>=2.9.9
Requires-Dist: minio>=7.1.17
Requires-Dist: pyarrow>=13.0.0
Requires-Dist: requests>=2.31.0
Requires-Dist: aiofiles>=23.2.1
Requires-Dist: fastparquet>=2023.8.0
Requires-Dist: mysql-connector-python>=8.0.33

# Yes4All SOP Utils Packages

This is a utils package served for SOP Data Analytics team at **Yes4All**. It contains various modules to work with **PostgreSQL, MinIO, Google API, Airflow, Telegram...**

---
<img  src="https://lh3.googleusercontent.com/drive-viewer/AK7aPaCxUGMFDqsjfE2pz3I5DdEupNwPVjPkoLd_6ifiMPt5harMEBNBUuJTh812_KmI_p1cWZpEMQRtSZ8O8-uyhdGmIc7Wxg=s1600"  alt="liuliukiki"  width="200"  />


## User Guide Documentation

### Install this package

```bash
$ pip install --upgrade sop-deutils
```

---

### Modules usage

#### Airflow

***Use case:*** when having a new scheduled task file on Airflow.

***Functional:***

Auto naming DAG ID and alerting failed DAG to Telegram:
- Sample code of base config Airflow  ```dag``` file: 

	```python
	from airflow import DAG
	from airflow.decorators import task
	from sop_deutils.y4a_airflow import auto_dag_id, telegram_alert

	default_args = {
	    "retries":  20,			# number times to retry when the task is failed
	    "retry_delay": timedelta(minutes=7),			# time delay among retries
	    "start_date": datetime(2023, 7, 14, 0, 0, 0),			# date that the DAG start to run 
	    "owner": 'liuliukiki',			# telegram user name of DAG owner
	    "on_failure_callback": telegram_alert,			# this contains function to alert to Telegram when the DAG/task is failed
	    "execution_timeout": timedelta(hours=4),			# limit time the DAG run
	}

	dag = DAG(
	    dag_id=auto_dag_id(),			# this contains function to name the DAG based on the file directory
	    description='Sample DAG',			# description about the DAG
	    default_args=default_args,			# default arguments contains dictionary of predefined params above
	    catchup=False,			# If True, the DAG will backfill tasks from the start_date to current date
	)

	with dag:
	    @task
	    def function_1():
	        ...

	    @task
	    def function_2():
	        ...

	    function_1() >> function_2()
	```

---

#### GoogleSheet

***Use case:*** when interacting with Google Sheet.

***Functional:***

Firstly, import GoogleSheet utils module class. This class requires one parameter:
- ```account_name```: the client account name to connect to Google Sheet. Only ```'da'``` value is available now.

	```python
	from sop_deutils.gg_api.y4a_sheet import GGSheetUtils

	sheet_utils = GGSheetUtils(
	    account_name='da',
	)
	```

To create a new spread sheet, using ```create_spread_sheet``` method, it has three parameters:
- ```sheet_name``` (required): name of the sheet to create. ***(str)***
- ```folder_id``` (optional): id of the folder contains spreadsheet. The default value is ```None```. ***(str)***
- ```share_to``` (optional): list of email to share the spreadsheet. The default value is ```[]```. ***(list)***

	The method will return the created spreadsheet id.

	```python
	spread_sheet_id = sheet_utils.create_spread_sheet(
	    sheet_name='my-sheet-name',
	    folder_id='my-folder-id',
	    share_to=['longnc@yes4all.com'],
	)

	print(spread_sheet_id)
	```
	Output:
	```bash
	1vTjZOcRfd5eiF5Qo8DCha29Vdt0zvYP11XPbq54eCMg
	```

To get all available worksheet of spreadsheet, using ```list_all_work_sheets``` method, it has one parameter:
- ```sheet_id``` (required): spreadsheet id. ***(str)***

	The method will return list all worksheets of spreadsheet.

	```python
	work_sheets = sheet_utils.list_all_work_sheets(
	    sheet_id='my-sheet-id',
	)

	print(work_sheets)
	```
	Output:
	```bash
	['Sheet1']
	```

To delete specific worksheet of spreadsheet, using ```delete_work_sheet``` method, it has two parameters:
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***

	```python
	sheet_utils.delete_work_sheet(
	    sheet_id='my-sheet-id',
	    sheet_name='my-sheet-name',
	)
	```

To clear all data of specific worksheet of spreadsheet, using ```clear_work_sheets``` method, it has two parameters:
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***

	```python
	sheet_utils.clear_work_sheet(
	    sheet_id='my-sheet-id',
	    sheet_name='my-sheet-name',
	)
	```

To get data from the given sheet, using ```get_data``` method, it has five parameters:
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***
- ```range_from``` (optional): the begining of the range of data from sheet to get. The default value is ```'A'```. ***(str)***
- ```range_to``` (optional):  the end of the range of data from sheet to get. The default value is ```'Z'```. ***(str)***
- ```columns_first_row``` (optional): whether to convert the first row to columns. The default value is ```False```. ***(bool)***

	```python
	df = sheet_utils.get_data(
	    sheet_id='my-sheet-id',
	    columns_first_row=True,
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```

To insert data to the given sheet, using ```insert_data``` method, it has five parameters:
- ```data``` (required): dataframe contains data to insert. ***(pd.DataFrame)***
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***
- ```from_row_index``` (optional):  the index of the row beginning to insert. The default value is ```1```. ***(int)***
- ```insert_column_names``` (optional): whether to insert column names. The default value is ```False```. ***(bool)***

	```python
	sheet_utils.insert_data(
	    data=df,
	    sheet_id='my-sheet-id',
	    from_row_index=2,
	    insert_column_names=False,
	)
	```

To update data of the given sheet, using ```update_data``` method, it has five parameters:
- ```data``` (required): dataframe contains data to update. ***(pd.DataFrame)***
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***
- ```range_from``` (optional): the begining of the range of data from sheet to update. The default value is ```'A'```. ***(str)***
- ```range_to``` (optional):  the end of the range of data from sheet to update. The default value is ```'Z'```. ***(str)***

	```python
	sheet_utils.update_data(
	    data=new_df,
	    sheet_id='my-sheet-id',
	    range_from='A4',
	    range_to='E7',
	)
	```

To remove data from specific range of the given sheet, using ```remove_data``` method, it has three parameters:
- ```sheet_id``` (required): spreadsheet id. ***(str)***
- ```sheet_name``` (optional): worksheet name. The default value is ```'Sheet1'```. ***(str)***
- ```list_range``` (optional): list of data ranges to remove. The default value is ```['A1:Z1', 'A4:Z4']```. ***(list)***

	```python
	sheet_utils.remove_data(
	    sheet_id='my-sheet-id',
	    list_range=[
	        'A2:D5',
	        'E5:G6',
	    ],
	)
	```

---

#### MinIO

MinIO is an object storage, it is API compatible with the Amazon S3 cloud storage service. MinIO can be used as a **datalake** to store unstructured data (photos, videos, log files, backups, and container images) and structured data.

***Use case:*** when need to store raw data or get raw data from datalake. Notes that the stored data extension must be ```.parquet``` .

***Notes about how to determine the*** ```file_path``` ***parameter in minIO when using this module:***

![minIO file path](https://lh3.googleusercontent.com/drive-viewer/AK7aPaCoN6qQ0K5BuEHT_7c0CznHRpJu2LpxyqTpIY9_lNVOk7f_eB9kAVx_wl6iiOB9ia9vbiSJ6WtmvRXX6FDb8g7VU8Sy=s1600)

> For example, if the directory to the data file in minIO is as above, then the ```file_path``` is ```"/scraping/amazon_vendor/avc_bulk_buy_request/2023/9/24/batch_1695525619"``` (after removing bucket name, data storage mode, and data file extension).


***Functional:*** 

Firstly, import minIO utils module class. This class requires one parameters:
- ```account_name```: the client account name to minio storage. Only ```'sop'``` available now for ```account_name```. ***(str)***

	```python
	from sop_deutils.datalake.y4a_minio import MinioUtils

	minio_utils = MinioUtils(
	    account_name='sop',
	)
	```

To check whether data exists in a storage directory, using ```data_exist``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to check. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to check. The default value is ```'sc-bucket'```. ***(str)***
	
	The method will return ```True``` if data exists otherwise ```False```.

	```python
	minio_utils.data_exist(
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)
	```
	Output:
	```bash
	True
	```

To get the distinct values of a specified column of data in a data directory, using ```get_data_value_exist``` method, it has four parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to get distinct values. ***(str)***
- ```column_key``` (required): the column name to get distinct values. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get distinct values.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return list of distinct values.

	```python
	minio_utils.get_data_value_exist(
	    mode='stag',
	    file_path='your-data-path',
	    column_key='your-chosen-column',
	    bucket_name='sc-bucket',
	)
	```
	Output:
	```bash
	['value_1', 'value_2']
	```

To load data from dataframe to storage, using ```load_data``` method, it has four parameters:
- ```data``` (required): dataframe contains data to load. ***(pd.DataFrame)***
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the directory to load the data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to load the data.  The default value is ```'sc-bucket'```. ***(str)***

	```python
	minio_utils.load_data(
	    data=df,
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)
	```

To get data from a single file of directory of storage, using ```get_data``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the data directory to get data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get data.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return dataframe contains data to get.

	```python
	df = minio_utils.get_data(
	    mode='stag',
	    file_path='your-data-path',
	    bucket_name='sc-bucket',
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```

To get data from multiple files of directories of storage,  using ```get_data_wildcard``` method, it has three parameters:
- ```mode``` (required): the data storage mode, the value must be either ```'prod'``` or ```'stag'```. ***(str)***
- ```file_path``` (required): the parent data directory to get the data. ***(str)***
- ```bucket_name``` (optional): the name of the bucket to get data.  The default value is ```'sc-bucket'```. ***(str)***

	The method will return dataframe contains data to get.

	```python
	df = minio_utils.get_data_wildcard(
	    mode='stag',
	    file_path='your-parent-data-path',
	    bucket_name='sc-bucket',
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```

---

#### PostgreSQL

***Use case:*** when interacting with Postgres database.

***Functional:***

Firstly, import PostgreSQL utils module class. This class requires three parameters:
- ```account_name```: the client account name to connect to PostgreSQL. The value can be used as DA member name as ```'linhvk'```, ```'giangpt'```, ```'hieulm'```, ```'huongtmt'```, ```'pi'```, ```'toannh'```, ```'trieuna'```, ```'hieulm'```, ```'hoaidtm'```, ```'duikha'```, ```'longgh'```, ```'huypv'```, ```'huytln'```, ```'quandm'```, ```'nguyentnt'```, ```'clong'```. ***(str)***
- ```db_host```: host db name to connect. Available values are ```'raw_master'```, ```'raw_repl'```, ```'serving_master'```, ```'serving_repl'```.  ***(str)***
- ```db```: database to connect. The default value is ```'serving'```. ***(str)***

	```python
	from sop_deutils.sql.y4a_postgresql import PostgreSQLUtils

	pg_utils = PostgreSQLUtils(
	    account_name='da',
	    db_host='serving_master',
	    db='serving',
	)
	```

To create a new PostgreSQL connection pool, using ```create_pool_conn``` method, it has one parameter:
- ```pool_size``` (optional): number of connections in the pool. The default value is ```1```, it means there is only a connection in pool. ***(int)***

	The method will return connection pool contains connections to the database.

	```python
	pool = pg_utils.create_pool_conn(
	    pool_size=1,
	)
	```

To close and remove the PostgreSQL connection pool after being used, using ```close_pool_conn``` method, it has one parameter:
- ```db_pool_conn``` (required): connection pool created by ```create_pool_conn``` method  ***(callable)***

	```python
	pg_utils.close_pool_conn(
	    db_pool_conn=pool,
	)
	```

To get the SQL query given by SQL file, using ```read_sql_file``` method, it has one parameter:
- ```sql_file_path``` (required): the located path of SQL file. ***(str)***

	The method will return the string of SQL query.

	```python
	sql = pg_utils.read_sql_file(
	    sql_file_path: 'your-path/select_all.sql',
	)

	print(sql)
	```
	Output:
	```bash
	"SELECT * FROM your_schema.your_table"
	```	

To insert data to PostgreSQL table, using ```insert_data``` method, it has six parameters:
- ```data``` (required): a dataframe contains data to insert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to insert. ***(str)***
- ```table``` (required): table name to insert. ***(str)***
- ```ignore_errors``` (optional): whether to ignore errors when inserting data. The default value is ```False```. ***(bool)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.insert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    ignore_errors=True,
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To insert large data to PostgreSQL table, using ```bulk_insert_data``` method, it has five parameters:
- ```data``` (required): a dataframe contains data to insert. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to insert. ***(str)***
- ```table``` (required): table name to insert. ***(str)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.bulk_insert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To upsert data to PostgreSQL table, using ```upsert_data``` method, it has five parameters:
- ```data``` (required): a dataframe contains data to upsert. Notes that if dataframe contains duplicated rows, it will be dropped. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to upsert. ***(str)***
- ```table``` (required): table name to upsert. ***(str)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.upsert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To upsert large data to PostgreSQL table, using ```bulk_upsert_data``` method, it has five parameters:
- ```data``` (required): a dataframe contains data to upsert. Notes that if dataframe contains duplicated rows, it will be dropped. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to upsert. ***(str)***
- ```table``` (required): table name to upsert. ***(str)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.bulk_upsert_data(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To update new data of specific columns in the table based on primary keys, using ```update_table``` method, it has seven parameters:
- ```data``` (required): a dataframe contains data to update, including primary keys and columns to update. ***(pd.DataFrame)***
- ```schema``` (required): schema contains table to update data. ***(str)***
- ```table``` (required): table to update data. ***(str)***
- ```columns``` (required): list of column names to update data. ***(list)***
- ```primary_keys``` (required): list of primary keys of table to update data. ***(list)***
- ```commit_every``` (optional): number rows of data to commit each time. The default value is ```1000```. ***(int)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.update_table(
	    data=your_df,
	    schema='your-schema',
	    table='your-table',
	    columns=['col1', 'col2'],
	    primary_keys=['pk1', 'pk2', 'pk3'],
	    commit_every=1000,
	    db_pool_conn=pool,
	)
	```

To get data from PostgreSQL database given by a SQL query, using ```get_data``` method, it has two parameters:
- ```sql``` (required): SQL query to get data. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return dataframe contains data extracted by the given SQL query.

	```python
	df = pg_utils.get_data(
	    sql='your-query',
	    db_pool_conn=pool,
	)

	print(df)
	```
	Output:
	```bash
	| Column1 Header | Column2 Header | Column3 Header |
	| ---------------| ---------------| ---------------|
	| Row1 Value1    | Row1 Value2    | Row1 Value3    |
	| Row2 Value1    | Row2 Value2    | Row2 Value3    |
	| Row3 Value1    | Row3 Value2    | Row3 Value3    |
	```


To get the distinct values of a specified column in a PostgreSQL table, using ```select_distinct``` method, it has four parameters:
- ```col``` (required): column name to get the distinct data. ***(str)***
- ```schema``` (required): schema contains table to get data. ***(str)***
- ```table``` (required): table to get data. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of distinct values.

	```python
	distinct_values = pg_utils.select_distinct(
	    col='chosen-column',
	    schema='your-schema',
	    table='your-table',
	    db_pool_conn=pool,
	)

	print(distinct_values)
	```
	Output:
	```bash
	['val1', 'val2', 'val3']
	```

To get list of columns name of a specific PostgreSQL table, using ```show_columns``` method, it has three parameters:
- ```schema``` (required): schema contains table to get columns. ***(str)***
- ```table``` (required): table to get columns. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of column names of the table.

	```python
	col_names = pg_utils.show_columns(
	    schema='your-schema',
	    table='your-table',
	    db_pool_conn=pool,
	)

	print(col_names)
	```
	Output:
	```bash
	['col1', 'col2', 'col3']
	```

To execute the given SQL query, using ```execute``` method, it has three parameters:
- ```sql``` (required): SQL query to execute. ***(str)***
- ```fetch_output``` (optional): whether to fetch the results of the query. The default value is ```False```. ***(bool)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return list of query output if ```fetch_output``` is ```True```, otherwise ```None```.
	
	```python
	sql = """
	    UPDATE
	        sales_order_avc_di o,
	        (
	            SELECT
	                DISTINCT po_name, 
	                asin,
	                CASE
	                    WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
	                    WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date) 
	                    ELSE o.status END po_asin_amazon_status
	            FROM
	                sales_order_avc_order_status o
	                LEFT JOIN
	                    sales_order_avc_order_asin_status a USING (updated_at, po_name)
	            WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
	        ) s
	    SET
	        o.po_asin_amazon_status = s.po_asin_amazon_status
	    WHERE
	        o.po_name = s.po_name
	        AND o.asin = s.asin
	"""

	pg_utils.execute(
	    sql=sql,
	    fetch_output=False,
	    db_pool_conn=pool,
	)
	```

To create new column for a specific PostgreSQL table, using ```add_column``` method, it has six parameters:
- ```schema``` (required): schema contains table to create column. ***(str)***
- ```table``` (required): table to create column. ***(str)***
- ```column_name``` (optional): name of the column to create available when creating single column. The default value is ```None``` ***(str)***
- ```dtype``` (optional): data type of the column to create available when creating single column. The default value is ```None``` ***(str)***
- ```muliple_columns``` (optional): dictionary contains columns name as key and data type of columns as value respectively. The default value is ```{}``` ***(dict)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.add_column(
	    schema='my-schema',
	    table='my-table',
	    muliple_columns={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	    },
	    db_pool_conn=pool,
	)
	```

To create new table in PostgreSQL database, using ```create_table``` method, it has seven parameters:
- ```schema``` (required): schema contains table to create. ***(str)***
- ```table``` (required): table name to create. ***(str)***
- ```columns_with_dtype``` (required): dictionary contains column names as key and the data type of column as value respectively. ***(dict)***
- ```columns_primary_key``` (optional): list of columns to set primary keys. The default value is ```[]```. ***(list)***
- ```columns_not_null``` (optional): list of columns to set constraints not null. The default value is ```[]```.  ***(list)***
- ```columns_with_default``` (optional): dictionary contains column names as key and the default value of column as value respectively. The default value is ```{}```. ***(dict)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	    db_pool_conn=pool,
	)
	```

To grant table privileges to users in PostgreSQL, using ```grant_table``` method, it has five parameters:
- ```schema``` (required): schema contains table to grant. ***(str)***
- ```table``` (required): table name to grant. ***(str)***
- ```list_users``` (required): list of users to grant. If want to grant for all members of DA team, provide ```['da']```. ***(list)***
- ```privileges``` (optional): list of privileges to grant. The default value is ```['SELECT']```. The accepted values in privileges list are ```'SELECT'```, ```'INSERT'```, ```'UPDATE'```, ```'DELETE'```, ```'TRUNCATE'```, ```'REFERENCES'```, ```'TRIGGER'```. ***(list)***
- ```all_privileges``` (optional): whether to grant all privileges. The default value is ```False```.  ***(bool)***

	```python
	pg_utils.grant_table(
	    schema='my-schema',
	    table='my-new-table',
	    list_users=[
	        'linhvk',
	        'trieuna',
	    ],
	    privileges=[
	        'SELECT',
	        'INSERT',
	        'UPDATE',
	    ],
	)
	```

To remove all the data of PostgreSQL table, using ```truncate_table``` method, it has four parameters:
- ```schema``` (required): schema contains table to truncate. ***(str)***
- ```table``` (required): table name to truncate. ***(str)***
- ```reset_identity``` (optional): whether to reset identity of the table. The defaults value is ```False```. ***(bool)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	```python
	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-table',
	    db_pool_conn=pool,
	)
	```

To check if the PostgreSQL table exists in database, using ```table_exists``` method, it has three parameters:
- ```schema``` (required): schema contains table to check. ***(str)***
- ```table``` (required): table name to check. ***(str)***
- ```db_pool_conn``` (optional): connection pool to connect to database. The default value is ```None```. If the value is ```None```, a new connection will be created and automatically closed after being used.  ***(callable)***

	The method will return ```True``` if table exists and ```False``` if not.

	```python
	pg_utils.table_exists(
	    schema='my-schema',
	    table='my-exists-table',
	    db_pool_conn=pool,
	)
	```
	Output:
	```bash
	True
	```

***Best practices:*** Remember the **Trade-Off**

Pre-define connection and reuse it for multiple tasks instead of each tasks create a new connection.
- ***Should:***

	```python
	pool = pg_utils.create_pool_conn()          # Create a new connection

	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	    db_pool_conn=pool,
	)           # Task 1

	pg_utils.insert_data(
	    data=your_df,
	    schema='my-schema',
	    table='my-new-table',
	    commit_every=1000,
	    db_pool_conn=pool,
	)           # Task 2

	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-new-table',
	    db_pool_conn=pool,
	)           # Task 3

	pg_utils.close_pool_conn(pool)          # Close connection

	# All the process will used only one connection
	```

- ***Shouldn't:***

	```python
	pg_utils.create_table(
	    schema='my-schema',
	    table='my-new-table',
	    columns_with_dtype={
	        'col1': 'int',
	        'col2': 'varchar(50)',
	        'col3': 'varchar(10)',
	    },
	    columns_primary_key=[
	        'col1',
	    ],
	    columns_not_null=[
	        'col2',
	    ],
	    columns_with_default={
	        'col3': 'USA',
	    },
	)           # This will create a new connection

	pg_utils.insert_data(
	    data=your_df,
	    schema='my-schema',
	    table='my-new-table',
	    commit_every=1000,
	)           # This will create a new connection

	pg_utils.truncate_table(
	    schema='my-schema',
	    table='my-new-table',
	)           # This will create a new connection

	# All the process will used three connection
	```

Config SQL query file outside the code file and import it instead of let query inside the code file.
- ***Should:***

	```python
	sql = pg_utils.read_sql_file(
	    sql_file_path: 'your-path/your-query.sql',
	)

	pg_utils.execute(
	    sql=sql,
	    db_pool_conn=pool,
	)
	```

- ***Shouldn't:***

	```python
	sql = """
	    UPDATE
	        sales_order_avc_di o,
	        (
	            SELECT
	                DISTINCT po_name, 
	                asin,
	                CASE
	                    WHEN o.status LIKE '%cancel%' AND a.status IS NULL THEN ''
	                    WHEN o.status LIKE '%cancel%' THEN CONCAT(a.status,' ',cancel_date) 
	                    ELSE o.status END po_asin_amazon_status
	            FROM
	                sales_order_avc_order_status o
	                LEFT JOIN
	                    sales_order_avc_order_asin_status a USING (updated_at, po_name)
	            WHERE updated_at > DATE_SUB(NOW(), INTERVAL 1 DAY)
	        ) s
	    SET
	        o.po_asin_amazon_status = s.po_asin_amazon_status
	    WHERE
	        o.po_name = s.po_name
	        AND o.asin = s.asin
	"""

	pg_utils.execute(
	    sql=sql,
	    db_pool_conn=pool,
	)
	```

---

#### Telegram

***Use case:*** when need to send messages to Telegram by using bot

***Functional:***

To send messages to Telegram, using ```send_message``` method, it has three parameters:
- ```text``` (required): message to send. ***(str)***
- ```bot_token``` (optional): token of the bot which send the message. The default value is ```None```. If the value is ```None```, the bot ```sleep at 9pm``` will be used to send messages. ***(str)***
- ```chat_id``` (optional): id of group chat where the message is sent. The default value is ```None```. If the value is ```None```, the group chat ```Airflow Status Alert``` will be used.  ***(str)***

	```python
	from sop_deutils.y4a_telegram import send_message

	send_message(
	    text='Hello liuliukiki'
	)
	```

---

#### All in one

***Use case:*** so far, there are a lot of platforms that needs to access frequently, in order not to import lots of modules, users can inherit all of above modules as simplest way.

***Functional:***

Firstly, import ```DAConfig``` class. This class requires one parameters:
- ```account_name```: the client account name to access platforms. The value can be used as DA member name as ```'linhvk'```, ```'giangpt'```, ```'hieulm'```, ```'huongtmt'```, ```'pi'```, ```'toannh'```, ```'trieuna'```, ```'hieulm'```, ```'hoaidtm'```, ```'duikha'```, ```'longgh'```, ```'huypv'```, ```'huytln'```, ```'quandm'```, ```'nguyentnt'```, ```'clong'```. ***(str)***

	```python
	from sop_deutils.base.y4a_da_cfg import DAConfig

	da_cfg = DAConfig(
	    account_name='quandm',
	)
	```

This class will have its attributes as all above modules (PostgreSQL, MinIO, Google API, Airflow, Telegram) that users don't need to import and config to connect individually to each platform, each platform attributes will have the its own methods that listed above. List of attributes are:
- ```minio_utils``` 
- ```pg_raw_r_utils``` (connected to PostgreSQL raw read - repl)
- ```pg_raw_w_utils``` (connected to PostgreSQL raw write - master)
- ```pg_serving_r_utils``` (connected to PostgreSQL serving read - repl)
- ```pg_serving_w_utils``` (connected to PostgreSQL serving write - master)
- ```sheet_utils```

	```python
	print(da_cfg.minio_utils)
	print(da_cfg.pg_raw_r_utils)
	print(da_cfg.pg_raw_w_utils)
	print(da_cfg.pg_serving_r_utils)
	print(da_cfg.pg_serving_w_utils)
	print(da_cfg.sheet_utils)
	```

	Output:
	```bash
	<sop_deutils.datalake.y4a_minio.MinioUtils object at 0x7fe6e704d6f0>
	<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704d9f0>
	<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704dae0>
	<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704e170>
	<sop_deutils.sql.y4a_postgresql.PostgreSQLUtils object at 0x7fe6e704e0b0>
	<sop_deutils.gg_api.y4a_sheet.GGSheetUtils object at 0x7fe72c65e1d0>
	```

---

### Workflow example

Without using ```DAConfig``` class:

```python
# Begining of workflow

from sop_deutils.gg_api.y4a_sheet import GGSheetUtils
from sop_deutils.datalake.y4a_minio import MinioUtils
from sop_deutils.sql.y4a_postgresql import PostgreSQLUtils
import pandas as pd


minio_utils = MinioUtils(
    account_name='sop',
)
sheet_utils = GGSheetUtils(
    account_name='da',
)
pg_serving_r_utils = PostgreSQLUtils(
    account_name='pi',
    db_host='serving_repl'
)
pg_serving_w_utils = PostgreSQLUtils(
    account_name='pi',
    db_host='serving_master'
)

# Create new spreadsheet id
spread_sheet_id = sheet_utils.create_spread_sheet(
    sheet_name='test_sheet_20231015',
    share_to=['longnc@yes4all.com'],
)

# Have a predefined dataframe
df = pd.DataFrame(
    [[1, 2, 3, 4]]*20,
    columns=['col1', 'col2', 'col3', 'col4']
)

# Insert dataframe to spreadsheet
sheet_utils.insert_data(
    data=df,
    sheet_id=spread_sheet_id,
    from_row_index=1,
    insert_column_names=True,
)

# Process data in the spreadsheet
sheet_utils.remove_data(
    sheet_id=spread_sheet_id,
    list_range=[
        'A3:D3',
        'A15:D15',
    ],
)

# Get data from spreadsheet
df_from_sheet = sheet_utils.get_data(
    sheet_id=spread_sheet_id,
    columns_first_row=True,
)

# Load data to minIO storage
minio_utils.load_data(
    data=df_from_sheet,
    mode='stag',
    file_path='/test_flow/20131015',
    bucket_name='sc-bucket',
)

# Get data from minIO
df_from_lake = minio_utils.get_data(
    mode='stag',
    file_path='/test_flow/20131015',
    bucket_name='sc-bucket',
)

# Process data
df_from_lake['total'] = df_from_lake['col1'] + df_from_lake['col2']\
    + df_from_lake['col3'] + df_from_lake['col4']
df_from_lake.dropna(inplace=True)
for col in df_from_lake.columns:
    df_from_lake[col] = df_from_lake[col].astype('int')

# Create new table and load processed data to database
pool_serving_w = pg_serving_w_utils.create_pool_conn()

pg_serving_w_utils.create_table(
    schema='sop_da_tmp',
    table='test_20131015',
    columns_with_dtype={
        'col1': 'int',
        'col2': 'int',
        'col3': 'int',
        'col4': 'int',
        'total': 'int',
    },
    db_pool_conn=pool_serving_w,
)

pg_serving_w_utils.insert_data(
    data=df_from_lake,
    schema='sop_da_tmp',
    table='test_20131015',
    db_pool_conn=pool_serving_w,
)

pg_serving_w_utils.close_pool_conn(pool_serving_w)

# Get data from database
pool_serving_r = pg_serving_r_utils.create_pool_conn()

df_from_db = pg_serving_r_utils.get_data(
    sql='SELECT * FROM sop_da_tmp.test_20131015'
)

pg_serving_r_utils.close_pool_conn(pool_serving_r)

# Update data from database to spreadsheet
sheet_utils.update_data(
    data=df_from_db,
    sheet_id=spread_sheet_id,
    range_from='A2',
    range_to='E22',
)

# End of workflow
```

With using ```DAConfig``` class:

```python
# Begining of the workflow

from sop_deutils.base.y4a_da_cfg import DAConfig
import pandas as pd

da_cfg = DAConfig(
    account_name='pi',
)

# Create new spreadsheet id
spread_sheet_id = da_cfg.sheet_utils.create_spread_sheet(
    sheet_name='test_sheet_20231015_new',
    share_to=['longnc@yes4all.com'],
)

# Have a predefined dataframe
df = pd.DataFrame(
    [[1, 2, 3, 4]]*20,
    columns=['col1', 'col2', 'col3', 'col4']
)

# Insert dataframe to spreadsheet
da_cfg.sheet_utils.insert_data(
    data=df,
    sheet_id=spread_sheet_id,
    from_row_index=1,
    insert_column_names=True,
)

# Process data in the spreadsheet
da_cfg.sheet_utils.remove_data(
    sheet_id=spread_sheet_id,
    list_range=[
        'A3:D3',
        'A15:D15',
    ],
)

# Get data from spreadsheet
df_from_sheet = da_cfg.sheet_utils.get_data(
    sheet_id=spread_sheet_id,
    columns_first_row=True,
)

# Load data to minIO storage
da_cfg.minio_utils.load_data(
    data=df_from_sheet,
    mode='stag',
    file_path='/test_flow/20131015_new',
    bucket_name='sc-bucket',
)

# Get data from minIO
df_from_lake = da_cfg.minio_utils.get_data(
    mode='stag',
    file_path='/test_flow/20131015_new',
    bucket_name='sc-bucket',
)

# Process data
df_from_lake['total'] = df_from_lake['col1'] + df_from_lake['col2']\
    + df_from_lake['col3'] + df_from_lake['col4']
df_from_lake.dropna(inplace=True)
for col in df_from_lake.columns:
    df_from_lake[col] = df_from_lake[col].astype('int')

# Create new table and load processed data to database
pool_serving_w = da_cfg.pg_serving_w_utils.create_pool_conn()

da_cfg.pg_serving_w_utils.create_table(
    schema='sop_da_tmp',
    table='test_20131015_new',
    columns_with_dtype={
        'col1': 'int',
        'col2': 'int',
        'col3': 'int',
        'col4': 'int',
        'total': 'int',
    },
    db_pool_conn=pool_serving_w,
)

da_cfg.pg_serving_w_utils.insert_data(
    data=df_from_lake,
    schema='sop_da_tmp',
    table='test_20131015_new',
    db_pool_conn=pool_serving_w,
)

da_cfg.pg_serving_w_utils.close_pool_conn(pool_serving_w)

# Get data from database
pool_serving_r = da_cfg.pg_serving_r_utils.create_pool_conn()

df_from_db = da_cfg.pg_serving_r_utils.get_data(
    sql='SELECT * FROM sop_da_tmp.test_20131015_new'
)

da_cfg.pg_serving_r_utils.close_pool_conn(pool_serving_r)

# Update data from database to spreadsheet
da_cfg.sheet_utils.update_data(
    data=df_from_db,
    sheet_id=spread_sheet_id,
    range_from='A2',
    range_to='E22',
)

# End of the workflow
```

---

> provided by ```liuliukiki```

> and special thank to ```duiikha``` for contributing api method to get and secure account credentials.

---
