Commit 596f9ca6 by arun.uday

v3

parent 4530f940
# main application for database task
from scripts.services.access_functions import db_access_op
from scripts.core.engine.postgres_connect import cur, postgres_client
from scripts.services.access_functions import mongo_db_access_op, postgres_db_access_op
# mongodb
db_choice = input("Enter mongo or postgres")
if db_choice == 'mongo':
print("Mongo DB")
mongo_db_access_op()
else:
print("PostgresSQL")
result = postgres_db_access_op()
if not result:
print("Lost in atoms")
else:
print("Successful")
print("Mongo DB")
db_access_op()
# database servers
[connection]
mongodb = mongodb://localhost:27017
[postgres]
hostname = localhost
port = 5432
dbname = storemanager
user = postgres
password = admin
table_name = products
# postgres connect
import configparser
conf_postgres = configparser.ConfigParser()
conf_postgres.read("conf/db.conf")
# read values
host_name = conf_postgres.get("postgres", "hostname")
port = conf_postgres.get("postgres", "port")
dbname = conf_postgres.get("postgres", "dbname")
user = conf_postgres.get("postgres", "user")
password = conf_postgres.get("postgres", "password")
products = conf_postgres.get("postgres", "table_name")
# postgres db connection
import psycopg2 as py
from scripts.config.postgres_connect_config import host_name, port, dbname, user, password
from scripts.constants.file_constants import exception_msg
try:
postgres_client = py.connect(host=host_name, port=port, user=user, password=password)
postgres_client.set_session(autocommit=True)
# creating cursor for the postgres
cur = postgres_client.cursor()
cur.execute("SELECT datname FROM pg_database WHERE datistemplate = false;")
# Fetch the result
result = cur.fetchall()
# check if database exist
if (dbname,) in result:
postgres_client = py.connect(host=host_name, port=port, dbname=dbname, user=user, password=password)
postgres_client.set_session(autocommit=True)
cur = postgres_client.cursor()
else:
# end current
postgres_client.commit()
cur.execute(f'CREATE DATABASE {dbname}')
except Exception as e:
print(f'{exception_msg},{e}')
# deleting data
# getting user inputs
from scripts.config.postgres_connect_config import products
# user input
def delete_db_post():
field_name = input("Enter the field condition: ")
del_data = int(input("Enter the data: "))
return field_name, del_data
# deleting the record
def db_delete_post(cur, field_name, del_data):
return cur.execute(f"DELETE FROM {products} WHERE {field_name} = {del_data}")
# reading user inputs
from scripts.config.postgres_connect_config import products
from scripts.constants.file_constants import exception_msg
# user inputs
def read_inputs_post():
p_id = int(input("Enter the product id: "))
p_name = input("Enter the product name: ")
p_category = input("Enter the category: ")
retail_price = int(input("Enter the retail price: "))
discount_price = int(input("Enter the discounted price: "))
description = input("Enter the product description: ")
over_rating = input("Enter the overall rating")
return p_id, p_name, p_category, retail_price, discount_price, description, over_rating
# insert in to table
def insert_data_post(cur, p_name, p_category, retail_price, discount_price, description, over_rating):
try:
val_data = [p_name, p_category, retail_price, discount_price, description, over_rating]
cur.execute(f"INSERT INTO {products} (Product_Name, Product_Category, Retail_Price, Discounted_Price, "
f"Description, Overall_Rating) VALUES (%s, %s, %s, %s, %s, %s)", val_data)
except Exception as e:
print(f'{exception_msg},{e}')
return False
else:
return True
# updating the db
from scripts.config.postgres_connect_config import products
# user data
def input_update_post():
field_name = input("Enter the field condition: ")
up_data = int(input("Enter the data: "))
up_field_name = input("Enter the field to update: ")
new_data = input("Enter the new data: ")
return field_name, up_data, up_field_name, new_data
# updating the database
def db_update_post(cur, field_name, up_data, up_field_name, new_data):
return cur.execute(f"UPDATE {products} SET {up_field_name} = '{new_data}' WHERE {field_name} = '{up_data}'")
# view db data
# display conditions
import pandas as pd
from scripts.config.postgres_connect_config import products
# view the data
def view_fields_post(cur):
cur.execute(f"SELECT * FROM {products}")
cursor_data = cur.fetchall()
if cursor_data is not None:
print("Data collected....")
view_data = pd.DataFrame((view_data_record for view_data_record in cursor_data), columns=['Product Id',
'Product_Name',
'Product_Category',
'Retail_Price',
'Discounted_Price',
'Description',
'Overall_Rating'])
print(view_data)
else:
print("Data fetching failed")
# creating a table
from scripts.config.postgres_connect_config import products
from scripts.constants.file_constants import exception_msg
# create table if exist or not
def create_table(cur):
try:
cursor_db = cur
cursor_db.execute(f"SELECT * FROM information_schema.tables WHERE table_name = '{products}'")
if cursor_db.fetchone():
print("Table exist")
else:
# Create the table if it does not exist
cursor_db.execute(f"CREATE TABLE {products} "
f"(Product_Id SERIAL PRIMARY KEY, Product_Name VARCHAR(255),"
f"Product_Category VARCHAR(255),"
f"Retail_Price INT,"
f"Discounted_Price INT,"
f"Description VARCHAR(255),"
f"Overall_Rating INT)")
except Exception as e:
print(f'{exception_msg},{e}')
return False
else:
return cursor_db
# access class functions
from scripts.services.mongo_db_operations import MongoDbStart
from scripts.services.postgres_db_operations import PostgresStart
def db_access_op():
# mongo access
def mongo_db_access_op():
db_class = MongoDbStart()
db_created = db_class.mongo_create()
while True:
......@@ -28,3 +30,36 @@ def db_access_op():
else:
print("Thank you....")
break
# postgres access
def postgres_db_access_op():
db_postgres_class = PostgresStart()
create_result = db_postgres_class.post_create()
if not create_result:
print("Cannot create table")
return False
while True:
try:
user_query = input("Enter insert/ update/ delete/ view/ quit : ")
if user_query == 'insert':
db_postgres_class.post_insert()
elif user_query == 'update':
db_postgres_class.post_update()
elif user_query == 'delete':
db_postgres_class.post_delete()
elif user_query == 'view':
db_postgres_class.post_view()
else:
db_postgres_class.post_quit()
print("Exiting....")
break
except Exception as e:
print("Exception occurred: ", e)
else:
choice = input("Do you want to continue (yes/no): ")
if choice == 'yes':
continue
else:
print("Thank you....")
return True
......@@ -8,6 +8,7 @@ from scripts.core.handlers.view_data_db import view_fields, view_cond
class MongoDbStart:
# create mongo table
@staticmethod
def mongo_create():
try:
......@@ -19,6 +20,7 @@ class MongoDbStart:
store_manager = db['products']
return store_manager
# mongo data insert
@staticmethod
# getting input and insert into db
def mongo_insert(db):
......@@ -36,6 +38,7 @@ class MongoDbStart:
except Exception as e:
print("Insertion failed", e)
# mongo update table
@staticmethod
def mongo_update(db):
try:
......@@ -48,6 +51,7 @@ class MongoDbStart:
except Exception as e:
print(f'{exception_msg}{e}')
# mongo delete data
@staticmethod
def mongo_delete(db):
try:
......@@ -60,6 +64,7 @@ class MongoDbStart:
except Exception as e:
print(f'{exception_msg}{e}')
# mongo view data
@staticmethod
def mongo_view(db):
try:
......
# class for postgresSql
from scripts.constants.file_constants import exception_msg
from scripts.core.engine.postgres_connect import cur, postgres_client
from scripts.core.handlers.postgres_data_del import delete_db_post, db_delete_post
from scripts.core.handlers.postgres_data_insert import read_inputs_post, insert_data_post
from scripts.core.handlers.postgres_data_update import input_update_post, db_update_post
from scripts.core.handlers.postgres_data_view import view_fields_post
from scripts.core.handlers.postgres_table_create import create_table
class PostgresStart:
@staticmethod
def post_create():
try:
client_cursor = cur
check_result = create_table(client_cursor)
if not check_result:
return False
else:
return check_result
except Exception as e:
print(f'{exception_msg}{e}')
@staticmethod
def post_insert():
try:
num_prod = int(input("Enter the number of products: "))
while num_prod > 0:
p_id, p_name, p_category, retail_price, discount_price, description, over_rating = read_inputs_post()
insert_check = insert_data_post(cur, p_name,
p_category, retail_price, discount_price, description, over_rating)
if insert_check:
print("Data inserted....")
num_prod -= 1
else:
print("Something went wrong data cannot be inserted....")
except Exception as e:
print("Insertion failed", e)
@staticmethod
def post_update():
try:
field_name, up_data, up_field_name, new_data = input_update_post()
update_check = db_update_post(cur, field_name, up_data, up_field_name, new_data)
if update_check is None:
print("Data updated")
else:
print("Data not updated")
except Exception as e:
print(f'{exception_msg},{e}')
@staticmethod
def post_delete():
try:
field_name, del_data = delete_db_post()
del_check = db_delete_post(cur, field_name, del_data)
if del_check is None:
print("Data deleted")
else:
print("Data not deleted")
except Exception as e:
print(f'{exception_msg}{e}')
@staticmethod
def post_view():
try:
view_fields_post(cur)
except Exception as e:
print(f'{exception_msg}{e}')
@staticmethod
def post_quit():
# Close the cursor and connection
cur.close()
postgres_client.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment