Skip to content

Commit 4f37b0c

Browse files
author
Lukas Wingerberg
committed
add some prometheus metrics
1 parent e6694d8 commit 4f37b0c

File tree

1 file changed

+54
-86
lines changed

1 file changed

+54
-86
lines changed

server/grpc-ffmpeg.py

Lines changed: 54 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
import ffmpeg_pb2
1515
import ffmpeg_pb2_grpc
1616

17-
from prometheus_client import Gauge, Counter, make_asgi_app
18-
from prometheus_client import start_http_server as start_prometheus_server
17+
from prometheus_client import Gauge, Counter, generate_latest, CONTENT_TYPE_LATEST
1918

2019
# Required for MediaInfo and ffmpeg health check
2120
import aiofiles
@@ -41,9 +40,9 @@
4140
health_status = {'healthy': False}
4241

4342
# Prometheus metrics
44-
ffmpeg_gauge = Gauge('ffmpeg_processes', 'Number of running ffmpeg processes')
45-
mediainfo_counter = Counter('mediainfo_commands', 'Number of mediainfo commands executed')
46-
ffprobe_counter = Counter('ffprobe_commands', 'Number of ffprobe commands executed')
43+
ffmpeg_process_gauge = Gauge("ffmpeg_process_count", "Number of running ffmpeg processes")
44+
mediainfo_counter = Counter("mediainfo_commands", "Number of mediainfo commands executed")
45+
ffprobe_counter = Counter("ffprobe_commands", "Number of ffprobe commands executed")
4746

4847
class TokenAuthValidator(grpc.AuthMetadataPlugin):
4948
def __call__(self, context, callback):
@@ -76,46 +75,42 @@ async def ExecuteCommand(self, request, context):
7675

7776
# Reconstruct the command
7877
command = shlex.join(tokens)
78+
if "ffmpeg" in tokens[0] and HEALTHCHECK_FILE not in tokens:
79+
ffmpeg_process_gauge.inc()
7980

80-
# Track metrics for this command but dont include health check commands
81-
binary = tokens[0].split('/')[-1]
82-
exclude_health_check = binary == 'ffmpeg' and HEALTHCHECK_FILE in command
83-
84-
if binary == 'ffmpeg' and not exclude_health_check:
85-
ffmpeg_gauge.inc()
86-
elif binary == 'mediainfo':
87-
mediainfo_counter.inc()
88-
elif binary == 'ffprobe':
89-
ffprobe_counter.inc()
90-
91-
process = await asyncio.create_subprocess_shell(
92-
command,
93-
stdout=asyncio.subprocess.PIPE,
94-
stderr=asyncio.subprocess.PIPE
95-
)
96-
97-
async def read_stream(stream, response_type, stream_name):
98-
while True:
99-
line = await stream.readline()
100-
if not line:
101-
break
102-
logger.info(f'{stream_name}: {line.decode("utf-8").strip()}')
103-
yield response_type(output=line.decode('utf-8'), stream=stream_name)
104-
105-
async for response in read_stream(process.stdout, ffmpeg_pb2.CommandResponse, "stdout"):
106-
yield response
107-
108-
async for response in read_stream(process.stderr, ffmpeg_pb2.CommandResponse, "stderr"):
109-
yield response
110-
111-
await process.wait()
112-
113-
# decrease counter whenever ffmpeg processes complete
114-
if binary == 'ffmpeg' and not exclude_health_check:
115-
ffmpeg_gauge.dec()
116-
117-
exit_code = process.returncode
118-
yield ffmpeg_pb2.CommandResponse(exit_code=exit_code, stream="exit_code")
81+
try:
82+
process = await asyncio.create_subprocess_shell(
83+
command,
84+
stdout=asyncio.subprocess.PIPE,
85+
stderr=asyncio.subprocess.PIPE
86+
)
87+
88+
async def read_stream(stream, response_type, stream_name):
89+
while True:
90+
line = await stream.readline()
91+
if not line:
92+
break
93+
logger.info(f'{stream_name}: {line.decode("utf-8").strip()}')
94+
yield response_type(output=line.decode('utf-8'), stream=stream_name)
95+
96+
async for response in read_stream(process.stdout, ffmpeg_pb2.CommandResponse, "stdout"):
97+
yield response
98+
99+
async for response in read_stream(process.stderr, ffmpeg_pb2.CommandResponse, "stderr"):
100+
yield response
101+
102+
await process.wait()
103+
exit_code = process.returncode
104+
yield ffmpeg_pb2.CommandResponse(exit_code=exit_code, stream="exit_code")
105+
106+
# Update metrics
107+
if "mediainfo" in tokens[0]:
108+
mediainfo_counter.inc()
109+
elif "ffprobe" in tokens[0]:
110+
ffprobe_counter.inc()
111+
finally:
112+
if "ffmpeg" in tokens[0] and HEALTHCHECK_FILE not in tokens:
113+
ffmpeg_process_gauge.dec()
119114

120115
async def health_check(self):
121116
logger.info("Running initial health check...")
@@ -164,47 +159,22 @@ def update_health_status(self, is_healthy):
164159
global health_status
165160
health_status['healthy'] = is_healthy
166161

167-
async def run_command(self, command, exclude_health_check=False):
168-
"""
169-
Executes a shell command asynchronously, ensuring proper cleanup to prevent zombie processes.
170-
"""
171-
binary = command.split()[0].split('/')[-1]
172-
173-
# Track Prometheus metrics
174-
if binary == 'ffmpeg' and not exclude_health_check:
175-
ffmpeg_gauge.inc()
176-
elif binary == 'mediainfo':
177-
mediainfo_counter.inc()
178-
elif binary == 'ffprobe':
179-
ffprobe_counter.inc()
180-
181-
logger.debug(f"Executing command: {command}")
162+
async def run_command(self, command):
182163
process = await asyncio.create_subprocess_shell(
183164
command,
184165
stdout=asyncio.subprocess.PIPE,
185166
stderr=asyncio.subprocess.PIPE
186167
)
187168

188-
try:
189-
stdout, stderr = await process.communicate()
190-
if process.returncode != 0:
191-
logger.error(f"Command failed with exit code {process.returncode}: {stderr.decode().strip()}")
192-
raise RuntimeError(stderr.decode().strip())
169+
stdout, stderr = await process.communicate()
170+
output = stdout.decode().strip() if stdout else ""
171+
error = stderr.decode().strip() if stderr else ""
193172

194-
logger.debug(f"Command output: {stdout.decode().strip()}")
195-
return stdout.decode().strip()
196-
finally:
197-
if binary == 'ffmpeg' and not exclude_health_check:
198-
ffmpeg_gauge.dec()
173+
if process.returncode != 0:
174+
logger.error(f"Command '{command}' failed with error: {error}")
175+
return f"Command '{command}' failed with error: {error}"
199176

200-
output = stdout.decode().strip() if stdout else ""
201-
error = stderr.decode().strip() if stderr else ""
202-
203-
if process.returncode != 0:
204-
logger.error(f"Command '{command}' failed with error: {error}")
205-
return f"Command '{command}' failed with error: {error}"
206-
207-
return output
177+
return output
208178

209179
async def is_file_valid(self, filename):
210180
try:
@@ -239,27 +209,25 @@ async def start_grpc_server():
239209
finally:
240210
await server.stop(0)
241211

242-
async def health_check_runner():
243-
service = FFmpegService()
244-
await service.health_check()
245-
246212
async def start_http_server():
247213
async def health_check(request):
248214
if health_status['healthy']:
249215
return web.Response(text="OK")
250216
else:
251217
return web.Response(text="Health check failed", status=500)
252-
253-
prometheus_app = make_asgi_app()
218+
219+
async def metrics(request):
220+
return web.Response(body=generate_latest(), content_type=CONTENT_TYPE_LATEST)
221+
254222
app = web.Application()
255223
app.router.add_get('/health', health_check)
256-
app.router.add_route('*', '/metrics', web._run_app(prometheus_app))
224+
app.router.add_get('/metrics', metrics) # Prometheus metrics endpoint
257225

258226
runner = web.AppRunner(app)
259227
await runner.setup()
260228
site = web.TCPSite(runner, '0.0.0.0', 8080)
261229
await site.start()
262-
logger.info('Health check server started on http://localhost:8080/health')
230+
logger.info('http endpoint server started on http://localhost:8080 /health and /metrics')
263231

264232
# Keep the server alive until shutdown
265233
try:
@@ -272,7 +240,7 @@ async def health_check(request):
272240

273241
async def ffmpeg_server():
274242
grpc_task = asyncio.create_task(start_grpc_server())
275-
health_task = asyncio.create_task(health_check_runner())
243+
health_task = asyncio.create_task(FFmpegService().health_check())
276244
http_task = asyncio.create_task(start_http_server()) # Treat HTTP server as a task
277245

278246
try:

0 commit comments

Comments
 (0)