안녕하세요 민윤홍 입니다.
이번에는 TinyML을 사용해서 WakeWord Detection을 진행한 포스팅입니다.
당시 회사에서 회사제품을 이용한 프로젝트를 잘 만들면 순금을 주는 제도가 있었는데, 금을 받고싶기도 했고, 저 스스로를 증명하고 싶어서 이를 갈고 제작했던 프로젝트입니다.
당시 회사에서 압도적인 최고점을 받고 1등을 하여 순금 2돈을 포상으로 받았던 기억이 있었던 만큼, 좋았던 기억도 있지만 허점도 있었던 만큼 여러모로 저한테는 감회가 색달랐던 프로젝트입니다.
https://maker.wiznet.io/Acorn_/projects/tinyml%2Dwakeword%2Ddetection/
TinyML - WakeWord Detection
Let’s do wakeword detection with W5500-evb-pico.
maker.wiznet.io
PROJECT DESCRIPTION
Summary
이 게시물은 음성 인식 시스템에 대해 다룹니다. 여기서는 Fourier Transform과 STFT(Short Time Fourier Transform)를 포함한 음성 인식의 원리를 설명하고, Arduino와 Python을 사용하여 음성 데이터를 수집하고 전처리하는 코드 예시를 제공합니다. 또한 모델을 훈련하고 이를 TensorFlow Lite로 변환하는 방법도 포함됩니다. 시스템 아키텍처와 작동 원리를 자세히 설명하며, Arduino와 Python 코드를 활용한 구현 단계도 설명합니다.
Overview
리소스가 제한된 IoT 장치에서의 Wake Word Detection 기술을 더 잘 이해하기 위해, iPhone과 같은 고성능 장치보다 훨씬 덜 강력한 Arduino에서 조사를 진행할 것입니다.
기존 데이터 세트를 사용하여 모델을 훈련하고, 이를 TensorFlow Lite 모델로 변환한 후 Arduino에 배포합니다. 그런 다음 Arduino IDE 메시지를 통해 출력 정보를 얻을 것입니다.
Prior Knowledge
Principles of voice recognition - sound
음원은 다음과 같은 파형으로 나타낼 수 있습니다.
음원 자체에서 특징 값을 추출하는 것은 어렵고, 특징 값의 수가 지수적으로 증가하므로 센서 값만으로 특징을 추출하는 것은 불가능합니다.
Fourier transform and spectrogram
- 음원에 대해 푸리에 변환을 수행하여 [시간 축]의 파형을 [주파수 축]의 파형으로 변환합니다.
여기서 특징을 추출할 수 있는 것은 주파수에서 이득이 큰 부분입니다. - 그러나 음성은 시간적인 특성을 가지므로 주파수 특성만으로 분류하는 것은 어렵습니다.
- 예를 들어, 주파수 특성만을 본다면 "Banana"와 "naBana"를 구분할 수 없습니다.
STFT(Short Time Fourier Transform)
- 음원의 시간을 짧은 구간으로 나누어 푸리에 변환을 시도합니다.
- 푸리에 변환된 그래프를 반시계 방향으로 회전하여 누적된 형태를 STFT(Short-Time Fourier Transform)이라고 합니다.
- 주파수 범위의 크기는 0과 255 사이의 값으로 표현됩니다.
- 이 코드에서는 (257X4) 텐서로 변환하여 푸리에 변환을 시도했습니다.
System Diagram
- 데이터셋을 PC에서 스펙트로그램으로 변환한 후 훈련을 진행합니다.
- CNN(Convolutional Neural Network)을 사용하여 학습을 진행하고, 학습된 데이터를 TFLite로 변환한 후 이를 .cpp 형식으로 이진화하여 Arduino 모델에 맞게 변환합니다.
- 이후 Arduino는 해당 모델을 로드하고 센서 값을 모델에 주입하여 추론을 수행합니다.
Code
Get_sounds.ino
#include <fix_fft.h>
const int analogSensorPin = 26;
const int ledPin = LED_BUILTIN;
const int sampleRate = 1000;
const int sampleTime = 1;
const int totalSamples = sampleRate * sampleTime;
int16_t vReal[totalSamples];
int16_t vImag[totalSamples];
unsigned long startTime;
void fft_wrapper(int16_t* vReal, int16_t* vImag, int n, int inverse) {
fix_fft((char*)vReal, (char*)vImag, n, inverse);
}
void setup() {
Serial.begin(9600);
pinMode(ledPin, OUTPUT);
}
void loop() {
//Serial.print("fft calculate");
digitalWrite(ledPin, HIGH);
startTime = millis();
for (int i = 0; i < totalSamples; i++) {
vReal[i] = analogRead(analogSensorPin);
vImag[i] = 0; // 허수부는 0으로 초기화
while (millis() < startTime + (i * 1000.0 / sampleRate));
}
digitalWrite(ledPin, LOW);
// FFT 계산
fft_wrapper(vReal, vImag, 10, 0);
// FFT 결과 출력
for (int i = 0; i < totalSamples / 2; i++) {
double frequency = (i * 1.0 * sampleRate) / totalSamples;
double magnitude = sqrt(vReal[i] * vReal[i] + vImag[i] * vImag[i]);
//Serial.print(frequency);
//Serial.print(",");
Serial.println(magnitude);
}
delay(2000);
}
get_voice_data.py
import serial
import numpy as np
import librosa
import csv
# 아두이노에서 데이터 읽어오기
ser = serial.Serial('COM13', 9600)
sampleRate = 1000
sampleTime = 1
totalSamples = sampleRate * sampleTime
while True:
try:
data = []
while len(data) < totalSamples:
if ser.in_waiting:
value = ser.readline().decode().strip()
if value:
data.append(float(value))
# 데이터 전처리
data = np.array(data)
data = data / 1023.0 # 정규화
# 오디오 데이터 변환
sr = sampleRate # 샘플링 레이트
audio = librosa.resample(data, orig_sr=sr, target_sr=sr) # 리샘플링
stft = librosa.stft(audio, n_fft=512, hop_length=256) # STFT 적용
spectrogram = librosa.amplitude_to_db(np.abs(stft), ref=np.max) # 스펙트로그램 변환
# 스펙트로그램 데이터를 CSV 파일로 저장
with open('you.csv', 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(spectrogram)
except KeyboardInterrupt:
break
ser.close()
Train_voice_data.ipynb
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
# CSV 파일 읽어오기
nothing_data = pd.read_csv('nothing.csv', header=None)
wiznet_data = pd.read_csv('wiznet.csv', header=None)
you_data = pd.read_csv('you.csv', header=None)
# 데이터 병합 및 레이블 할당
data = np.vstack((nothing_data, wiznet_data, you_data))
labels = np.concatenate((np.zeros(len(nothing_data)), np.ones(len(wiznet_data)), np.ones(len(you_data))*2))
# 학습 데이터와 테스트 데이터 분리
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)
# 입력 데이터 크기 확인
input_shape = X_train.shape[1]
# 모델 구성
model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(input_shape,)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(3, activation='softmax')
])
# 모델 컴파일
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# 모델 학습
model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))
# 모델 평가
test_loss, test_acc = model.evaluate(X_test, y_test)
print('Test accuracy:', test_acc)
# TensorFlow Lite 변환
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
# TensorFlow Lite 모델 저장
with open('voice_recognition_model.tflite', 'wb') as f:
f.write(tflite_model)
def representative_dataset_gen():
for i in range(len(X_train)):
yield [X_train[i].astype(np.float32)]
# TensorFlow Lite 변환 및 완전 정수 양자화
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
tflite_quant_model = converter.convert()
# 완전 정수 양자화된 TensorFlow Lite 모델 저장
with open('voice_recognition_model_quant.tflite', 'wb') as f:
f.write(tflite_quant_model)
이것은 훈련 결과입니다.
데이터셋 자체가 충분하지 않기 때문에 학습 상황이 불안정하게 나타납니다. 손실(loss)의 경우 갑작스러운 급증으로 인해 정확도가 떨어지지만, 전체적으로는 학습이 진행되는 것으로 보입니다.
세 가지 분류 유형이 있고 데이터가 부족하다는 점을 고려했을 때, 예상보다 더 높은 정확도로 만족할 수 있습니다.
WakeWord_Detection.ino
#include <TensorFlowLite.h>
#define ARDUINO_EXCLUDE_CODE
#include "tensorflow/lite/micro/kernels/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_error_reporter.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/schema/schema_generated.h"
#include "tensorflow/lite/version.h"
#undef ARDUINO_EXCLUDE_CODE
#include "voice_recognition_model.h" // 변환된 모델 파일 포함
#include <fix_fft.h>
// 모델 관련 상수 정의
const int kTensorArenaSize = 4 * 1024; // 텐서 아레나 크기 증가
const int kNumInputs = 1;
const int kNumOutputs = 1;
const int kInputFrames = 4;
const int kInputShape[4] = {1, 257, kInputFrames, 1};
const int kOutputSize = 3;
const int analogSensorPin = 26;
const int ledPin = LED_BUILTIN;
const int sampleRate = 1000;
const int sampleTime = 1;
const int totalSamples = sampleRate * sampleTime;
int16_t vReal[totalSamples];
int16_t vImag[totalSamples];
unsigned long startTime;
// 텐서 아레나 메모리 할당
uint8_t tensor_arena[kTensorArenaSize];
// 오디오 입력 버퍼
float audio_buffer[257 * kInputFrames];
int audio_buffer_index = 0;
// 모델 추론 함수
String inference(float* input_data) {
// 에러 리포터 설정
tflite::MicroErrorReporter micro_error_reporter;
tflite::ErrorReporter* error_reporter = µ_error_reporter;
// 플랫버퍼 모델 포인터 설정
const tflite::Model* model = ::tflite::GetModel(voice_recognition_model_tflite);
if (model->version() != TFLITE_SCHEMA_VERSION) {
return "Model schema mismatch";
}
// 모델 연산자 설정
tflite::ops::micro::AllOpsResolver resolver;
// 인터프리터 생성
tflite::MicroInterpreter interpreter(model, resolver, tensor_arena, kTensorArenaSize, error_reporter);
// 텐서 할당
interpreter.AllocateTensors();
// 입력 텐서 포인터 얻기
TfLiteTensor* input = interpreter.input(0);
// 입력 데이터 복사
for (int i = 0; i < 257 * kInputFrames; i++) {
input->data.f[i] = input_data[i];
}
// 추론 실행
TfLiteStatus invoke_status = interpreter.Invoke();
if (invoke_status != kTfLiteOk) {
return "Invoke failed";
}
// 출력 텐서 포인터 얻기
TfLiteTensor* output = interpreter.output(0);
// 출력 결과 처리
int predicted_class = 0;
float max_probability = output->data.f[0];
for (int i = 1; i < kOutputSize; i++) {
if (output->data.f[i] > max_probability) {
predicted_class = i;
max_probability = output->data.f[i];
}
}
// 결과 반환
if (predicted_class == 0) {
return "Nothing";
} else if (predicted_class == 1) {
return "WIZnet";
} else if (predicted_class == 2) {
return "You";
}
return "Unknown";
}
void fft_wrapper(int16_t* vReal, int16_t* vImag, int n, int inverse) {
fix_fft((char*)vReal, (char*)vImag, n, inverse);
}
void setup() {
// 시리얼 통신 초기화
Serial.begin(9600);
pinMode(ledPin, OUTPUT);
}
void loop() {
Serial.print("PLEASE Recognized word: ");
// 오디오 입력 받기
if (audio_buffer_index < 257 * kInputFrames) {
digitalWrite(ledPin, HIGH);
startTime = millis();
for (int i = 0; i < totalSamples; i++) {
vReal[i] = analogRead(analogSensorPin);
vImag[i] = 0; // 허수부는 0으로 초기화
while (millis() < startTime + (i * 1000.0 / sampleRate));
}
digitalWrite(ledPin, LOW);
// FFT 계산
fft_wrapper(vReal, vImag, 10, 0);
// FFT 결과를 audio_buffer에 저장
for (int i = 0; i < totalSamples / 2; i++) {
double magnitude = sqrt(vReal[i] * vReal[i] + vImag[i] * vImag[i]);
audio_buffer[audio_buffer_index++] = magnitude;
}
}
// 오디오 버퍼가 가득 찼을 때 추론 수행
if (audio_buffer_index >= 257 * kInputFrames) {
// 추론 실행
String result = inference(audio_buffer);
// 결과 출력
Serial.print("Recognized word: ");
Serial.println(result);
// 오디오 버퍼 초기화
audio_buffer_index = 0;
}
delay(500); // 500밀리초 대기
}
Result
https://www.youtube.com/shorts/CfJKs5EapFk
'AIoT' 카테고리의 다른 글
TinyML(2) - using low-sensitivity sensor to predict high-performance sensor (0) | 2024.12.24 |
---|---|
TinyML(1) - Beginning TinyML with W5500-evb-pico (0) | 2024.12.23 |
W5X00-EVB-Pico with SSL/TLS (0) | 2024.12.19 |
L2P - LLM to Pico (1) | 2024.12.18 |