Commit b50337e9 by arjun.b

fastapi crud ops

follows the project structure
parents
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N801" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (fastapi)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/fastapi.iml" filepath="$PROJECT_DIR$/.idea/fastapi.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import uvicorn
from scripts.config.application_config import server_path, port_no
if __name__ == "__main__":
try:
uvicorn.run(server_path, port=int(port_no), reload=True)
except Exception as e:
print(str(e))
[server]
server_path=scripts.services.fastapi_services:app
port=8000
[mongo]
uri=mongodb://localhost:27017
port=27017
\ No newline at end of file
import configparser
try:
config = configparser.ConfigParser()
config.read('conf/application.conf')
# # path of csv file
# file_path = config.get("path", "base_path")
# # mongo connect
mongo_uri = config.get("mongo", "uri")
port_connect = config.get("mongo", "port")
# # server
server_path = config.get("server", "server_path")
port_no = config.get("server", "port")
except configparser.NoOptionError:
print("could not read configuration file")
from pymongo import MongoClient
from scripts.config.application_config import mongo_uri
from scripts.utils.mongo_utils import create_collection
def db_connect():
try:
conn = MongoClient(mongo_uri)
database = conn.employees
# print(conn.list_database_names())
dblist = conn.list_database_names()
if database in dblist:
print("database exist")
print("mongo connect successfully")
return database
except Exception as e:
print(str(e))
# create database
db = db_connect()
print("database created")
# create collection
collection = create_collection(db)
print("collection created")
class EndPoints:
upload="/upload"
send_data="/send_data"
update="/update_data/{doc_id}"
delete="/delete_data/{doc_id}"
from scripts.database.model import employee_insert
from scripts.services.fastapi_services import collection
class fastAPIHandler:
@staticmethod
def file_upload(file_content):
try:
collection.insert_many(file_content.to_dict("records"))
except Exception as e:
print(e)
@staticmethod
def insert_data(emp):
try:
new_employee = employee_insert(id=emp.id,
first_name=emp.first_name,
last_name=emp.last_name,
gender=emp.gender,
phone=emp.phone)
collection.insert_one(dict(new_employee))
except Exception as e:
print(e)
@staticmethod
def update_data(uid, details):
try:
update_obj = {}
if details.first_name is not None:
update_obj['first_name'] = details.first_name
if details.last_name is not None:
update_obj['last_name'] = details.last_name
if details.gender is not None:
update_obj['gender'] = details.gender
if details.phone is not None:
update_obj['phone'] = details.phone
# update the document in mongodb
collection.update_one({"id": uid}, {"$set": update_obj})
print("data updated")
except Exception as e:
print(e)
@staticmethod
def delete_data(uid):
try:
collection.delete_one({"id": uid})
except Exception as e:
print(e)
from typing import Optional
from mongoengine import Document, StringField, ObjectIdField, IntField
from pydantic import BaseModel
class Employee(Document):
_id: ObjectIdField()
id: IntField()
first_name: StringField()
last_name: StringField()
gender: StringField()
phone: StringField()
class employee_insert(BaseModel):
_id: str
id: int
first_name: str
last_name: str
gender: str
phone: str
class employee_update(BaseModel):
first_name: Optional['str'] = None
last_name: Optional['str'] = None
gender: Optional['str'] = None
phone: Optional['str'] = None
class employee_delete(BaseModel):
first_name: Optional['str'] = None
last_name: Optional['str'] = None
gender: Optional['str'] = None
phone: Optional['str'] = None
import logging
import os
# from logging import StreamHandler
from logging.handlers import RotatingFileHandler, SocketHandler
import yaml
# from scripts.constants.app_configuration import PathToStorage
# this method is to read the configuration from backup.conf
def read_configuration(file_name):
"""
:param file_name:
:return: all the configuration constants
"""
with open(file_name, 'r') as stream:
try:
return yaml.safe_load(stream)
except Exception as e:
print(f"Failed to load Configuration. Error: {e}")
config = read_configuration("scripts/logging/logger_conf.yml")
logging_config = config["logger"]
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging_config["level"].upper())
log_formatter = '%(asctime)s - %(levelname)-6s - [%(threadName)5s:%(funcName)5s():''' \
'%(lineno)s] - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = PathToStorage.LOGS_MODULE_PATH
formatter = logging.Formatter(log_formatter, time_format)
for each_handler in logging_config["handlers"]:
if each_handler["type"] in ["RotatingFileHandler"]:
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{logging_config['name']}.log")
temp_handler = RotatingFileHandler(log_file,
maxBytes=each_handler["max_bytes"],
backupCount=each_handler["back_up_count"])
temp_handler.setFormatter(formatter)
elif each_handler["type"] in ["SocketHandler"]:
temp_handler = SocketHandler(each_handler["host"], each_handler["port"])
elif each_handler["type"] in ["StreamHandler"]:
temp_handler = StreamHandler()
temp_handler.setFormatter(formatter)
else:
temp_handler = None
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
import logging
from fastapi import FastAPI, UploadFile
from mongoengine import connect
from scripts.config.application_config import port_connect
from scripts.constants.db_connect import db_connect, collection
from scripts.constants.end_points import EndPoints
from scripts.core.handlers.fastapi_handler import fastAPIHandler
from scripts.database.model import employee_insert, employee_update
import pandas as pd
app = FastAPI()
connect(db="employees", host="localhost", port=int(port_connect))
@app.get("/")
async def root():
return {"data": "FastAPI CRUD Operations"}
# upload and insert into database
@app.post(EndPoints.upload, tags=["upload file"])
async def upload_file(file: UploadFile):
try:
file_content = pd.read_csv(file.file)
fastAPIHandler.file_upload(file_content)
return {"data": "CSV data uploaded successfully"}
except Exception as e:
logging.error(f'upload file failed:{e}')
print(e)
# insert data using post method
@app.post(EndPoints.send_data, tags=['send data'])
def send_data(emp: employee_insert):
try:
fastAPIHandler.insert_data(emp)
return {"message": "new data has been inserted"}
except Exception as e:
logging.error(f'inserting a document failed {e}')
# update data
@app.put(EndPoints.update, tags=["update database"])
def update_data(doc_id: int, details: employee_update):
try:
fastAPIHandler.update_data(doc_id, details)
return {"data": "data updated"}
except Exception as e:
logging.error(f'updating the data failed {e}')
# delete data
@app.delete(EndPoints.delete, tags=["delete data"])
def delete_data(doc_id: int):
try:
fastAPIHandler.delete_data(doc_id)
return {"data": "data deleted"}
except Exception as e:
logging.error(f'deleting a document failed {e}')
def create_collection(db):
try:
collection_name = "employee"
collection = db[collection_name]
return collection
except Exception as e:
print(str(e))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment