Commit 288b7333 by arun.uday

first commit 09-02-2023

parents
MONGO_URI = mongodb://localhost:27017
\ No newline at end of file
# Default ignored files
/shelf/
/workspace.xml
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (MQTT_FastApi)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="scripts.config.application_config" />
</list>
</option>
</inspection_tool>
</profile>
</component>
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (MQTT_FastApi)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/MQTT_FastApi.iml" filepath="$PROJECT_DIR$/.idea/MQTT_FastApi.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.applications_conf_read import uvicorn_receiver_port
from scripts.logging.loggers import logger
from scripts.services.app_service_run import receiver_api
app = FastAPI()
app.include_router(receiver_api)
# starting the application
if __name__ == "__main__":
try:
print("MQTT receiver task")
uvicorn.run(app, port=int(uvicorn_receiver_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
[path]
base_path = scripts/
sub_path = external/
[file]
csv_file = user_data.csv
file_mode = wb
[uvicorn]
uvicorn_port = 8000
receiver_port = 8001
# database servers
[connection]
db_name = user_data
collection = user
[api_routes]
route_index = /
route_insert_csv = /insert-csv/
route_insert_one = /insert-one/
route_view_all = /view-all/
route_view_one = /view-one/
route_update = /update/
route_delete = /delete/
route_receiver = /receiver/
[log]
formatter_time = asctime
formatter_level = levelname
[mqtt]
topic = topic
mqtt_host = 192.168.0.220
port = 1883
requests = 60
[db]
db_table = details
[encode]
encode = utf-8
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.applications_conf_read import uvicorn_port
from scripts.logging.loggers import logger
from scripts.services.app_service_run import mongo_api
app = FastAPI()
app.include_router(mongo_api)
# starting the application
if __name__ == "__main__":
try:
print("MQTT sender task")
uvicorn.run(app, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
# reading from the applications.conf
import configparser
import os
import sys
from dotenv import load_dotenv
load_dotenv()
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
base_path = config.get("path", 'base_path')
sub_path = config.get("path", "sub_path")
full_path = base_path + sub_path
# file
file_csv = config.get("file", "csv_file")
csv_path = full_path + file_csv
file_mode = config.get("file", "file_mode")
# mongo server
uri = os.getenv("MONGO_URI")
# log
formatter_time = config.get("log", "formatter_time")
formatter_level = config.get("log", "formatter_level")
# uvicorn
uvicorn_port = config.get("uvicorn", "uvicorn_port")
uvicorn_receiver_port = config.get("uvicorn", "receiver_port")
# mongodb conf
db_name = config.get("connection", "db_name")
collection = config.get("connection", "collection")
# mqtt
topic_name = config.get("mqtt", "topic")
mqtt_host = config.get("mqtt", "mqtt_host")
mqtt_port = config.get("mqtt", "port")
request_no = config.get("mqtt", "requests")
# db name
db_table = config.get("db", "db_table")
# encode
utf_encode = config.get("encode", "encode")
except Exception as e:
print(e)
sys.stdout.flush()
sys.exit()
# reading conf file
import configparser
import sys
try:
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# api paths
route_index = config.get("api_routes", "route_index")
route_insert_csv = config.get("api_routes", "route_insert_csv")
route_insert_one = config.get("api_routes", "route_insert_one")
route_view_all = config.get("api_routes", "route_view_all")
route_view_one = config.get("api_routes", "route_view_one")
route_update = config.get("api_routes", "route_update")
route_delete = config.get("api_routes", "route_delete")
route_receiver = config.get("api_routes", "route_receiver")
except Exception as e:
print(e)
sys.stdout.flush()
sys.exit()
from scripts.config.applications_conf_read import topic_name
from scripts.logging.loggers import logger
def connect_mqtt(client_name, userdata, flags, rc):
try:
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
# subscribing to the topic
client_name.subscribe(topic_name)
except Exception as e:
logger.error("Exception occurred while connecting to mqtt: ", e)
from scripts.logging.loggers import logger
def display_fetched_data(cursor):
try:
# Fetch all the rows as a list of tuples
rows = cursor.fetchall()
# Iterate through the rows and print the data
for row in rows:
print(row)
except Exception as e:
logger.error("Exception occurred while fetching the data from table: ", e)
import json
from scripts.config.applications_conf_read import utf_encode
from scripts.core.handlers.receiver_mqtt.operations_call_functions import operations_call_
from scripts.logging.loggers import logger
def message_mqtt(client_name, userdata, msg):
try:
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
# payload to json
json_data = json.loads(payload_decoded)
# if type is list then data will get enter else message will be printed
operations_call_(json_data)
except Exception as e:
logger.error("Exception occurred while decoding the message: ", e)
from paho.mqtt.client import Client
from scripts.core.handlers.receiver_mqtt.connection_mqtt_receiver import connect_mqtt
from scripts.core.handlers.receiver_mqtt.message_mqtt_receive import message_mqtt
from scripts.logging.loggers import logger
def mqtt_receiver_call(mqtt_host, port, request_no):
try:
# client object creation
client = Client(userdata="Receiver")
# connecting to broker
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = connect_mqtt
# listening to the topic
client.on_message = message_mqtt
client.loop_forever()
except Exception as e:
logger.error("Exception occurred while calling the mqtt: ", e)
from scripts.core.handlers.receiver_mqtt.display_data import display_fetched_data
from scripts.core.handlers.receiver_mqtt.operations_db import SqliteOperations
from scripts.logging.loggers import logger
# database operations
def operations_call_(json_data):
try:
# creating a copy of the data
data_list = json_data
# calling the sqlite operations class
obj_db = SqliteOperations()
# dictionary for multiple functions of the sqlite operations class
operations = {
"insert": obj_db.insert_db,
"insert_one": obj_db.insert_one_db,
"fetch": obj_db.fetch_data,
"fetch_one": obj_db.fetch_one,
"update": obj_db.update_db,
"delete": obj_db.delete_db
}
# selecting the operation based on the message from the sender
operation_type = data_list[0]
operation = operations.get(operation_type)
# determining the operation
if operation:
if operation_type in ["insert", "insert_one", "delete", "fetch_one"]:
data_fetched = operation(data_list[1])
elif operation_type == "update":
data_fetched = obj_db.update_db(data_list[1], data_list[2])
else:
data_fetched = operation()
else:
data_fetched = None
# fetching the data from the db
display_fetched_data(data_fetched) if data_fetched else print("")
except Exception as e:
logger.error("Exception occurred while converting to data frame: ", e)
import sqlite3
import pandas as pd
from scripts.config.applications_conf_read import full_path, db_name, db_table
from scripts.core.handlers.receiver_mqtt.updating_conditions import conditions_string, updates_string
from scripts.logging.loggers import logger
from scripts.utils.mongo.db_queries import DbQueries
class SqliteOperations:
def __init__(self):
# connecting to the database
self.conn = sqlite3.connect(full_path + db_name + ".db")
def create_db(self):
try:
# creating the table
self.conn.execute(DbQueries().create_table)
self.conn.close()
except Exception as e:
logger.error("Exception occurred while creating table: ", e)
def insert_db(self, data):
try:
# inserting data to table
frames = pd.DataFrame(data, index=None)
frames.to_sql(db_table, self.conn, if_exists='replace')
self.conn.commit()
except Exception as e:
logger.error("Exception occurred while inserting data: ", e)
else:
print("Insertion successful")
def insert_one_db(self, data):
try:
# inserting data to table one by one
frames = pd.DataFrame(data, index=None)
frames.to_sql(db_table, self.conn, if_exists='append')
self.conn.commit()
except Exception as e:
logger.error("Exception occurred while inserting one: ", e)
else:
print("Insertion of one successful")
def fetch_data(self):
try:
# Create a cursor
cursor = self.conn.cursor()
# Execute a SELECT statement to fetch data from the table
cursor.execute(DbQueries().select_query())
except Exception as e:
logger.error("Exception occurred while fetching data: ", e)
else:
print("Fetching successful")
return cursor
def fetch_one(self, conditions):
try:
# Create a cursor
cursor = self.conn.cursor()
str_conditions = conditions_string(conditions)
# Execute a SELECT statement to fetch data from the table
cursor.execute(DbQueries().select_one(str_conditions))
except Exception as e:
logger.error("Exception occurred while fetching data one: ", e)
else:
print("Fetching one successful")
return cursor
def update_db(self, conditions, updates):
try:
# changing the conditions and updates for the query
str_conditions = conditions_string(conditions)
str_updates = updates_string(updates)
# running the query
self.conn.execute(DbQueries().update_query(str_conditions, str_updates))
self.conn.commit()
except Exception as e:
logger.error("Exception occurred while updating data: ", e)
else:
print("Updating successful")
def delete_db(self, conditions):
try:
# changing the conditions for the query
str_conditions = conditions_string(conditions)
# running the query
self.conn.execute(DbQueries().delete_query(str_conditions))
self.conn.commit()
except Exception as e:
logger.error("Exception occurred while deleting data: ", e)
else:
print("Deletion successful")
# update the conditions for updating and deletion
from scripts.logging.loggers import logger
# updating the conditions for the query
def conditions_string(conditions):
try:
str_ = " and ".join(f"{key} = '{value}'" for key, value in conditions.items())
return str_
except Exception as e:
logger.error("Exception occurred while making conditions to string: ", e)
# updating the updates for the query
def updates_string(updates):
try:
str_ = ",".join(f"{key} = '{value}'" for key, value in updates.items())
return str_
except Exception as e:
logger.error("Exception occurred while making conditions to string: ", e)
# conditions to dict
from scripts.logging.loggers import logger
def remove_none(condition):
# removing the none and string values from the user inputs
try:
dict_ = {key: value for key, value in condition if value is not None and value != 'string'}
return dict_
except Exception as e:
logger.error("Exception occurred while removing none and string: ", e)
# updating conditions and update conditions
from scripts.logging.loggers import logger
def remove_none_updates(condition, update):
# removing the none and string values from the user inputs
# conditions to dictionary
try:
dict_ = {key: value for key, value in condition if value is not None and value != 'string'}
# update conditions to dictionary
update_ = {key: value for key, value in update if value is not None and value != 'string'}
return dict_, update_
except Exception as e:
logger.error("Exception occurred while removing none and string in update: ", e)
# convert csv to json
import pandas as pd
from scripts.logging.loggers import logger
# convert csv to json
def file_convert_json(csv_path):
try:
# converting the data to csv
csv_file = pd.read_csv(csv_path)
dict_data = csv_file.to_dict(orient="records")
except Exception as e:
logger.error("Some exception occurred while reading csv: ", e)
else:
return dict_data
# creating the message for the receiver
from scripts.logging.loggers import logger
def create_message_one(msg, dict_data):
try:
# creating message for insertion
list_data = [msg, dict_data]
return list_data
except Exception as e:
logger.error("Exception occurred while creating insertion message: ", e)
def create_message_two(msg, condition, update):
try:
# create message for the update
list_ = [msg, condition, update]
return list_
except Exception as e:
logger.error("Exception occurred while creating message for update: ", e)
def create_message_three(msg, condition):
try:
# create message for fetch one and delete
list_ = [msg, condition]
return list_
except Exception as e:
logger.error("Exception occurred while creating message for fetch: ", e)
# cursor handler
from scripts.logging.loggers import logger
def cursor_to_dict(cursor):
try:
dict_data = {}
# converting to the dict
for i, dicts in enumerate(cursor):
dict_data.update({i: dicts})
# returning the data to api
return dict_data
except Exception as e:
logger.error("Some exception occurred while trying to run the cursor: ", e)
import json
from paho.mqtt.client import Client
from scripts.config import applications_conf_read
from scripts.logging.loggers import logger
def extract_send_data(dict_):
try:
# dumps the list to json
json_data = json.dumps(dict_)
# create the paho client
client = Client()
# connect to the mqtt client
client.connect(applications_conf_read.mqtt_host, int(applications_conf_read.mqtt_port))
# publish the topic
client.publish(applications_conf_read.topic_name, json_data)
# disconnecting from the mqtt
client.disconnect()
except Exception as e:
logger.error("Some exception occurred while sending file: ", e)
# file upload to csv
import os
from scripts.config import applications_conf_read
from scripts.config.applications_conf_read import file_mode
from scripts.core.handlers.sender_mqtt.convert_to_json import file_convert_json
from scripts.core.handlers.sender_mqtt.create_message import create_message_one
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.logging.loggers import logger
def extract_uploaded_data(full_path_csv, file_data, db_obj, collection):
try:
# splitting the file to get the extension
filename, file_extension = os.path.splitext(file_data.filename)
# checking if the file is csv
if file_extension == ".csv":
# reading from the file
with open(full_path_csv, file_mode) as file_object:
file_object.write(file_data.file.read())
# converting the data to csv
csv_dict_data = file_convert_json(applications_conf_read.csv_path)
for_list = file_convert_json(applications_conf_read.csv_path)
# convert to list
list_ = create_message_one("insert", for_list)
# inserting to the collection
db_obj.insert_data_many(collection, csv_dict_data)
# extract and send the data
extract_send_data(list_)
else:
logger.error("Invalid File format")
except Exception as e:
logger.error("Some exception occurred while extracting data: ", e)
from pydantic import BaseModel
from pydantic.class_validators import Optional
# model for the insertion data
class User(BaseModel):
id: int
first_name: str
last_name: str
email: str
address: str
# model for updating and getting conditions
class UpdateUser(BaseModel):
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[str] = None
address: Optional[str] = None
# connect to mongodb
from pymongo import MongoClient
from scripts.config import applications_conf_read
from scripts.logging.loggers import logger
def connection_mongo_db():
try:
# connecting to the mongo
mongo_client = MongoClient(applications_conf_read.uri)
# creating the db
db = mongo_client[applications_conf_read.db_name]
# creating the collection
collection = db[applications_conf_read.collection]
return collection
except Exception as e:
logger.exception("Some exception occurred while connecting to mongo: ", e)
id,first_name,last_name,email,Address
1,Maurita,Helks,mhelks0@parallels.com,PO Box 3203
2,Prisca,Baxstar,pbaxstar1@whitehouse.gov,20th Floor
3,Camellia,Gallop,cgallop2@comsenz.com,5th Floor
4,Kingston,Keightley,kkeightley3@rediff.com,Suite 35
5,Suzie,Tredgold,stredgold4@myspace.com,PO Box 76402
6,Ade,Coonihan,acoonihan5@stanford.edu,Apt 376
7,Candida,Kobiela,ckobiela6@jiathis.com,10th Floor
8,Karee,Marty,kmarty7@auda.org.au,Suite 75
9,Granville,Weathey,gweathey8@bloglovin.com,2nd Floor
10,Letty,Brecher,lbrecher9@posterous.com,3rd Floor
11,Tam,Graal,tgraala@dmoz.org,Apt 525
12,Dusty,Causbey,dcausbeyb@drupal.org,Apt 1003
13,Kath,Chattey,kchatteyc@smugmug.com,Apt 1074
14,Rutter,Wattins,rwattinsd@hhs.gov,Room 1920
15,Simonette,Gledstane,sgledstanee@nytimes.com,20th Floor
16,Cassey,Gerrelt,cgerreltf@scribd.com,12th Floor
17,Paige,O'Henery,poheneryg@nature.com,Apt 1111
18,Tabbie,Trever,ttreverh@ask.com,20th Floor
19,Hoebart,Valder,hvalderi@tripod.com,5th Floor
20,Margie,Stollenbecker,mstollenbeckerj@fema.gov,Apt 531
21,Bobby,Beet,bbeetk@odnoklassniki.ru,20th Floor
22,Obie,Le Blanc,oleblancl@bravesites.com,Room 832
23,Alyosha,Ruffli,arufflim@bandcamp.com,Suite 14
24,Syman,Granger,sgrangern@bbb.org,PO Box 19521
25,Slade,Weller,swellero@livejournal.com,Room 1911
26,Robina,Rate,rratep@ask.com,Apt 917
27,Karon,Liveley,kliveleyq@tinypic.com,3rd Floor
28,Patty,McGeouch,pmcgeouchr@paginegialle.it,11th Floor
29,Osborn,Ferriday,oferridays@slashdot.org,Room 1474
30,Ellsworth,Dundin,edundint@wikispaces.com,PO Box 17827
31,Mitch,Stetson,mstetsonu@linkedin.com,Apt 603
32,Hardy,Swansbury,hswansburyv@google.it,Apt 1802
33,Nikolas,Telling,ntellingw@ocn.ne.jp,Room 148
34,Gladi,Florey,gfloreyx@dropbox.com,Suite 55
35,Adela,Overbury,aoverburyy@tinyurl.com,4th Floor
36,Evie,McFfaden,emcffadenz@scribd.com,Suite 88
37,Baldwin,Reaman,breaman10@booking.com,Suite 71
38,Beckie,Leadley,bleadley11@blogs.com,Apt 1592
39,Bendick,Kubes,bkubes12@latimes.com,Room 769
40,Huntlee,Mazzeo,hmazzeo13@columbia.edu,Apt 1564
41,Vally,Dzenisenka,vdzenisenka14@themeforest.net,PO Box 96195
42,Lethia,Bolden,lbolden15@yandex.ru,Suite 80
43,Odille,Chataignier,ochataignier16@slate.com,PO Box 73001
44,Cy,Wescott,cwescott17@reuters.com,17th Floor
45,Darnell,Jays,djays18@yellowbook.com,Room 1722
46,Mikael,Wickwar,mwickwar19@bloglines.com,Room 1780
47,Marguerite,McGahey,mmcgahey1a@1688.com,Room 243
48,Jessie,Gaskill,jgaskill1b@angelfire.com,Suite 98
49,Gardiner,Bestall,gbestall1c@cloudflare.com,17th Floor
50,Jo-ann,Rickett,jrickett1d@live.com,Room 903
51,Gavra,Moquin,gmoquin1e@quantcast.com,Room 1139
52,Dylan,Slewcock,dslewcock1f@trellian.com,Apt 1256
53,Danyette,Coolson,dcoolson1g@biglobe.ne.jp,Apt 452
54,Roderich,Ashworth,rashworth1h@163.com,Apt 1416
55,Hildagarde,Haucke,hhaucke1i@ox.ac.uk,Suite 7
56,Darlleen,Bearsmore,dbearsmore1j@google.com,Suite 43
57,Wilmette,Bedwell,wbedwell1k@weebly.com,Suite 65
58,Neville,Swapp,nswapp1l@ocn.ne.jp,Apt 1054
59,Norene,Kopacek,nkopacek1m@domainmarket.com,Suite 38
60,Aubert,Streeting,astreeting1n@cocolog-nifty.com,Suite 82
61,Katrine,Easson,keasson1o@bizjournals.com,Room 1306
62,Celle,Elgie,celgie1p@sciencedirect.com,16th Floor
63,Alvan,Curtin,acurtin1q@booking.com,16th Floor
64,Cornela,Roder,croder1r@google.pl,2nd Floor
65,Wanids,Dansie,wdansie1s@fc2.com,18th Floor
66,Kip,Fillon,kfillon1t@imgur.com,Suite 82
67,Holmes,Kidstoun,hkidstoun1u@usnews.com,Apt 17
68,Freddy,Featherstone,ffeatherstone1v@parallels.com,13th Floor
69,Gusella,Jodlkowski,gjodlkowski1w@chronoengine.com,Apt 515
70,Marietta,Circuit,mcircuit1x@nydailynews.com,18th Floor
71,Brander,Hackley,bhackley1y@drupal.org,Suite 91
72,Nikola,Hughes,nhughes1z@elegantthemes.com,Suite 34
73,Krisha,Aberkirder,kaberkirder20@blogs.com,Suite 70
74,Shina,Meriott,smeriott21@rakuten.co.jp,18th Floor
75,Debra,Sproat,dsproat22@wikia.com,Apt 1650
76,Dunc,Wallworke,dwallworke23@devhub.com,PO Box 58145
77,Rivalee,Gonnet,rgonnet24@oracle.com,Suite 97
78,Regine,Keling,rkeling25@edublogs.org,Room 1519
79,Ashla,Fair,afair26@ucla.edu,Apt 1213
80,Arabela,Theakston,atheakston27@nsw.gov.au,Apt 1277
81,Barney,Grimsdale,bgrimsdale28@reddit.com,PO Box 144
82,Lucho,Braunlein,lbraunlein29@cisco.com,PO Box 26380
83,Jania,Eldredge,jeldredge2a@bloglovin.com,Suite 3
84,Hector,Remer,hremer2b@va.gov,Suite 51
85,Hans,Cantrell,hcantrell2c@bbb.org,Apt 1834
86,Jacquelynn,Kuhne,jkuhne2d@unc.edu,PO Box 70594
87,Way,Moogan,wmoogan2e@chronoengine.com,PO Box 51805
88,Janenna,Costa,jcosta2f@symantec.com,Room 1115
89,Hermine,Bridywater,hbridywater2g@weibo.com,13th Floor
90,Claudette,Highman,chighman2h@ftc.gov,Room 1359
91,Reinhold,Bromehed,rbromehed2i@cargocollective.com,9th Floor
92,Katleen,Dohmer,kdohmer2j@naver.com,PO Box 73746
93,Kellen,Hull,khull2k@eepurl.com,Apt 1569
94,Chrotoem,Lorincz,clorincz2l@behance.net,PO Box 8905
95,Worth,Putton,wputton2m@qq.com,PO Box 91027
96,Luce,Lyford,llyford2n@nps.gov,Room 739
97,Van,Warre,vwarre2o@zimbio.com,Room 1771
98,Kikelia,Benardette,kbenardette2p@vkontakte.ru,PO Box 74859
99,Fabio,Grigoli,fgrigoli2q@plala.or.jp,Apt 1785
100,Rhodia,Batch,rbatch2r@alibaba.com,Room 1597
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import applications_conf_read
from scripts.config.applications_conf_read import formatter_time, formatter_level
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
# setting the logger level
__logger__.setLevel(logging.INFO)
# creating the format for the log
log_formatter = f'%({formatter_time})s - %({formatter_level})-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
# getting the path for the logger
file_path = applications_conf_read.full_path
# setting the format
formatter = logging.Formatter(log_formatter, time_format)
# creating the folder if not exist
if not os.path.exists(file_path):
os.makedirs(file_path)
# joining the path
log_file = os.path.join(f"{file_path}{applications_conf_read.db_name}.log")
# creating rotating file handler with max byte as 1
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
# setting the formatter
temp_handler.setFormatter(formatter)
# setting the handler
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
# api for convert, read, insert, update and delete
from fastapi import UploadFile, File, APIRouter
from scripts.config import applications_conf_read
from scripts.config.applications_conf_read import mqtt_host, request_no, mqtt_port
from scripts.constants import api_path_config
from scripts.core.handlers.receiver_mqtt.mqtt_receive_data import mqtt_receiver_call
from scripts.core.handlers.sender_mqtt.create_message import create_message_one
from scripts.core.handlers.sender_mqtt.cursor_handle import cursor_to_dict
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.core.handlers.sender_mqtt.uploaded_file_ import extract_uploaded_data
from scripts.databases.models.db_model_mongo import User, UpdateUser
from scripts.databases.mongo.connect_mongo import connection_mongo_db
from scripts.logging.loggers import logger
from scripts.utils.mongo.mongo_utils import MongoDB
# mongo app api
mongo_api = APIRouter()
# receiver app api
receiver_api = APIRouter()
# collections
collection = connection_mongo_db()
# mongo class object
db_obj = MongoDB()
# index page
@mongo_api.get(api_path_config.route_index)
def start_app():
return "Starting the app"
@mongo_api.post(api_path_config.route_insert_csv)
def csv_file(file_data: UploadFile = File(...)):
try:
# extracting and inserting the csv to mongo and sending mqtt messages
extract_uploaded_data(applications_conf_read.csv_path, file_data, db_obj, collection)
except Exception as e:
logger.error("Some exception occurred while running csv insertion: ", e)
else:
return {"msg": "Data added"}
@mongo_api.post(api_path_config.route_insert_one)
def insert_one(user_data: User):
try:
# inserting one data and sending mqtt messages
db_obj.insert_data_one(collection, user_data)
# convert the data to dictionary
dict_ = dict(user_data)
# creating the message for the receiver
list_ = create_message_one("insert_one", [dict_])
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running insertion: ", e)
else:
return {"msg": "Data inserted"}
@mongo_api.post(api_path_config.route_view_all)
def view_data_all():
try:
# fetching data from the mongo and sending messages to mqtt
cursor = db_obj.find_data_all(collection)
# creating the data from the cursor
data_fetch = cursor_to_dict(cursor)
return data_fetch
except Exception as e:
logger.error("Some exception occurred while running view all: ", e)
@mongo_api.post(api_path_config.route_view_one)
def view_data_one(condition: UpdateUser):
try:
# fetching data from the mongo and sending messages to mqtt
cursor = db_obj.find_data_one(collection, condition)
# creating the data from the cursor
data_fetch = cursor_to_dict(cursor)
return data_fetch
except Exception as e:
logger.error("Some exception occurred while running view all: ", e)
@mongo_api.put(api_path_config.route_update)
def update_data(condition: UpdateUser, update: UpdateUser):
try:
# updating data from the mongo and sending messages to mqtt
list_ = db_obj.update_data(collection, condition, update)
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running update: ", e)
else:
return {"msg": "Data Updated"}
@mongo_api.delete(api_path_config.route_delete)
def delete_data(condition: UpdateUser):
try:
list_ = db_obj.delete_data(collection, condition)
# creating the message
extract_send_data(list_)
except Exception as e:
logger.error("Some exception occurred while running delete: ", e)
else:
return {"msg": "Data deleted"}
@receiver_api.post(api_path_config.route_receiver)
def receiver():
try:
# fetching data from the topic
mqtt_receiver_call(mqtt_host, mqtt_port, request_no)
except Exception as e:
logger.error("Exception occurred: ", e)
# database queries
from scripts.config.applications_conf_read import db_table
class DbQueries:
def __init__(self):
# table creation
self.create_table = '''CREATE TABLE IF NOT EXISTS details
(ID INT PRIMARY KEY NOT NULL,
FIRSTNAME TEXT NOT NULL,
LASTNAME TEXT NOT NULL,
EMAIL TEXT NOT NULL,
ADDRESS TEXT NOT NULL);'''
# selection query
@staticmethod
def select_query():
query = "SELECT * FROM "+db_table
return query
# selection query for one
# table select one
@staticmethod
def select_one(str_conditions):
query = "SELECT * FROM " + db_table + " WHERE " + str_conditions
return query
# update query
@staticmethod
def update_query(str_conditions, str_updates):
query = "UPDATE " + db_table + " SET " + str_updates + " WHERE " + str_conditions
return query
# delete query
@staticmethod
def delete_query(str_conditions):
query = "DELETE FROM " + db_table + " WHERE " + str_conditions
return query
# mongo operations
from scripts.core.handlers.sender_mqtt.conditions_to_dictionary import remove_none
from scripts.core.handlers.sender_mqtt.conditions_updates import remove_none_updates
from scripts.core.handlers.sender_mqtt.create_message import create_message_two, create_message_three
from scripts.core.handlers.sender_mqtt.sending_mqtt import extract_send_data
from scripts.logging.loggers import logger
class MongoDB:
@staticmethod
def insert_data_many(collections, csv_dict_data):
try:
# inserting csv data to mongo
collections.insert_many(csv_dict_data)
except Exception as e:
logger.error("Some exception occurred while running insertion of csv: ", e)
@staticmethod
def insert_data_one(collections, user_data):
try:
# inserting single data to mongo
collections.insert_one(dict(user_data))
except Exception as e:
logger.error("Some exception occurred while running insertion: ", e)
@staticmethod
def find_data_all(collections):
try:
# finding all the data from mongo
cursor = collections.find({}, {"_id": 0})
# creating a message for the receiver
list_ = create_message_two("fetch", "", "")
extract_send_data(list_)
return cursor
except Exception as e:
logger.error("Some exception occurred while fetching: ", e)
@staticmethod
def find_data_one(collections, condition):
try:
# making conditions to a form that can be used for fetching
new_conditions = remove_none(condition)
# fetching the data based on the query
cursor = collections.find(new_conditions, {"_id": 0})
# creating a message for the receiver
list_ = create_message_three("fetch_one", new_conditions)
extract_send_data(list_)
return cursor
except Exception as e:
logger.error("Some exception occurred while fetching one : ", e)
@staticmethod
def update_data(collections, condition, update):
try:
# making conditions and updates to a form that can be used for updating
new_conditions, update_ = remove_none_updates(condition, update)
# creating a message for the receiver
list_ = create_message_two("update", new_conditions, update_)
collections.update_one(new_conditions, {"$set": update_})
return list_
except Exception as e:
logger.error("Some exception occurred while updating: ", e)
@staticmethod
def delete_data(collections, condition):
try:
# making conditions to a form that can be used for fetching
new_conditions = remove_none(condition)
# creating a message for the receiver
list_ = create_message_three("delete", new_conditions)
collections.delete_one(new_conditions)
return list_
except Exception as e:
logger.error("Some exception occurred while deleting: ", e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment