first commit
This commit is contained in:
		
						commit
						9b6e170b5d
					
				|  | @ -0,0 +1,392 @@ | |||
| #!/usr/bin/env python | ||||
| 
 | ||||
| 
 | ||||
| from pvrecorder import PvRecorder | ||||
| from pydub import AudioSegment | ||||
| from scipy.io import wavfile | ||||
| import numpy as np | ||||
| import soundfile as sf | ||||
| import argparse | ||||
| import sys, os | ||||
| import struct | ||||
| import wave | ||||
| 
 | ||||
| VERSION = 0.1 | ||||
| 
 | ||||
| def detect_frequency(file_path, duration=60): | ||||
|     """ | ||||
|     Detect the dominant frequency in an audio file. | ||||
| 
 | ||||
|     Parameters: | ||||
|         file_path (str): Path to the audio file. | ||||
|         duration (int, optional): Duration in seconds to analyze. Defaults to 60. | ||||
| 
 | ||||
|     Returns: | ||||
|         float: The dominant frequency in Hz. | ||||
|      | ||||
|     Src: https://github.com/Michael-Sebero/Audio-Frequency-Tools/blob/main/tools/frequency-detector.py | ||||
|     """ | ||||
|     try: | ||||
|         # Read audio file with soundfile (supports many formats) | ||||
|         audio_data, sample_rate = sf.read(file_path, always_2d=True) | ||||
|          | ||||
|         # Extract the first channel if the audio is stereo | ||||
|         audio_data = audio_data[:, 0] | ||||
| 
 | ||||
|         # Truncate or pad the audio to the desired duration | ||||
|         max_samples = int(sample_rate * duration) | ||||
|         audio_data = audio_data[:max_samples] if len(audio_data) > max_samples else np.pad( | ||||
|             audio_data, (0, max_samples - len(audio_data)), 'constant' | ||||
|         ) | ||||
| 
 | ||||
|      | ||||
|         if len(audio_data) == 0: | ||||
|             raise ValueError("Audio file contains no data or is too short for analysis.") | ||||
| 
 | ||||
|         # Perform FFT on the audio data | ||||
|         fft_result = np.fft.rfft(audio_data) | ||||
|         freqs = np.fft.rfftfreq(len(audio_data), 1.0 / sample_rate) | ||||
| 
 | ||||
|         # Find the dominant frequency | ||||
|         dominant_freq = freqs[np.argmax(np.abs(fft_result))] | ||||
|         #print(f"The dominant frequency is {dominant_freq:.2f} Hz.") | ||||
|         return dominant_freq | ||||
| 
 | ||||
|     except FileNotFoundError: | ||||
|         print("Error: The specified audio file was not found.") | ||||
|     except ValueError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except RuntimeError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except Exception as e: | ||||
|         print(f"An unexpected error occurred: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def getDevices(): | ||||
|     """ | ||||
|     Detect and return enum of devices | ||||
| 
 | ||||
|     Return: | ||||
|         enumerate of devices. | ||||
|     """ | ||||
|     return enumerate(PvRecorder.get_available_devices()) | ||||
| 
 | ||||
| 
 | ||||
| def getArgs(parser): | ||||
|     """ | ||||
|     Parsing Args and return them | ||||
| 
 | ||||
|     Argmunents: | ||||
|         parser: Namespace.  | ||||
|      | ||||
|     return: | ||||
|         parser parser. | ||||
|     """ | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "-s","--show_devices", | ||||
|         help="List of audio devices currently available for use.", | ||||
|         action="store_true") | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "-i","--device_index", | ||||
|         help="Index of input audio device. (Default: -1)", | ||||
|         type=int, | ||||
|         default=-1) | ||||
|      | ||||
|     parser.add_argument( | ||||
|         "-r","--rate", | ||||
|         help="Rate config. (Default: 16000)", | ||||
|         type=int, | ||||
|         default=16000) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "-l","--length_frame", | ||||
|         help="Frame size. (Default: 512)", | ||||
|         type=int, | ||||
|         default=512) | ||||
| 
 | ||||
|     parser.add_argument( | ||||
|         "-p","--output_name", | ||||
|         help="Name to file to store raw audio. (Default: recorder.wav)", | ||||
|         type=str, | ||||
|         default="recorder.wav") | ||||
|      | ||||
|     parser.add_argument( | ||||
|         "-v","--verbosity", | ||||
|         help="Increase logging. (Default: False)", | ||||
|         default=False, | ||||
|         action="store_true") | ||||
|      | ||||
|     return parser.parse_args() | ||||
| 
 | ||||
| 
 | ||||
| def saving(audio, output_name, rate, len_frame, wave_dir_path): | ||||
|     """ | ||||
|     Saving audio data in file outut_path. | ||||
| 
 | ||||
|     Arguments: | ||||
|         audio: file containing data audio | ||||
|         output_name: str | ||||
|         rate: int. Audio rate. | ||||
|         len_frame: int. Length of frame | ||||
| 
 | ||||
|     """ | ||||
| 
 | ||||
|     print(f"\nSaving wav file: {wave_dir_path}/{output_name}.") | ||||
| 
 | ||||
|     try:  | ||||
|         with wave.open(wave_dir_path + "/" + output_name, 'w') as f: | ||||
|             f.setparams((1, 2, rate, len_frame, "NONE", "NONE")) | ||||
|             f.writeframes(struct.pack("h" * len(audio), *audio)) | ||||
|     except ValueError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except RuntimeError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except Exception as e: | ||||
|         print(f"An unexpected error occurred: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def recording(dev_index,output_name,rate,len_frame, wave_dir_path): | ||||
|     """ | ||||
|     Create recorder, start listener. | ||||
|     If ctrl + c are pressed: | ||||
|         Stop listening | ||||
|         Close PvRecorder instance. | ||||
|         Call saving function. | ||||
| 
 | ||||
|     Arguments: | ||||
|         dev_index: int. Number of device. (can show it with -s option) | ||||
|         output_name: str. | ||||
|         rate: int. Audio rate. | ||||
|         len_frame: int. Length of frame | ||||
|      | ||||
|     Return: | ||||
|         data and length (in sec) of them. | ||||
|     """ | ||||
| 
 | ||||
|     # Create object for recording | ||||
|     recorder = PvRecorder(device_index=dev_index, frame_length=len_frame) | ||||
|      | ||||
|     # Var containing audio | ||||
|     audio = [] | ||||
| 
 | ||||
|     try: | ||||
|         # Start recorder | ||||
|         recorder.start() | ||||
|         print("Listener is running.") | ||||
| 
 | ||||
|         while True: | ||||
|             # Read and append data in audio buffer | ||||
|             frame = recorder.read() | ||||
|             audio.extend(frame) | ||||
| 
 | ||||
|     except KeyboardInterrupt: | ||||
|         # If ctrl +c stop recoding, delete object and save audio | ||||
|         recorder.stop() | ||||
|         saving(audio,output_name, rate, len_frame,wave_dir_path) | ||||
| 
 | ||||
|     except ValueError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except RuntimeError as e: | ||||
|         print(f"Error: {e}") | ||||
|     except Exception as e: | ||||
|         print(f"An unexpected error occurred: {e}") | ||||
|     finally: | ||||
|         recorder.delete()  | ||||
| 
 | ||||
|     rate, data = wavfile.read(wave_dir_path + "/" + output_name) | ||||
|     length = data.shape[0] / rate | ||||
|     return data,length | ||||
| 
 | ||||
| 
 | ||||
| def showInfos(data, rate,length, output_name,dev_index): | ||||
|     print("  ######## INFOS ########") | ||||
|     print(f"  Value recording = {data}") | ||||
|     print(f"  Data shape: {data.shape[0]}") | ||||
|     print(f"  Rate: {rate}") | ||||
|     print(f"  length: {length}s") | ||||
|     print(f"  Path file: {output_name}") | ||||
|     print(f"  Device Index: {dev_index}") | ||||
|     for index, device in getDevices(): | ||||
|         if index == dev_index: | ||||
|             print(f"  Device name: {device}") | ||||
| 
 | ||||
| 
 | ||||
| def cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length): | ||||
|     """ | ||||
|     Cut original wave in part. Actually support only 8bits | ||||
|     The time duration for isolate data is 0.125s | ||||
| 
 | ||||
|     Function trunc file in many file with time duration.  | ||||
| 
 | ||||
|     Arguments: | ||||
|         output_name: str | ||||
|         length: Duration of original wave. | ||||
| 
 | ||||
|     Return:  | ||||
|         List of all files. | ||||
|     """ | ||||
|     bits_audio = AudioSegment.from_wav(os.path.join(wave_dir_path, output_name)) | ||||
|     begin = 0 | ||||
|     bits_time = 125 # milli = 0.125s. Its the time duration for bits information | ||||
|     file_number = (length * 1000 ) / bits_time | ||||
|     count = 0 | ||||
|     tmp_tab = [] | ||||
|     while count < file_number:  | ||||
|         new_audio = bits_audio[begin:begin + bits_time] | ||||
|         filepath = str(f"{tmp_dir_path}/{count}-bits-{begin}-{begin + bits_time}.wav") | ||||
|         new_audio.export(filepath, format="wav") | ||||
|         tmp_tab.append(filepath) | ||||
|         begin += bits_time | ||||
|         count += 1 | ||||
| 
 | ||||
|     return tmp_tab | ||||
| 
 | ||||
| 
 | ||||
| def readBits(bits_tab_wave): | ||||
|     """ | ||||
|     Read all files cutted and isolate freq. | ||||
|      | ||||
|     Values:  | ||||
|         if freq is > at 1500 => 1 | ||||
|         if freq is < 1500 and > 500 => 0 | ||||
|         else pass. | ||||
| 
 | ||||
|     Arguments:  | ||||
|         List containing all files cutted | ||||
| 
 | ||||
|     Return: | ||||
|         Message in binary | ||||
|      | ||||
|     """ | ||||
| 
 | ||||
|     bits_message = [] | ||||
|     print("Extracting data from wav file:") | ||||
|     for waves in bits_tab_wave: | ||||
|         freq = detect_frequency(waves) | ||||
|         if freq < 500: | ||||
|             sys.stdout.write('.') | ||||
|         elif freq < 1500 and freq > 500: | ||||
|             sys.stdout.write("0") | ||||
|             bits_message.append(0) | ||||
|         elif freq >= 1500: | ||||
|             sys.stdout.write("1") | ||||
|             bits_message.append(1) | ||||
|          | ||||
|         sys.stdout.flush() | ||||
| 
 | ||||
|     return bits_message | ||||
|   | ||||
| 
 | ||||
| def showArgs(parser): | ||||
|     print("Args value:") | ||||
|     for arg,opt in parser.parse_args().__dict__.items(): | ||||
|         print('  %s: %s' % (arg, opt)) | ||||
| 
 | ||||
| 
 | ||||
| def decodeMessage(message): | ||||
|     """ | ||||
|     Function decoding message.  | ||||
| 
 | ||||
|     Argument: | ||||
|         Message: str. String containing message in binary | ||||
| 
 | ||||
|     Return: | ||||
|         Message decoded. | ||||
|     """ | ||||
| 
 | ||||
|     print("\nDecoding message:") | ||||
|      | ||||
|     i = 0 | ||||
|     j = 0 | ||||
|     final_message = [] | ||||
| 
 | ||||
|     while i <len(message): | ||||
|         if message[i] in (0, 1): | ||||
| 
 | ||||
|             tmp_msg = message[i:i+8] | ||||
|              | ||||
|             tmp_msg.reverse() | ||||
|             binary_tab=''.join(str(x) for x in tmp_msg) | ||||
|              | ||||
|             c=bytes("0b" + binary_tab, "utf-8") | ||||
|             binary_str= int(c,2) | ||||
|             try: | ||||
|                 final_message.append(binary_str.to_bytes((binary_str.bit_length() + 7) // 8, 'big').decode('utf-8'))  | ||||
|             except: | ||||
|                 final_message.append("?") | ||||
| 
 | ||||
|             # Jump to next bits | ||||
|             i += 8 | ||||
|         else: | ||||
|             i += 1 | ||||
|         print(".", end='') | ||||
| 
 | ||||
|     return final_message | ||||
| 
 | ||||
| 
 | ||||
| def checkEnv(root_path, tmp_dir_path, wave_dir_path): | ||||
| 
 | ||||
|     for dir in (root_path, tmp_dir_path, wave_dir_path): | ||||
|         if not os.path.exists(dir): | ||||
|             try: | ||||
|                 os.mkdir(dir) | ||||
|             except Exception as e: | ||||
|                 print(f"An error occurred: {e}") | ||||
| 
 | ||||
| 
 | ||||
| def welcome(): | ||||
|     print(f"""\ | ||||
|     ____                           __          | ||||
|    / __ \___  _________  _________/ /__  _____ | ||||
|   / /_/ / _ \/ ___/ __ \/ ___/ __  / _ \/ ___/ | ||||
|  / _, _/  __/ /__/ /_/ / /  / /_/ /  __/ /     | ||||
| /_/ |_|\___/\___/\____/_/   \__,_/\___/_/      | ||||
| 
 | ||||
|                 Author: Snorky | ||||
|                 Version : {VERSION}                                 | ||||
| """) | ||||
| 
 | ||||
| def main(): | ||||
| 
 | ||||
|     welcome() | ||||
|     parser = argparse.ArgumentParser() | ||||
|     args = getArgs(parser)   | ||||
|     showArgs(parser) | ||||
| 
 | ||||
|     verbosity = False | ||||
|     root_path = "/home/" + os.getlogin() + "/.cache/recorder" | ||||
|     tmp_dir_path = root_path + "/tmp" | ||||
|     wave_dir_path = root_path + "/wave" | ||||
|     checkEnv(root_path, tmp_dir_path, wave_dir_path) | ||||
|   | ||||
|     if args.show_devices: | ||||
|         devices = getDevices() | ||||
|         for index, device in devices: | ||||
|             print(f"[{index}] {device}") | ||||
|         exit(2) | ||||
|      | ||||
|     if args.verbosity: | ||||
|         verbosity=True | ||||
|          | ||||
|     output_name = args.output_name | ||||
|     dev_index = args.device_index | ||||
|     length_frame = args.length_frame | ||||
|     rate = args.rate | ||||
|      | ||||
|     data, length = recording(dev_index, output_name, rate, length_frame, wave_dir_path) | ||||
| 
 | ||||
|     if verbosity: | ||||
|         showInfos(data, rate,length,output_name,dev_index) | ||||
| 
 | ||||
|     bits_tab_wave = cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length) | ||||
| 
 | ||||
|     message = readBits(bits_tab_wave) | ||||
|     final_message = decodeMessage(message) | ||||
|         | ||||
|     print(f"\nMessage transmitted is: {''.join(final_message)}") | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 Snorky
						Snorky