Commit 31e69847 by ajil.k

updated

parent 150d4aa5
[fastapi_connection]
link_to_fastapi = scripts.services.main:app
link_to_fastapi = scripts.services.api_call:app
port_no = 8000
[MQTT_Connection]
broker = broker.emqx.io
broker = 192.168.0.220
port = 1883
time_out = 60
......
class ApiEndpoints:
root = "/"
no_of_doctors = "/no_of_doctors/{no_of_doctors_today}"
patient_details = "/get_and_send_patient_details/{patient_id}"
from scripts.constants.mqtt_connection import mqtt_client_connection
from scripts.constants.redis_connection import establish_redis_connection
from scripts.logging.logger import logger
# Initialize the Redis cache
redis_connection = establish_redis_connection()
......@@ -10,42 +11,47 @@ mqtt_client = mqtt_client_connection()
# Counter to keep track of the next doctor to assign
counter = 0
# No. of doctors
n = int(input("Enter the number of doctors: "))
def assign_doctor(patient_id):
try:
global counter
# Increment the counter and wrap around if it exceeds n
counter = (counter + 1) % n
doctor_topic = "doctor" + str(counter)
# Store the doctor's topic in the Redis cache for the patient
redis_connection.set(patient_id, doctor_topic)
print(patient_id, " assigned to ", doctor_topic)
except Exception as e:
print("Exception occurred due to \"", e, "\"")
def handle_new_patient(patient_id):
try:
# Check if the patient has visited before
all_patient_id = redis_connection.keys()
ids = [each.decode() for each in all_patient_id]
if patient_id in ids:
doctor_topic = redis_connection.get(patient_id)
print(patient_id, " consulted ", doctor_topic.decode(), " before.")
else:
# If the patient is new, assign a doctor using round-robin
assign_doctor(patient_id)
except Exception as e:
print("Exception occurred due to \"", e, "\"")
def send_patient_details(patient_id, patient_details):
try:
# Publish the patient information to the doctor's topic
mqtt_client.publish(patient_id, patient_details)
except Exception as e:
print("Exception occurred due to \"", e, "\"")
class Patient:
def __init__(self):
self.no_of_doctors = 0
@staticmethod
def assign_doctor(patient_id, no_of_doctors):
try:
global counter
# Increment the counter and wrap around if it exceeds n
counter = (counter + 1) % no_of_doctors
doctor_topic = "doctor" + str(counter)
# Store the doctor's topic in the Redis cache for the patient
redis_connection.set(patient_id, doctor_topic)
print(patient_id, " assigned to ", doctor_topic)
except Exception as e:
logger.exception(e)
print("Exception occurred due to \"", e, "\"")
def handle_new_patient(self, patient_id, no_of_doctors):
try:
# Check if the patient has visited before
all_patient_id = redis_connection.keys()
ids = [each.decode() for each in all_patient_id]
if patient_id in ids:
doctor_topic = redis_connection.get(patient_id)
print(patient_id, " consulted ", doctor_topic.decode(), " before.")
else:
# If the patient is new, assign a doctor using round-robin
self.assign_doctor(patient_id, no_of_doctors)
except Exception as e:
logger.exception(e)
print("Exception occurred handling patients due to \"", e, "\"")
def send_patient_details(self, patient_id, patient_details):
try:
# Publish the patient information to the doctor's topic
topic = "hospital/"+str(patient_id)
mqtt_client.publish(topic, patient_details)
except Exception as e:
from scripts.logging.logger import logger
logger.exception(e)
print("Exception occurred due to \"", e, "\"")
import json
from scripts.constants.handle_patients import handle_new_patient, send_patient_details
def start_service(app_api):
@app_api.post("/consultation/", tags="Consultation")
def consultation():
i = 1
while i:
print("------Select one Option------\n"
"1.Consult Doctor\n"
"2.Exit")
choice = int(input("Enter your choice:"))
if choice == 1:
print("------Enter patient information------")
patient_id = input("Enter patient id: ")
description = input("Enter the description: ")
age = int(input("Enter the age: "))
name = input("Enter the name: ")
patient_details = {
"description": description,
"age": age,
"name": name
}
# Send patient details for Publishing
send_patient_details(patient_id, json.dumps(patient_details))
# Handle patients using Round Robin Method
handle_new_patient(patient_id)
elif choice == 2:
i = 0
import logging
import os
from logging.handlers import RotatingFileHandler
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging.INFO)
log_formatter = '%(asctime)s - %(levelname)-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = "temp/"
file_name = "logs"
formatter = logging.Formatter(log_formatter, time_format)
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{file_name}.log")
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
\ No newline at end of file
import json
from fastapi import FastAPI
from scripts.constants.endpoints import ApiEndpoints
from scripts.core.handlers.handle_patients import Patient
from scripts.database.models import PatientDetails
# Create FastAPI instance
app = FastAPI()
# No of doctors
no_of_doctor = 0
# Setting Root
@app.get(ApiEndpoints.root)
async def root():
return {"INFO": "It's Working!"}
@app.post(ApiEndpoints().no_of_doctors, tags=["No of doctors"])
async def post_no_of_doctors(no_of_doctors_today: int):
global no_of_doctor
no_of_doctor = no_of_doctors_today
return {"Info": str(no_of_doctor)+" doctors are available."}
@app.post(ApiEndpoints().patient_details, tags=["get and send patient details"])
def consultation(patient_id: str, details: PatientDetails):
global no_of_doctor
# Send patient details for Publishing
patient_details = {
"description": details.description,
"age": details.age,
"name": details.name
}
Patient().send_patient_details(patient_id, json.dumps(patient_details))
# Handle patients using Round Robin Method
Patient().handle_new_patient(patient_id, no_of_doctor)
return {"Message": "Details Send"}
from fastapi import FastAPI
from scripts.core.handlers.patient_information import start_service
# Create FastAPI instance
app = FastAPI()
# Setting Root
@app.get("/")
async def root():
return {"INFO": "It's Working!"}
start_service(app)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment