2025-06-18 21:46:20 +02:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
|
|
from pvrecorder import PvRecorder
|
|
|
|
from pydub import AudioSegment
|
|
|
|
from scipy.io import wavfile
|
|
|
|
import numpy as np
|
|
|
|
import soundfile as sf
|
|
|
|
import argparse
|
|
|
|
import sys, os
|
|
|
|
import struct
|
|
|
|
import wave
|
|
|
|
|
2025-06-19 15:28:27 +02:00
|
|
|
AUTHOR = "Snorky"
|
2025-06-18 21:46:20 +02:00
|
|
|
VERSION = 0.1
|
|
|
|
|
|
|
|
def detect_frequency(file_path, duration=60):
|
|
|
|
"""
|
|
|
|
Detect the dominant frequency in an audio file.
|
|
|
|
|
|
|
|
Parameters:
|
|
|
|
file_path (str): Path to the audio file.
|
|
|
|
duration (int, optional): Duration in seconds to analyze. Defaults to 60.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
float: The dominant frequency in Hz.
|
|
|
|
|
|
|
|
Src: https://github.com/Michael-Sebero/Audio-Frequency-Tools/blob/main/tools/frequency-detector.py
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
# Read audio file with soundfile (supports many formats)
|
|
|
|
audio_data, sample_rate = sf.read(file_path, always_2d=True)
|
|
|
|
|
|
|
|
# Extract the first channel if the audio is stereo
|
|
|
|
audio_data = audio_data[:, 0]
|
|
|
|
|
|
|
|
# Truncate or pad the audio to the desired duration
|
|
|
|
max_samples = int(sample_rate * duration)
|
|
|
|
audio_data = audio_data[:max_samples] if len(audio_data) > max_samples else np.pad(
|
|
|
|
audio_data, (0, max_samples - len(audio_data)), 'constant'
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if len(audio_data) == 0:
|
|
|
|
raise ValueError("Audio file contains no data or is too short for analysis.")
|
|
|
|
|
|
|
|
# Perform FFT on the audio data
|
|
|
|
fft_result = np.fft.rfft(audio_data)
|
|
|
|
freqs = np.fft.rfftfreq(len(audio_data), 1.0 / sample_rate)
|
|
|
|
|
|
|
|
# Find the dominant frequency
|
|
|
|
dominant_freq = freqs[np.argmax(np.abs(fft_result))]
|
|
|
|
#print(f"The dominant frequency is {dominant_freq:.2f} Hz.")
|
|
|
|
return dominant_freq
|
|
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
print("Error: The specified audio file was not found.")
|
|
|
|
except ValueError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except RuntimeError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except Exception as e:
|
|
|
|
print(f"An unexpected error occurred: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def getDevices():
|
|
|
|
"""
|
|
|
|
Detect and return enum of devices
|
|
|
|
|
|
|
|
Return:
|
|
|
|
enumerate of devices.
|
|
|
|
"""
|
|
|
|
return enumerate(PvRecorder.get_available_devices())
|
|
|
|
|
|
|
|
|
|
|
|
def getArgs(parser):
|
|
|
|
"""
|
|
|
|
Parsing Args and return them
|
|
|
|
|
|
|
|
Argmunents:
|
|
|
|
parser: Namespace.
|
|
|
|
|
|
|
|
return:
|
|
|
|
parser parser.
|
|
|
|
"""
|
|
|
|
parser.add_argument(
|
|
|
|
"-s","--show_devices",
|
|
|
|
help="List of audio devices currently available for use.",
|
|
|
|
action="store_true")
|
|
|
|
|
|
|
|
parser.add_argument(
|
|
|
|
"-i","--device_index",
|
|
|
|
help="Index of input audio device. (Default: -1)",
|
|
|
|
type=int,
|
|
|
|
default=-1)
|
|
|
|
|
|
|
|
parser.add_argument(
|
|
|
|
"-r","--rate",
|
|
|
|
help="Rate config. (Default: 16000)",
|
|
|
|
type=int,
|
|
|
|
default=16000)
|
|
|
|
|
|
|
|
parser.add_argument(
|
|
|
|
"-l","--length_frame",
|
|
|
|
help="Frame size. (Default: 512)",
|
|
|
|
type=int,
|
|
|
|
default=512)
|
|
|
|
|
|
|
|
parser.add_argument(
|
|
|
|
"-p","--output_name",
|
|
|
|
help="Name to file to store raw audio. (Default: recorder.wav)",
|
|
|
|
type=str,
|
|
|
|
default="recorder.wav")
|
|
|
|
|
|
|
|
parser.add_argument(
|
|
|
|
"-v","--verbosity",
|
|
|
|
help="Increase logging. (Default: False)",
|
|
|
|
default=False,
|
|
|
|
action="store_true")
|
|
|
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
def saving(audio, output_name, rate, len_frame, wave_dir_path):
|
|
|
|
"""
|
|
|
|
Saving audio data in file outut_path.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
audio: file containing data audio
|
|
|
|
output_name: str
|
|
|
|
rate: int. Audio rate.
|
|
|
|
len_frame: int. Length of frame
|
|
|
|
"""
|
|
|
|
print(f"\nSaving wav file: {wave_dir_path}/{output_name}.")
|
|
|
|
|
|
|
|
try:
|
|
|
|
with wave.open(wave_dir_path + "/" + output_name, 'w') as f:
|
|
|
|
f.setparams((1, 2, rate, len_frame, "NONE", "NONE"))
|
|
|
|
f.writeframes(struct.pack("h" * len(audio), *audio))
|
|
|
|
except ValueError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except RuntimeError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except Exception as e:
|
|
|
|
print(f"An unexpected error occurred: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def recording(dev_index,output_name,rate,len_frame, wave_dir_path):
|
|
|
|
"""
|
|
|
|
Create recorder, start listener.
|
|
|
|
If ctrl + c are pressed:
|
|
|
|
Stop listening
|
|
|
|
Close PvRecorder instance.
|
|
|
|
Call saving function.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
dev_index: int. Number of device. (can show it with -s option)
|
|
|
|
output_name: str.
|
|
|
|
rate: int. Audio rate.
|
|
|
|
len_frame: int. Length of frame
|
|
|
|
|
|
|
|
Return:
|
|
|
|
data and length (in sec) of them.
|
|
|
|
"""
|
|
|
|
# Create object for recording
|
|
|
|
recorder = PvRecorder(device_index=dev_index, frame_length=len_frame)
|
|
|
|
|
|
|
|
# Var containing audio
|
|
|
|
audio = []
|
|
|
|
|
|
|
|
try:
|
|
|
|
# Start recorder
|
|
|
|
recorder.start()
|
|
|
|
print("Listener is running.")
|
2025-06-18 22:21:07 +02:00
|
|
|
print("Ctrl + c for quit.")
|
2025-06-18 21:46:20 +02:00
|
|
|
|
|
|
|
while True:
|
|
|
|
# Read and append data in audio buffer
|
|
|
|
frame = recorder.read()
|
|
|
|
audio.extend(frame)
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
# If ctrl +c stop recoding, delete object and save audio
|
|
|
|
recorder.stop()
|
|
|
|
saving(audio,output_name, rate, len_frame,wave_dir_path)
|
|
|
|
|
|
|
|
except ValueError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except RuntimeError as e:
|
|
|
|
print(f"Error: {e}")
|
|
|
|
except Exception as e:
|
|
|
|
print(f"An unexpected error occurred: {e}")
|
|
|
|
finally:
|
|
|
|
recorder.delete()
|
|
|
|
|
|
|
|
rate, data = wavfile.read(wave_dir_path + "/" + output_name)
|
|
|
|
length = data.shape[0] / rate
|
|
|
|
return data,length
|
|
|
|
|
|
|
|
|
|
|
|
def showInfos(data, rate,length, output_name,dev_index):
|
|
|
|
print(" ######## INFOS ########")
|
|
|
|
print(f" Value recording = {data}")
|
|
|
|
print(f" Data shape: {data.shape[0]}")
|
|
|
|
print(f" Rate: {rate}")
|
|
|
|
print(f" length: {length}s")
|
|
|
|
print(f" Path file: {output_name}")
|
|
|
|
print(f" Device Index: {dev_index}")
|
|
|
|
for index, device in getDevices():
|
|
|
|
if index == dev_index:
|
|
|
|
print(f" Device name: {device}")
|
|
|
|
|
|
|
|
|
|
|
|
def cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length):
|
|
|
|
"""
|
|
|
|
Cut original wave in part. Actually support only 8bits
|
|
|
|
The time duration for isolate data is 0.125s
|
|
|
|
|
|
|
|
Function trunc file in many file with time duration.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
output_name: str
|
|
|
|
length: Duration of original wave.
|
|
|
|
|
|
|
|
Return:
|
|
|
|
List of all files.
|
|
|
|
"""
|
|
|
|
bits_audio = AudioSegment.from_wav(os.path.join(wave_dir_path, output_name))
|
|
|
|
begin = 0
|
|
|
|
bits_time = 125 # milli = 0.125s. Its the time duration for bits information
|
|
|
|
file_number = (length * 1000 ) / bits_time
|
|
|
|
count = 0
|
|
|
|
tmp_tab = []
|
|
|
|
while count < file_number:
|
|
|
|
new_audio = bits_audio[begin:begin + bits_time]
|
|
|
|
filepath = str(f"{tmp_dir_path}/{count}-bits-{begin}-{begin + bits_time}.wav")
|
|
|
|
new_audio.export(filepath, format="wav")
|
|
|
|
tmp_tab.append(filepath)
|
|
|
|
begin += bits_time
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
return tmp_tab
|
|
|
|
|
|
|
|
|
|
|
|
def readBits(bits_tab_wave):
|
|
|
|
"""
|
|
|
|
Read all files cutted and isolate freq.
|
|
|
|
|
|
|
|
Values:
|
|
|
|
if freq is > at 1500 => 1
|
|
|
|
if freq is < 1500 and > 500 => 0
|
|
|
|
else pass.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
List containing all files cutted
|
|
|
|
|
|
|
|
Return:
|
|
|
|
Message in binary
|
2025-06-18 22:00:37 +02:00
|
|
|
|
2025-06-18 21:46:20 +02:00
|
|
|
"""
|
|
|
|
bits_message = []
|
|
|
|
print("Extracting data from wav file:")
|
|
|
|
for waves in bits_tab_wave:
|
|
|
|
freq = detect_frequency(waves)
|
|
|
|
if freq < 500:
|
|
|
|
sys.stdout.write('.')
|
|
|
|
elif freq < 1500 and freq > 500:
|
|
|
|
sys.stdout.write("0")
|
|
|
|
bits_message.append(0)
|
|
|
|
elif freq >= 1500:
|
|
|
|
sys.stdout.write("1")
|
|
|
|
bits_message.append(1)
|
|
|
|
|
|
|
|
sys.stdout.flush()
|
|
|
|
return bits_message
|
|
|
|
|
|
|
|
|
|
|
|
def showArgs(parser):
|
|
|
|
print("Args value:")
|
|
|
|
for arg,opt in parser.parse_args().__dict__.items():
|
|
|
|
print(' %s: %s' % (arg, opt))
|
|
|
|
|
|
|
|
|
|
|
|
def decodeMessage(message):
|
|
|
|
"""
|
|
|
|
Function decoding message.
|
|
|
|
|
|
|
|
Argument:
|
|
|
|
Message: str. String containing message in binary
|
|
|
|
|
|
|
|
Return:
|
|
|
|
Message decoded.
|
|
|
|
"""
|
|
|
|
print("\nDecoding message:")
|
|
|
|
|
|
|
|
i = 0
|
|
|
|
j = 0
|
|
|
|
final_message = []
|
|
|
|
|
|
|
|
while i <len(message):
|
|
|
|
if message[i] in (0, 1):
|
|
|
|
|
|
|
|
tmp_msg = message[i:i+8]
|
|
|
|
|
|
|
|
tmp_msg.reverse()
|
|
|
|
binary_tab=''.join(str(x) for x in tmp_msg)
|
|
|
|
|
|
|
|
c=bytes("0b" + binary_tab, "utf-8")
|
|
|
|
binary_str= int(c,2)
|
|
|
|
try:
|
|
|
|
final_message.append(binary_str.to_bytes((binary_str.bit_length() + 7) // 8, 'big').decode('utf-8'))
|
|
|
|
except:
|
|
|
|
final_message.append("?")
|
|
|
|
|
|
|
|
# Jump to next bits
|
|
|
|
i += 8
|
|
|
|
else:
|
|
|
|
i += 1
|
|
|
|
print(".", end='')
|
|
|
|
return final_message
|
|
|
|
|
|
|
|
|
|
|
|
def checkEnv(root_path, tmp_dir_path, wave_dir_path):
|
|
|
|
for dir in (root_path, tmp_dir_path, wave_dir_path):
|
|
|
|
if not os.path.exists(dir):
|
|
|
|
try:
|
|
|
|
os.mkdir(dir)
|
|
|
|
except Exception as e:
|
|
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def welcome():
|
|
|
|
print(f"""\
|
|
|
|
____ __
|
|
|
|
/ __ \___ _________ _________/ /__ _____
|
|
|
|
/ /_/ / _ \/ ___/ __ \/ ___/ __ / _ \/ ___/
|
|
|
|
/ _, _/ __/ /__/ /_/ / / / /_/ / __/ /
|
|
|
|
/_/ |_|\___/\___/\____/_/ \__,_/\___/_/
|
|
|
|
|
2025-06-19 15:28:27 +02:00
|
|
|
Author: {AUTHOR}
|
2025-06-18 21:46:20 +02:00
|
|
|
Version : {VERSION}
|
|
|
|
""")
|
|
|
|
|
2025-06-19 15:28:27 +02:00
|
|
|
|
2025-06-18 21:46:20 +02:00
|
|
|
def main():
|
|
|
|
|
|
|
|
welcome()
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
args = getArgs(parser)
|
|
|
|
showArgs(parser)
|
|
|
|
|
|
|
|
verbosity = False
|
2025-06-19 15:28:27 +02:00
|
|
|
""" root_path = "/home/" + os.getlogin() + "/.cache/recorder"
|
2025-06-18 21:46:20 +02:00
|
|
|
tmp_dir_path = root_path + "/tmp"
|
2025-06-19 15:28:27 +02:00
|
|
|
wave_dir_path = root_path + "/wave" """
|
|
|
|
|
|
|
|
root_path = os.path.join(os.path.expanduser('~'), ".cache/recorder")
|
|
|
|
tmp_dir_path = os.path.join(root_path, "tmp")
|
|
|
|
wave_dir_path = os.path.join(root_path, "wave")
|
2025-06-18 21:46:20 +02:00
|
|
|
checkEnv(root_path, tmp_dir_path, wave_dir_path)
|
|
|
|
|
|
|
|
if args.show_devices:
|
|
|
|
devices = getDevices()
|
|
|
|
for index, device in devices:
|
|
|
|
print(f"[{index}] {device}")
|
|
|
|
exit(2)
|
|
|
|
|
|
|
|
if args.verbosity:
|
|
|
|
verbosity=True
|
|
|
|
|
|
|
|
output_name = args.output_name
|
|
|
|
dev_index = args.device_index
|
|
|
|
length_frame = args.length_frame
|
|
|
|
rate = args.rate
|
|
|
|
|
|
|
|
data, length = recording(dev_index, output_name, rate, length_frame, wave_dir_path)
|
|
|
|
|
|
|
|
if verbosity:
|
|
|
|
showInfos(data, rate,length,output_name,dev_index)
|
|
|
|
|
|
|
|
bits_tab_wave = cutWaveInBits(wave_dir_path, tmp_dir_path, output_name, length)
|
|
|
|
|
|
|
|
message = readBits(bits_tab_wave)
|
|
|
|
final_message = decodeMessage(message)
|
|
|
|
|
|
|
|
print(f"\nMessage transmitted is: {''.join(final_message)}")
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|