#!/usr/bin/env python3 import os import time import json import re import requests import redis from flask import Flask, jsonify, request, send_from_directory, Blueprint from dotenv import load_dotenv bp = Blueprint("api_extra", __name__) API_KEY = "f1bd537847b87739d5f27b54c2b38cd3fb8aa6e1d19592e6363878805e47f6cd" API_URL = "https://rai.resensys.cloud:8443/api/v3" SITE_ID_RE = re.compile(r"^[A-Za-z0-9\-_.:]+$") # simple safety gate for keys cr123_types = ["MRT", "STRED", "STRAIN", "4CHSTR", "DISP", "2DHRT", "HUT", "HPA", "WETNESS"] rechargables = ["SNMX", "WLS_S"] load_dotenv() # REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1") REDIS_HOST = os.getenv("REDIS_HOST", "172.31.25.139") REDIS_PORT = int(os.getenv("REDIS_PORT", "6380")) REDIS_DB = int(os.getenv("REDIS_DB", "0")) ONLINE_SECS = 90 * 60 RECENT_SECS = 24 * 3600 app = Flask(__name__, static_folder="static") r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True) # Parse DF_BY_TYPE from the same env var the Lambda uses (optional but convenient) try: DF_BY_TYPE = json.loads(os.getenv("DF_BY_TYPE_JSON", '{"STRED":[10002]}')) DF_BY_TYPE = {k: [int(x) for x in v] for k, v in DF_BY_TYPE.items()} except Exception: DF_BY_TYPE = {"STRED": [10002]} def _hget_int(h, key, default=0): try: return int(h.get(key) or default) except Exception: return default def _ok_site_id(sid: str) -> bool: return bool(sid and SITE_ID_RE.match(sid)) def _meta_key(sid: str) -> str: return f"site:{sid}:meta" def _site_tags_key(sid: str) -> str: return f"site:{sid}:tags" def _tag_sites_key(tag: str) -> str: return f"tag:{tag}:sites" def _derive_site_state(last_ts: int, now_ts: int | None = None) -> str: """ Derive state from last contact time. Online : within last 90 minutes recently offline : older than 90 minutes but within last 24 hours Offline : older than 24 hours, or no timestamp """ now_ts = now_ts or int(time.time()) if not last_ts or last_ts <= 0: return "offline" age = now_ts - last_ts if age < 0: # clock skew / future timestamp age = 0 if age <= ONLINE_SECS: return "online" elif age <= RECENT_SECS: return "recent" else: return "offline" def _safe_json_post(endpoint: str, payload: dict, timeout: int = 10): try: resp = requests.post( f"{API_URL}/{endpoint}", headers={ "X-API-Key": API_KEY, "Content-Type": "application/json" }, json=payload, timeout=timeout, ) except requests.RequestException as e: return None, (jsonify({"ok": False, "error": f"upstream error: {e}"}), 502) if resp.status_code != 200: return None, (jsonify({ "ok": False, "status_code": resp.status_code, "body": resp.text }), 502) try: upstream = resp.json() except ValueError: upstream = {} return upstream, None def _infer_passed(upstream: dict, default: bool = True) -> bool: if isinstance(upstream.get("ok"), bool): return upstream["ok"] status = str(upstream.get("status", "")).lower() if status: return ("ok" in status) or ("pass" in status) or ("good" in status) or ("within" in status) return default @app.get("/") def root(): return send_from_directory("templates", "index.html") @app.get("/api/site-grid") def site_grid(): q = (request.args.get("q") or "").strip().lower() requested_state = (request.args.get("state") or "").strip().lower() port = (request.args.get("port") or "").strip().lower() tag = (request.args.get("tag") or "").strip() limit = min(int(request.args.get("limit", 2000)), 5000) now_ts = int(time.time()) items = [] # Use scan_iter instead of keys for better Redis behavior at scale for k in r.scan_iter("site:*:summary"): parts = k.split(":") if len(parts) < 3: continue sid = parts[1] data = r.hgetall(k) or {} meta = r.hgetall(_meta_key(sid)) or {} site_name = meta.get("name", "") tags = sorted(list(r.smembers(_site_tags_key(sid)))) # Search site_id and site_name if q and q not in sid.lower() and q not in site_name.lower(): continue if port and (data.get("port") or "").lower() != port: continue if tag and tag not in tags: continue last_ts = _hget_int(data, "last_contact_ts", 0) derived_state = _derive_site_state(last_ts, now_ts) # Filter using derived state, not stored redis state if requested_state and derived_state != requested_state: continue age_seconds = None if last_ts <= 0 else max(0, now_ts - last_ts) items.append({ "site_id": sid, "site_name": site_name, "site_tags": tags, "state": derived_state, "port": data.get("port"), "last_contact_ts": last_ts if last_ts > 0 else None, "last_contact_age_sec": age_seconds, "last_contact_age_min": (age_seconds // 60) if age_seconds is not None else None, "num_gateways": _hget_int(data, "num_gateways", 0), "num_devices": _hget_int(data, "num_devices", 0), "num_inactive_24h": _hget_int(data, "num_inactive_24h", 0), }) if len(items) >= limit: break items.sort(key=lambda x: x["last_contact_ts"] or 0, reverse=True) return jsonify(items) @app.get("/api/devices") def devices(): """Optional: list devices for a given site_id.""" sid = request.args.get("site_id") if not sid: return jsonify({"error": "site_id required"}), 400 # Sets are unordered; make a stable order for the UI dids = sorted(r.smembers(f"site:{sid}:devices")) # Fetch all device hashes in one round-trip pipe = r.pipeline() for d in dids: pipe.hgetall(f"device:{d}") devrows = pipe.execute() out = [] for d, row in zip(dids, devrows): lat_h = r.hgetall(f"device:{d}:latest") latest_rssi = lat_h.get("rssi") or "" latest_voltage = lat_h.get("voltage") or "" row = row or {} device_class = "" if row.get("device_type_code") in cr123_types: device_class = "non_recharge" elif row.get("device_type_code") in rechargables: device_class = "recharge" out.append({ "device_id": d, "site_id": row.get("site_id"), "local_address": row.get("local_address"), "last_voltage_ts": int(row.get("last_voltage_ts") or 0), "last_gateway_id": row.get("last_gateway_id"), "device_type": row.get("device_type") or "", "device_class": device_class, "device_type_code": row.get("device_type_code") or "", "rssi": latest_rssi, "voltage": latest_voltage }) return jsonify(out) @app.route("/api/device-cache", methods=["POST"]) def device_cache(): """ Body: {"device_ids": ["AA-BB-CC-DD", ...]} Returns core device info + cached DF values based on device type code. """ data = request.get_json(silent=True) or {} dids = data.get("device_ids") or [] if not isinstance(dids, list) or not dids: return jsonify({"ok": False, "error": "device_ids required"}), 400 devices_out = [] df_map = {} for did in dids: key_dev = f"device:{did}" key_latest = f"device:{did}:latest" dev_h = r.hgetall(key_dev) lat_h = r.hgetall(key_latest) print("lat_h: ", lat_h) dev_type = dev_h.get("device_type") or "" dev_type_code = dev_h.get("device_type_code") or "" site_id = dev_h.get("site_id") or lat_h.get("siteID") or "" local_addr = dev_h.get("local_address") or lat_h.get("localAddress") or "" latest_ts = _hget_int(dev_h, "last_voltage_ts", _hget_int(lat_h, "timestamp", 0)) latest_rssi = lat_h.get("rssi") or "" latest_voltage = lat_h.get("voltage") or "" devices_out.append({ "device_id": did, "site_id": site_id, "device_type": dev_type, "device_type_code": dev_type_code, "local_address": local_addr, "latest_ts": latest_ts, "rssi": latest_rssi, "voltage": latest_voltage }) dfs_cfg = [int(x) for x in DF_BY_TYPE.get(dev_type_code, [])] per_df = {} # 1) Try configured DFs first for df in dfs_cfg: h = r.hgetall(f"device:{did}:df:{df}") if h: per_df[str(df)] = { "value": h.get("value", ""), "optional": h.get("optional", ""), "ts": h.get("ts", ""), "type_code": h.get("type_code", dev_type_code), "cached": True, } # 2) Fall back to any seen DFs if not per_df: seen = sorted( r.smembers(f"device:{did}:dataformats"), key=lambda x: int(x) if str(x).isdigit() else 0 ) for df in seen: key = f"device:{did}:df:{df}" h = r.hgetall(key) per_df[str(df)] = { "value": h.get("value", "") if h else "10", "optional": h.get("optional", "") if h else "10", "ts": h.get("ts", "") if h else "10", "type_code": h.get("type_code", dev_type_code), "cached": bool(h), } df_map[did] = per_df return jsonify({"ok": True, "devices": devices_out, "df_map": df_map}) @app.get("/api/site-meta") def get_site_meta(): sid = (request.args.get("site_id") or "").strip() if not _ok_site_id(sid): return jsonify({"error": "invalid or missing site_id"}), 400 meta = r.hgetall(_meta_key(sid)) or {} tags = sorted(list(r.smembers(_site_tags_key(sid)))) return jsonify({ "site_id": sid, "name": meta.get("name", ""), "tags": tags, }) @app.post("/api/site-meta") def post_site_meta(): try: payload = request.get_json(force=True, silent=False) or {} except Exception: return jsonify({"error": "invalid JSON"}), 400 sid = (payload.get("site_id") or "").strip() name = (payload.get("name") or "").strip() tags = payload.get("tags") or [] print(f"Trying to add tags {tags} for site {sid}.") if not _ok_site_id(sid): return jsonify({"error": "invalid or missing site_id"}), 400 if not isinstance(tags, list) or not all(isinstance(t, str) and t.strip() for t in tags): return jsonify({"error": "tags must be a non-empty list of strings (or empty list)"}), 400 norm_tags = [] seen = set() for t in tags: tt = t.strip() if tt and tt not in seen: seen.add(tt) norm_tags.append(tt) site_meta_key = _meta_key(sid) site_tags_key = _site_tags_key(sid) old_tags = set(r.smembers(site_tags_key)) new_tags = set(norm_tags) to_add = list(new_tags - old_tags) to_remove = list(old_tags - new_tags) pipe = r.pipeline() pipe.hset(site_meta_key, mapping={"name": name}) for t in to_add: pipe.sadd(site_tags_key, t) pipe.sadd(_tag_sites_key(t), sid) for t in to_remove: pipe.srem(site_tags_key, t) pipe.srem(_tag_sites_key(t), sid) pipe.execute() return jsonify({"site_id": sid, "name": name, "tags": norm_tags}) @app.get("/api/site-bundle") def site_bundle(): sid = (request.args.get("site_id") or "").strip() if not sid: return jsonify({"error": "site_id required"}), 400 data = r.hgetall(f"site:{sid}:bundle") or {} out = { "site_id": sid, "firmware_version": data.get("firmware_version", ""), "error_code": data.get("error_code", ""), "iccid": data.get("iccid", ""), "imei": data.get("imei", ""), "iccid_str": data.get("iccid_str", ""), "imei_str": data.get("imei_str", ""), "bundle_updated_at": int(data.get("bundle_updated_at", 0) or 0), } return jsonify(out) @app.route("/internal/check-signal", methods=["POST"]) def check_signal(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("rssi_status", {"did": did}) if err: return err print("RSSI backend response: ", upstream) latest = upstream.get("latest", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = f"RSSI and device signal are GOOD with latest reading being {latest}." else: explain = f"RSSI and device signal are WEAK with latest reading being {latest}." passed = _infer_passed(upstream, default=True) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": {"RSSI": latest} }) @app.route("/internal/check-strain", methods=["POST"]) def check_strain(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("strain_status", {"did": did}) if err: return err print("Strain backend response: ", upstream) latest = upstream.get("latest", "unknown") i2_median = upstream.get("i2_median", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = ( f"Strain reading quality is GOOD with latest raw strain reading being {latest} " f"and infofield2 within range -90 +/- 20 counts at {i2_median}." ) else: explain = ( f"Strain quality is not good and the median infofield2 value is out of range " f"of -90 +/- 20 counts at {i2_median}." ) passed = _infer_passed(upstream, default=True) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": {"Strain": latest, "Infofield2": i2_median} }) @app.route("/internal/check-hpa", methods=["POST"]) def check_hpa(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("hpa_status", {"did": did}) if err: return err print("HPA backend response: ", upstream) latest = upstream.get("latest", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = f"Accelerometer readings are GOOD with latest reading for HPA-Z being {latest}." else: explain = f"Accelerometer readings are POOR with latest reading for HPA-Z being {latest}." try: firmware_version = float(upstream.get("firmware_version", 0) or 0) except Exception: firmware_version = 0.0 try: if firmware_version >= 13: accel_threshold = float(upstream.get("i2", 0) or 0) else: accel_threshold = float(upstream.get("i1", 0) or 0) except Exception: accel_threshold = 0.0 if accel_threshold < 50: explain = "Trigger threshold is too low. Consider raising to at least 180 mg." passed = _infer_passed(upstream, default=True) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": { "HPA-Z (latest)": latest, "Threshold": f"{accel_threshold} mg" } }) @app.route("/internal/check-wls", methods=["POST"]) def check_wls(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("wls_status", {"did": did}) if err: return err print("WLS backend response: ", upstream) latest = upstream.get("latest", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = f"Water level readings are GOOD with latest reading for level being {latest}." else: explain = f"Water level readings are POOR with latest reading for level being {latest}." passed = _infer_passed(upstream, default=True) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": {"Level": latest} }) @app.route("/internal/check-battery", methods=["POST"]) def check_battery(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("battery_status", {"did": did}) if err: return err print("Battery backend response: ", upstream) latest = upstream.get("latest", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = f"Battery is GOOD with latest reading being {latest}." else: explain = f"Battery is LOW with latest reading being {latest}." passed = _infer_passed(upstream, default=False) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": {"Battery Voltage": latest} }) @app.route("/internal/check-charge", methods=["POST"]) def check_charge(): data = request.get_json(silent=True) or {} did = data.get("did") if not did: return jsonify({"ok": False, "error": "did required"}), 400 upstream, err = _safe_json_post("charge_status", {"did": did}) if err: return err print("Charge backend response: ", upstream) max_current = upstream.get("max", "unknown") status = str(upstream.get("status", "")).lower() if status == "good": explain = f"Charging is GOOD with maximum reading being {max_current} mA." else: explain = ( f"Charging is LOW with maximum reading being {max_current} mA. " f"Please try adjusting the solar panel toward south sky." ) passed = _infer_passed(upstream, default=False) return jsonify({ "ok": passed, "raw": upstream, "explain": explain, "metrics": {"Charging Current": f"{max_current} mA"} }) @app.route("/api/device-info", methods=["GET"]) def api_device_info(): """ Internal proxy for retrieving a device's type information. Frontend calls /api/device-info?did=15-03-77-44 This server then POSTs to the upstream Resensys API with the API key. """ did = request.args.get("did") if not did: return jsonify({"ok": False, "error": "Query parameter 'did' is required"}), 400 upstream, err = _safe_json_post("device_type", {"did": did}) if err: return err device_type = upstream.get("device_type") or upstream.get("type") code_name = upstream.get("code_name") print(f"Device {did} is {code_name}.") return jsonify({ "ok": True, "device_id": did, "device_type": device_type, "code_name": code_name, "raw": upstream }) if __name__ == "__main__": # run: python api.py app.run(host="0.0.0.0", port=5556, debug=True)