From 9b6e170b5d3c655c3d46ac63464e8568f9bb0a66 Mon Sep 17 00:00:00 2001 From: Snorky Date: Wed, 18 Jun 2025 21:46:20 +0200 Subject: [PATCH] first commit --- recorder.py | 392 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 392 insertions(+) create mode 100755 recorder.py diff --git a/recorder.py b/recorder.py new file mode 100755 index 0000000..18c2f39 --- /dev/null +++ b/recorder.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python + + +from pvrecorder import PvRecorder +from pydub import AudioSegment +from scipy.io import wavfile +import numpy as np +import soundfile as sf +import argparse +import sys, os +import struct +import wave + +VERSION = 0.1 + +def detect_frequency(file_path, duration=60): + """ + Detect the dominant frequency in an audio file. + + Parameters: + file_path (str): Path to the audio file. + duration (int, optional): Duration in seconds to analyze. Defaults to 60. + + Returns: + float: The dominant frequency in Hz. + + Src: https://github.com/Michael-Sebero/Audio-Frequency-Tools/blob/main/tools/frequency-detector.py + """ + try: + # Read audio file with soundfile (supports many formats) + audio_data, sample_rate = sf.read(file_path, always_2d=True) + + # Extract the first channel if the audio is stereo + audio_data = audio_data[:, 0] + + # Truncate or pad the audio to the desired duration + max_samples = int(sample_rate * duration) + audio_data = audio_data[:max_samples] if len(audio_data) > max_samples else np.pad( + audio_data, (0, max_samples - len(audio_data)), 'constant' + ) + + + if len(audio_data) == 0: + raise ValueError("Audio file contains no data or is too short for analysis.") + + # Perform FFT on the audio data + fft_result = np.fft.rfft(audio_data) + freqs = np.fft.rfftfreq(len(audio_data), 1.0 / sample_rate) + + # Find the dominant frequency + dominant_freq = freqs[np.argmax(np.abs(fft_result))] + #print(f"The dominant frequency is {dominant_freq:.2f} Hz.") + return dominant_freq + + except FileNotFoundError: + print("Error: The specified audio file was not found.") + except ValueError as e: + print(f"Error: {e}") + except RuntimeError as e: + print(f"Error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +def getDevices(): + """ + Detect and return enum of devices + + Return: + enumerate of devices. + """ + return enumerate(PvRecorder.get_available_devices()) + + +def getArgs(parser): + """ + Parsing Args and return them + + Argmunents: + parser: Namespace. + + return: + parser parser. + """ + + parser.add_argument( + "-s","--show_devices", + help="List of audio devices currently available for use.", + action="store_true") + + parser.add_argument( + "-i","--device_index", + help="Index of input audio device. (Default: -1)", + type=int, + default=-1) + + parser.add_argument( + "-r","--rate", + help="Rate config. (Default: 16000)", + type=int, + default=16000) + + parser.add_argument( + "-l","--length_frame", + help="Frame size. (Default: 512)", + type=int, + default=512) + + parser.add_argument( + "-p","--output_name", + help="Name to file to store raw audio. (Default: recorder.wav)", + type=str, + default="recorder.wav") + + parser.add_argument( + "-v","--verbosity", + help="Increase logging. (Default: False)", + default=False, + action="store_true") + + return parser.parse_args() + + +def saving(audio, output_name, rate, len_frame, wave_dir_path): + """ + Saving audio data in file outut_path. + + Arguments: + audio: file containing data audio + output_name: str + rate: int. Audio rate. + len_frame: int. Length of frame + + """ + + print(f"\nSaving wav file: {wave_dir_path}/{output_name}.") + + try: + with wave.open(wave_dir_path + "/" + output_name, 'w') as f: + f.setparams((1, 2, rate, len_frame, "NONE", "NONE")) + f.writeframes(struct.pack("h" * len(audio), *audio)) + except ValueError as e: + print(f"Error: {e}") + except RuntimeError as e: + print(f"Error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +def recording(dev_index,output_name,rate,len_frame, wave_dir_path): + """ + Create recorder, start listener. + If ctrl + c are pressed: + Stop listening + Close PvRecorder instance. + Call saving function. + + Arguments: + dev_index: int. Number of device. (can show it with -s option) + output_name: str. + rate: int. Audio rate. + len_frame: int. Length of frame + + Return: + data and length (in sec) of them. + """ + + # Create object for recording + recorder = PvRecorder(device_index=dev_index, frame_length=len_frame) + + # Var containing audio + audio = [] + + try: + # Start recorder + recorder.start() + print("Listener is running.") + + while True: + # Read and append data in audio buffer + frame = recorder.read() + audio.extend(frame) + + except KeyboardInterrupt: + # If ctrl +c stop recoding, delete object and save audio + recorder.stop() + saving(audio,output_name, rate, len_frame,wave_dir_path) + + except ValueError as e: + print(f"Error: {e}") + except RuntimeError as e: + print(f"Error: {e}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + finally: + recorder.delete() + + rate, data = wavfile.read(wave_dir_path + "/" + output_name) + length = data.shape[0] / rate + return data,length + + +def showInfos(data, rate,length, output_name,dev_index): + print(" ######## INFOS ########") + print(f" Value recording = {data}") + print(f" Data shape: {data.shape[0]}") + print(f" Rate: {rate}") + print(f" length: {length}s") + print(f" Path file: {output_name}") + print(f" Device Index: {dev_index}") + for index, device in getDevices(): + if index == dev_index: + print(f" Device name: {device}") + + +def cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length): + """ + Cut original wave in part. Actually support only 8bits + The time duration for isolate data is 0.125s + + Function trunc file in many file with time duration. + + Arguments: + output_name: str + length: Duration of original wave. + + Return: + List of all files. + """ + bits_audio = AudioSegment.from_wav(os.path.join(wave_dir_path, output_name)) + begin = 0 + bits_time = 125 # milli = 0.125s. Its the time duration for bits information + file_number = (length * 1000 ) / bits_time + count = 0 + tmp_tab = [] + while count < file_number: + new_audio = bits_audio[begin:begin + bits_time] + filepath = str(f"{tmp_dir_path}/{count}-bits-{begin}-{begin + bits_time}.wav") + new_audio.export(filepath, format="wav") + tmp_tab.append(filepath) + begin += bits_time + count += 1 + + return tmp_tab + + +def readBits(bits_tab_wave): + """ + Read all files cutted and isolate freq. + + Values: + if freq is > at 1500 => 1 + if freq is < 1500 and > 500 => 0 + else pass. + + Arguments: + List containing all files cutted + + Return: + Message in binary + + """ + + bits_message = [] + print("Extracting data from wav file:") + for waves in bits_tab_wave: + freq = detect_frequency(waves) + if freq < 500: + sys.stdout.write('.') + elif freq < 1500 and freq > 500: + sys.stdout.write("0") + bits_message.append(0) + elif freq >= 1500: + sys.stdout.write("1") + bits_message.append(1) + + sys.stdout.flush() + + return bits_message + + +def showArgs(parser): + print("Args value:") + for arg,opt in parser.parse_args().__dict__.items(): + print(' %s: %s' % (arg, opt)) + + +def decodeMessage(message): + """ + Function decoding message. + + Argument: + Message: str. String containing message in binary + + Return: + Message decoded. + """ + + print("\nDecoding message:") + + i = 0 + j = 0 + final_message = [] + + while i