Commit 86dcae83 by ajil.k

updated

parent 3c76b831
MONGO_URI = mongodb://localhost:27017/
DATA_PATH = temp/data.csv
\ No newline at end of file
# importing libraries
import uvicorn
from scripts.constants.app_configuration import port_no
from scripts.constants.app_configuration import fastapi_port_no
if __name__ == "__main__":
try:
uvicorn.run("scripts.services.main:app", port=int(port_no), reload=True)
uvicorn.run("scripts.services.api_call:app", port=int(fastapi_port_no), reload=True)
except Exception as e:
print("Error-", e)
[MongoDB]
mongo_uri = mongodb://localhost:27017/
port = 8000
[DataPath]
csv_path = temp/data.csv
\ No newline at end of file
csv_path = temp/data.csv
[fastapi_connection]
fastapi_port_no = 8000
\ No newline at end of file
......@@ -5,6 +5,7 @@ config = ConfigParser()
config.read(f"conf/application.conf")
uri = config.get("MongoDB", "mongo_uri")
port_no = config.get("MongoDB", "port")
csv_path = config.get("DataPath", "csv_path")
fastapi_port_no = config.get("fastapi_connection", "fastapi_port_no")
......@@ -8,12 +8,12 @@ def db_connect():
try:
# Connect to MongoDB
connection = MongoClient(uri)
if connection:
print("Database Connected Successfully !")
# Create a database
db_name = 'db_person'
db = connection[db_name]
return db
collection_name = 'personal_details'
collection = db[collection_name]
print("Database Connected Successfully !")
return collection
except Exception as e:
print("Error ", e)
class APIEndpoints:
# FastAPI endpoints
root = "/"
insert_all_documents = "/insert-all-documents-into-db/"
get_details = "/get_details"
create_new_document = "/create-new-document"
update_document = "/update-document/{document_id}"
delete_document = "/delete-document/{document_id}"
import json
from mongoengine import connect
from scripts.constants.app_configuration import csv_path
from scripts.constants.db_connection import db_connect
from scripts.core.handlers.insert_data_into_db import insert_into_db
from scripts.core.handlers.read_file import extract_data
from scripts.database.mongodb.models import personal_details, PersonalDetails
connect(db='db_person', host='localhost', port=27017)
class operations_on_api:
def __init__(self):
self.collection = db_connect()
def insert_all_data(self):
try:
data = extract_data(csv_path)
insert_status = insert_into_db(self.collection, data)
if insert_status:
print("Data inserted from csv to database successfully!")
else:
print("Data insertion to db failed!")
except Exception as e:
print("Data insertion error-", e)
@staticmethod
def get_data_from_db():
try:
documents = personal_details.objects().to_json()
json_data = json.loads(documents)
print(json_data)
return json_data
except Exception as e:
print("Data Fetching error-", e)
def insert_one_document(self, details):
try:
person_details = PersonalDetails(id=details.id,
first_name=details.first_name,
last_name=details.last_name,
email=details.email,
phone_no=details.phone_no)
result = self.collection.insert_one(dict(person_details))
if result:
print("Data inserted to the document", dict(person_details))
return result
except Exception as e:
print("Data insertion error-", e)
def update_one_document(self, document_id, update_details):
try:
condition_data = {"id": document_id}
set_data = {"$set": {}}
if update_details.first_name is not None:
set_data["$set"]["first_name"] = update_details.first_name
if update_details.last_name is not None:
set_data["$set"]["last_name"] = update_details.last_name
if update_details.email is not None:
set_data["$set"]["email"] = update_details.email
if update_details.phone_no is not None:
set_data["$set"]["phone_no"] = update_details.phone_no
result = self.collection.update_one(condition_data, set_data)
return result
except Exception as e:
print("Data updation error-", e)
def delete_one_document(self, document_id):
try:
condition_data = {"id": document_id}
result = self.collection.delete_one(condition_data)
return result
except Exception as e:
print("Data deletion error-", e)
from scripts.constants.app_configuration import csv_path
from scripts.core.handlers.read_file import extract_data
def insert_into_db(collection, data):
status = collection.insert_many(data.to_dict('records'))
status = collection.insert_many(data)
if status:
return True
else:
return False
# importing libraries
from typing import Optional
from mongoengine import connect
from pydantic import BaseModel
from scripts.core.engine.model import personal_details
employees = {
1: {
"name": "Varun",
"gender": "Male",
"dob": "01-01-1999",
"mobile": 9876543210
}
}
class Employee(BaseModel):
name: str
gender: str
dob: str
mobile: int
class PersonalDetails(BaseModel):
_id: Optional[str] = None
id: int
first_name: str
last_name: str
email: str
phone_no: int
class UpdatePersonalDetails(BaseModel):
_id: Optional[str] = None
id: Optional[int] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[str] = None
phone_no: Optional[int] = None
class UpdateEmployee(BaseModel):
name: Optional[str] = None
gender: Optional[str] = None
dob: Optional[str] = None
mobile: Optional[int] = None
connect(
db='db_person',
host='localhost',
port=27017
)
def start_operations(app_api):
@app_api.post("/add-employee/{employee_id}", tags=["add employee"])
def add_employee(employee_id: int, employee: Employee):
if employee_id in employees:
return {"Error": "Employee already Exists"}
employees[employee_id] = employee
return employees
@app_api.get("/view-employee/{employee_id}", tags=["view employee"])
def view_employee(employee_id: int):
if employee_id in employees:
return employees[employee_id]
else:
return {"Error": "Employee doesn't Exist"}
@app_api.put("/update-employee/{employee_id}", tags=["update employee"])
def update_employee(employee_id: int, employee: UpdateEmployee):
if employee_id not in employees:
return {"Error": "Employee not Exists!"}
if employee.name is not None:
employees[employee_id].name = employee.name
if employee.gender is not None:
employees[employee_id].gender = employee.gender
if employee.dob is not None:
employees[employee_id].dob = employee.dob
if employee.mobile is not None:
employees[employee_id].mobile = employee.mobile
return employees[employee_id]
@app_api.delete("/delete-employee/{employee_id}", tags=["delete employee"])
def delete_employee(employee_id: int):
if employee_id not in employees:
return {"Error": "Employee not Exists!"}
del employees[employee_id]
return {"Message": "Employee deleted successfully!"}
def view_documents(app_api):
@app_api.get("/get_details", tags=["read documents"])
def get_all_details():
documents = personal_details.objects().to_json()
print(documents)
return {"data": documents}
def insert_document(app_api, collection):
@app_api.post("/create-new-document", tags=["create document"])
async def create_new_document(details: PersonalDetails):
p_details = PersonalDetails(id=details.id,
first_name=details.first_name,
last_name=details.last_name,
email=details.email,
phone_no=details.phone_no)
result = collection.insert_one(dict(p_details))
return {"inserted document id": str(result.inserted_id)}
def update_documents(app_api, collection):
@app_api.put("/update-document{document_id}", tags=["update document"])
async def update_document(document_id: int, update_details: UpdatePersonalDetails):
condition_data = {"id": document_id}
set_data = {"$set": {}}
if update_details.first_name is not None:
set_data["$set"]["first_name"] = update_details.first_name
if update_details.last_name is not None:
set_data["$set"]["last_name"] = update_details.last_name
if update_details.email is not None:
set_data["$set"]["email"] = update_details.email
if update_details.phone_no is not None:
set_data["$set"]["phone_no"] = update_details.phone_no
updated = collection.update_one(condition_data, set_data)
return {"Message": "Document updated"}
def delete_document(app_api, collection):
@app_api.delete("/delete-document/{document_id}", tags=["delete document"])
async def delete_documents(document_id: int):
condition_data = {"id": document_id}
deleted_one = collection.delete_one(condition_data)
return {"Message": "Document deleted Successfully!"}
......@@ -4,6 +4,6 @@ import pandas as pd
def extract_data(path):
try:
data_from_csv = pd.read_csv(path)
return data_from_csv
return data_from_csv.to_dict('records')
except Exception as e:
print("Error-", e)
from typing import Optional
from mongoengine import Document, StringField, ObjectIdField, IntField
from pydantic import BaseModel
class PersonalDetails(BaseModel):
_id: Optional[str] = None
id: int
first_name: str
last_name: str
email: str
phone_no: int
class UpdatePersonalDetails(BaseModel):
_id: Optional[str] = None
id: Optional[int] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[str] = None
phone_no: Optional[int] = None
class personal_details(Document):
......
# importing libraries
from fastapi import FastAPI
from scripts.core.handlers.api_functions import operations_on_api
from scripts.database.mongodb.models import PersonalDetails, UpdatePersonalDetails
from scripts.constants.endpoints import APIEndpoints
# Create FastAPI instance
app = FastAPI()
# Create Endpoint instance
api_endpoint = APIEndpoints()
# Root
@app.get(api_endpoint.root)
async def root():
return {"Message": "It's Working!"}
# Insert data from csv to database
@app.post(api_endpoint.insert_all_documents, tags=["insert_all_documents_into_db"])
async def insert_all_documents_into_db():
operations_on_api().insert_all_data()
return {"Message": "Data inserted from csv to database successfully!"}
# Display all documents
@app.get(api_endpoint.get_details, tags=["read documents"])
def get_all_details():
documents = operations_on_api().get_data_from_db()
return {"data": documents}
# Insert one document to db
@app.post(api_endpoint.create_new_document, tags=["create document"])
async def create_new_document(details: PersonalDetails):
result = operations_on_api().insert_one_document(details)
return {"inserted document id": str(result.inserted_id)}
# Update one Document
@app.put(api_endpoint.update_document, tags=["update document"])
async def update_document(document_id: int, update_details: UpdatePersonalDetails):
operations_on_api().update_one_document(document_id, update_details)
return {"Message": "Document updated"}
# Delete one Document
@app.delete(api_endpoint.delete_document, tags=["delete document"])
async def delete_documents(document_id: int):
operations_on_api().delete_one_document(document_id)
return {"Message": "Document deleted Successfully!"}
# importing libraries
from fastapi import FastAPI
from scripts.constants.app_configuration import csv_path
from scripts.constants.db_connection import db_connect
from scripts.core.handlers.insert_data_into_db import insert_into_db
from scripts.core.handlers.operations import start_operations, view_documents, insert_document, update_documents, \
delete_document
from scripts.core.handlers.read_file import extract_data
from scripts.utils.mongo_util import create_collection
app = FastAPI()
@app.get("/")
async def root():
return "It's Working!"
# Operations on api
start_operations(app)
# Insert data from csv to database
data = extract_data(csv_path)
db = db_connect()
collection = create_collection(db)
insert_status = insert_into_db(collection, data)
if insert_status:
print("Data inserted from csv to database successfully!")
# Display all documents
view_documents(app)
# Insert one document to db
insert_document(app, collection)
# Update one Document
update_documents(app, collection)
# Delete one Document
delete_document(app, collection)
# function for creating collection
def create_collection(db):
# Create Collection
collection_name = 'personal_details'
collection = db[collection_name]
return collection
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment