Improved Update:
#!/usr/bin/env python3
"""
Copyright as of 2025-12-09 Iwan Serrano
m2mfmt_server.py — Local UDP server emitting GPS records in the requested CSV datastructure (STRICT FORMAT)
CSV schema (13 fields):
RouterID,gps_date,gps_time,gps_use,gps_latitude,gps_NS,gps_longitude,gps_EW,gps_speed,gps_degrees,gps_FS,gps_HDOP,gps_MSL
Strict formatting rules (per spec):
- RouterID: (PRODUCT_ID default = Ro287)
- gps_date: yymmdd (e.g., 251205)
- gps_time: hhmmss.s (one decimal)
- gps_use: two-digit, zero-padded satellites used (00..12)
- gps_latitude: ddmm.mmmmmm (six decimals for minutes)
- gps_NS: 'N' or 'S'
- gps_longitude: dddmm.mmmmmm (six decimals for minutes; 3-digit degrees)
- gps_EW: 'E' or 'W'
- gps_speed: one decimal, km/h
- gps_degrees: one decimal, degrees
- gps_FS: single digit fix-status
- gps_HDOP: one decimal
- gps_MSL: one decimal, meters
Source data:
- $GPRMC provides date, time, latitude/longitude (ddmm.mmmm + NS/EW), speed (knots) and course.
- $GPGGA provides fix-status, satellites used, HDOP and MSL altitude.
Setup on RutOS:
- Save as: /root/m2mfmt/m2mfmt_server.py
mkdir -p /root/m2mfmt
vi /root/m2mfmt/m2mfmt_server.py
chmod +x /root/m2mfmt/m2mfmt_server.py
- WebUI → Services → GPS → NMEA forwarding:
Enabled: ON
Host: 127.0.0.1
Protocol: UDP
Port: 8501
Starup script (System/Maintenance/Custom Scripts):
sleep 5
python3 /root/m2mfmt/m2mfmt_server.py >> /root/m2mfmt.log 2>&1 &
exit 0
Update .py file:
- DELETE: rm /root/m2mfmt/m2mfmt_server.py
- CREATE: vi /root/m2mfmt/m2mfmt_server.py
- MAKE IT EXECUTABLE: chmod +x /root/m2mfmt/m2mfmt_server.py
- EXECUTE: python3 /root/m2mfmt/m2mfmt_server.py
- CHECK IF RUNNING: ps | grep m2mfmt_server.py
- STOP SCRIPT: kill <id>
"""
import os
import socket
import time
import threading
import signal
import sys
from typing import Tuple, List
# =========================
# Configuration parameters
# =========================
BIND_ADDR = "0.0.0.0"
BIND_PORT = 144
NMEA_HOST = "127.0.0.1"
NMEA_PORT = 8501
SOCKET_TIMEOUT = 0.10
HB_INTERVAL_S = 5
MAX_PEERS = 16
LOG_EVERY_SEC = 15
# Router/product identification
PRODUCT_ID = "Ro287_" # default RouterID prefix
IFACE_PREFERENCE = ["eth0","br-lan","wan","wwan0","lan0"]
# Framing that matches the user's working capture.
# If your environment uses a different product token, override via env vars.
FRAME_PREFIX = os.environ.get("FRAME_PREFIX", "Ro287_")
FRAME_SUFFIX = os.environ.get("FRAME_SUFFIX", "")
LINE_ENDING = os.environ.get("LINE_ENDING", "\r\n") # set to "\n" if your client expects LF only
# =========================
# Runtime GPS state
# =========================
gps_state = {
"date_yymmdd": "",
"time_hhmmss": "",
"lat_dm": "",
"NS": "",
"lon_dm": "",
"EW": "",
"speed_kmh": 0.0,
"course_deg": 0.0,
"fix": 0,
"sats": 0,
"hdop": None,
"msl": None,
"last_nmea_ts": 0.0,
}
gps_lock = threading.Lock()
# =========================
# Utility functions
# =========================
def normalize_time_hhmmss_one_decimal(raw: str) -> str:
"""Return hhmmss.s (one decimal) from raw NMEA time string."""
if not raw:
return ""
try:
if "." in raw:
base, frac = raw.split(".", 1)
return f"{base}.{frac[:1]}"
else:
return f"{raw}.0"
except Exception:
return ""
def normalize_dm(dm_str: str, is_lat: bool) -> str:
"""Normalize NMEA ddmm.mmmm to ddmm.mmmmmm (six decimals for minutes).
- Lat: 2-digit degrees, Lon: 3-digit degrees
"""
if not dm_str:
return ""
try:
dm = float(dm_str)
deg = int(dm // 100)
mins = dm - deg * 100
minutes_str = f"{mins:.6f}"
if is_lat:
return f"{deg:02d}{minutes_str}"
else:
return f"{deg:03d}{minutes_str}"
except Exception:
return ""
def pick_iface_mac_last8() -> str:
"""Pick a reasonable interface MAC and return its last 8 hex characters (no colons)."""
candidates = []
for name in IFACE_PREFERENCE:
p = f"/sys/class/net/{name}/address"
if os.path.exists(p):
candidates.append(p)
for name in os.listdir("/sys/class/net"):
if name == "lo":
continue
p = f"/sys/class/net/{name}/address"
if os.path.exists(p) and p not in candidates:
candidates.append(p)
mac_hex = "00000000"
for p in candidates:
try:
with open(p, "r") as f:
mac = f.read().strip()
mac_compact = mac.replace(":", "").lower()
if len(mac_compact) >= 8:
mac_hex = mac_compact[-8:]
break
except Exception:
continue
return mac_hex
def make_router_id() -> str:
return f"R{pick_iface_mac_last8()}"
# Cached RouterID computed at startup
ROUTER_ID = None
# =========================
# NMEA parsing
# =========================
def parse_gprmc(line: str) -> None:
"""$GPRMC,hhmmss.sss,A,lat_dm,NS,lon_dm,EW,spd_knots,course,ddmmyy,...*CS"""
try:
if not line.startswith("$") or "RMC" not in line:
return
body = line.split("*")[0]
f = body.split(",")
time_str = f[1] if len(f) > 1 else ""
status_A = (len(f) > 2 and f[2] == "A")
lat_dm = f[3] if len(f) > 3 else ""
NS = f[4] if len(f) > 4 else ""
lon_dm = f[5] if len(f) > 5 else ""
EW = f[6] if len(f) > 6 else ""
spd_knots = float(f[7]) if len(f) > 7 and f[7] else 0.0
course = float(f[8]) if len(f) > 8 and f[8] else 0.0
date_ddmmyy = f[9] if len(f) > 9 else ""
hhmmss = normalize_time_hhmmss_one_decimal(time_str)
yymmdd = ""
if date_ddmmyy and len(date_ddmmyy) == 6:
dd = date_ddmmyy[0:2]
mm = date_ddmmyy[2:4]
yy = date_ddmmyy[4:6]
yymmdd = f"{yy}{mm}{dd}"
with gps_lock:
if hhmmss:
gps_state["time_hhmmss"] = hhmmss
if yymmdd:
gps_state["date_yymmdd"] = yymmdd
if lat_dm:
gps_state["lat_dm"] = lat_dm
if NS:
gps_state["NS"] = NS
if lon_dm:
gps_state["lon_dm"] = lon_dm
if EW:
gps_state["EW"] = EW
gps_state["speed_kmh"] = spd_knots * 1.852
gps_state["course_deg"] = course
if status_A and gps_state["fix"] == 0:
gps_state["fix"] = 1
gps_state["last_nmea_ts"] = time.time()
except Exception:
pass
def parse_gpgga(line: str) -> None:
"""$GPGGA,hhmmss,lat,N,lon,E,fix,sats,hdop,alt,...*CS"""
try:
if not line.startswith("$") or "GGA" not in line:
return
body = line.split("*")[0]
f = body.split(",")
fix = int(f[6]) if len(f) > 6 and f[6] else 0
sats = int(f[7]) if len(f) > 7 and f[7] else 0
hdop = float(f[8]) if len(f) > 8 and f[8] else None
alt_m = float(f[9]) if len(f) > 9 and f[9] else None
lat_dm = f[2] if len(f) > 2 else ""
NS = f[3] if len(f) > 3 else ""
lon_dm = f[4] if len(f) > 4 else ""
EW = f[5] if len(f) > 5 else ""
with gps_lock:
gps_state["fix"] = fix
gps_state["sats"] = sats
gps_state["hdop"] = hdop
gps_state["msl"] = alt_m
if lat_dm:
gps_state["lat_dm"] = lat_dm
if NS:
gps_state["NS"] = NS
if lon_dm:
gps_state["lon_dm"] = lon_dm
if EW:
gps_state["EW"] = EW
gps_state["last_nmea_ts"] = time.time()
except Exception:
pass
def nmea_listener(stop_event: threading.Event) -> None:
print(f"[NMEA] Binding listener on {NMEA_HOST}:{NMEA_PORT}")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.bind((NMEA_HOST, NMEA_PORT))
s.settimeout(1.0)
print(f"[NMEA] Listener bound OK on {NMEA_HOST}:{NMEA_PORT}")
except Exception as e:
print(f"[NMEA] Bind failed ({NMEA_HOST}:{NMEA_PORT}): {e}")
print("[NMEA] Server will run with zeroed GPS until forwarding is fixed.")
return
while not stop_event.is_set():
try:
data, _ = s.recvfrom(4096)
text = data.decode(errors="ignore")
for line in text.splitlines():
line = line.strip()
if not line:
continue
if "RMC" in line:
parse_gprmc(line)
elif "GGA" in line:
parse_gpgga(line)
except socket.timeout:
continue
except Exception:
continue
try:
s.close()
except Exception:
pass
print("[NMEA] Listener stopped")
# =========================
# CSV encoder (strict per spec)
# =========================
def build_csv_line() -> str:
"""Return one CSV line with 13 fields strictly formatted per GPSdatastructure."""
with gps_lock:
lat_dm_raw = gps_state["lat_dm"]
lon_dm_raw = gps_state["lon_dm"]
lat_dm = normalize_dm(lat_dm_raw, is_lat=True) if lat_dm_raw else ""
lon_dm = normalize_dm(lon_dm_raw, is_lat=False) if lon_dm_raw else ""
NS = gps_state["NS"]
EW = gps_state["EW"]
date_yymmdd = gps_state["date_yymmdd"]
time_hhmmss = gps_state["time_hhmmss"]
sats = gps_state["sats"]
spd = gps_state["speed_kmh"]
crs = gps_state["course_deg"]
fix = gps_state["fix"]
hdop = gps_state["hdop"]
msl = gps_state["msl"]
router_id = ROUTER_ID or make_router_id()
gps_use = f"{sats:02d}"
gps_speed = f"{spd:.1f}"
gps_degrees = f"{crs:.1f}"
gps_FS = f"{fix:d}"
gps_HDOP = f"{hdop:.1f}" if hdop is not None else ""
gps_MSL = f"{msl:.1f}" if msl is not None else ""
fields = [
router_id,
date_yymmdd or "",
time_hhmmss or "",
gps_use,
lat_dm or "",
NS or "",
lon_dm or "",
EW or "",
gps_speed,
gps_degrees,
gps_FS,
gps_HDOP,
gps_MSL,
]
return ",".join(fields)
# Build framed bytes to send in one UDP datagram
def build_framed_bytes() -> bytes:
payload = build_csv_line().translate({ord("\r"): None, ord("\n"): None})
frame = f"{FRAME_PREFIX}{payload}{FRAME_SUFFIX}{LINE_ENDING}"
b = frame.encode("ascii", errors="ignore")
if not b.startswith(b"Ro287_"):
b = b.lstrip(b"\r\n\t \x00")
if not b.startswith(b"Ro287_"):
b = b"Ro287_" + b
return b
# =========================
# UDP server (request/response + HB)
# =========================
def udp_server(stop_event: threading.Event) -> None:
print(f"[SRV] Binding UDP server on {BIND_ADDR}:{BIND_PORT}")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind((BIND_ADDR, BIND_PORT))
s.setblocking(False)
print(f"[SRV] Server bound OK on {BIND_ADDR}:{BIND_PORT}")
except Exception as e:
print(f"[SRV] Bind failed ({BIND_ADDR}:{BIND_PORT}): {e}")
print("[SRV] Tip: switch BIND_PORT or free the port, then restart.")
return
recent_peers: List[Tuple[str, int]] = []
last_hb = 0.0
last_log = 0.0
while not stop_event.is_set():
try:
data, peer = s.recvfrom(2048)
if peer not in recent_peers:
recent_peers.append(peer)
if len(recent_peers) > MAX_PEERS:
recent_peers = recent_peers[-MAX_PEERS:]
# Send one framed line per request
s.sendto(build_framed_bytes(), peer)
except BlockingIOError:
pass
except Exception:
pass
now = time.time()
# Heartbeat: framed line, same as request reply
if HB_INTERVAL_S > 0 and recent_peers and (now - last_hb >= HB_INTERVAL_S):
framed = build_framed_bytes()
for p in list(recent_peers):
try:
s.sendto(framed, p)
except Exception:
try:
recent_peers.remove(p)
except Exception:
pass
last_hb = now
if LOG_EVERY_SEC > 0 and (now - last_log >= LOG_EVERY_SEC):
with gps_lock:
print(
f"[SRV] RID={ROUTER_ID or ''} "
f"peers={len(recent_peers)} "
f"date={gps_state['date_yymmdd'] or ''} time={gps_state['time_hhmmss'] or ''} "
f"lat_dm={normalize_dm(gps_state['lat_dm'], True) or ''} {gps_state['NS'] or ''} "
f"lon_dm={normalize_dm(gps_state['lon_dm'], False) or ''} {gps_state['EW'] or ''} "
f"spd={gps_state['speed_kmh']:.1f}km/h crs={gps_state['course_deg']:.1f} "
f"fix={gps_state['fix']} sats={gps_state['sats']:02d} hdop={gps_state['hdop']} msl={gps_state['msl']}"
)
last_log = now
time.sleep(SOCKET_TIMEOUT)
try:
s.close()
except Exception:
pass
print("[SRV] Server stopped")
# =========================
# Main & graceful shutdown
# =========================
def main() -> None:
stop_event = threading.Event()
# Compute RouterID once at startup
global ROUTER_ID
ROUTER_ID = make_router_id()
print(f"[SYS] RouterID={ROUTER_ID}")
def _sig_handler(sig, _frm):
print(f"[SYS] Signal {sig} received; stopping...")
stop_event.set()
signal.signal(signal.SIGTERM, _sig_handler)
signal.signal(signal.SIGINT, _sig_handler)
# Start NMEA listener first
t_nmea = threading.Thread(target=nmea_listener, args=(stop_event,), daemon=True)
t_nmea.start()
# Initial placeholder console line (before NMEA data is in)
with gps_lock:
print("[SRV] peers=0 date= time= lat_dm= lon_dm= spd=0.0km/h crs=0.0 fix=0 sats=00 hdop=None msl=None")
# Show a sample of how the reply line will look once data flows (uses current state)
try:
sample = build_csv_line()
print("With real data flowing, your replies will look like:")
print(f"{FRAME_PREFIX}{sample}{FRAME_SUFFIX}")
except Exception:
pass
# Server loop
udp_server(stop_event)
stop_event.set()
t_nmea.join(timeout=2.0)
print("[SYS] Exit complete")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"[SYS] Fatal error: {e}")
sys.exit(1)
Enjoy guys 