Commit 4748db26 by mohammed.shibili

final commit

parent f6b1d2fe
MONGO_URI = mongodb://192.168.0.220:2717
MONGO_USER = admin
MONGO_PASS = iLensDevMongo783
DB_NAME = ilens_configuration
COLLECTION_NAME = tag_hierarchy
HIERARCHY_ID = site_107
PROJECT_ID = project_id
PORT_API = 8000
START_TIME = 2022-11-08 00:00:00.000
END_TIME = 2023-02-07 00:00:00.000
kairos_api = http://192.168.0.220:8080/api/v1/datapoints/query
MONGO_URI = 192.168.0.220:2717
tags = c3
name = ilens.live_data.raw
USER = admin
PASSWORD = iLensDevMongo783
HIERARCHY_ID = site_114
PROJECT_ID = project_099
import pandas as pd
from pymongo import MongoClient
from script.config.appconfig import mongo_uri, mongo_user, mongo_pass, db_name, collection_name
client = MongoClient(mongo_uri,
username=mongo_user,
password=mongo_pass)
database = client[db_name]
collection = database[collection_name]
projected = [
{
'$lookup': {
'from': 'tags',
'localField': 'parameter_id',
'foreignField': 'id',
'as': 'tag_details'
}
}, {
'$unwind': '$tag_details'
}, {
'$project': {
'site_id': '$site_id',
'line_id': '$line_id',
'equipment_id': '$equipment_id',
'site_name': '$site_name',
'line_name': '$line_name',
'equipment_name': '$equipment_name',
'id': '$id',
'tag_name': '$tag_details.tag_name',
'_id': 0
}
}
]
datas = list(collection.aggregate(projected))
df = pd.DataFrame(datas)
print(df)
from script.config.appconfig import mongo_uri, db_name, collection_name, mongo_user, mongo_pass, hierarchy_id
from pymongo import MongoClient
from scripts.services.app_run_service import main_service_run
client = MongoClient(mongo_uri,
username=mongo_user,
password=mongo_pass)
database = client[db_name]
collection = database[collection_name]
class extract_data:
@staticmethod
def fetch_data():
try:
query = {}
hierarchies = hierarchy_id.split('$')
for each_hierarchy in hierarchies:
if "site_" in each_hierarchy:
query["site_id"] = each_hierarchy
elif "line_" in each_hierarchy:
query["line_id"] = each_hierarchy
elif "equipment_" in each_hierarchy:
query["equipment_id"] = each_hierarchy
elif "parameter_" in each_hierarchy:
query["parameter_id"] = each_hierarchy
filter_dict = {"_id": 0, "site_id": 1, "line_id": 1, "equipment_id": 1,
"site_name": 1, "line_name": 1, "equipment_name": 1, "id": 1, "tag_name": 1}
documents = collection.find(query, filter_dict)
for document in documents:
print(document)
except Exception as e:
print(e)
mongo_obj = extract_data()
mongo_obj.fetch_data()
main_service_run()
import os
from dotenv import load_dotenv
load_dotenv()
mongo_uri = os.getenv("MONGO_URI")
mongo_user = os.getenv("MONGO_USER")
mongo_pass = os.getenv("MONGO_PASS")
db_name = os.getenv("DB_NAME")
collection_name = os.getenv("COLLECTION_NAME")
hierarchy_id = os.getenv("HIERARCHY_ID")
project_id = os.getenv("PROJECT_ID")
api_port = os.getenv("PORT_API")
mqtt_broker = os.getenv("MQTT_BROKER")
mqtt_port = os.getenv("MQTT_PORT")
mqtt_time = os.getenv("MQTT_TIME")
import os
from dotenv import load_dotenv
load_dotenv()
from_date = os.getenv("START_TIME")
end_date = os.getenv("END_TIME")
kairos_api = os.getenv("kairos_api")
mongodb_uri = os.getenv("MONGO_URI")
port = os.getenv("PORT")
server_path = os.getenv("SERVER_PATH")
name = os.getenv("name")
project_id = os.getenv("PROJECT_ID")
hierarchy_id = os.getenv("HIERARCHY_ID")
username = os.getenv("USER")
password = os.getenv("PASSWORD")
from datetime import datetime
def time_get_epoch(time_string):
# Convert the timestamp string into a datetime object
timestamp_dt = datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f")
# Calculate the difference between the timestamp and Unix reference time in seconds
epoch = int(timestamp_dt.timestamp()) * 1000
return epoch
import pandas as pd
from scripts.core.handlers.epoch_to_date_time import epoch_date_time
def create_csv(key_, data_):
da = pd.DataFrame(columns=['Timestamp'])
for key, value in data_.items():
dp = pd.DataFrame()
if len(value) != 0:
dp[['Timestamp', key_[key]]] = [[epoch_date_time(time), data] for time, data in value]
da = pd.concat([da, dp])
new_da = da.fillna("NA")
new_da.to_csv("scripts/temp/ilens.csv", index=False)
import json
def key_value_dict(query_data):
resp = json.loads(query_data.text)
temp_json = {}
if resp["queries"][0]["sample_size"] > 0:
for i in range(0, len(resp["queries"][0]["results"])):
c3 = resp["queries"][0]["results"][i]["tags"]["c3"][0]
master_key = c3
if master_key not in temp_json:
temp_json[master_key] = resp["queries"][0]["results"][i]["values"]
else:
temp_json[master_key].extend(resp["queries"][0]["results"][i]["values"])
return temp_json
from datetime import datetime
def epoch_date_time(epoch):
# Convert the datetime object into a timestamp string
timestamp_dt = datetime.fromtimestamp(epoch/1000).strftime("%Y-%m-%d %H:%M:%S")
return timestamp_dt
import json
from scripts.utils.query_kairos import query_kairos_get
def get_query(tag_query, tag_name, start_time, end_time):
query_ = query_kairos_get(tag_query, tag_name, start_time, end_time)
query = json.dumps(query_)
return query
from pymongo import MongoClient
import pandas as pd
from scripts.config.appconfig import mongodb_uri, hierarchy_id, username, password
client = MongoClient(mongodb_uri,
username=username,
password=password)
database = client['ilens_configuration']
collection = database['tag_hierarchy']
class SendData:
@staticmethod
def extract_data():
try:
tag_hierarchy = collection
query = {}
hierarchies = hierarchy_id.split('$')
for each_hierarchy in hierarchies:
if "site_" in each_hierarchy:
query["site_id"] = each_hierarchy
elif "line_" in each_hierarchy:
query["line_id"] = each_hierarchy
elif "equipment_" in each_hierarchy:
query["equipment_id"] = each_hierarchy
pipeline = [
{
"$match": query
},
{
'$lookup': {
'from': 'tags',
'localField': 'parameter_id',
'foreignField': 'id',
'as': 'tag_details'
}
}, {
'$unwind': '$tag_details'
}, {
'$group': {
'_id': '$id',
'first_document': {
'$first': '$$ROOT'
}
}
}, {
'$replaceRoot': {
'newRoot': '$first_document'
}
}, {
'$project': {
'site_id': '$site_id',
'line_id': '$line_id',
'equipment_id': '$equipment_id',
'site_name': '$site_name',
'dept_name': '$dept_name',
'line_name': '$line_name',
'equipment_name': '$equipment_name',
'id': '$id',
'tag_name': '$tag_details.tag_name',
'_id': 0
}
}
]
tag_names = tag_hierarchy.aggregate(pipeline)
data = list(tag_names)
dataset = pd.DataFrame(data)
return dataset
except Exception as e:
print(e)
@staticmethod
def final_dict(dataframe):
try:
final_data = {}
for index, row in dataframe.iterrows():
hierarchy_name = ""
if row["site_name"] != "":
hierarchy_name = row["site_name"]
if row["dept_name"] != "":
hierarchy_name = hierarchy_name + ">" + row["dept_name"]
if row["line_name"] != "":
hierarchy_name = hierarchy_name + ">" + row["line_name"]
if row["equipment_name"] != "":
hierarchy_name = hierarchy_name + ">" + row["equipment_name"]
if row["tag_name"] != "":
hierarchy_name = hierarchy_name + ":" + row["tag_name"]
new_dict = {row["id"]: hierarchy_name}
final_data.update(new_dict)
return final_data
except Exception as e:
print(e)
import requests
from scripts.config import appconfig
from scripts.core.handlers.convert_time_epoch import time_get_epoch
from scripts.core.handlers.create_csv_data import create_csv
from scripts.core.handlers.dictionary_values_tags import key_value_dict
from scripts.core.handlers.query_to_json import get_query
from scripts.core.handlers.send_data import SendData
def main_service_run():
# Define the base URL for the KairosDB API
base_url = appconfig.kairos_api
start_time = time_get_epoch(appconfig.from_date)
end_time = time_get_epoch(appconfig.end_date)
mongo_obj = SendData()
df = mongo_obj.extract_data()
key_ = mongo_obj.final_dict(df)
dict_ = []
for key, value in key_.items():
dict_.append(key)
query_data = get_query(dict_, appconfig.name, start_time, end_time)
# Send the query to the KairosDB API
response = requests.post(
base_url,
data=query_data
)
data_ = key_value_dict(response)
create_csv(key_, data_)
def query_kairos_get(tag_query, tag_name, start_time, end_time):
query = {
"metrics": [
{
"tags": {
"c3":
tag_query
},
"name": f'{tag_name}',
"group_by": [
{
"name": "tag",
"tags": [
"c3"
]
}
],
"aggregators": [
{
"name": "sum",
"sampling": {
"value": "1",
"unit": "milliseconds"
},
"align_sampling": "true"
}
]
}
],
"plugins": [],
"cache_time": 0,
"time_zone": "Asia/Calcutta",
"start_absolute": f'{int(start_time)}',
"end_absolute": f'{int(end_time)}'
}
return query
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment