-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
4276 lines (3923 loc) · 189 KB
/
main.py
File metadata and controls
4276 lines (3923 loc) · 189 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
astrbot-plugin-tmp-bot
欧卡2TMP查询插件 (版本 1.7.6)
"""
import re
import asyncio
import aiohttp
import json
import os
import re as _re_local
import base64
import socket
import hashlib
import random
import time
from typing import Optional, List, Dict, Tuple, Any
from datetime import datetime, timedelta
# 引入 AstrBot 核心 API
try:
from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult
from astrbot.api.star import Context, Star, register, StarTools
from astrbot.api import logger
from astrbot.api.message_components import Image, Plain
# 强制 INFO 级别,确保能看到 bans 日志
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
except ImportError:
# 最小化兼容回退
class _DummyFilter:
class EventMessageType:
ALL = "ALL"
def command(self, pattern, **kwargs):
def decorator(func):
return func
return decorator
def event_message_type(self, _type, **kwargs):
def decorator(func):
return func
return decorator
filter = _DummyFilter()
class AstrMessageEvent:
def __init__(self, message_str: str = "", sender_id: str = "0", match=None, group_id: str = None):
self.message_str = message_str
self._sender_id = sender_id
self._match = match
self._group_id = group_id
def get_sender_id(self) -> str:
return self._sender_id
def get_group_id(self) -> str:
return self._group_id or ""
def is_group_message(self) -> bool:
return bool(self._group_id)
def is_private_message(self) -> bool:
return not self._group_id
async def plain_result(self, msg):
return msg
async def chain_result(self, components):
return components
MessageEventResult = Any
class Context: pass
class Star:
def __init__(self, context: Context = None): pass
def register(*args, **kwargs):
def deco(cls):
return cls
return deco
class StarTools:
@staticmethod
def get_data_dir(name: str):
return os.path.join(os.getcwd(), name)
class _Logger:
@staticmethod
def info(msg):
print("[INFO]", msg)
@staticmethod
def error(msg, exc_info=False):
print("[ERROR]", msg)
if exc_info:
import traceback
traceback.print_exc()
logger = _Logger()
# 兼容运行环境缺失时的占位 Image 类
class Image:
@staticmethod
def fromBytes(b: bytes):
return b
@staticmethod
def fromURL(url: str):
return url
class Plain:
def __init__(self, text: str):
self.text = text
USER_GROUP_MAP = {
'Player': '玩家',
'Retired Legend': '退役',
'Game Developer': '游戏开发者',
'Retired Team Member': '退休团队成员',
'Add-On Team': '附加组件团队',
'Game Moderator': '游戏管理员'
}
PROMODS_SERVER_IDS = {50, 51}
def _translate_user_groups(groups: List[Any]) -> List[str]:
translated: List[str] = []
for g in groups:
if g is None:
continue
key = str(g)
translated.append(USER_GROUP_MAP.get(key, key))
return translated
# --- 辅助函数:格式化时间戳 ---
def _format_timestamp_to_readable(timestamp_str: Optional[str]) -> str:
"""将 TruckersMP API 返回的 UTC 时间戳转换为可读格式 (ISO 8601)。"""
if not timestamp_str:
return "未知"
try:
# TruckersMP V2 返回 ISO 8601 (e.g., "2024-05-28T14:30:00.000Z")
clean_str = timestamp_str.replace('T', ' ').split('.')[0].replace('Z', '')
dt_utc = datetime.strptime(clean_str, '%Y-%m-%d %H:%M:%S')
# 直接显示 UTC 时间,并标注时区
return dt_utc.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
# 兼容性回退
return timestamp_str.split('T')[0] if 'T' in timestamp_str else timestamp_str
# -----------------------------
def _format_timestamp_to_beijing(timestamp_str: Optional[str]) -> str:
"""将 UTC 时间戳转换为北京时间 (UTC+8)。兼容 ISO 8601 和简单格式。"""
if not timestamp_str:
return "未知"
s = str(timestamp_str).strip()
if s.lower().startswith('never'):
return "永久封禁"
try:
clean_str = s.replace('T', ' ').split('.')[0].replace('Z', '')
dt_utc = datetime.strptime(clean_str, '%Y-%m-%d %H:%M:%S')
dt_bj = dt_utc + timedelta(hours=8)
return dt_bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
try:
# ISO 8601 with timezone offset, e.g. 2025-12-01T07:55:00+00:00
iso = s.replace('Z', '+00:00')
dt = datetime.fromisoformat(iso)
dt_bj = dt + timedelta(hours=8)
return dt_bj.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return s
def _cleanup_cn_location_text(text: str) -> str:
s = str(text or "").strip()
if not s:
return s
try:
s = _re_local.sub(r"\s+", " ", s).strip()
s = _re_local.sub(r"^(?:[\[[][^\]]]+[\]]]\s*)+", "", s)
s = _re_local.sub(r"^<[^>]+>\s*", "", s)
s = _re_local.sub(r"^(?:&\s*)?(?:n|v|adj|adv|vt|vi|prep|pron|conj|abbr)[\..]\s*", "", s, flags=_re_local.IGNORECASE)
s = _re_local.sub(r"^(?:\s*(?:&\s*)?(?:n|v|adj|adv|vt|vi|prep|pron|conj|abbr)[\..]\s*)+", "", s, flags=_re_local.IGNORECASE)
s = _re_local.sub(r"^(?:\s*(?:名|动|形|副|介|代|连|数|量|叹|助|冠)(?:词)?[\..::]\s*)+", "", s)
s = _re_local.sub(r"([^)]*)", "", s)
s = _re_local.sub(r"\([^)]*\)", "", s)
for sep in [";", ";", ","]:
if sep in s:
s = s.split(sep, 1)[0]
s = s.strip(" 、,。.;;")
if _re_local.search(r"\s", s):
head = _re_local.split(r"\s+", s, 1)[0]
if _re_local.search(r"[\u4e00-\u9fff]", head) and not _re_local.fullmatch(r"(?:名|动|形|副|介|代|连|数|量|叹|助|冠)(?:词)?[\..::]?", head):
s = head
return s or text
except Exception:
return text
# -----------------------------
# 自定义异常类
class TmpApiException(Exception):
"""TMP 相关异常的基类"""
pass
class PlayerNotFoundException(TmpApiException):
"""玩家不存在异常"""
pass
class SteamIdNotFoundException(TmpApiException):
"""Steam ID 未绑定 TMP 账号异常"""
pass
class NetworkException(Exception):
"""网络请求异常"""
pass
class ApiResponseException(TmpApiException):
"""API响应异常"""
pass
@register("tmp-bot", "BGYdook", "欧卡2TMP查询插件", "1.7.4", "https://github.com/BGYdook/astrbot-plugin-tmp-bot")
class TmpBotPlugin(Star):
def __init__(self, context, config=None): # 接收 context 和 config
super().__init__(context) # 将 context 传给父类
self.widget_list = []
# 会在真实环境中由框架注入 session/context 等
self.session = None
self._ready = False
self.config = config or {}
self._translate_cache: Dict[str, str] = {}
self._location_maps_loaded: bool = False
self._fullmap_cache: Optional[Dict[str, Any]] = None
self._fullmap_cache_ts: float = 0.0
self._fullmap_last_fetch_ts: float = 0.0
self._fullmap_next_fetch_ts: float = 0.0
self._fullmap_task: Optional[asyncio.Task] = None
self._fullmap_lock = asyncio.Lock()
self._fullmap_fetch_lock = asyncio.Lock()
self._load_location_maps()
try:
bind_path = self.config.get('bind_file')
if not bind_path:
root = os.getcwd()
bind_path = os.path.join(root, 'data', 'tmp_bindings.json')
d = os.path.dirname(bind_path)
if d:
os.makedirs(d, exist_ok=True)
self.bind_file = bind_path
except Exception:
self.bind_file = os.path.join(os.getcwd(), 'tmp_bindings.json')
try:
logger.info("TMP Bot 插件初始化开始")
# 仅做轻量初始化,避免在导入阶段执行网络/阻塞操作
# 真实运行时框架会在 on_load/on_start 注入 session 等资源
self._ready = True
logger.info("TMP Bot 插件初始化完成(就绪)")
except Exception as e:
self._ready = False
logger.exception("TMP Bot 插件初始化发生异常,标记为未就绪:%s", e)
# --- 配置读取辅助 ---
def _cfg_bool(self, key: str, default: bool) -> bool:
v = self.config.get(key, default)
return bool(v) if isinstance(v, (bool, int, str)) else default
def _cfg_int(self, key: str, default: int) -> int:
try:
v = self.config.get(key, default)
return int(v)
except Exception:
return default
def _cfg_str(self, key: str, default: str) -> str:
v = self.config.get(key, default)
if v is None:
return default
return str(v)
async def initialize(self):
# 统一 User-Agent,并更新版本号
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
# 使用 IPv4 优先的连接器,并允许读取环境代理设置(与浏览器/系统行为更一致)
connector = aiohttp.TCPConnector(family=socket.AF_INET)
self.session = aiohttp.ClientSession(
headers={'User-Agent': 'astrBot-TMP-Plugin/1.3.59'},
timeout=aiohttp.ClientTimeout(total=timeout_sec),
connector=connector,
trust_env=True
)
logger.info(f"TMP Bot 插件HTTP会话已创建,超时 {timeout_sec}s")
self._fullmap_task = None
def _get_fullmap_interval(self) -> int:
v = self._cfg_int('ets2map_fullmap_interval_seconds', 60)
return 60 if v < 60 else v
def _start_fullmap_task(self) -> None:
if self._fullmap_task and not self._fullmap_task.done():
return
self._fullmap_task = asyncio.create_task(self._fullmap_loop())
async def _fullmap_loop(self) -> None:
await asyncio.sleep(self._get_fullmap_interval())
while True:
await self._fetch_fullmap()
await asyncio.sleep(self._get_fullmap_interval())
async def _fetch_fullmap(self) -> None:
if not self.session:
return
interval = self._get_fullmap_interval()
async with self._fullmap_fetch_lock:
now_wall = time.time()
if now_wall - self._fullmap_last_fetch_ts < interval:
if not self._fullmap_cache:
logger.info(f"fullmap 拉取跳过(限频): interval={interval}s")
return
now_mono = time.monotonic()
if now_mono < self._fullmap_next_fetch_ts:
if not self._fullmap_cache:
logger.info(f"fullmap 拉取跳过(限频): interval={interval}s")
return
self._fullmap_next_fetch_ts = now_mono + interval
self._fullmap_last_fetch_ts = time.time()
url = "https://tracker.ets2map.com/v3/fullmap"
try:
async with self.session.get(url, timeout=self._cfg_int('api_timeout_seconds', 10)) as resp:
if resp.status == 200:
data = await resp.json()
if isinstance(data, dict):
async with self._fullmap_lock:
self._fullmap_cache = data
self._fullmap_cache_ts = time.time()
logger.info("fullmap 拉取成功")
return
logger.info(f"fullmap 拉取失败 status={resp.status}")
except Exception as e:
logger.error(f"fullmap 拉取异常: {e}")
def _get_fullmap_tile_url(self, map_type: str) -> Optional[str]:
data = self._fullmap_cache or {}
candidates: List[str] = []
def walk(v: Any) -> None:
if isinstance(v, dict):
for val in v.values():
walk(val)
return
if isinstance(v, list):
for val in v:
walk(val)
return
if isinstance(v, str):
s = v.strip()
if s.startswith("http") and "{z}" in s and "{x}" in s and "{y}" in s:
candidates.append(s)
if isinstance(data, dict):
if data.get('Data'):
walk(data.get('Data'))
if data.get('data'):
walk(data.get('data'))
walk(data)
else:
walk(data)
if not candidates:
return None
seen = set()
uniq = []
for c in candidates:
if c in seen:
continue
seen.add(c)
uniq.append(c)
candidates = uniq
if map_type == "promods":
for c in candidates:
if "promods" in c.lower():
return c
for c in candidates:
lc = c.lower()
if "ets" in lc and "promods" not in lc:
return c
return candidates[0]
# --- 工具:头像处理 ---
def _normalize_avatar_url(self, url: Optional[str]) -> Optional[str]:
if not url:
return None
# 去除日志可能引入的反引号、括号、引号,以及误传入的 CQ 片段前缀
u = str(url).strip()
# 清理包装字符
for ch in ('`', '"', "'", '(', ')'):
u = u.strip(ch)
# 如果误传了完整片段,剥离前缀
if u.startswith('[CQ:image,file='):
u = u[len('[CQ:image,file='):]
# 去掉结尾的右括号
if u.endswith(']'):
u = u[:-1]
u = u.strip()
return u or None
async def _get_avatar_base64(self, url: str) -> Optional[str]:
if not self.session:
return None
try:
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
async with self.session.get(url, timeout=timeout_sec) as resp:
if resp.status == 200:
content = await resp.read()
if content:
return base64.b64encode(content).decode('ascii')
return None
except Exception:
return None
async def _get_avatar_bytes(self, url: str) -> Optional[bytes]:
if not self.session:
return None
try:
timeout_sec = self._cfg_int('api_timeout_seconds', 10)
async with self.session.get(url, timeout=timeout_sec, allow_redirects=True) as resp:
if resp.status == 200:
content = await resp.read()
if content:
return content
else:
logger.info(f"头像下载失败: 空内容 status=200 url={url}")
return None
else:
logger.info(f"头像下载失败: status={resp.status} url={url}")
return None
except Exception as e:
logger.error(f"头像下载异常: url={url} err={e}", exc_info=False)
return None
async def _translate_text(self, content: str, cache: bool = True) -> str:
s = (content or "").strip()
if not s:
return content
if not self._cfg_bool('baidu_translate_enable', True):
return content
use_cache = self._cfg_bool('baidu_translate_cache_enable', False)
cache_key = hashlib.md5(s.encode('utf-8')).hexdigest()
if cache and use_cache:
cached = self._translate_cache.get(cache_key)
if cached:
return cached
app_id = self._cfg_str('baidu_translate_app_id', '').strip()
app_key = self._cfg_str('baidu_translate_key', '').strip()
if not app_id or not app_key or not self.session:
return content
try:
salt = str(random.randint(1000, 9999))
sign = hashlib.md5((app_id + s + salt + app_key).encode('utf-8')).hexdigest()
url = "https://fanyi-api.baidu.com/api/trans/vip/translate"
params = {
'q': s,
'from': 'auto',
'to': 'zh',
'appid': app_id,
'salt': salt,
'sign': sign
}
async with self.session.get(url, params=params, timeout=self._cfg_int('api_timeout_seconds', 10)) as resp:
if resp.status == 200:
data = await resp.json()
if isinstance(data, dict) and data.get('trans_result'):
dst = data['trans_result'][0].get('dst')
if isinstance(dst, str) and dst.strip():
translated = dst.strip()
if cache and use_cache:
self._translate_cache[cache_key] = translated
return translated
except Exception:
return content
return content
async def _get_avatar_bytes_with_fallback(self, url: str, tmp_id: Optional[str]) -> Optional[bytes]:
"""尝试多种 TruckersMP 头像URL变体,尽可能获取头像字节。"""
base = self._normalize_avatar_url(url)
candidates: List[str] = []
if base:
candidates.append(base)
# 切换 jpg/png
if base.lower().endswith('.jpg'):
candidates.append(base[:-4] + '.png')
elif base.lower().endswith('.png'):
candidates.append(base[:-4] + '.jpg')
# 解析 avatarsN/{id}.{stamp}.{ext} -> 生成多种组合
import re as _re
m = _re.search(r"https?://static\.truckersmp\.com/(avatarsN|avatars)/(\d+)(?:\.\d+)?\.(jpg|png)", base, _re.IGNORECASE)
if m:
folder = m.group(1)
pid = m.group(2)
ext = m.group(3).lower()
alt_ext = 'png' if ext == 'jpg' else 'jpg'
# 去掉时间戳
candidates.append(f"https://static.truckersmp.com/{folder}/{pid}.{ext}")
candidates.append(f"https://static.truckersmp.com/{folder}/{pid}.{alt_ext}")
# 切到另一个目录
other_folder = 'avatars' if folder.lower() == 'avatarsn' else 'avatarsN'
candidates.append(f"https://static.truckersmp.com/{other_folder}/{pid}.{ext}")
candidates.append(f"https://static.truckersmp.com/{other_folder}/{pid}.{alt_ext}")
# 根据 tmp_id 追加常见直连地址
if tmp_id:
for ext in ('jpg', 'png'):
candidates.append(f"https://static.truckersmp.com/avatars/{tmp_id}.{ext}")
candidates.append(f"https://static.truckersmp.com/avatarsN/{tmp_id}.{ext}")
# 去重保持顺序
seen = set()
uniq: List[str] = []
for c in candidates:
if not c:
continue
if c in seen:
continue
seen.add(c)
uniq.append(c)
for c in uniq:
b = await self._get_avatar_bytes(c)
logger.info(f"头像下载尝试: url={c} -> {'成功' if b else '失败'}")
if b:
return b
return None
# --- 内部工具方法 (保持不变) ---
def _load_bindings(self) -> Dict[str, Any]:
try:
if os.path.exists(self.bind_file):
with open(self.bind_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
except Exception as e:
logger.error(f"加载绑定数据失败: {e}")
return {}
def _save_bindings(self, bindings: dict) -> bool:
try:
with open(self.bind_file, 'w', encoding='utf-8') as f:
json.dump(bindings, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"保存绑定数据失败: {e}")
return False
def _get_bound_tmp_id(self, user_id: str) -> Optional[str]:
bindings = self._load_bindings()
user_binding = bindings.get(user_id)
if isinstance(user_binding, dict):
return user_binding.get('tmp_id')
return user_binding
def _bind_tmp_id(self, user_id: str, tmp_id: str, player_name: str) -> bool:
bindings = self._load_bindings()
bindings[user_id] = {
'tmp_id': tmp_id,
'player_name': player_name,
'bind_time': asyncio.get_event_loop().time()
}
return self._save_bindings(bindings)
def _unbind_tmp_id(self, user_id: str) -> bool:
bindings = self._load_bindings()
if user_id in bindings:
del bindings[user_id]
return self._save_bindings(bindings)
return False
COUNTRY_MAP_EN_TO_CN = {
"germany": "德国",
"de": "德国",
"france": "法国",
"fr": "法国",
"united kingdom": "英国",
"uk": "英国",
"gb": "英国",
"netherlands": "荷兰",
"nl": "荷兰",
"belgium": "比利时",
"be": "比利时",
"poland": "波兰",
"pl": "波兰",
"czech republic": "捷克",
"czechia": "捷克",
"cz": "捷克",
"slovakia": "斯洛伐克",
"sk": "斯洛伐克",
"italy": "意大利",
"it": "意大利",
"spain": "西班牙",
"es": "西班牙",
"portugal": "葡萄牙",
"pt": "葡萄牙",
"switzerland": "瑞士",
"ch": "瑞士",
"austria": "奥地利",
"at": "奥地利",
"hungary": "匈牙利",
"hu": "匈牙利",
"denmark": "丹麦",
"dk": "丹麦",
"sweden": "瑞典",
"se": "瑞典",
"norway": "挪威",
"no": "挪威",
"finland": "芬兰",
"fi": "芬兰",
"estonia": "爱沙尼亚",
"ee": "爱沙尼亚",
"latvia": "拉脱维亚",
"lv": "拉脱维亚",
"lithuania": "立陶宛",
"lt": "立陶宛",
"russia": "俄罗斯",
"ru": "俄罗斯",
"turkey": "土耳其",
"tr": "土耳其",
"romania": "罗马尼亚",
"ro": "罗马尼亚",
"bulgaria": "保加利亚",
"bg": "保加利亚",
"greece": "希腊",
"gr": "希腊",
"united states": "美国",
"usa": "美国",
"us": "美国",
"iceland": "冰岛",
"is": "冰岛",
"svalbard": "斯瓦尔巴群岛",
}
CITY_MAP_EN_TO_CN = {
"calais": "加来",
"duisburg": "杜伊斯堡",
"berlin": "柏林",
"paris": "巴黎",
"london": "伦敦",
"cambridge": "剑桥",
"milano": "米兰",
"milan": "米兰",
"rome": "罗马",
"madrid": "马德里",
"barcelona": "巴塞罗那",
"lisbon": "里斯本",
"rotterdam": "鹿特丹",
"amsterdam": "阿姆斯特丹",
"brussels": "布鲁塞尔",
"prague": "布拉格",
"vienna": "维也纳",
"budapest": "布达佩斯",
"warsaw": "华沙",
"krakow": "克拉科夫",
"akureyri": "阿克雷里",
"burgos": "布尔戈斯",
"praha": "布拉格",
"steinkjer": "斯泰恩谢尔",
"valmiera": "瓦尔米耶拉",
"umeå": "于默奥",
"umea": "于默奥",
"longyearbyen": "朗伊尔城",
"napoli": "那不勒斯",
"sundsvall": "松兹瓦尔",
}
LOCATION_FIX_MAP = {
"kirkenes": "希尔克内斯",
"kirkenes quarry": "希尔克内斯 采石场",
"c-d road": "加莱-杜伊斯堡",
"cd road": "加莱-杜伊斯堡",
"calais-duisburg road": "加莱-杜伊斯堡",
"calais - duisburg": "加莱-杜伊斯堡",
"calais–duisburg": "加莱-杜伊斯堡",
"calais-duisburg": "加莱-杜伊斯堡",
"calais intersection": "加来 交叉口",
"dortmund": "多特蒙德",
"hannover": "汉诺威",
"hamburg": "汉堡",
"strasbourg": "斯特拉斯堡",
"dijon": "第戎",
"reims": "兰斯",
"brussel": "布鲁塞尔",
"aalborg": "奥尔堡",
"kiruna": "基律纳",
"skellefteå": "谢莱夫特奥",
"skelleftea": "谢莱夫特奥",
"ljubjana": "卢布尔雅那",
"ljubljana": "卢布尔雅那",
"nikel": "尼克尔",
"travemünde": "特拉弗明德",
"travemunde": "特拉弗明德",
"zürich": "苏黎世",
"zurich": "苏黎世",
}
def _load_location_maps(self) -> None:
if getattr(self, "_location_maps_loaded", False):
return
def _strip_cn_city_suffix(cn: str) -> str:
t = (cn or "").strip()
if t.endswith("(城市)"):
t = t[:-4]
return t.strip()
def _parse_table(file_path: str) -> List[Tuple[str, str]]:
try:
if not os.path.exists(file_path):
return []
rows: List[Tuple[str, str]] = []
with open(file_path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line.startswith("|"):
continue
if line.startswith("| English |"):
continue
if line.startswith("|---"):
continue
parts = [p.strip() for p in line.strip("|").split("|")]
if len(parts) < 2:
continue
en = parts[0].strip()
cn = parts[1].strip()
if not en or not cn:
continue
rows.append((en, cn))
return rows
except Exception:
return []
def _add_mapping(en: str, cn: str) -> None:
en_raw = (en or "").strip()
cn_raw = (cn or "").strip()
if not en_raw or not cn_raw:
return
if cn_raw == en_raw:
return
en_key = en_raw.lower()
cn_clean = _cleanup_cn_location_text(cn_raw)
if not cn_clean:
return
status_m = _re_local.search(r"\s*-\s*(?P<status>[A-Za-z]+)\s*\((?P<num>\d+)\)\s*$", en_raw)
en_base = en_raw
if status_m:
en_base = en_raw[: status_m.start()].strip()
if cn_clean.lower() == en_base.lower():
return
city_m = _re_local.search(r"\s*\(City\)\s*$", en_base, flags=_re_local.IGNORECASE)
if city_m:
city_en_base = en_base[: city_m.start()].strip()
city_cn_base = _strip_cn_city_suffix(cn_clean)
if city_en_base and (city_cn_base or cn_clean).lower() == city_en_base.lower():
return
if city_en_base:
self.CITY_MAP_EN_TO_CN[city_en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[city_en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[en_base.lower()] = city_cn_base or cn_clean
self.LOCATION_FIX_MAP[en_key] = city_cn_base or cn_clean
return
self.COUNTRY_MAP_EN_TO_CN[en_base.lower()] = cn_clean
self.LOCATION_FIX_MAP[en_base.lower()] = cn_clean
self.LOCATION_FIX_MAP[en_key] = cn_clean
try:
root = os.path.dirname(__file__)
except Exception:
root = os.getcwd()
data_dir = os.path.join(root, "TruckersMP-citties-name")
for name in ("s1-cities.md", "promods-cities.md"):
path = os.path.join(data_dir, name)
for en, cn in _parse_table(path):
_add_mapping(en, cn)
self._location_maps_loaded = True
async def _translate_country_city(self, country: Optional[str], city: Optional[str]) -> Tuple[str, str]:
country_en = (country or "").strip()
city_en = (city or "").strip()
def _has_cjk(t: str) -> bool:
return bool(_re_local.search(r"[\u4e00-\u9fff]", t or ""))
def _clean_raw_text(raw: str) -> str:
t = (raw or "").strip()
if not t or _has_cjk(t):
return t
t = _re_local.sub(r"\s*\([^)]*\)\s*", " ", t)
t = _re_local.sub(r"\s*([^)]*)\s*", " ", t)
t = _re_local.sub(r"\s*\[[^\]]*\]\s*", " ", t)
t = _re_local.sub(r"[^A-Za-z\s\-]", " ", t)
t = _re_local.sub(r"\s+", " ", t).strip()
return t
def _ensure_cn_text(text: Optional[str], en_fallback: str, is_city: bool) -> str:
t = (text or "").strip()
if _has_cjk(t):
return t
key = (en_fallback or "").strip().lower()
mapped = self.CITY_MAP_EN_TO_CN.get(key) if is_city else self.COUNTRY_MAP_EN_TO_CN.get(key)
if mapped and _has_cjk(mapped):
return mapped
fixed = self.LOCATION_FIX_MAP.get(key)
if fixed and _has_cjk(fixed):
return fixed
return ""
country_en = _clean_raw_text(country_en)
city_en = _clean_raw_text(city_en)
def _normalize_city_input(raw_city: str, raw_country: str) -> str:
s = (raw_city or "").strip()
if not s:
return s
s = _re_local.sub(r"\s+", " ", s).strip()
c = (raw_country or "").strip()
if c:
c_norm = _re_local.sub(r"\s+", " ", c).strip()
if s.lower().startswith((c_norm + " - ").lower()):
s = s[len(c_norm) + 3 :].strip()
elif s.lower().startswith((c_norm + " ").lower()):
s = s[len(c_norm) + 1 :].strip()
if " - " in s:
left, right = s.split(" - ", 1)
left_k = left.strip().lower()
if left_k in self.COUNTRY_MAP_EN_TO_CN:
s = right.strip()
low = s.lower()
for k in sorted(self.COUNTRY_MAP_EN_TO_CN.keys(), key=len, reverse=True):
if not k:
continue
if low.startswith(k + " - "):
s = s[len(k) + 3 :].strip()
break
if low.startswith(k + " "):
s = s[len(k) + 1 :].strip()
break
return s
city_en = _normalize_city_input(city_en, country_en)
country_key = country_en.lower()
city_key = city_en.lower()
country_cn = self.COUNTRY_MAP_EN_TO_CN.get(country_key)
city_cn = self.CITY_MAP_EN_TO_CN.get(city_key)
if country_en and not country_cn:
translated_country = await self._translate_text(country_en, cache=True)
if translated_country:
country_cn = translated_country
if city_en and not city_cn:
translated_city = await self._translate_text(city_en, cache=True)
if translated_city:
city_cn = translated_city
fix_country = self.LOCATION_FIX_MAP.get(country_key)
fix_city = self.LOCATION_FIX_MAP.get(city_key)
if fix_country:
country_cn = fix_country
if fix_city:
city_cn = fix_city
country_cn = _ensure_cn_text(country_cn, country_en, False)
city_cn = _ensure_cn_text(city_cn, city_en, True)
return country_cn, city_cn
async def _translate_traffic_name(self, name: Optional[str]) -> str:
s = (name or "").strip()
if not s:
return s
s = _re_local.sub(r"\s+", " ", s).strip()
key = s.lower()
# 1. 查修正表
fix = self.LOCATION_FIX_MAP.get(key)
if fix:
return fix
# 2. 查城市表 (路况里的 name 经常是城市名)
city_fix = self.CITY_MAP_EN_TO_CN.get(key)
if city_fix:
return city_fix
m_suffix = _re_local.search(r"^(?P<base>.+?)\s+(?P<suffix>intersection|quarry)\s*$", s, flags=_re_local.IGNORECASE)
if m_suffix:
base = (m_suffix.group("base") or "").strip()
suffix = (m_suffix.group("suffix") or "").strip().lower()
base_cn = await self._translate_traffic_name(base)
suffix_cn = "交叉口" if suffix == "intersection" else "采石场"
merged_key = f"{base} {suffix}".strip().lower()
merged_fix = self.LOCATION_FIX_MAP.get(merged_key)
if merged_fix:
return merged_fix
if base_cn and base_cn != base:
return f"{base_cn} {suffix_cn}".strip()
for sep in (" - ", "–", "-", "/"):
if sep in s:
parts = [p.strip() for p in s.split(sep) if p.strip()]
if len(parts) >= 2:
translated_parts: List[str] = []
for p in parts:
pk = p.lower()
translated_parts.append(
self.LOCATION_FIX_MAP.get(pk)
or self.CITY_MAP_EN_TO_CN.get(pk)
or p
)
joiner = " - " if sep.strip() in ("-", "–") else sep
return joiner.join(translated_parts)
# 3. 百度翻译
translated = await self._translate_text(s, cache=True)
if translated:
return translated
return s
# --- API请求方法 ---
async def _get_tmp_id_from_steam_id(self, steam_id: str) -> str:
if not self.session:
raise NetworkException("插件未初始化,HTTP会话不可用")
try:
# TruckersMP 官方 API - 直接通过 SteamID 查询玩家信息
url = f"https://api.truckersmp.com/v2/player/{steam_id}"
async with self.session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.json()
if data.get('error') is False and data.get('response'):
player_data = data.get('response', {})
tmp_id = player_data.get('id')
if tmp_id:
logger.info(f"成功通过 SteamID {steam_id} 获取到 TMP ID: {tmp_id}")
return str(tmp_id)
else:
raise SteamIdNotFoundException(f"Steam ID {steam_id} 未在 TruckersMP 中注册。")
else:
error_msg = data.get('descriptor', '未知错误')
if 'not found' in error_msg.lower() or 'unable to find' in error_msg.lower():
raise SteamIdNotFoundException(f"Steam ID {steam_id} 未在 TruckersMP 中注册。")
else:
raise ApiResponseException(f"API 返回错误: {error_msg}")
elif response.status == 404:
raise SteamIdNotFoundException(f"Steam ID {steam_id} 未在 TruckersMP 中注册。")
else:
raise ApiResponseException(f"Steam ID查询API返回错误状态码: {response.status}")
except aiohttp.ClientError:
raise NetworkException("Steam ID查询服务网络请求失败")
except asyncio.TimeoutError:
raise NetworkException("请求 Steam ID 查询服务超时")
except SteamIdNotFoundException:
raise
except Exception as e:
logger.error(f"通过 SteamID 查询 TMP ID 失败: {e}")
raise NetworkException("查询失败")
def _get_steam_id_from_player_info(self, player_info: Dict) -> Optional[str]:
steam_id = player_info.get('steamID64')
return str(steam_id) if steam_id else None
async def _get_player_info(self, tmp_id: str) -> Dict:
if not self.session:
raise NetworkException("插件未初始化,HTTP会话不可用")
try:
# TMP 官方 V2 接口
url = f"https://api.truckersmp.com/v2/player/{tmp_id}"
async with self.session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.json()
response_data = data.get('response')
if response_data and isinstance(response_data, dict):
return response_data
raise PlayerNotFoundException(f"玩家 {tmp_id} 不存在")
elif response.status == 404:
raise PlayerNotFoundException(f"玩家 {tmp_id} 不存在")
else:
raise ApiResponseException(f"API返回错误状态码: {response.status}")
except aiohttp.ClientError:
raise NetworkException("TruckersMP API 网络请求失败")
except asyncio.TimeoutError:
raise NetworkException("请求TruckersMP API超时")
except Exception as e:
logger.error(f"查询玩家信息失败: {e}")
raise NetworkException("查询失败")
async def _get_player_bans(self, tmp_id: str) -> List[Dict]:
if not self.session: return []
try:
url = f"https://api.truckersmp.com/v2/bans/{tmp_id}"
async with self.session.get(url, timeout=10) as response:
if response.status == 200:
data = await response.json()
# 兼容:优先取 response,其次直接取 data(防止结构变化)
bans = data.get('response') or data.get('data') or []
if not isinstance(bans, list):
bans = []
# 额外打印完整返回,方便一次性定位
logger.info(f"Bans API 原始返回: {data}")
logger.info(f"Bans API 提取后: keys={list(data.keys())}, count={len(bans)}")
return bans