1
+ import ast
2
+ import json
1
3
import os
2
- import subprocess
4
+ import re
3
5
import signal
4
6
import socket
5
- import requests
7
+ import subprocess
8
+ import sys
6
9
import time
10
+
11
+ import requests
7
12
import yaml
8
- import ast
9
- import re
10
- import json
11
- import sys
12
- from flask import Flask , jsonify , request , Response
13
+ from flask import Flask , Response , jsonify , request
13
14
14
15
app = Flask (__name__ )
15
16
@@ -20,7 +21,7 @@ def get_base_port():
20
21
if not nv_visible_devices or nv_visible_devices .lower () == "all" :
21
22
return 8000
22
23
# 提取第一个数字
23
- match = re .search (r' \d+' , nv_visible_devices )
24
+ match = re .search (r" \d+" , nv_visible_devices )
24
25
if match :
25
26
return int (match .group (0 )) * 100 + 8000
26
27
return 8000
@@ -29,7 +30,7 @@ def get_base_port():
29
30
def is_port_in_use (port ):
30
31
"""检查端口是否被占用"""
31
32
with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as s :
32
- return s .connect_ex ((' localhost' , port )) == 0
33
+ return s .connect_ex ((" localhost" , port )) == 0
33
34
34
35
35
36
def get_available_port (env_key : str , default_start : int ):
@@ -69,7 +70,9 @@ def build_command(config):
69
70
"""根据配置构建启动命令"""
70
71
# 基础命令
71
72
cmd = [
72
- "python" , "-m" , "fastdeploy.entrypoints.openai.api_server" ,
73
+ "python" ,
74
+ "-m" ,
75
+ "fastdeploy.entrypoints.openai.api_server" ,
73
76
]
74
77
75
78
# 添加配置参数
@@ -97,7 +100,7 @@ def merge_configs(base_config, override_config):
97
100
def get_server_pid ():
98
101
"""获取服务进程ID PORT"""
99
102
if os .path .exists (PID_FILE ):
100
- with open (PID_FILE , 'r' ) as f :
103
+ with open (PID_FILE , "r" ) as f :
101
104
data = yaml .safe_load (f )
102
105
return data
103
106
return None
@@ -109,21 +112,19 @@ def is_server_running():
109
112
if pid_port is None :
110
113
return False , {"status" : "Server not running..." }
111
114
112
- server_pid , port = pid_port ["PID" ], pid_port ["PORT" ]
115
+ _ , port = pid_port ["PID" ], pid_port ["PORT" ]
113
116
health_check_endpoint = f"http://0.0.0.0:{ port } /health"
114
117
115
118
if os .path .exists (LOG_FILE ):
116
- with open (LOG_FILE , 'r' ) as f :
119
+ with open (LOG_FILE , "r" ) as f :
117
120
msg = f .readlines ()
118
121
result = parse_tqdm_progress (msg )
119
122
120
123
try :
121
- response = requests .get (
122
- health_check_endpoint ,
123
- timeout = 2
124
- )
124
+ response = requests .get (health_check_endpoint , timeout = 2 )
125
125
return response .status_code == 200 , result
126
126
except requests .exceptions .RequestException as e :
127
+ print (f"Failed to check server health: { e } " )
127
128
return False , result
128
129
129
130
@@ -147,15 +148,11 @@ def parse_tqdm_progress(log_lines):
147
148
"speed" : data ["speed" ],
148
149
"eta" : data ["eta" ],
149
150
"elapsed" : data ["elapsed" ],
150
- "bar" : data ["bar" ].strip ()
151
+ "bar" : data ["bar" ].strip (),
151
152
},
152
- "raw_line" : line .strip ()
153
+ "raw_line" : line .strip (),
153
154
}
154
- return {
155
- "status" : "服务启动中" ,
156
- "progress" : {},
157
- "raw_line" : log_lines [- 1 ] if log_lines else "server.log为空"
158
- }
155
+ return {"status" : "服务启动中" , "progress" : {}, "raw_line" : log_lines [- 1 ] if log_lines else "server.log为空" }
159
156
160
157
161
158
def stop_server (signum = None , frame = None ):
@@ -189,8 +186,8 @@ def stop_server(signum=None, frame=None):
189
186
except Exception as e :
190
187
print (f"Failed to killed process on port: { e } " )
191
188
# 若log目录存在,则重命名为log_timestamp
192
- if os .path .isdir (' ./log' ):
193
- os .rename (' ./log' , ' ./log_{}' .format (time .strftime ("%Y%m%d%H%M%S" )))
189
+ if os .path .isdir (" ./log" ):
190
+ os .rename (" ./log" , " ./log_{}" .format (time .strftime ("%Y%m%d%H%M%S" )))
194
191
195
192
if signum :
196
193
sys .exit (0 )
@@ -203,15 +200,15 @@ def stop_server(signum=None, frame=None):
203
200
signal .signal (signal .SIGTERM , stop_server )
204
201
205
202
206
- @app .route (' /start' , methods = [' POST' ])
203
+ @app .route (" /start" , methods = [" POST" ])
207
204
def start_service ():
208
205
"""启动大模型推理服务"""
209
206
# 检查服务是否已在运行
210
207
if is_server_running ()[0 ]:
211
208
return Response (
212
209
json .dumps ({"status" : "error" , "message" : "服务已启动,无需start" }, ensure_ascii = False ),
213
210
status = 400 ,
214
- content_type = ' application/json'
211
+ content_type = " application/json" ,
215
212
)
216
213
217
214
try :
@@ -235,7 +232,7 @@ def start_service():
235
232
return Response (
236
233
json .dumps ({"status" : "error" , "message" : str (e )}, ensure_ascii = False ),
237
234
status = 500 ,
238
- content_type = ' application/json'
235
+ content_type = " application/json" ,
239
236
)
240
237
241
238
print ("cmd" , cmd )
@@ -244,17 +241,11 @@ def start_service():
244
241
# 设置环境变量并启动进程
245
242
env = os .environ .copy ()
246
243
247
- with open (LOG_FILE , 'w' ) as log :
248
- process = subprocess .Popen (
249
- cmd ,
250
- stdout = log ,
251
- stderr = log ,
252
- env = env ,
253
- start_new_session = True
254
- )
244
+ with open (LOG_FILE , "w" ) as log :
245
+ process = subprocess .Popen (cmd , stdout = log , stderr = log , env = env , start_new_session = True )
255
246
256
247
# 保存进程ID,port到yaml文件
257
- with open (PID_FILE , 'w' ) as f :
248
+ with open (PID_FILE , "w" ) as f :
258
249
yaml .dump ({"PID" : process .pid , "PORT" : final_config ["--port" ]}, f )
259
250
260
251
json_data = {
@@ -267,24 +258,20 @@ def start_service():
267
258
"port_info" : {
268
259
"api_port" : FD_API_PORT ,
269
260
"queue_port" : FD_ENGINE_QUEUE_PORT ,
270
- "metrics_port" : FD_METRICS_PORT
271
- }
261
+ "metrics_port" : FD_METRICS_PORT ,
262
+ },
272
263
}
273
264
274
- return Response (
275
- json .dumps (json_data , ensure_ascii = False ),
276
- status = 200 ,
277
- content_type = 'application/json'
278
- )
265
+ return Response (json .dumps (json_data , ensure_ascii = False ), status = 200 , content_type = "application/json" )
279
266
except Exception as e :
280
267
return Response (
281
268
json .dumps ({"status" : "error" , "message" : str (e )}, ensure_ascii = False ),
282
269
status = 500 ,
283
- content_type = ' application/json'
270
+ content_type = " application/json" ,
284
271
)
285
272
286
273
287
- @app .route (' /switch' , methods = [' POST' ])
274
+ @app .route (" /switch" , methods = [" POST" ])
288
275
def switch_service ():
289
276
"""切换模型服务"""
290
277
# kill掉已有服务
@@ -311,7 +298,7 @@ def switch_service():
311
298
return Response (
312
299
json .dumps ({"status" : "error" , "message" : str (e )}, ensure_ascii = False ),
313
300
status = 500 ,
314
- content_type = ' application/json'
301
+ content_type = " application/json" ,
315
302
)
316
303
317
304
print ("cmd" , cmd )
@@ -320,17 +307,11 @@ def switch_service():
320
307
# 设置环境变量并启动进程
321
308
env = os .environ .copy ()
322
309
323
- with open (LOG_FILE , 'w' ) as log :
324
- process = subprocess .Popen (
325
- cmd ,
326
- stdout = log ,
327
- stderr = log ,
328
- env = env ,
329
- start_new_session = True
330
- )
310
+ with open (LOG_FILE , "w" ) as log :
311
+ process = subprocess .Popen (cmd , stdout = log , stderr = log , env = env , start_new_session = True )
331
312
332
313
# 保存进程ID,port到yaml文件
333
- with open (PID_FILE , 'w' ) as f :
314
+ with open (PID_FILE , "w" ) as f :
334
315
yaml .dump ({"PID" : process .pid , "PORT" : final_config ["--port" ]}, f )
335
316
336
317
json_data = {
@@ -343,116 +324,98 @@ def switch_service():
343
324
"port_info" : {
344
325
"api_port" : FD_API_PORT ,
345
326
"queue_port" : FD_ENGINE_QUEUE_PORT ,
346
- "metrics_port" : FD_METRICS_PORT
347
- }
327
+ "metrics_port" : FD_METRICS_PORT ,
328
+ },
348
329
}
349
330
350
- return Response (
351
- json .dumps (json_data , ensure_ascii = False ),
352
- status = 200 ,
353
- content_type = 'application/json'
354
- )
331
+ return Response (json .dumps (json_data , ensure_ascii = False ), status = 200 , content_type = "application/json" )
355
332
except Exception as e :
356
333
return Response (
357
334
json .dumps ({"status" : "error" , "message" : str (e )}, ensure_ascii = False ),
358
335
status = 500 ,
359
- content_type = ' application/json'
336
+ content_type = " application/json" ,
360
337
)
361
338
362
339
363
- @app .route (' /status' , methods = [' GET' , ' POST' ])
340
+ @app .route (" /status" , methods = [" GET" , " POST" ])
364
341
def service_status ():
365
342
"""检查服务状态"""
366
343
health , msg = is_server_running ()
367
344
368
345
if not health :
369
- return Response (
370
- json .dumps (msg , ensure_ascii = False ),
371
- status = 500 ,
372
- content_type = 'application/json'
373
- )
346
+ return Response (json .dumps (msg , ensure_ascii = False ), status = 500 , content_type = "application/json" )
374
347
375
348
# 检查端口是否监听
376
349
ports_status = {
377
350
"api_port" : FD_API_PORT if is_port_in_use (FD_API_PORT ) else None ,
378
351
"queue_port" : FD_ENGINE_QUEUE_PORT if is_port_in_use (FD_ENGINE_QUEUE_PORT ) else None ,
379
- "metrics_port" : FD_METRICS_PORT if is_port_in_use (FD_METRICS_PORT ) else None
352
+ "metrics_port" : FD_METRICS_PORT if is_port_in_use (FD_METRICS_PORT ) else None ,
380
353
}
381
354
382
355
msg ["status" ] = "服务启动完成"
383
356
msg ["ports_status" ] = ports_status
384
357
385
- return Response (
386
- json .dumps (msg , ensure_ascii = False ),
387
- status = 200 ,
388
- content_type = 'application/json'
389
- )
358
+ return Response (json .dumps (msg , ensure_ascii = False ), status = 200 , content_type = "application/json" )
390
359
391
360
392
- @app .route (' /stop' , methods = [' POST' ])
361
+ @app .route (" /stop" , methods = [" POST" ])
393
362
def stop_service ():
394
363
"""停止大模型推理服务"""
395
364
res , status_code = stop_server ()
396
365
397
366
return res , status_code
398
367
399
368
400
- @app .route (' /config' , methods = [' GET' ])
369
+ @app .route (" /config" , methods = [" GET" ])
401
370
def get_config ():
402
371
"""获取当前server配置"""
403
372
health , msg = is_server_running ()
404
373
405
374
if not health :
406
- return Response (
407
- json .dumps (msg , ensure_ascii = False ),
408
- status = 500 ,
409
- content_type = 'application/json'
410
- )
375
+ return Response (json .dumps (msg , ensure_ascii = False ), status = 500 , content_type = "application/json" )
411
376
412
377
if not os .path .exists ("log/api_server.log" ):
413
378
return Response (
414
379
json .dumps ({"message" : "api_server.log不存在" }, ensure_ascii = False ),
415
380
status = 500 ,
416
- content_type = ' application/json'
381
+ content_type = " application/json" ,
417
382
)
418
383
419
384
try :
420
385
# 筛选出包含"args:"的行
421
- with open ("log/api_server.log" , 'r' ) as f :
386
+ with open ("log/api_server.log" , "r" ) as f :
422
387
lines = [line for line in f .readlines () if "args:" in line ]
423
388
424
389
last_line = lines [- 1 ] if lines else ""
425
390
426
391
# 使用正则表达式提取JSON格式的配置
427
- match = re .search (r' args\s*[::]\s*(.*)' , last_line )
392
+ match = re .search (r" args\s*[::]\s*(.*)" , last_line )
428
393
if not match :
429
394
return Response (
430
395
json .dumps ({"message" : "api_server.log中没有args信息,请检查log" }, ensure_ascii = False ),
431
396
status = 500 ,
432
- content_type = ' application/json'
397
+ content_type = " application/json" ,
433
398
)
434
399
435
400
# 尝试解析JSON
436
401
config_json = match .group (1 ).strip ()
437
402
config_data = ast .literal_eval (config_json )
438
403
print ("config_data" , config_data , type (config_data ))
439
404
return Response (
440
- json .dumps ({"server_config" : config_data }, ensure_ascii = False ),
441
- status = 200 ,
442
- content_type = 'application/json'
405
+ json .dumps ({"server_config" : config_data }, ensure_ascii = False ), status = 200 , content_type = "application/json"
443
406
)
444
407
445
408
except Exception as e :
446
409
return Response (
447
410
json .dumps ({"message" : "api_server.log解析失败,请检查log" , "error" : str (e )}, ensure_ascii = False ),
448
411
status = 500 ,
449
- content_type = ' application/json'
412
+ content_type = " application/json" ,
450
413
)
451
414
452
415
453
- @app .route (' /wait_for_infer' , methods = [' POST' ])
416
+ @app .route (" /wait_for_infer" , methods = [" POST" ])
454
417
def wait_for_infer ():
455
- timeout = int (request .args .get (' timeout' , 120 )) # 可选超时时间,默认120秒
418
+ timeout = int (request .args .get (" timeout" , 120 )) # 可选超时时间,默认120秒
456
419
interval = 2
457
420
response_interval = 10
458
421
start_time = time .time ()
@@ -470,18 +433,19 @@ def generate():
470
433
ports_status = {
471
434
"api_port" : FD_API_PORT if is_port_in_use (FD_API_PORT ) else None ,
472
435
"queue_port" : FD_ENGINE_QUEUE_PORT if is_port_in_use (FD_ENGINE_QUEUE_PORT ) else None ,
473
- "metrics_port" : FD_METRICS_PORT if is_port_in_use (FD_METRICS_PORT ) else None
436
+ "metrics_port" : FD_METRICS_PORT if is_port_in_use (FD_METRICS_PORT ) else None ,
474
437
}
475
438
msg ["status" ] = "服务启动完成"
476
439
msg ["ports_status" ] = ports_status
477
440
yield json .dumps (msg , ensure_ascii = False ) + "\n "
478
441
break
479
442
480
443
if elapsed >= timeout :
444
+
481
445
def tail_file (path , lines = 50 ):
482
446
try :
483
447
with open (path , "r" , encoding = "utf-8" , errors = "ignore" ) as f :
484
- return '' .join (f .readlines ()[- lines :])
448
+ return "" .join (f .readlines ()[- lines :])
485
449
except Exception as e :
486
450
return f"[无法读取 { path } ]: { e } \n "
487
451
@@ -501,12 +465,12 @@ def tail_file(path, lines=50):
501
465
502
466
time .sleep (interval )
503
467
504
- return Response (generate (), status = 200 , content_type = ' text/plain' )
468
+ return Response (generate (), status = 200 , content_type = " text/plain" )
505
469
506
470
507
- if __name__ == ' __main__' :
471
+ if __name__ == " __main__" :
508
472
print (f"FLASK_PORT: { FLASK_PORT } " )
509
473
print (f"FD_API_PORT: { FD_API_PORT } " )
510
474
print (f"FD_ENGINE_QUEUE_PORT: { FD_ENGINE_QUEUE_PORT } " )
511
475
print (f"FD_METRICS_PORT: { FD_METRICS_PORT } " )
512
- app .run (host = ' 0.0.0.0' , port = FLASK_PORT , debug = False )
476
+ app .run (host = " 0.0.0.0" , port = FLASK_PORT , debug = False )
0 commit comments