Commit d1c15cbd by arun.uday

commit5

parent ef07dc7b
import uvicorn
from scripts.config.application_config import uvicorn_host, uvicorn_port, uvicorn_app
from scripts.config.application_config import uvicorn_host, uvicorn_port
from scripts.logging.loggers import logger
# starting the application
if __name__ == "__main__":
print("MQTT task")
uvicorn.run(uvicorn_app, host=uvicorn_host, port=int(uvicorn_port))
try:
print("MQTT task receiver")
uvicorn.run("main:app", host=uvicorn_host, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
......@@ -22,4 +22,7 @@ db_name = user_details.db
db_table = details
[encode]
encode = utf-8
\ No newline at end of file
encode = utf-8
[api_routes]
api_index = /index/
\ No newline at end of file
import uvicorn
from fastapi import FastAPI
from scripts.config.application_config import uvicorn_host, uvicorn_port, uvicorn_app
from scripts.logging.loggers import logger
from scripts.services.receiver_app_services_run import mqtt_receive
app = FastAPI()
app.include_router(mqtt_receive)
# starting the application
if __name__ == "__main__":
try:
print("MQTT task receiver")
uvicorn.run(app, host=uvicorn_host, port=int(uvicorn_port))
except Exception as e:
logger.error("Interruption occurred: ", e)
import configparser
config = configparser.RawConfigParser()
config.read("conf/applications.conf")
# path
api_index = config.get("api_routes", "api_index")
from scripts.config.application_config import topic_name
from scripts.logging.loggers import logger
def connect_mqtt(client_name, userdata, flags, rc):
try:
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
client_name.subscribe(topic_name)
except Exception as e:
logger.error("Exception occurred while connecting to mqtt: ", e)
import pandas as pd
from scripts.core.handlers.display_data import display_fetched_data
from scripts.core.handlers.operations_db import SqliteOperations
from scripts.logging.loggers import logger
def json_to_dictionary(json_data):
try:
data_list = list(json_data.values())
# list to dataframe
frames = pd.DataFrame(data_list, index=None)
obj_db = SqliteOperations()
# inserting the data to table
obj_db.insert_db(frames)
# fetching the data
data_fetched = obj_db.fetch_data()
display_fetched_data(data_fetched)
except Exception as e:
logger.error("Exception occurred while converting to data frame: ", e)
from scripts.logging.loggers import logger
def display_fetched_data(cursor):
# Fetch all the rows as a list of tuples
rows = cursor.fetchall()
try:
# Fetch all the rows as a list of tuples
rows = cursor.fetchall()
# Iterate through the rows and print the data
for row in rows:
print(row)
# Iterate through the rows and print the data
for row in rows:
print(row)
except Exception as e:
logger.error("Exception occurred while fetching the data from table: ", e)
import json
from scripts.config.application_config import utf_encode
from scripts.core.handlers.convert_to_dictionary import json_to_dictionary
from scripts.logging.loggers import logger
def message_mqtt(client_name, userdata, msg):
try:
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
json_data = json.loads(payload_decoded)
# decoded msg to list
json_to_dictionary(json_data)
except Exception as e:
logger.error("Exception occurred while decoding the message: ", e)
from scripts.core.handlers.connection_mqtt_receiver import connect_mqtt
from scripts.core.handlers.message_mqtt_receive import message_mqtt
from scripts.logging.loggers import logger
def mqtt_receiver_call(mqtt, mqtt_host, port, request_no):
try:
client = mqtt.Client()
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = connect_mqtt
client.on_message = message_mqtt
client.loop_forever()
except Exception as e:
logger.error("Exception occurred while calling the mqtt: ", e)
import sqlite3
from scripts.config.application_config import db_name, db_table, full_path
from scripts.logging.loggers import logger
from scripts.utilis.db_queries import DbQueries
......@@ -15,7 +16,7 @@ class SqliteOperations:
self.conn.execute(DbQueries().create_table)
self.conn.close()
except Exception as e:
print(e)
logger.error("Exception occurred while creating table: ", e)
def insert_db(self, frames):
try:
......@@ -23,7 +24,7 @@ class SqliteOperations:
frames.to_sql(db_table, self.conn, if_exists='replace')
self.conn.commit()
except Exception as e:
print(e)
logger.error("Exception occurred while inserting data: ", e)
def fetch_data(self):
try:
......@@ -34,4 +35,4 @@ class SqliteOperations:
cursor.execute(DbQueries().select)
return cursor
except Exception as e:
print(e)
logger.error("Exception occurred while fetching data: ", e)
import logging
import os
from logging.handlers import RotatingFileHandler
from scripts.config import application_config
def get_logger():
"""
Creates a rotating log
"""
__logger__ = logging.getLogger('')
__logger__.setLevel(logging.INFO)
log_formatter = '%(asctime)s - %(levelname)-6s - %(message)s'
time_format = "%Y-%m-%d %H:%M:%S"
file_path = application_config.full_path
formatter = logging.Formatter(log_formatter, time_format)
if not os.path.exists(file_path):
os.makedirs(file_path)
log_file = os.path.join(f"{file_path}{application_config.topic_name}.log")
temp_handler = RotatingFileHandler(log_file, maxBytes=1)
temp_handler.setFormatter(formatter)
__logger__.addHandler(temp_handler)
return __logger__
logger = get_logger()
import paho.mqtt.client as mqtt
from fastapi import FastAPI
import json
import pandas as pd
from fastapi import APIRouter
from scripts.config.application_config import topic_name, mqtt_host, port, request_no, utf_encode
from scripts.core.handlers.display_data import display_fetched_data
from scripts.core.handlers.operations_db import SqliteOperations
from scripts.config.api_routes_config import api_index
from scripts.config.application_config import mqtt_host, port, request_no
from scripts.core.handlers.mqtt_receive_data import mqtt_receiver_call
from scripts.logging.loggers import logger
# This is the Subscriber
app = FastAPI()
mqtt_receive = APIRouter()
@app.get("/")
@mqtt_receive.get(api_index)
def receiver():
try:
def on_connect(client_name, userdata, flags, rc):
print(userdata, " ", flags)
print("Connected with result code " + str(rc))
client_name.subscribe(topic_name)
def on_message(client_name, userdata, msg):
print(client_name, " ", userdata)
# decoding the msg to string
payload_decoded = msg.payload.decode(utf_encode)
json_data = json.loads(payload_decoded)
# decoded msg to list
data_list = list(json_data.values())
# list to dataframe
frames = pd.DataFrame(data_list, index=None)
obj_db = SqliteOperations()
# inserting the data to table
obj_db.insert_db(frames)
# fetching the data
data_fetched = obj_db.fetch_data()
display_fetched_data(data_fetched)
client = mqtt.Client()
client.connect(mqtt_host, int(port), int(request_no))
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
mqtt_receiver_call(mqtt, mqtt_host, port, request_no)
except Exception as e:
print(e)
logger.error("Exception occurred: ", e)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment