#!/usr/bin/env python from pvrecorder import PvRecorder from pydub import AudioSegment from scipy.io import wavfile import numpy as np import soundfile as sf import argparse import sys, os import struct import wave AUTHOR = "Snorky" VERSION = 0.1 def detect_frequency(file_path, duration=60): """ Detect the dominant frequency in an audio file. Parameters: file_path (str): Path to the audio file. duration (int, optional): Duration in seconds to analyze. Defaults to 60. Returns: float: The dominant frequency in Hz. Src: https://github.com/Michael-Sebero/Audio-Frequency-Tools/blob/main/tools/frequency-detector.py """ try: # Read audio file with soundfile (supports many formats) audio_data, sample_rate = sf.read(file_path, always_2d=True) # Extract the first channel if the audio is stereo audio_data = audio_data[:, 0] # Truncate or pad the audio to the desired duration max_samples = int(sample_rate * duration) audio_data = audio_data[:max_samples] if len(audio_data) > max_samples else np.pad( audio_data, (0, max_samples - len(audio_data)), 'constant' ) if len(audio_data) == 0: raise ValueError("Audio file contains no data or is too short for analysis.") # Perform FFT on the audio data fft_result = np.fft.rfft(audio_data) freqs = np.fft.rfftfreq(len(audio_data), 1.0 / sample_rate) # Find the dominant frequency dominant_freq = freqs[np.argmax(np.abs(fft_result))] #print(f"The dominant frequency is {dominant_freq:.2f} Hz.") return dominant_freq except FileNotFoundError: print("Error: The specified audio file was not found.") except ValueError as e: print(f"Error: {e}") except RuntimeError as e: print(f"Error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") def getDevices(): """ Detect and return enum of devices Return: enumerate of devices. """ return enumerate(PvRecorder.get_available_devices()) def getArgs(parser): """ Parsing Args and return them Argmunents: parser: Namespace. return: parser parser. """ parser.add_argument( "-s","--show_devices", help="List of audio devices currently available for use.", action="store_true") parser.add_argument( "-i","--device_index", help="Index of input audio device. (Default: -1)", type=int, default=-1) parser.add_argument( "-r","--rate", help="Rate config. (Default: 16000)", type=int, default=16000) parser.add_argument( "-l","--length_frame", help="Frame size. (Default: 512)", type=int, default=512) parser.add_argument( "-p","--output_name", help="Name to file to store raw audio. (Default: recorder.wav)", type=str, default="recorder.wav") parser.add_argument( "-v","--verbosity", help="Increase logging. (Default: False)", default=False, action="store_true") return parser.parse_args() def saving(audio, output_name, rate, len_frame, wave_dir_path): """ Saving audio data in file outut_path. Arguments: audio: file containing data audio output_name: str rate: int. Audio rate. len_frame: int. Length of frame """ print(f"\nSaving wav file: {wave_dir_path}/{output_name}.") try: with wave.open(wave_dir_path + "/" + output_name, 'w') as f: f.setparams((1, 2, rate, len_frame, "NONE", "NONE")) f.writeframes(struct.pack("h" * len(audio), *audio)) except ValueError as e: print(f"Error: {e}") except RuntimeError as e: print(f"Error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") def recording(dev_index,output_name,rate,len_frame, wave_dir_path): """ Create recorder, start listener. If ctrl + c are pressed: Stop listening Close PvRecorder instance. Call saving function. Arguments: dev_index: int. Number of device. (can show it with -s option) output_name: str. rate: int. Audio rate. len_frame: int. Length of frame Return: data and length (in sec) of them. """ # Create object for recording recorder = PvRecorder(device_index=dev_index, frame_length=len_frame) # Var containing audio audio = [] try: # Start recorder recorder.start() print("Listener is running.") print("Ctrl + c for quit.") while True: # Read and append data in audio buffer frame = recorder.read() audio.extend(frame) except KeyboardInterrupt: # If ctrl +c stop recoding, delete object and save audio recorder.stop() saving(audio,output_name, rate, len_frame,wave_dir_path) except ValueError as e: print(f"Error: {e}") except RuntimeError as e: print(f"Error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") finally: recorder.delete() rate, data = wavfile.read(wave_dir_path + "/" + output_name) length = data.shape[0] / rate return data,length def showInfos(data, rate,length, output_name,dev_index): print(" ######## INFOS ########") print(f" Value recording = {data}") print(f" Data shape: {data.shape[0]}") print(f" Rate: {rate}") print(f" length: {length}s") print(f" Path file: {output_name}") print(f" Device Index: {dev_index}") for index, device in getDevices(): if index == dev_index: print(f" Device name: {device}") def cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length): """ Cut original wave in part. Actually support only 8bits The time duration for isolate data is 0.125s Function trunc file in many file with time duration. Arguments: output_name: str length: Duration of original wave. Return: List of all files. """ bits_audio = AudioSegment.from_wav(os.path.join(wave_dir_path, output_name)) begin = 0 bits_time = 125 # milli = 0.125s. Its the time duration for bits information file_number = (length * 1000 ) / bits_time count = 0 tmp_tab = [] while count < file_number: new_audio = bits_audio[begin:begin + bits_time] filepath = str(f"{tmp_dir_path}/{count}-bits-{begin}-{begin + bits_time}.wav") new_audio.export(filepath, format="wav") tmp_tab.append(filepath) begin += bits_time count += 1 return tmp_tab def readBits(bits_tab_wave): """ Read all files cutted and isolate freq. Values: if freq is > at 1500 => 1 if freq is < 1500 and > 500 => 0 else pass. Arguments: List containing all files cutted Return: Message in binary """ bits_message = [] print("Extracting data from wav file:") for waves in bits_tab_wave: freq = detect_frequency(waves) if freq < 500: sys.stdout.write('.') elif freq < 1500 and freq > 500: sys.stdout.write("0") bits_message.append(0) elif freq >= 1500: sys.stdout.write("1") bits_message.append(1) sys.stdout.flush() return bits_message def showArgs(parser): print("Args value:") for arg,opt in parser.parse_args().__dict__.items(): print(' %s: %s' % (arg, opt)) def decodeMessage(message): """ Function decoding message. Argument: Message: str. String containing message in binary Return: Message decoded. """ print("\nDecoding message:") i = 0 j = 0 final_message = [] while i