Commit ebb97daf by rakipv

changed

parent 782e3867
from scripts.core.services.app import crud_operation_mongodb, crud_operations_postgresql
print("Welcome to Python Crud operations \nchoose your db\n"
"1.MongoDB\n"
"2.PostgreSQL\n")
print(
"1.postgres\n 2.x+exit\n")
choice = int(input("Enter your choice: "))
choice = int(input("Enter"))
if choice == 1:
crud_operation_mongodb()
if choice == 2:
crud_operations_postgresql()
exit()
from pymongo import MongoClient
client = MongoClient('localhost', 27017) # 27017 is the default port number for mongodb
......@@ -7,7 +7,7 @@ def connect_db():
password="root",
host="127.0.0.1",
port="5432",
database="student")
database="my_data")
return connection
except (Exception, psycopg2.Error) as error:
......
from scripts.core.config.mongodb_connection import client
import pymongo
def db_connect():
# create a connection
try:
connection = client
# create a database
db = connection['students']
except Exception as e:
print("Error ", e)
return db
import psycopg2 as py
from scripts.config.postgres_connect_config import host_name, port, dbname, user, password
from scripts.constants.proj_constants import ConstantsValues
from scripts.core.database.postgres.queries.post_queries import QueriesPost
try:
postgres_client = py.connect(host=host_name, port=port, user=user, password=password)
postgres_client.set_session(autocommit=True)
# creating cursor for the postgres
cur = postgres_client.cursor()
cur.execute(QueriesPost().databases())
# Fetch the result
result = cur.fetchall()
# check if database exist
if (dbname,) in result:
postgres_client = py.connect(host=host_name, port=port, dbname=dbname, user=user, password=password)
postgres_client.set_session(autocommit=True)
cur = postgres_client.cursor()
else:
# end current
postgres_client.commit()
cur.execute(f'CREATE DATABASE {dbname}')
except Exception as e:
print(f'{ConstantsValues().exception_msg},{e}')
def delete_many_document(collection):
try:
# delete more than one document
deleted_many = collection.delete_many({"pat_change": 33.3})
if deleted_many.acknowledged:
print("Documents deleted successfully")
else:
print("Documents deletion failed!")
except Exception as e:
print("Error ", e)
def delete_one_document(collection):
try:
# delete one document
deleted_one = collection.delete_one({"student name": "pv"})
if deleted_one.acknowledged:
print("One document deleted")
else:
print("Document deletion failed!")
except Exception as e:
print("Error ", e)
from scripts.core.handlers.mongo_db.insert_one_mongodb import data_from_user
def inserting_many(collection):
try:
# insert more than one document
print("------Inserting using insert_many------")
number_of_documents = int(input("Enter the number of documents to be inserted: "))
document_list = []
for i in range(0, number_of_documents):
print("----- Enter the details" + str(i + 1) + " -----\n")
document_list.append(data_from_user())
# inserting more than one document
inserted_many = collection.insert_many(document_list)
if inserted_many.acknowledged:
print(str(number_of_documents) + " documents inserted successfully!")
except Exception as e:
print("Error ", e)
def data_from_user():
full_name = input("Enter full name: ")
email= input("Enter email: ")
course_of_study = input("Enter course: ")
year = float(input("enter year: "))
gpa = float(input("Enter gap: "))
company_document = {
"full_name": full_name,
"company_name": email,
"course_of_study": course_of_study,
"year": year,
"gpa": gpa
}
return company_document
def inserting_one(collection):
try:
# insert one document
print("------Inserting using insert_one------")
# get data from user
data = data_from_user()
# inserting one document
inserted_one = collection.insert_one(data)
if inserted_one.acknowledged:
print("One document inserted successfully!")
else:
print("One document insertion failed!")
except Exception as e:
print("Error ", e)
def finding_one(collection):
try:
# find one document
print(list(collection.find({}, {"fullname": 1, "_id": 0, "email": "jdoe@x.edu.ng"})))
except Exception as e:
print("Error ", e)
def finding_many(collection):
try:
# find one document
print(collection.find_one())
except Exception as e:
print("Error ", e)
def update_one_document(collection):
try:
# update one
updated_one = collection.update_one({'email': "123@gmail,com."},
)
if updated_one.matched_count == 1:
print("One document updated!")
else:
print("Updating one document failed!")
except Exception as e:
print("Error ", e)
def update_many_document(collection):
try:
# update many
updated_many = collection.update_many({}, {"$set": {"pat_change": 3.6}})
if updated_many.matched_count != 0:
print("Documents updated Successfully!")
else:
print("Updating documents failed!")
except Exception as e:
print("Error ", e)
def delete_row(connection):
try:
cursor = connection.cursor()
print("------DELETE OPERATION------\n")
license_no = input("Enter the license no")
delete_data = f'DELETE FROM tbl_company WHERE license_no={license_no}'
student_id = input("Enter the id no")
delete_data = f'DELETE FROM my_data WHERE id={student_id}'
cursor.execute(delete_data)
connection.commit()
print("Row deleted successfully!")
......
def data_from_user():
company_license_no = input("Enter the company license no: ")
company_name = input("Enter the company name: ")
market_cap = input("Enter the market capital: ")
revenue_change = input("Enter the revenue change: ")
pat_change = input("Enter the pat change: ")
company_document = (company_license_no, company_name, market_cap, revenue_change, pat_change)
student_id = input("Enter student id no: ")
student_name = input("Enter the student name: ")
cgpa = input("Enter student cgpa: ")
percentage = input("Enter student percengtage: ")
company_document = (student_id, student_name, cgpa, percentage)
return company_document
......@@ -3,11 +3,11 @@ from scripts.core.handlers.get_data_postgresql import data_from_user
def insert(connection):
cursor = connection.cursor()
print("------INSERT OPERATION------\n")
insert_table = "INSERT INTO tbl_company(license_no, company_name, market_cap,revenue_change, pat_change) VALUES(" \
insert_table = "INSERT INTO my_data(student_id, student_name, cgpa,percentag) VALUES(" \
"%s,%s,%s,%s,%s)"
row_value = data_from_user()
cursor.execute(insert_table, row_value)
connection.commit()
count = cursor.rowcount
print(count, "Record inserted successfully into tbl_company table")
print(count, "done")
def read_data(connection):
try:
cursor = connection.cursor()
print("------SELECT OPERATION------\n")
view_data = "SELECT * FROM tbl_company"
view_data = "SELECT * FROM my_data"
cursor.execute(view_data)
company_records = cursor.fetchall()
print("Each row and it's columns values")
for row in company_records:
print("license_no = ", row[0])
print("company_name = ", row[1])
print("market_cap = ", row[2])
print("revenue_change = ", row[3])
print("pat_change = ", row[4], "\n")
print("student_id = ", row[0])
print("student_name = ", row[1])
print("cgpa = ", row[2])
print("percentage = ", row[3])
except Exception as e:
print("Error ", e)
def update_row(connection):
try:
cursor = connection.cursor()
print("------UPDATE OPERATION------\n")
license_no = input("Enter the license no")
print("Choose the field you want to update\n"
"1.company_name\n"
"2.market_cap\n"
"3.revenue_change\n"
"4.pat_change\n")
student_id = input("Enter the student id")
print("change n 1.student_name\n2.cgpa\n3.percentage\n")
field_choice = int(input("Enter your choice: "))
if field_choice == 1:
update_data = input("Enter the data: ")
update_query = f'update tbl_company SET company_name ={update_data} where license_no={license_no}'
new_stu_data = input("type name")
update_query = f'update my_data SET student_name ={new_stu_data} where student_id={student_id}'
else:
if field_choice == 2:
field_to_update = "market_cap"
new_data = "cgpa"
elif field_choice == 3:
field_to_update = "revenue_change"
new_data = "percentage"
elif field_choice == 4:
field_to_update = "pat_change"
update_data = input("Enter the data: ")
update_query = f'UPDATE tbl_company SET {field_to_update}={update_data} WHERE license_no={license_no}'
new_data = "pat_change"
new_stu_data = input("Enter the data: ")
update_query = f'UPDATE my_data SET {new_data}={new_stu_data} WHERE student_id={student_id}'
cursor.execute(update_query)
connection.commit()
print("Row updated successfully!")
......
from scripts.core.config.postgres_connection import connect_db
from scripts.core.database.mongodb import db_connect
from scripts.core.handlers.mongo_db.delete_many_mongodb import delete_many_document
from scripts.core.handlers.mongo_db.delete_one_mongodb import delete_one_document
from scripts.core.handlers.mongo_db.read_one_mongodb import finding_one, finding_many
from scripts.core.handlers.mongo_db.insert_one_mongodb import inserting_one
from scripts.core.handlers.mongo_db.insert_many_mongodb import inserting_many
from scripts.core.handlers.mongo_db.update_one_mongodb import update_one_document
from scripts.core.handlers.mongo_db.updatw_many_mongodb import update_many_document
from scripts.core.handlers.posgres_db.delete_postgresql import delete_row
from scripts.core.handlers.posgres_db.insert_postgresql import insert
from scripts.core.handlers.posgres_db.read_data_postgresql import read_data
from scripts.core.handlers.posgres_db.update_postgresql import update_row
def crud_operation_mongodb():
try:
choice = int(input(
" enter which operation to perform\n0.display many\n1.display one\2.insert one\n3.insert many\n4.delete one\n5.delete many\n6.update one \n7. update many"))
# create a db connection
db = db_connect()
# create a collection
collection = db['students_collection']
if choice == 0:
finding_many(collection)
if choice == 1:
finding_one(collection)
if choice == 2:
inserting_one(collection)
if choice == 3:
inserting_many(collection)
if choice == 4:
delete_one_document(collection)
if choice == 5:
delete_many_document(collection)
if choice == 6:
update_one_document(collection)
if choice == 7:
update_many_document(collection)
except Exception as e:
print("Error-", e)
finally:
print("end")
def crud_operations_postgresql():
try:
postgresql_connection = connect_db()
cursor = postgresql_connection.cursor()
# checking if the table exists
cursor.execute(f"SELECT * FROM information_schema.tables WHERE table_name = 'tbl_company'")
if cursor.fetchone():
print("Table Already Exists!")
else:
postgres_create_query = """CREATE TABLE tbl_company (license_no INT,
company_name CHAR(50) NOT NULL,
market_cap INT NOT NULL,
revenue_change INT NOT NULL,
pat_change INT NOT NULL,
PRIMARY KEY( license_no ))"""
cursor.execute(postgres_create_query)
postgresql_connection.commit()
print("Table Created!")
# inserting records in the table
insert(postgresql_connection)
# read or selecting the data in the table
read_data(postgresql_connection)
# updating data in the table based on the license number user entered
update_row(postgresql_connection)
# deleting data in the table based on the license number user entered
delete_row(postgresql_connection)
db_postgres_class = PostgresStart()
create_result = db_postgres_class.post_create()
if not create_result:
print("Cannot create table")
return False
while True:
try:
user_query = input("Enter 1 insert/ 2 update/ 3 delete/ 4view/ quit : ")
if user_query == '1':
insert()
elif user_query == '2':
update_row()
elif user_query == '3':
delete_row()
elif user_query == '4':
read_data()
else:
db_postgres_class.post_quit()
break
except Exception as e:
print("Exception occurred: ", e)
else:
choice = input("Do you want to continue (yes/no): ")
if choice == 'yes':
continue
else:
return True
except Exception as e:
print("Error-", e)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment