Metadata-Version: 2.1
Name: rscylladb
Version: 0.0.5
Summary: Bulk records add into Cassandra or ScyllaDB.
Home-page: UNKNOWN
Author: Reeya Patel and Meet Rathod
Author-email: 
License: UNKNOWN
Platform: UNKNOWN
Description-Content-Type: text/markdown


<p align="center">
  <a href="https://skillicons.dev">
    <img src="https://skillicons.dev/icons?i=git,cassandra,python" />
  </a>
</p>

## Bulk Installer ScyllaDB/Cassandra ![my badge](https://badgen.net/badge/version/0.0.1/red?icon=git)

Insert over 20Million records within minutes.

**Special thanks 🤝 to @[Reeya Patel](https://github.com/ReeyaPatel06)**

### Install using
```shell
pip install rscylla
```
### Usage
```python
from rscylla.cql import Cql
# hostslist = ['127.0.0.1']
# port = 9042
# table_name = keyspace.table_name [must required]
# username = cassandra
# password  = cassandra
obj = Cql(hostslist,port,table_name,username,password)
# default chunks = 10,000
# default workers = 4
obj.insert(filename,chuncks,workers) # filename or filepath
```
#### Example:
```python
from rscylla.cql import Cql
obj = Cql(["localhost"],9042,"cassandra","cassandra")
obj.insert("data.csv") # optional : chuncks and workers
# way 2
obj.insert("data.csv",chunks=1000,workers=2)
```
### Result
- Tested on 4GB RAM with i3 processor.

| Rows | Duration |
|:-----|:---------|
|10,000|0.03s|
|3,00,000|2m|
|2,00,00,000|66m|
**Note: Depends on you hardware capability!**
## Work flow
1. Import packages
``` python
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider # authenticate user
import pandas as pd
import numpy as np
import threading
import time
```

2. **class Cql** constructor accept:
    - list of hosts : list of string
    - port : integer
    - username : string
    - password : string
```python
class Cql:
    def __init__(self,hosts:list,port:int,table_name:str,user:str,password:str) -> None:
```

3. In the constructor *_ _ init _ _* intialize cluster and authenticator for connect ScyllaDb server.
```python
self.start_time = time.time() # for completion the time show
# if table name is not formated as keyspace.table_name then raise exception
if len(table_name.split("."))!=2: raise Exception("Table name must be! keyspace.table_name")
# table_name.split('.') returns [keyspace,table_name]
# keyspace
self.keyspace_name = table_name.split(".")[0]
# table nme
self.table_name = table_name.split(".")[1]
# set authenticator with username & password
auth = PlainTextAuthProvider(username=user,password=password)
# set cluster, hosts,port and auth
self.cluster = Cluster(contact_points=hosts,port=port,auth_provider=auth)
# set keyspace
self.session = self.cluster.connect(self.keyspace_name)
```
4. *insert* method accept:
    - For this method chunks and workers are optional
```python
# chunks default = 10000
# workers default = 4
def insert(self,file_name:str,chuncks=10000,workers=4)->None:
```

5. Get inputed table's all columns and data types to avoid data type conflicts.
    - get table's columns & data types from *system_schema* keyspace. 
```python
table_schema = self.session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name='%s' AND table_name='%s'"%(self.keyspace_name,self.table_name))     
```

6. Dictinary which contains cql data types & with python data types.
    - cql data type as key & python data type as value
```python
self.cql_to_python_type = { 
            'NULL': None,
            'boolean': bool,
            'float': float,
            'double': float,
            'int':int,
            'smallint':int, 
            'tinyint':int,
            'counter':int,
            'varint':int,
            'bigint': int,
            'decimal': float,
            'ascii':str,
            'varchar':str,
            'text': str,
            'blob': str,
            'date': str,
            'timestamp': str,
            'time': str,
            'list': list,
            'set': set,
            'map': dict,
            'timeuuid,': None,
            'uuid': None 
        }
```

7. Create **self.cql_types** dictionary for **pandans Data Frame**. Store all columns names from table_schema (exectracted from _schema_keyspace_ [inputed table's cols])

```python
self.cql_types = {}
self.cql_columns=[]
# create dictonary for data frame: column_name:python_datatype
for row in table_schema: # iterate extracted column_names & data types
    # row contains column_name & type
    # create key as column_name & value as python data type get from cql_to_python_type dictionary
    self.cql_types[row.column_name] = self.cql_to_python_type[row.type]
    # add column name into list
    self.cql_columns.append(row.column_name)
```
Example:
```
    # if student table contnains two rows:
    1. id : bigint
    2. name : text
    
    # which converted as
    cql_types = {
        "id":int,
        "name":str
    }
```
8. Create commana seprated string for insert statement
``` python
param_keys = ",".join(self.cql_columns) 
param_values = ",".join([ '%('+k+')s' for k in self.cql_columns])
self.insert__statement = "INSERT INTO "+self.table_name+" ("+param_keys+") VALUES ("+param_values+")"
```
Example:
```
if cql_columns = ['id','name']
then
param_keys = 'id,name'
# for bind value as dictionary (json object)
param_values = '%(id)s,%(name)s'
# param_values direct bind py execute method.
```

9. Read file based on their extension
    - pandas read_csv method accept data frame & chunksize which slice huge file into chunks.
    - supported files are: .csv, .json, .xls, .xlsx
``` python
# supported files
file_ext = file_name.split(".")[-1] # get extension
if file_ext=="csv": # if csv then read csv with chuncks
    data_frames = pd.read_csv(file_name,chunksize=chuncks)
elif file_ext=="json":
    data_frames = pd.read_json(file_name,chunksize=chuncks,lines=True)
elif file_ext in ["xls","xlsx"]:
    data_frames = pd.read_excel(file_name,chunksize=chuncks)
else:
    raise Exception("%s is not supported!"%(file_name))

```
Example:
```
data__frames has Pandas TextFileReader object 
```
10. Create empty thread list
``` python
threads = list(range(workers))
# e.g workers = 4
# threads = [0,1,2,3]
```
11. Start while loop for read data_frames:
 - df = next(data_frames) fetch data frame from TextFileReader, Every loop fetch new frame
 - First for loop assigin thread object into threads[i] position & start thread
 - In the thread class we pass **execute_insert** method and parameters as args = (dataframe,)
 - Second for loop wait for threads[i] complete the execution.
```python
while True:
    try:
        # start thread
        for i in range(workers):
            # fetch next dataframe
            df = next(data_frames)
            threads[i] = threading.Thread(target=self.execute_insert,args=(df,))
            threads[i].start()
        
        # wait until complete task
        for i in range(workers):
            threads[i].join()
    # on execption break
    except Exception as e:
        print(e)
        break
```

12. Created method **execute_insert**.
 - Accept parameter DataFrame
``` python
def execute_insert(self,df:pd.DataFrame)->None:
```

13. In the method:
 -  Make lower case column names. (cql have all lower case columns names)
 - handle null type for numeric values:
   - **df.select_dtypes(include=np.number).columns** get column name which have data type float,int
   - **df.select_dtypes(include=np.number).fillna(0)** select columns & fill 0 if null present
   - **df.select_dtypes(exclude=np.number).fillna('')** select non numeric cols and fill empty string.
 - Iterate df (dataframe) with **session.execute_async** method with insert statement & row(dict|json object).
   - Method **session.execute_async** return future object so result() is required for completed execute.
   - Add **session.execute_async** into **futures list**
 - Second loop iterate each future object & call result() method. By using this loop wait for all **session.execute_async** method to complete execution.

**NOTE: In dataframe Pandas.NaN set by default where null value is located and cql raise exception so we have to fill 0 & empty string**
 ``` python
# make lower columns name for cql
df.columns = map(str.lower,df.columns)
# handle null for numeric type default 0 add
df[df.select_dtypes(include=np.number).columns] = df.select_dtypes(include=np.number).fillna(0) 
# handle null for object and string types
df[df.select_dtypes(exclude=np.number).columns] = df.select_dtypes(exclude=np.number).fillna('') 
# empty list
futures = []
# excute async & store future object
for row in df.to_dict(orient='records'): # iterate each records
    futures.append(self.session.execute_async(self.insert__statement,row))
# wait for complete async 
for future in futures:
    future.result() # wait until insert queries
```

# Authors :
- ### 🙋‍♀️ [Reeya Patel](https://github.com/ReeyaPatel06)
- ### 🙋‍♂️ [Meet Rathod](https://github.com/MeetRathod0)




