forked from NomaDamas/k-skill
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfine_dust.py
More file actions
executable file
·542 lines (448 loc) · 17.5 KB
/
Copy pathfine_dust.py
File metadata and controls
executable file
·542 lines (448 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import pathlib
import sys
import urllib.error
import urllib.parse
import urllib.request
from math import atan2, cos, radians, sin, sqrt, tan
STATION_SERVICE_URL = "http://apis.data.go.kr/B552584/MsrstnInfoInqireSvc"
MEASUREMENT_SERVICE_URL = "http://apis.data.go.kr/B552584/ArpltnInforInqireSvc"
SECRET_NAME = "AIR_KOREA_OPEN_API_KEY"
PROXY_BASE_URL_NAME = "KSKILL_PROXY_BASE_URL"
DEFAULT_PROXY_BASE_URL = "https://k-skill-proxy.nomadamas.org"
WGS84_A = 6378137.0
WGS84_F = 1 / 298.257223563
BESSEL_A = 6377397.155
BESSEL_F = 1 / 299.1528128
AIR_KOREA_TM_LAT0 = radians(38.0)
AIR_KOREA_TM_LON0 = radians(127.0)
AIR_KOREA_TM_FALSE_EASTING = 200000.0
AIR_KOREA_TM_FALSE_NORTHING = 500000.0
AIR_KOREA_TM_SCALE = 1.0
AIR_KOREA_WGS84_TO_BESSEL = (146.43, -507.89, -681.46)
GRADE_LABELS = {
"1": "좋음",
"2": "보통",
"3": "나쁨",
"4": "매우나쁨",
}
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Summarize Air Korea PM10/PM2.5 data from location or fallback hints.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
report = subparsers.add_parser("report", help="build a PM10/PM2.5 report")
report.add_argument("--lat", type=float, help="WGS84 latitude")
report.add_argument("--lon", type=float, help="WGS84 longitude")
report.add_argument("--region-hint", help="fallback region/administrative-area hint")
report.add_argument("--station-name", help="explicit station name fallback")
report.add_argument("--station-file", help="offline station JSON fixture")
report.add_argument("--measurement-file", help="offline measurement JSON fixture")
report.add_argument("--json", action="store_true", help="print JSON instead of text")
return parser.parse_args(argv)
def load_json_file(path: str | os.PathLike[str]) -> dict:
return json.loads(pathlib.Path(path).read_text(encoding="utf-8"))
def extract_items(payload: dict | list) -> list[dict]:
if isinstance(payload, list):
return payload
response = payload.get("response", {})
body = response.get("body", {})
items = body.get("items", [])
if isinstance(items, dict):
return [items]
if isinstance(items, list):
return items
return []
def to_float(raw: object) -> float | None:
if raw in (None, "", "-"):
return None
try:
return float(str(raw))
except ValueError:
return None
def squared_distance(lat_a: float, lon_a: float, lat_b: float, lon_b: float) -> float:
return (lat_a - lat_b) ** 2 + (lon_a - lon_b) ** 2
def meridional_arc(phi: float, *, semi_major_axis: float, eccentricity_squared: float) -> float:
e2 = eccentricity_squared
return semi_major_axis * (
(1 - e2 / 4 - 3 * e2**2 / 64 - 5 * e2**3 / 256) * phi
- (3 * e2 / 8 + 3 * e2**2 / 32 + 45 * e2**3 / 1024) * sin(2 * phi)
+ (15 * e2**2 / 256 + 45 * e2**3 / 1024) * sin(4 * phi)
- (35 * e2**3 / 3072) * sin(6 * phi)
)
def wgs84_to_bessel(lat: float, lon: float) -> tuple[float, float]:
dx, dy, dz = AIR_KOREA_WGS84_TO_BESSEL
source_e2 = 2 * WGS84_F - WGS84_F**2
target_e2 = 2 * BESSEL_F - BESSEL_F**2
lat_rad = radians(lat)
lon_rad = radians(lon)
sin_lat = sin(lat_rad)
cos_lat = cos(lat_rad)
prime_vertical_radius = WGS84_A / sqrt(1 - source_e2 * sin_lat * sin_lat)
x = prime_vertical_radius * cos_lat * cos(lon_rad) + dx
y = prime_vertical_radius * cos_lat * sin(lon_rad) + dy
z = prime_vertical_radius * (1 - source_e2) * sin_lat + dz
lon_bessel = atan2(y, x)
horizontal = sqrt(x * x + y * y)
lat_bessel = atan2(z, horizontal * (1 - target_e2))
for _ in range(8):
sin_lat_bessel = sin(lat_bessel)
bessel_radius = BESSEL_A / sqrt(1 - target_e2 * sin_lat_bessel * sin_lat_bessel)
next_lat = atan2(z + target_e2 * bessel_radius * sin_lat_bessel, horizontal)
if abs(next_lat - lat_bessel) < 1e-14:
lat_bessel = next_lat
break
lat_bessel = next_lat
return lat_bessel, lon_bessel
def wgs84_to_air_korea_tm(lat: float, lon: float) -> tuple[float, float]:
lat_rad, lon_rad = wgs84_to_bessel(lat, lon)
bessel_e2 = 2 * BESSEL_F - BESSEL_F**2
second_eccentricity_squared = bessel_e2 / (1 - bessel_e2)
sin_lat = sin(lat_rad)
cos_lat = cos(lat_rad)
tan_lat = tan(lat_rad)
prime_vertical_radius = BESSEL_A / sqrt(1 - bessel_e2 * sin_lat * sin_lat)
tan_squared = tan_lat * tan_lat
curvature = second_eccentricity_squared * cos_lat * cos_lat
A = (lon_rad - AIR_KOREA_TM_LON0) * cos_lat
meridional = meridional_arc(lat_rad, semi_major_axis=BESSEL_A, eccentricity_squared=bessel_e2)
meridional_origin = meridional_arc(
AIR_KOREA_TM_LAT0,
semi_major_axis=BESSEL_A,
eccentricity_squared=bessel_e2,
)
tm_x = AIR_KOREA_TM_FALSE_EASTING + AIR_KOREA_TM_SCALE * prime_vertical_radius * (
A
+ (1 - tan_squared + curvature) * A**3 / 6
+ (5 - 18 * tan_squared + tan_squared**2 + 72 * curvature - 58 * second_eccentricity_squared) * A**5 / 120
)
tm_y = AIR_KOREA_TM_FALSE_NORTHING + AIR_KOREA_TM_SCALE * (
meridional
- meridional_origin
+ prime_vertical_radius
* tan_lat
* (
A**2 / 2
+ (5 - tan_squared + 9 * curvature + 4 * curvature**2) * A**4 / 24
+ (61 - 58 * tan_squared + tan_squared**2 + 600 * curvature - 330 * second_eccentricity_squared)
* A**6
/ 720
)
)
return tm_x, tm_y
def pick_station(
station_items: list[dict],
*,
lat: float | None = None,
lon: float | None = None,
region_hint: str | None = None,
station_name: str | None = None,
) -> dict:
if not station_items:
raise SystemExit("측정소 후보가 없습니다.")
if station_name:
exact_match = next((item for item in station_items if item.get("stationName") == station_name), None)
if exact_match:
return exact_match
partial_match = next(
(
item
for item in station_items
if station_name in str(item.get("stationName", "")) or station_name in str(item.get("addr", ""))
),
None,
)
if partial_match:
return partial_match
if lat is not None and lon is not None:
candidates = []
for item in station_items:
item_lat = to_float(item.get("dmX"))
item_lon = to_float(item.get("dmY"))
if item_lat is None or item_lon is None:
continue
candidates.append((squared_distance(lat, lon, item_lat, item_lon), item))
if candidates:
candidates.sort(key=lambda pair: pair[0])
return candidates[0][1]
if region_hint:
tokens = sorted({token for token in region_hint.split() if token}, key=len, reverse=True)
for token in tokens:
station_name_match = next(
(item for item in station_items if token in str(item.get("stationName", ""))),
None,
)
if station_name_match:
return station_name_match
address_match = next(
(item for item in station_items if token in str(item.get("addr", ""))),
None,
)
if address_match:
return address_match
return station_items[0]
def resolve_station(
station_items: list[dict],
*,
lat: float | None = None,
lon: float | None = None,
region_hint: str | None = None,
station_name: str | None = None,
) -> dict:
if station_items:
return pick_station(
station_items,
lat=lat,
lon=lon,
region_hint=region_hint,
station_name=station_name,
)
if station_name:
return {"stationName": station_name, "addr": None}
raise SystemExit("측정소 후보가 없습니다.")
def find_measurement(measurement_items: list[dict], station_name: str) -> dict:
exact_match = next((item for item in measurement_items if item.get("stationName") == station_name), None)
if exact_match:
return exact_match
partial_match = next(
(item for item in measurement_items if station_name in str(item.get("stationName", ""))),
None,
)
if partial_match:
return partial_match
raise SystemExit(f"측정값 응답에서 측정소 '{station_name}' 를 찾지 못했습니다.")
def grade_to_label(raw_grade: object, *, pollutant: str, value: object) -> str:
raw_text = str(raw_grade) if raw_grade not in (None, "") else ""
if raw_text in GRADE_LABELS:
return GRADE_LABELS[raw_text]
numeric_value = to_float(value)
if numeric_value is None:
return "정보없음"
thresholds = {
"pm10": [(30, "좋음"), (80, "보통"), (150, "나쁨")],
"pm25": [(15, "좋음"), (35, "보통"), (75, "나쁨")],
}[pollutant]
for threshold, label in thresholds:
if numeric_value <= threshold:
return label
return "매우나쁨"
def build_report(
*,
station_items: list[dict],
measurement_items: list[dict],
lat: float | None = None,
lon: float | None = None,
region_hint: str | None = None,
station_name: str | None = None,
lookup_mode: str | None = None,
selected_station: dict | None = None,
) -> dict:
station = selected_station or resolve_station(
station_items,
lat=lat,
lon=lon,
region_hint=region_hint,
station_name=station_name,
)
measurement = find_measurement(measurement_items, station["stationName"])
resolved_lookup_mode = lookup_mode or ("coordinates" if lat is not None and lon is not None else "fallback")
return {
"station_name": station["stationName"],
"station_address": station.get("addr"),
"lookup_mode": resolved_lookup_mode,
"measured_at": measurement.get("dataTime"),
"pm10": {
"value": str(measurement.get("pm10Value", "-")),
"grade": grade_to_label(
measurement.get("pm10Grade"),
pollutant="pm10",
value=measurement.get("pm10Value"),
),
},
"pm25": {
"value": str(measurement.get("pm25Value", "-")),
"grade": grade_to_label(
measurement.get("pm25Grade"),
pollutant="pm25",
value=measurement.get("pm25Value"),
),
},
"khai_grade": "정보없음"
if measurement.get("khaiGrade") in (None, "")
else grade_to_label(
measurement.get("khaiGrade"),
pollutant="pm10",
value=measurement.get("pm10Value"),
),
}
def build_missing_secret_message() -> str:
return (
f"이 작업에는 {SECRET_NAME} 환경변수가 필요합니다.\n"
"환경변수가 설정되어 있지 않으면 ~/.config/k-skill/secrets.env 에 추가하거나\n"
"에이전트의 secret vault에서 주입해 주세요."
)
def get_required_secret() -> str:
value = os.environ.get(SECRET_NAME)
if not value or value == "replace-me":
raise SystemExit(build_missing_secret_message())
return value
def get_proxy_base_url() -> str | None:
value = os.environ.get(PROXY_BASE_URL_NAME)
if value and value.lower() in {"off", "false", "0", "disable", "disabled", "none"}:
return None
if value and value != "replace-me":
return value.rstrip("/")
return DEFAULT_PROXY_BASE_URL
def read_json_response(request: urllib.request.Request | str) -> dict:
try:
with urllib.request.urlopen(request, timeout=20) as response:
return json.load(response)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
try:
payload = json.loads(body)
except json.JSONDecodeError:
payload = None
message = payload.get("message") if isinstance(payload, dict) else None
if isinstance(payload, dict) and payload.get("error") == "ambiguous_location":
candidates = payload.get("candidate_stations") or []
sido_name = payload.get("sido_name")
detail = [message or "단일 측정소를 확정하지 못했습니다."]
if sido_name:
detail.append(f"시도: {sido_name}")
if candidates:
detail.append(f"후보 측정소: {', '.join(candidates)}")
detail.append("위 후보 중 정확한 측정소명으로 --station-name 재조회하세요.")
raise SystemExit("\n".join(detail)) from exc
raise SystemExit(message or f"요청이 실패했습니다: HTTP {exc.code}") from exc
def fetch_json(url: str, params: dict[str, object]) -> dict:
query = urllib.parse.urlencode({key: value for key, value in params.items() if value is not None})
request_url = f"{url}?{query}"
return read_json_response(request_url)
def fetch_proxy_report(args: argparse.Namespace) -> dict | None:
base_url = get_proxy_base_url()
if not base_url or args.station_file or args.measurement_file:
return None
params: dict[str, object] = {}
if args.lat is not None:
params["lat"] = args.lat
if args.lon is not None:
params["lon"] = args.lon
if args.region_hint:
params["regionHint"] = args.region_hint
if args.station_name:
params["stationName"] = args.station_name
query = urllib.parse.urlencode(params)
request = urllib.request.Request(f"{base_url}/v1/fine-dust/report?{query}")
return read_json_response(request)
def fetch_station_lookup(args: argparse.Namespace) -> tuple[dict, str]:
if args.station_file:
return load_json_file(args.station_file), "coordinates" if args.lat is not None and args.lon is not None else "fallback"
service_key = get_required_secret()
common = {
"serviceKey": service_key,
"returnType": "json",
"numOfRows": 50,
"pageNo": 1,
}
if args.lat is not None and args.lon is not None:
tm_x, tm_y = wgs84_to_air_korea_tm(args.lat, args.lon)
nearby_payload = fetch_json(
f"{STATION_SERVICE_URL}/getNearbyMsrstnList",
{
**common,
"numOfRows": 10,
"tmX": tm_x,
"tmY": tm_y,
},
)
if extract_items(nearby_payload):
return nearby_payload, "coordinates"
if args.region_hint or args.station_name:
return (
fetch_json(
f"{STATION_SERVICE_URL}/getMsrstnList",
{
**common,
"addr": args.region_hint,
"stationName": args.station_name,
},
),
"fallback",
)
raise SystemExit("위도/경도 또는 region fallback 이 필요합니다.")
def fetch_station_payload(args: argparse.Namespace) -> dict:
payload, _ = fetch_station_lookup(args)
return payload
def fetch_measurement_payload(args: argparse.Namespace, station_name: str) -> dict:
if args.measurement_file:
return load_json_file(args.measurement_file)
service_key = get_required_secret()
return fetch_json(
f"{MEASUREMENT_SERVICE_URL}/getMsrstnAcctoRltmMesureDnsty",
{
"serviceKey": service_key,
"returnType": "json",
"numOfRows": 100,
"pageNo": 1,
"stationName": station_name,
"dataTerm": "DAILY",
"ver": "1.4",
},
)
def render_text(report: dict) -> str:
return "\n".join(
[
f"측정소: {report['station_name']}",
f"주소: {report['station_address'] or '-'}",
f"조회 시각: {report['measured_at']}",
f"조회 방식: {report['lookup_mode']}",
f"PM10: {report['pm10']['value']} ({report['pm10']['grade']})",
f"PM2.5: {report['pm25']['value']} ({report['pm25']['grade']})",
f"통합대기등급: {report['khai_grade']}",
],
)
def command_report(args: argparse.Namespace) -> None:
proxy_report = fetch_proxy_report(args)
if proxy_report is not None:
if args.json:
print(json.dumps(proxy_report, ensure_ascii=False, indent=2))
return
print(render_text(proxy_report))
return
station_payload, lookup_mode = fetch_station_lookup(args)
station_items = extract_items(station_payload)
station = resolve_station(
station_items,
lat=args.lat,
lon=args.lon,
region_hint=args.region_hint,
station_name=args.station_name,
)
measurement_payload = fetch_measurement_payload(args, station["stationName"])
report = build_report(
station_items=station_items,
measurement_items=extract_items(measurement_payload),
lat=args.lat,
lon=args.lon,
region_hint=args.region_hint,
station_name=station["stationName"],
lookup_mode=lookup_mode,
selected_station=station,
)
if args.json:
print(json.dumps(report, ensure_ascii=False, indent=2))
return
print(render_text(report))
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
if args.command == "report":
command_report(args)
return 0
raise SystemExit(f"unsupported command: {args.command}")
if __name__ == "__main__":
sys.exit(main())