recorder/recorder.py

385 lines
10 KiB
Python
Raw Permalink Normal View History

2025-06-18 21:46:20 +02:00
#!/usr/bin/env python
from pvrecorder import PvRecorder
from pydub import AudioSegment
from scipy.io import wavfile
import numpy as np
import soundfile as sf
import argparse
import sys, os
import struct
import wave
2025-06-19 15:28:27 +02:00
AUTHOR = "Snorky"
2025-06-18 21:46:20 +02:00
VERSION = 0.1
def detect_frequency(file_path, duration=60):
"""
Detect the dominant frequency in an audio file.
Parameters:
file_path (str): Path to the audio file.
duration (int, optional): Duration in seconds to analyze. Defaults to 60.
Returns:
float: The dominant frequency in Hz.
Src: https://github.com/Michael-Sebero/Audio-Frequency-Tools/blob/main/tools/frequency-detector.py
"""
try:
# Read audio file with soundfile (supports many formats)
audio_data, sample_rate = sf.read(file_path, always_2d=True)
# Extract the first channel if the audio is stereo
audio_data = audio_data[:, 0]
# Truncate or pad the audio to the desired duration
max_samples = int(sample_rate * duration)
audio_data = audio_data[:max_samples] if len(audio_data) > max_samples else np.pad(
audio_data, (0, max_samples - len(audio_data)), 'constant'
)
if len(audio_data) == 0:
raise ValueError("Audio file contains no data or is too short for analysis.")
# Perform FFT on the audio data
fft_result = np.fft.rfft(audio_data)
freqs = np.fft.rfftfreq(len(audio_data), 1.0 / sample_rate)
# Find the dominant frequency
dominant_freq = freqs[np.argmax(np.abs(fft_result))]
#print(f"The dominant frequency is {dominant_freq:.2f} Hz.")
return dominant_freq
except FileNotFoundError:
print("Error: The specified audio file was not found.")
except ValueError as e:
print(f"Error: {e}")
except RuntimeError as e:
print(f"Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def getDevices():
"""
Detect and return enum of devices
Return:
enumerate of devices.
"""
return enumerate(PvRecorder.get_available_devices())
def getArgs(parser):
"""
Parsing Args and return them
Argmunents:
parser: Namespace.
return:
parser parser.
"""
parser.add_argument(
"-s","--show_devices",
help="List of audio devices currently available for use.",
action="store_true")
parser.add_argument(
"-i","--device_index",
help="Index of input audio device. (Default: -1)",
type=int,
default=-1)
parser.add_argument(
"-r","--rate",
help="Rate config. (Default: 16000)",
type=int,
default=16000)
parser.add_argument(
"-l","--length_frame",
help="Frame size. (Default: 512)",
type=int,
default=512)
parser.add_argument(
"-p","--output_name",
help="Name to file to store raw audio. (Default: recorder.wav)",
type=str,
default="recorder.wav")
parser.add_argument(
"-v","--verbosity",
help="Increase logging. (Default: False)",
default=False,
action="store_true")
return parser.parse_args()
def saving(audio, output_name, rate, len_frame, wave_dir_path):
"""
Saving audio data in file outut_path.
Arguments:
audio: file containing data audio
output_name: str
rate: int. Audio rate.
len_frame: int. Length of frame
"""
print(f"\nSaving wav file: {wave_dir_path}/{output_name}.")
try:
with wave.open(wave_dir_path + "/" + output_name, 'w') as f:
f.setparams((1, 2, rate, len_frame, "NONE", "NONE"))
f.writeframes(struct.pack("h" * len(audio), *audio))
except ValueError as e:
print(f"Error: {e}")
except RuntimeError as e:
print(f"Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def recording(dev_index,output_name,rate,len_frame, wave_dir_path):
"""
Create recorder, start listener.
If ctrl + c are pressed:
Stop listening
Close PvRecorder instance.
Call saving function.
Arguments:
dev_index: int. Number of device. (can show it with -s option)
output_name: str.
rate: int. Audio rate.
len_frame: int. Length of frame
Return:
data and length (in sec) of them.
"""
# Create object for recording
recorder = PvRecorder(device_index=dev_index, frame_length=len_frame)
# Var containing audio
audio = []
try:
# Start recorder
recorder.start()
print("Listener is running.")
2025-06-18 22:21:07 +02:00
print("Ctrl + c for quit.")
2025-06-18 21:46:20 +02:00
while True:
# Read and append data in audio buffer
frame = recorder.read()
audio.extend(frame)
except KeyboardInterrupt:
# If ctrl +c stop recoding, delete object and save audio
recorder.stop()
saving(audio,output_name, rate, len_frame,wave_dir_path)
except ValueError as e:
print(f"Error: {e}")
except RuntimeError as e:
print(f"Error: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
recorder.delete()
rate, data = wavfile.read(wave_dir_path + "/" + output_name)
length = data.shape[0] / rate
return data,length
def showInfos(data, rate,length, output_name,dev_index):
print(" ######## INFOS ########")
print(f" Value recording = {data}")
print(f" Data shape: {data.shape[0]}")
print(f" Rate: {rate}")
print(f" length: {length}s")
print(f" Path file: {output_name}")
print(f" Device Index: {dev_index}")
for index, device in getDevices():
if index == dev_index:
print(f" Device name: {device}")
def cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length):
"""
Cut original wave in part. Actually support only 8bits
The time duration for isolate data is 0.125s
Function trunc file in many file with time duration.
Arguments:
output_name: str
length: Duration of original wave.
Return:
List of all files.
"""
bits_audio = AudioSegment.from_wav(os.path.join(wave_dir_path, output_name))
begin = 0
bits_time = 125 # milli = 0.125s. Its the time duration for bits information
file_number = (length * 1000 ) / bits_time
count = 0
tmp_tab = []
while count < file_number:
new_audio = bits_audio[begin:begin + bits_time]
filepath = str(f"{tmp_dir_path}/{count}-bits-{begin}-{begin + bits_time}.wav")
new_audio.export(filepath, format="wav")
tmp_tab.append(filepath)
begin += bits_time
count += 1
return tmp_tab
def readBits(bits_tab_wave):
"""
Read all files cutted and isolate freq.
Values:
if freq is > at 1500 => 1
if freq is < 1500 and > 500 => 0
else pass.
Arguments:
List containing all files cutted
Return:
Message in binary
2025-06-18 22:00:37 +02:00
2025-06-18 21:46:20 +02:00
"""
bits_message = []
print("Extracting data from wav file:")
for waves in bits_tab_wave:
freq = detect_frequency(waves)
if freq < 500:
sys.stdout.write('.')
elif freq < 1500 and freq > 500:
sys.stdout.write("0")
bits_message.append(0)
elif freq >= 1500:
sys.stdout.write("1")
bits_message.append(1)
sys.stdout.flush()
return bits_message
def showArgs(parser):
print("Args value:")
for arg,opt in parser.parse_args().__dict__.items():
print(' %s: %s' % (arg, opt))
def decodeMessage(message):
"""
Function decoding message.
Argument:
Message: str. String containing message in binary
Return:
Message decoded.
"""
print("\nDecoding message:")
i = 0
j = 0
final_message = []
while i <len(message):
if message[i] in (0, 1):
tmp_msg = message[i:i+8]
tmp_msg.reverse()
binary_tab=''.join(str(x) for x in tmp_msg)
c=bytes("0b" + binary_tab, "utf-8")
binary_str= int(c,2)
try:
final_message.append(binary_str.to_bytes((binary_str.bit_length() + 7) // 8, 'big').decode('utf-8'))
except:
final_message.append("?")
# Jump to next bits
i += 8
else:
i += 1
print(".", end='')
return final_message
def checkEnv(root_path, tmp_dir_path, wave_dir_path):
for dir in (root_path, tmp_dir_path, wave_dir_path):
if not os.path.exists(dir):
try:
os.mkdir(dir)
except Exception as e:
print(f"An error occurred: {e}")
def welcome():
print(f"""\
____ __
/ __ \___ _________ _________/ /__ _____
/ /_/ / _ \/ ___/ __ \/ ___/ __ / _ \/ ___/
/ _, _/ __/ /__/ /_/ / / / /_/ / __/ /
/_/ |_|\___/\___/\____/_/ \__,_/\___/_/
2025-06-19 15:28:27 +02:00
Author: {AUTHOR}
2025-06-18 21:46:20 +02:00
Version : {VERSION}
""")
2025-06-19 15:28:27 +02:00
2025-06-18 21:46:20 +02:00
def main():
welcome()
parser = argparse.ArgumentParser()
args = getArgs(parser)
showArgs(parser)
verbosity = False
2025-06-19 15:28:27 +02:00
root_path = os.path.join(os.path.expanduser('~'), ".cache/recorder")
tmp_dir_path = os.path.join(root_path, "tmp")
wave_dir_path = os.path.join(root_path, "wave")
2025-06-18 21:46:20 +02:00
checkEnv(root_path, tmp_dir_path, wave_dir_path)
if args.show_devices:
devices = getDevices()
for index, device in devices:
print(f"[{index}] {device}")
exit(2)
if args.verbosity:
verbosity=True
output_name = args.output_name
dev_index = args.device_index
length_frame = args.length_frame
rate = args.rate
data, length = recording(dev_index, output_name, rate, length_frame, wave_dir_path)
if verbosity:
showInfos(data, rate,length,output_name,dev_index)
bits_tab_wave = cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length)
message = readBits(bits_tab_wave)
final_message = decodeMessage(message)
print(f"\nMessage transmitted is: {''.join(final_message)}")
if __name__ == "__main__":
main()