659 lines
19 KiB
Python
659 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import time
|
|
import json
|
|
import re
|
|
import requests
|
|
import redis
|
|
|
|
from flask import Flask, jsonify, request, send_from_directory, Blueprint
|
|
from dotenv import load_dotenv
|
|
|
|
bp = Blueprint("api_extra", __name__)
|
|
|
|
API_KEY = "f1bd537847b87739d5f27b54c2b38cd3fb8aa6e1d19592e6363878805e47f6cd"
|
|
API_URL = "https://rai.resensys.cloud:8443/api/v3"
|
|
|
|
SITE_ID_RE = re.compile(r"^[A-Za-z0-9\-_.:]+$") # simple safety gate for keys
|
|
|
|
cr123_types = ["MRT", "STRED", "STRAIN", "4CHSTR", "DISP", "2DHRT", "HUT", "HPA", "WETNESS"]
|
|
rechargables = ["SNMX", "WLS_S"]
|
|
|
|
load_dotenv()
|
|
|
|
# REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")
|
|
REDIS_HOST = os.getenv("REDIS_HOST", "172.31.25.139")
|
|
REDIS_PORT = int(os.getenv("REDIS_PORT", "6380"))
|
|
REDIS_DB = int(os.getenv("REDIS_DB", "0"))
|
|
|
|
ONLINE_SECS = 90 * 60
|
|
RECENT_SECS = 24 * 3600
|
|
|
|
app = Flask(__name__, static_folder="static")
|
|
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
|
|
|
|
# Parse DF_BY_TYPE from the same env var the Lambda uses (optional but convenient)
|
|
try:
|
|
DF_BY_TYPE = json.loads(os.getenv("DF_BY_TYPE_JSON", '{"STRED":[10002]}'))
|
|
DF_BY_TYPE = {k: [int(x) for x in v] for k, v in DF_BY_TYPE.items()}
|
|
except Exception:
|
|
DF_BY_TYPE = {"STRED": [10002]}
|
|
|
|
|
|
def _hget_int(h, key, default=0):
|
|
try:
|
|
return int(h.get(key) or default)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _ok_site_id(sid: str) -> bool:
|
|
return bool(sid and SITE_ID_RE.match(sid))
|
|
|
|
|
|
def _meta_key(sid: str) -> str:
|
|
return f"site:{sid}:meta"
|
|
|
|
|
|
def _site_tags_key(sid: str) -> str:
|
|
return f"site:{sid}:tags"
|
|
|
|
|
|
def _tag_sites_key(tag: str) -> str:
|
|
return f"tag:{tag}:sites"
|
|
|
|
|
|
def _derive_site_state(last_ts: int, now_ts: int | None = None) -> str:
|
|
"""
|
|
Derive state from last contact time.
|
|
|
|
Online : within last 90 minutes
|
|
recently offline : older than 90 minutes but within last 24 hours
|
|
Offline : older than 24 hours, or no timestamp
|
|
"""
|
|
now_ts = now_ts or int(time.time())
|
|
|
|
if not last_ts or last_ts <= 0:
|
|
return "offline"
|
|
|
|
age = now_ts - last_ts
|
|
if age < 0:
|
|
# clock skew / future timestamp
|
|
age = 0
|
|
|
|
if age <= ONLINE_SECS:
|
|
return "online"
|
|
elif age <= RECENT_SECS:
|
|
return "recent"
|
|
else:
|
|
return "offline"
|
|
|
|
|
|
def _safe_json_post(endpoint: str, payload: dict, timeout: int = 10):
|
|
try:
|
|
resp = requests.post(
|
|
f"{API_URL}/{endpoint}",
|
|
headers={
|
|
"X-API-Key": API_KEY,
|
|
"Content-Type": "application/json"
|
|
},
|
|
json=payload,
|
|
timeout=timeout,
|
|
)
|
|
except requests.RequestException as e:
|
|
return None, (jsonify({"ok": False, "error": f"upstream error: {e}"}), 502)
|
|
|
|
if resp.status_code != 200:
|
|
return None, (jsonify({
|
|
"ok": False,
|
|
"status_code": resp.status_code,
|
|
"body": resp.text
|
|
}), 502)
|
|
|
|
try:
|
|
upstream = resp.json()
|
|
except ValueError:
|
|
upstream = {}
|
|
|
|
return upstream, None
|
|
|
|
|
|
def _infer_passed(upstream: dict, default: bool = True) -> bool:
|
|
if isinstance(upstream.get("ok"), bool):
|
|
return upstream["ok"]
|
|
|
|
status = str(upstream.get("status", "")).lower()
|
|
if status:
|
|
return ("ok" in status) or ("pass" in status) or ("good" in status) or ("within" in status)
|
|
|
|
return default
|
|
|
|
|
|
@app.get("/")
|
|
def root():
|
|
return send_from_directory("templates", "index.html")
|
|
|
|
|
|
@app.get("/api/site-grid")
|
|
def site_grid():
|
|
q = (request.args.get("q") or "").strip().lower()
|
|
requested_state = (request.args.get("state") or "").strip().lower()
|
|
port = (request.args.get("port") or "").strip().lower()
|
|
tag = (request.args.get("tag") or "").strip()
|
|
limit = min(int(request.args.get("limit", 2000)), 5000)
|
|
|
|
now_ts = int(time.time())
|
|
items = []
|
|
|
|
# Use scan_iter instead of keys for better Redis behavior at scale
|
|
for k in r.scan_iter("site:*:summary"):
|
|
parts = k.split(":")
|
|
if len(parts) < 3:
|
|
continue
|
|
|
|
sid = parts[1]
|
|
data = r.hgetall(k) or {}
|
|
|
|
meta = r.hgetall(_meta_key(sid)) or {}
|
|
site_name = meta.get("name", "")
|
|
tags = sorted(list(r.smembers(_site_tags_key(sid))))
|
|
|
|
# Search site_id and site_name
|
|
if q and q not in sid.lower() and q not in site_name.lower():
|
|
continue
|
|
|
|
if port and (data.get("port") or "").lower() != port:
|
|
continue
|
|
|
|
if tag and tag not in tags:
|
|
continue
|
|
|
|
last_ts = _hget_int(data, "last_contact_ts", 0)
|
|
derived_state = _derive_site_state(last_ts, now_ts)
|
|
|
|
# Filter using derived state, not stored redis state
|
|
if requested_state and derived_state != requested_state:
|
|
continue
|
|
|
|
age_seconds = None if last_ts <= 0 else max(0, now_ts - last_ts)
|
|
|
|
items.append({
|
|
"site_id": sid,
|
|
"site_name": site_name,
|
|
"site_tags": tags,
|
|
"state": derived_state,
|
|
"port": data.get("port"),
|
|
"last_contact_ts": last_ts if last_ts > 0 else None,
|
|
"last_contact_age_sec": age_seconds,
|
|
"last_contact_age_min": (age_seconds // 60) if age_seconds is not None else None,
|
|
"num_gateways": _hget_int(data, "num_gateways", 0),
|
|
"num_devices": _hget_int(data, "num_devices", 0),
|
|
"num_inactive_24h": _hget_int(data, "num_inactive_24h", 0),
|
|
})
|
|
|
|
if len(items) >= limit:
|
|
break
|
|
|
|
items.sort(key=lambda x: x["last_contact_ts"] or 0, reverse=True)
|
|
return jsonify(items)
|
|
|
|
|
|
@app.get("/api/devices")
|
|
def devices():
|
|
"""Optional: list devices for a given site_id."""
|
|
sid = request.args.get("site_id")
|
|
if not sid:
|
|
return jsonify({"error": "site_id required"}), 400
|
|
|
|
# Sets are unordered; make a stable order for the UI
|
|
dids = sorted(r.smembers(f"site:{sid}:devices"))
|
|
|
|
# Fetch all device hashes in one round-trip
|
|
pipe = r.pipeline()
|
|
for d in dids:
|
|
pipe.hgetall(f"device:{d}")
|
|
devrows = pipe.execute()
|
|
|
|
out = []
|
|
for d, row in zip(dids, devrows):
|
|
lat_h = r.hgetall(f"device:{d}:latest")
|
|
latest_rssi = lat_h.get("rssi") or ""
|
|
latest_voltage = lat_h.get("voltage") or ""
|
|
row = row or {}
|
|
|
|
device_class = ""
|
|
if row.get("device_type_code") in cr123_types:
|
|
device_class = "non_recharge"
|
|
elif row.get("device_type_code") in rechargables:
|
|
device_class = "recharge"
|
|
|
|
out.append({
|
|
"device_id": d,
|
|
"site_id": row.get("site_id"),
|
|
"local_address": row.get("local_address"),
|
|
"last_voltage_ts": int(row.get("last_voltage_ts") or 0),
|
|
"last_gateway_id": row.get("last_gateway_id"),
|
|
"device_type": row.get("device_type") or "",
|
|
"device_class": device_class,
|
|
"device_type_code": row.get("device_type_code") or "",
|
|
"rssi": latest_rssi,
|
|
"voltage": latest_voltage
|
|
})
|
|
|
|
return jsonify(out)
|
|
|
|
|
|
@app.route("/api/device-cache", methods=["POST"])
|
|
def device_cache():
|
|
"""
|
|
Body: {"device_ids": ["AA-BB-CC-DD", ...]}
|
|
Returns core device info + cached DF values based on device type code.
|
|
"""
|
|
data = request.get_json(silent=True) or {}
|
|
dids = data.get("device_ids") or []
|
|
if not isinstance(dids, list) or not dids:
|
|
return jsonify({"ok": False, "error": "device_ids required"}), 400
|
|
|
|
devices_out = []
|
|
df_map = {}
|
|
|
|
for did in dids:
|
|
key_dev = f"device:{did}"
|
|
key_latest = f"device:{did}:latest"
|
|
|
|
dev_h = r.hgetall(key_dev)
|
|
lat_h = r.hgetall(key_latest)
|
|
|
|
print("lat_h: ", lat_h)
|
|
|
|
dev_type = dev_h.get("device_type") or ""
|
|
dev_type_code = dev_h.get("device_type_code") or ""
|
|
site_id = dev_h.get("site_id") or lat_h.get("siteID") or ""
|
|
local_addr = dev_h.get("local_address") or lat_h.get("localAddress") or ""
|
|
latest_ts = _hget_int(dev_h, "last_voltage_ts", _hget_int(lat_h, "timestamp", 0))
|
|
|
|
latest_rssi = lat_h.get("rssi") or ""
|
|
latest_voltage = lat_h.get("voltage") or ""
|
|
|
|
devices_out.append({
|
|
"device_id": did,
|
|
"site_id": site_id,
|
|
"device_type": dev_type,
|
|
"device_type_code": dev_type_code,
|
|
"local_address": local_addr,
|
|
"latest_ts": latest_ts,
|
|
"rssi": latest_rssi,
|
|
"voltage": latest_voltage
|
|
})
|
|
|
|
dfs_cfg = [int(x) for x in DF_BY_TYPE.get(dev_type_code, [])]
|
|
per_df = {}
|
|
|
|
# 1) Try configured DFs first
|
|
for df in dfs_cfg:
|
|
h = r.hgetall(f"device:{did}:df:{df}")
|
|
if h:
|
|
per_df[str(df)] = {
|
|
"value": h.get("value", ""),
|
|
"optional": h.get("optional", ""),
|
|
"ts": h.get("ts", ""),
|
|
"type_code": h.get("type_code", dev_type_code),
|
|
"cached": True,
|
|
}
|
|
|
|
# 2) Fall back to any seen DFs
|
|
if not per_df:
|
|
seen = sorted(
|
|
r.smembers(f"device:{did}:dataformats"),
|
|
key=lambda x: int(x) if str(x).isdigit() else 0
|
|
)
|
|
for df in seen:
|
|
key = f"device:{did}:df:{df}"
|
|
h = r.hgetall(key)
|
|
per_df[str(df)] = {
|
|
"value": h.get("value", "") if h else "10",
|
|
"optional": h.get("optional", "") if h else "10",
|
|
"ts": h.get("ts", "") if h else "10",
|
|
"type_code": h.get("type_code", dev_type_code),
|
|
"cached": bool(h),
|
|
}
|
|
|
|
df_map[did] = per_df
|
|
|
|
return jsonify({"ok": True, "devices": devices_out, "df_map": df_map})
|
|
|
|
|
|
@app.get("/api/site-meta")
|
|
def get_site_meta():
|
|
sid = (request.args.get("site_id") or "").strip()
|
|
if not _ok_site_id(sid):
|
|
return jsonify({"error": "invalid or missing site_id"}), 400
|
|
|
|
meta = r.hgetall(_meta_key(sid)) or {}
|
|
tags = sorted(list(r.smembers(_site_tags_key(sid))))
|
|
return jsonify({
|
|
"site_id": sid,
|
|
"name": meta.get("name", ""),
|
|
"tags": tags,
|
|
})
|
|
|
|
|
|
@app.post("/api/site-meta")
|
|
def post_site_meta():
|
|
try:
|
|
payload = request.get_json(force=True, silent=False) or {}
|
|
except Exception:
|
|
return jsonify({"error": "invalid JSON"}), 400
|
|
|
|
sid = (payload.get("site_id") or "").strip()
|
|
name = (payload.get("name") or "").strip()
|
|
tags = payload.get("tags") or []
|
|
|
|
print(f"Trying to add tags {tags} for site {sid}.")
|
|
|
|
if not _ok_site_id(sid):
|
|
return jsonify({"error": "invalid or missing site_id"}), 400
|
|
if not isinstance(tags, list) or not all(isinstance(t, str) and t.strip() for t in tags):
|
|
return jsonify({"error": "tags must be a non-empty list of strings (or empty list)"}), 400
|
|
|
|
norm_tags = []
|
|
seen = set()
|
|
for t in tags:
|
|
tt = t.strip()
|
|
if tt and tt not in seen:
|
|
seen.add(tt)
|
|
norm_tags.append(tt)
|
|
|
|
site_meta_key = _meta_key(sid)
|
|
site_tags_key = _site_tags_key(sid)
|
|
|
|
old_tags = set(r.smembers(site_tags_key))
|
|
new_tags = set(norm_tags)
|
|
|
|
to_add = list(new_tags - old_tags)
|
|
to_remove = list(old_tags - new_tags)
|
|
|
|
pipe = r.pipeline()
|
|
pipe.hset(site_meta_key, mapping={"name": name})
|
|
|
|
for t in to_add:
|
|
pipe.sadd(site_tags_key, t)
|
|
pipe.sadd(_tag_sites_key(t), sid)
|
|
|
|
for t in to_remove:
|
|
pipe.srem(site_tags_key, t)
|
|
pipe.srem(_tag_sites_key(t), sid)
|
|
|
|
pipe.execute()
|
|
|
|
return jsonify({"site_id": sid, "name": name, "tags": norm_tags})
|
|
|
|
|
|
@app.get("/api/site-bundle")
|
|
def site_bundle():
|
|
sid = (request.args.get("site_id") or "").strip()
|
|
if not sid:
|
|
return jsonify({"error": "site_id required"}), 400
|
|
|
|
data = r.hgetall(f"site:{sid}:bundle") or {}
|
|
out = {
|
|
"site_id": sid,
|
|
"firmware_version": data.get("firmware_version", ""),
|
|
"error_code": data.get("error_code", ""),
|
|
"iccid": data.get("iccid", ""),
|
|
"imei": data.get("imei", ""),
|
|
"iccid_str": data.get("iccid_str", ""),
|
|
"imei_str": data.get("imei_str", ""),
|
|
"bundle_updated_at": int(data.get("bundle_updated_at", 0) or 0),
|
|
}
|
|
return jsonify(out)
|
|
|
|
|
|
@app.route("/internal/check-signal", methods=["POST"])
|
|
def check_signal():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("rssi_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("RSSI backend response: ", upstream)
|
|
|
|
latest = upstream.get("latest", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = f"RSSI and device signal are GOOD with latest reading being {latest}."
|
|
else:
|
|
explain = f"RSSI and device signal are WEAK with latest reading being {latest}."
|
|
|
|
passed = _infer_passed(upstream, default=True)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {"RSSI": latest}
|
|
})
|
|
|
|
|
|
@app.route("/internal/check-strain", methods=["POST"])
|
|
def check_strain():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("strain_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("Strain backend response: ", upstream)
|
|
|
|
latest = upstream.get("latest", "unknown")
|
|
i2_median = upstream.get("i2_median", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = (
|
|
f"Strain reading quality is GOOD with latest raw strain reading being {latest} "
|
|
f"and infofield2 within range -90 +/- 20 counts at {i2_median}."
|
|
)
|
|
else:
|
|
explain = (
|
|
f"Strain quality is not good and the median infofield2 value is out of range "
|
|
f"of -90 +/- 20 counts at {i2_median}."
|
|
)
|
|
|
|
passed = _infer_passed(upstream, default=True)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {"Strain": latest, "Infofield2": i2_median}
|
|
})
|
|
|
|
|
|
@app.route("/internal/check-hpa", methods=["POST"])
|
|
def check_hpa():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("hpa_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("HPA backend response: ", upstream)
|
|
|
|
latest = upstream.get("latest", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = f"Accelerometer readings are GOOD with latest reading for HPA-Z being {latest}."
|
|
else:
|
|
explain = f"Accelerometer readings are POOR with latest reading for HPA-Z being {latest}."
|
|
|
|
try:
|
|
firmware_version = float(upstream.get("firmware_version", 0) or 0)
|
|
except Exception:
|
|
firmware_version = 0.0
|
|
|
|
try:
|
|
if firmware_version >= 13:
|
|
accel_threshold = float(upstream.get("i2", 0) or 0)
|
|
else:
|
|
accel_threshold = float(upstream.get("i1", 0) or 0)
|
|
except Exception:
|
|
accel_threshold = 0.0
|
|
|
|
if accel_threshold < 50:
|
|
explain = "Trigger threshold is too low. Consider raising to at least 180 mg."
|
|
|
|
passed = _infer_passed(upstream, default=True)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {
|
|
"HPA-Z (latest)": latest,
|
|
"Threshold": f"{accel_threshold} mg"
|
|
}
|
|
})
|
|
|
|
|
|
@app.route("/internal/check-wls", methods=["POST"])
|
|
def check_wls():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("wls_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("WLS backend response: ", upstream)
|
|
|
|
latest = upstream.get("latest", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = f"Water level readings are GOOD with latest reading for level being {latest}."
|
|
else:
|
|
explain = f"Water level readings are POOR with latest reading for level being {latest}."
|
|
|
|
passed = _infer_passed(upstream, default=True)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {"Level": latest}
|
|
})
|
|
|
|
|
|
@app.route("/internal/check-battery", methods=["POST"])
|
|
def check_battery():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("battery_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("Battery backend response: ", upstream)
|
|
|
|
latest = upstream.get("latest", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = f"Battery is GOOD with latest reading being {latest}."
|
|
else:
|
|
explain = f"Battery is LOW with latest reading being {latest}."
|
|
|
|
passed = _infer_passed(upstream, default=False)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {"Battery Voltage": latest}
|
|
})
|
|
|
|
|
|
@app.route("/internal/check-charge", methods=["POST"])
|
|
def check_charge():
|
|
data = request.get_json(silent=True) or {}
|
|
did = data.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "did required"}), 400
|
|
|
|
upstream, err = _safe_json_post("charge_status", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
print("Charge backend response: ", upstream)
|
|
|
|
max_current = upstream.get("max", "unknown")
|
|
status = str(upstream.get("status", "")).lower()
|
|
|
|
if status == "good":
|
|
explain = f"Charging is GOOD with maximum reading being {max_current} mA."
|
|
else:
|
|
explain = (
|
|
f"Charging is LOW with maximum reading being {max_current} mA. "
|
|
f"Please try adjusting the solar panel toward south sky."
|
|
)
|
|
|
|
passed = _infer_passed(upstream, default=False)
|
|
|
|
return jsonify({
|
|
"ok": passed,
|
|
"raw": upstream,
|
|
"explain": explain,
|
|
"metrics": {"Charging Current": f"{max_current} mA"}
|
|
})
|
|
|
|
|
|
@app.route("/api/device-info", methods=["GET"])
|
|
def api_device_info():
|
|
"""
|
|
Internal proxy for retrieving a device's type information.
|
|
Frontend calls /api/device-info?did=15-03-77-44
|
|
This server then POSTs to the upstream Resensys API with the API key.
|
|
"""
|
|
did = request.args.get("did")
|
|
if not did:
|
|
return jsonify({"ok": False, "error": "Query parameter 'did' is required"}), 400
|
|
|
|
upstream, err = _safe_json_post("device_type", {"did": did})
|
|
if err:
|
|
return err
|
|
|
|
device_type = upstream.get("device_type") or upstream.get("type")
|
|
code_name = upstream.get("code_name")
|
|
|
|
print(f"Device {did} is {code_name}.")
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"device_id": did,
|
|
"device_type": device_type,
|
|
"code_name": code_name,
|
|
"raw": upstream
|
|
})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# run: python api.py
|
|
app.run(host="0.0.0.0", port=5556, debug=True)
|