Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| import asyncio | |
| import ray | |
| from ray.util.queue import Queue | |
| class FFMpegConverterActor: | |
| def __init__(self, output_queue: Queue, buffer_size: int = 1920, output_format: str='s16le'): | |
| self.output_queue = output_queue | |
| self.buffer_size = buffer_size | |
| self.output_format = output_format | |
| self.input_pipe = None | |
| self.output_pipe = None | |
| self.process = None | |
| async def run(self): | |
| while True: | |
| chunk = await self.output_pipe.readexactly(self.buffer_size) | |
| # print(f"FFMpegConverterActor: read {len(chunk)} bytes") | |
| await self.output_queue.put_async(chunk) | |
| async def start_process(self): | |
| cmd = [ | |
| 'ffmpeg', | |
| '-i', 'pipe:0', # read from stdin | |
| '-f', self.output_format, | |
| '-ar', '48000', | |
| '-ac', '1', | |
| 'pipe:1' # write to stdout | |
| ] | |
| self.process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdin=asyncio.subprocess.PIPE, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| self.input_pipe = self.process.stdin | |
| self.output_pipe = self.process.stdout | |
| assert self.input_pipe is not None, "input_pipe was not initialized" | |
| # print (f"input_pipe: {self.input_pipe}") | |
| async def push_chunk(self, chunk): | |
| try: | |
| self.input_pipe.write(chunk) | |
| # await self.input_pipe.drain() | |
| except BrokenPipeError: | |
| # If the pipe is broken, restart the process. | |
| await self.start_process() | |
| self.input_pipe.write(chunk) | |
| # await self.input_pipe.drain() | |
| # def has_processed_all_data(self): | |
| # return self.process.poll() is not None | |
| def flush_output_queue(self): | |
| while not self.output_queue.empty(): | |
| self.output_queue.get() | |
| def close(self): | |
| self.input_pipe.close() | |
| self.output_pipe.close() | |
| self.process.wait() | 
