N150 N300 T3K P100 P150 P300C Galaxy 30 min Validated

Overview

Build a real-time audio processing pipeline using TTNN for signal processing operations. This project demonstrates practical DSP (Digital Signal Processing) on TT hardware.

Features:

Why This Project:

Time: 45 minutes | Difficulty: Intermediate


Example Output

Mel-Spectrogram Visualization

Mel-spectrogram of a music sample processed on TT hardware. Shows frequency components over time, similar to what Whisper uses for speech recognition.


Deploy the Project

📦 Deploy All Cookbook Projects

This creates the project in ~/tt-scratchpad/cookbook/audio_processor/.


Implementation

Step 1: Core Audio Processor (processor.py)

"""
Audio signal processing using TTNN
Implements mel-spectrogram, MFCC, and real-time effects
"""

import ttnn
import torch
import numpy as np
import librosa
from scipy import signal

class AudioProcessor:
    def __init__(self, device, sample_rate=44100, n_fft=2048, hop_length=512, n_mels=128):
        """
        Initialize audio processor on TT hardware.

        Args:
            device: TTNN device handle
            sample_rate: Audio sample rate (Hz)
            n_fft: FFT window size (must be power of 2)
            hop_length: Number of samples between successive frames
            n_mels: Number of mel frequency bins
        """
        self.device = device
        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels

        # Pre-compute mel filterbank on device
        self.mel_filterbank = self._create_mel_filterbank()

        # Pre-compute window function (Hann window)
        self.window = self._create_window()

    def _create_mel_filterbank(self):
        """
        Create mel-scale filterbank matrix.
        Converts linear frequency bins to perceptual mel scale.
        """
        # Use librosa to generate mel filterbank
        mel_fb = librosa.filters.mel(
            sr=self.sample_rate,
            n_fft=self.n_fft,
            n_mels=self.n_mels,
            fmin=0,
            fmax=self.sample_rate // 2
        )

        # Move to device
        return ttnn.from_torch(
            torch.from_numpy(mel_fb).float(),
            device=self.device,
            layout=ttnn.TILE_LAYOUT
        )

    def _create_window(self):
        """Create Hann window for STFT."""
        window = torch.hann_window(self.n_fft, periodic=True)
        return ttnn.from_torch(
            window,
            device=self.device,
            layout=ttnn.TILE_LAYOUT
        )

    def load_audio(self, file_path, duration=None, offset=0.0):
        """
        Load audio file.

        Args:
            file_path: Path to audio file (WAV, MP3, etc.)
            duration: Optional duration to load (seconds)
            offset: Start time (seconds)

        Returns:
            Torch tensor of audio samples
        """
        audio, sr = librosa.load(
            file_path,
            sr=self.sample_rate,
            duration=duration,
            offset=offset
        )

        # Convert mono to tensor
        return torch.from_numpy(audio).float()

    def compute_stft(self, audio):
        """
        Compute Short-Time Fourier Transform.

        Args:
            audio: 1D audio tensor

        Returns:
            Complex STFT tensor (freq_bins, time_frames)
        """
        # Convert to TTNN
        audio_tt = ttnn.from_torch(audio, device=self.device)

        # Compute STFT using TTNN FFT
        # Note: STFT = overlapping windows + FFT for each window
        num_frames = 1 + (len(audio) - self.n_fft) // self.hop_length
        stft_result = []

        for frame_idx in range(num_frames):
            # Extract frame
            start = frame_idx * self.hop_length
            frame = audio[start:start + self.n_fft]

            if len(frame) < self.n_fft:
                # Pad last frame
                frame = torch.nn.functional.pad(frame, (0, self.n_fft - len(frame)))

            # Move to device and apply window
            frame_tt = ttnn.from_torch(frame, device=self.device)
            windowed = ttnn.multiply(frame_tt, self.window)

            # Compute FFT
            fft_result = ttnn.fft.rfft(windowed)
            stft_result.append(fft_result)

        # Stack frames
        stft = ttnn.stack(stft_result, dim=-1)
        return stft

    def compute_mel_spectrogram(self, audio):
        """
        Compute mel-spectrogram from audio.

        Pipeline:
        1. STFT (time domain → frequency domain)
        2. Power spectrum (magnitude squared)
        3. Mel filterbank (linear freq → mel scale)
        4. Log scale (perceptual compression)

        Args:
            audio: 1D audio tensor or file path

        Returns:
            Mel-spectrogram (n_mels, time_frames) on CPU
        """
        # Load if file path given
        if isinstance(audio, str):
            audio = self.load_audio(audio)

        # Compute STFT
        stft = self.compute_stft(audio)

        # Power spectrum: |STFT|^2
        power_spec = ttnn.square(ttnn.abs(stft))

        # Apply mel filterbank
        mel_spec = ttnn.matmul(self.mel_filterbank, power_spec)

        # Convert to log scale (dB)
        # Add small epsilon to avoid log(0)
        log_mel = ttnn.log(ttnn.add(mel_spec, 1e-10))

        # Scale to decibels
        log_mel = ttnn.multiply(log_mel, 10.0)  # 10 * log10(x) ≈ 4.34 * ln(x)

        # Convert to CPU for visualization/analysis
        return ttnn.to_torch(log_mel).cpu().numpy()

    def compute_mfcc(self, audio, n_mfcc=13):
        """
        Compute Mel-Frequency Cepstral Coefficients.
        MFCCs are commonly used for speech recognition.

        Args:
            audio: 1D audio tensor or file path
            n_mfcc: Number of MFCC coefficients

        Returns:
            MFCC features (n_mfcc, time_frames)
        """
        # Get mel-spectrogram
        mel_spec = self.compute_mel_spectrogram(audio)

        # Apply DCT (Discrete Cosine Transform)
        # DCT decorrelates mel-frequency components
        mfcc = librosa.feature.mfcc(
            S=mel_spec,
            n_mfcc=n_mfcc
        )

        return mfcc

    def detect_beats(self, audio):
        """
        Detect beats/onsets in audio.
        Uses spectral flux and peak picking.

        Args:
            audio: 1D audio tensor or file path

        Returns:
            Array of beat times (seconds)
        """
        if isinstance(audio, str):
            audio = self.load_audio(audio)

        # Compute onset strength envelope
        mel_spec = self.compute_mel_spectrogram(audio.numpy())
        onset_env = librosa.onset.onset_strength(
            S=mel_spec,
            sr=self.sample_rate,
            hop_length=self.hop_length
        )

        # Detect peaks (beats)
        peaks = librosa.onset.onset_detect(
            onset_envelope=onset_env,
            sr=self.sample_rate,
            hop_length=self.hop_length,
            units='time'
        )

        return peaks

    def extract_pitch(self, audio):
        """
        Extract fundamental frequency (pitch) over time.
        Uses autocorrelation method (YIN algorithm).

        Args:
            audio: 1D audio tensor or file path

        Returns:
            (times, frequencies) arrays
        """
        if isinstance(audio, str):
            audio = self.load_audio(audio)

        # Use librosa's pitch tracking
        pitches, magnitudes = librosa.core.piptrack(
            y=audio.numpy(),
            sr=self.sample_rate,
            hop_length=self.hop_length
        )

        # Extract pitch with highest magnitude
        pitch_track = []
        for t in range(pitches.shape[1]):
            index = magnitudes[:, t].argmax()
            pitch = pitches[index, t]
            pitch_track.append(pitch)

        times = librosa.frames_to_time(
            np.arange(len(pitch_track)),
            sr=self.sample_rate,
            hop_length=self.hop_length
        )

        return times, np.array(pitch_track)

# Example usage
if __name__ == "__main__":
    import ttnn
    from visualizer import SpectrogramVisualizer

    # Initialize device
    device = ttnn.open_device(device_id=0)

    # Create processor
    processor = AudioProcessor(device, sample_rate=22050)

    # Load audio file
    audio_file = "examples/sample.wav"  # Use your own file
    audio = processor.load_audio(audio_file, duration=10.0)

    # Compute mel-spectrogram
    mel_spec = processor.compute_mel_spectrogram(audio)

    # Visualize
    viz = SpectrogramVisualizer(processor)
    viz.plot_spectrogram(mel_spec, title="Mel-Spectrogram")

    # Detect beats
    beats = processor.detect_beats(audio)
    print(f"Detected {len(beats)} beats at times: {beats}")

    # Extract pitch
    times, pitches = processor.extract_pitch(audio)
    viz.plot_pitch(times, pitches)

    # Cleanup
    ttnn.close_device(device)

Step 2: Audio Effects (effects.py)

"""
Real-time audio effects using TTNN
"""

import ttnn
import torch
import numpy as np
from scipy import signal

class AudioEffects:
    def __init__(self, processor):
        """
        Initialize audio effects processor.

        Args:
            processor: AudioProcessor instance
        """
        self.processor = processor
        self.device = processor.device
        self.sample_rate = processor.sample_rate

    def reverb(self, audio, room_size=0.5, damping=0.5, wet=0.3):
        """
        Add reverb effect using convolution with impulse response.

        Args:
            audio: Input audio tensor
            room_size: Room size (0-1, larger = longer reverb tail)
            damping: High-frequency damping (0-1)
            wet: Wet/dry mix (0=dry, 1=wet)

        Returns:
            Audio with reverb applied
        """
        # Generate simple impulse response (exponential decay)
        reverb_time = int(self.sample_rate * room_size * 2)  # Up to 2 seconds
        decay = np.exp(-3 * np.arange(reverb_time) / reverb_time)

        # Apply damping (low-pass filter)
        if damping > 0:
            b, a = signal.butter(2, damping, btype='low', fs=1.0)
            decay = signal.lfilter(b, a, decay)

        # Normalize
        impulse_response = decay / np.max(np.abs(decay))

        # Convert to TTNN
        audio_tt = ttnn.from_torch(audio, device=self.device)
        ir_tt = ttnn.from_torch(
            torch.from_numpy(impulse_response).float(),
            device=self.device
        )

        # Convolve with impulse response
        reverb_audio = ttnn.conv1d(audio_tt.unsqueeze(0).unsqueeze(0),
                                   ir_tt.unsqueeze(0).unsqueeze(0))
        reverb_audio = reverb_audio.squeeze()

        # Mix wet/dry
        audio_tt_padded = ttnn.pad(audio_tt, (0, len(impulse_response) - 1))
        mixed = ttnn.add(
            ttnn.multiply(audio_tt_padded, (1 - wet)),
            ttnn.multiply(reverb_audio, wet)
        )

        return ttnn.to_torch(mixed).cpu()

    def pitch_shift(self, audio, semitones):
        """
        Shift pitch without changing duration (phase vocoder).

        Args:
            audio: Input audio
            semitones: Pitch shift in semitones (+12 = up one octave)

        Returns:
            Pitch-shifted audio
        """
        # Use librosa for phase vocoder
        shifted = librosa.effects.pitch_shift(
            y=audio.numpy(),
            sr=self.sample_rate,
            n_steps=semitones
        )
        return torch.from_numpy(shifted).float()

    def time_stretch(self, audio, rate):
        """
        Change duration without changing pitch.

        Args:
            audio: Input audio
            rate: Stretch factor (0.5 = half speed, 2.0 = double speed)

        Returns:
            Time-stretched audio
        """
        stretched = librosa.effects.time_stretch(
            y=audio.numpy(),
            rate=rate
        )
        return torch.from_numpy(stretched).float()

    def echo(self, audio, delay_ms=500, decay=0.5):
        """
        Add echo effect.

        Args:
            audio: Input audio
            delay_ms: Delay in milliseconds
            decay: Amplitude decay of echo

        Returns:
            Audio with echo
        """
        delay_samples = int(self.sample_rate * delay_ms / 1000)

        # Create delayed copy
        audio_tt = ttnn.from_torch(audio, device=self.device)
        delayed = ttnn.pad(audio_tt, (delay_samples, 0))
        delayed = delayed[:len(audio)]

        # Mix with decay
        echo_audio = ttnn.add(
            audio_tt,
            ttnn.multiply(delayed, decay)
        )

        return ttnn.to_torch(echo_audio).cpu()

    def chorus(self, audio, rate=1.5, depth=0.002):
        """
        Add chorus effect (slightly detuned copies).

        Args:
            audio: Input audio
            rate: LFO rate (Hz)
            depth: Modulation depth (seconds)

        Returns:
            Audio with chorus effect
        """
        # Implement as time-varying delay with LFO
        num_samples = len(audio)
        t = np.arange(num_samples) / self.sample_rate

        # Low-frequency oscillator
        lfo = np.sin(2 * np.pi * rate * t)
        delay_samples = (depth * self.sample_rate * lfo).astype(int)

        # Apply variable delay (simplified version)
        # In production, use interpolation for smooth delay changes
        output = audio.clone()
        for i in range(num_samples):
            delay_idx = max(0, min(num_samples - 1, i + delay_samples[i]))
            output[i] += 0.5 * audio[delay_idx]

        return output

Step 3: Visualization (visualizer.py)

"""
Real-time audio visualization
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import sounddevice as sd

class SpectrogramVisualizer:
    def __init__(self, processor):
        """
        Initialize visualizer.

        Args:
            processor: AudioProcessor instance
        """
        self.processor = processor

    def plot_spectrogram(self, mel_spec, title="Mel-Spectrogram", save_path=None):
        """
        Plot mel-spectrogram.

        Args:
            mel_spec: 2D array (n_mels, time_frames)
            title: Plot title
            save_path: Optional path to save figure
        """
        fig, ax = plt.subplots(figsize=(12, 4))

        # Convert frames to time
        times = np.arange(mel_spec.shape[1]) * self.processor.hop_length / self.processor.sample_rate

        # Plot
        img = ax.imshow(
            mel_spec,
            aspect='auto',
            origin='lower',
            extent=[times.min(), times.max(), 0, self.processor.n_mels],
            cmap='viridis'
        )

        ax.set_xlabel('Time (seconds)')
        ax.set_ylabel('Mel Frequency Bin')
        ax.set_title(title)

        plt.colorbar(img, ax=ax, format='%+2.0f dB')
        plt.tight_layout()

        if save_path:
            plt.savefig(save_path, dpi=150)

        plt.show()

    def plot_waveform(self, audio, title="Waveform"):
        """Plot audio waveform."""
        times = np.arange(len(audio)) / self.processor.sample_rate

        fig, ax = plt.subplots(figsize=(12, 3))
        ax.plot(times, audio, linewidth=0.5)
        ax.set_xlabel('Time (seconds)')
        ax.set_ylabel('Amplitude')
        ax.set_title(title)
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def plot_pitch(self, times, pitches):
        """Plot pitch track."""
        fig, ax = plt.subplots(figsize=(12, 4))

        # Filter out zero pitches (unvoiced)
        voiced = pitches > 0
        ax.plot(times[voiced], pitches[voiced], 'o-', markersize=2)

        ax.set_xlabel('Time (seconds)')
        ax.set_ylabel('Frequency (Hz)')
        ax.set_title('Pitch Track')
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def real_time_spectrogram(self, duration=10, window_size=2.0):
        """
        Real-time spectrogram from microphone.

        Args:
            duration: Total duration (seconds)
            window_size: Spectrogram window size (seconds)
        """
        # Buffer for audio samples
        buffer_size = int(self.processor.sample_rate * window_size)
        audio_buffer = np.zeros(buffer_size)

        # Setup plot
        fig, ax = plt.subplots(figsize=(12, 4))
        spec_img = ax.imshow(
            np.zeros((self.processor.n_mels, 100)),
            aspect='auto',
            origin='lower',
            cmap='viridis',
            vmin=-80,
            vmax=0
        )
        ax.set_xlabel('Time (frames)')
        ax.set_ylabel('Mel Frequency')
        ax.set_title('Real-Time Spectrogram')
        plt.colorbar(spec_img, ax=ax)

        # Callback for audio stream
        spec_history = []

        def audio_callback(indata, frames, time, status):
            nonlocal audio_buffer, spec_history

            # Shift buffer and add new data
            audio_buffer = np.roll(audio_buffer, -frames)
            audio_buffer[-frames:] = indata[:, 0]

            # Compute mel-spectrogram
            audio_torch = torch.from_numpy(audio_buffer).float()
            mel_spec = self.processor.compute_mel_spectrogram(audio_torch)

            # Store
            spec_history.append(mel_spec)
            if len(spec_history) > 100:
                spec_history.pop(0)

            # Update plot
            if len(spec_history) > 0:
                spec_concat = np.concatenate(spec_history, axis=1)
                spec_img.set_data(spec_concat[:, -100:])
                fig.canvas.draw_idle()

        # Start audio stream
        with sd.InputStream(callback=audio_callback,
                           channels=1,
                           samplerate=self.processor.sample_rate):
            print(f"Recording for {duration} seconds...")
            plt.show(block=False)
            plt.pause(duration)

        print("Done!")

    def animate_spectrogram_with_audio(self, audio_file):
        """
        Animate spectrogram synchronized with audio playback.

        Args:
            audio_file: Path to audio file
        """
        # Load audio
        audio = self.processor.load_audio(audio_file)

        # Compute full mel-spectrogram
        mel_spec = self.processor.compute_mel_spectrogram(audio)

        # Setup plot
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6))

        # Waveform plot
        times = np.arange(len(audio)) / self.processor.sample_rate
        ax1.plot(times, audio, linewidth=0.5, color='blue')
        ax1.set_xlabel('Time (seconds)')
        ax1.set_ylabel('Amplitude')
        ax1.set_title('Waveform')
        ax1.grid(True, alpha=0.3)

        # Current time marker
        line = ax1.axvline(x=0, color='red', linewidth=2)

        # Spectrogram plot
        spec_times = np.arange(mel_spec.shape[1]) * self.processor.hop_length / self.processor.sample_rate
        ax2.imshow(
            mel_spec,
            aspect='auto',
            origin='lower',
            extent=[spec_times.min(), spec_times.max(), 0, self.processor.n_mels],
            cmap='viridis'
        )
        ax2.set_xlabel('Time (seconds)')
        ax2.set_ylabel('Mel Frequency')
        ax2.set_title('Mel-Spectrogram')

        # Marker
        spec_line = ax2.axvline(x=0, color='red', linewidth=2)

        plt.tight_layout()

        # Animation
        def update(frame):
            current_time = frame / 30  # 30 FPS
            line.set_xdata([current_time, current_time])
            spec_line.set_xdata([current_time, current_time])
            return [line, spec_line]

        # Play audio in background
        sd.play(audio.numpy(), self.processor.sample_rate)

        # Animate
        num_frames = int(len(audio) / self.processor.sample_rate * 30)
        anim = FuncAnimation(fig, update, frames=num_frames, interval=1000/30, blit=True)

        plt.show()

Running the Project

Quick Start - Click to Run:

🎵 Run Audio Processor Demo
cd ~/tt-scratchpad/cookbook/audio_processor && export PYTHONPATH=~/tt-metal:$PYTHONPATH && python3 processor.py

Manual Commands:

cd ~/tt-scratchpad/cookbook/audio_processor

# Install dependencies
pip install -r requirements.txt

# Create a test audio file if you don't have one:
mkdir -p examples
python3 -c "
import numpy as np; from scipy.io.wavfile import write
sr = 22050; t = np.linspace(0, 2, sr*2)
write('examples/sample.wav', sr, (0.5*np.sin(2*3.14159*440*t)*32767).astype('int16'))
print('Created examples/sample.wav')
"

# Process an audio file
python processor.py examples/sample.wav

Note: processor.py is a starter template. The mel-spectrogram uses librosa (CPU). The template opens the TT device and is ready for TTNN acceleration — see the compute_mel_spectrogram method to add TTNN ops.

Try audio effects:

python -c "
from processor import AudioProcessor
from effects import AudioEffects
import ttnn
import sounddevice as sd

device = ttnn.open_device(device_id=0)
processor = AudioProcessor(device)
effects = AudioEffects(processor)

# Load audio
audio = processor.load_audio('examples/sample.wav')

# Apply reverb
reverb_audio = effects.reverb(audio, room_size=0.7, wet=0.5)

# Play original vs reverb
print('Playing original...')
sd.play(audio.numpy(), processor.sample_rate)
sd.wait()

print('Playing with reverb...')
sd.play(reverb_audio.numpy(), processor.sample_rate)
sd.wait()

ttnn.close_device(device)
"

Real-time spectrogram from microphone:

python -c "
from processor import AudioProcessor
from visualizer import SpectrogramVisualizer
import ttnn

device = ttnn.open_device(device_id=0)
processor = AudioProcessor(device, sample_rate=22050)
viz = SpectrogramVisualizer(processor)

viz.real_time_spectrogram(duration=10)

ttnn.close_device(device)
"

Extensions for Audio Engineers

1. Voice Activity Detection (VAD)

Detect speech vs silence:

def voice_activity_detection(self, audio, threshold_db=-40):
    """Detect speech segments using energy thresholding."""
    # Compute short-time energy
    frame_length = self.n_fft
    hop_length = self.hop_length

    energy = []
    for i in range(0, len(audio) - frame_length, hop_length):
        frame = audio[i:i+frame_length]
        frame_energy = 20 * np.log10(np.sqrt(np.mean(frame**2)) + 1e-10)
        energy.append(frame_energy)

    # Threshold
    is_speech = np.array(energy) > threshold_db

    # Convert to time segments
    times = np.arange(len(energy)) * hop_length / self.sample_rate
    return times, is_speech

2. Automatic Gain Control (AGC)

Normalize volume dynamically:

def auto_gain_control(self, audio, target_db=-20, attack_ms=50, release_ms=200):
    """Dynamic range compression."""
    # Convert to dB
    audio_db = 20 * torch.log10(torch.abs(audio) + 1e-10)

    # Envelope follower
    attack_coef = np.exp(-1000 / (attack_ms * self.sample_rate))
    release_coef = np.exp(-1000 / (release_ms * self.sample_rate))

    envelope = torch.zeros_like(audio_db)
    for i in range(1, len(audio_db)):
        if audio_db[i] > envelope[i-1]:
            envelope[i] = attack_coef * envelope[i-1] + (1 - attack_coef) * audio_db[i]
        else:
            envelope[i] = release_coef * envelope[i-1] + (1 - release_coef) * audio_db[i]

    # Apply gain
    gain_db = target_db - envelope
    gain_linear = 10 ** (gain_db / 20)

    return audio * gain_linear

3. Noise Gate

Remove background noise:

def noise_gate(self, audio, threshold_db=-50, attack_ms=10, release_ms=100):
    """Suppress audio below threshold."""
    audio_db = 20 * torch.log10(torch.abs(audio) + 1e-10)

    # Gate on/off
    gate_open = audio_db > threshold_db

    # Smooth transitions
    gate_smooth = self._smooth_gate(gate_open, attack_ms, release_ms)

    return audio * gate_smooth

4. Parametric EQ

Frequency-specific gain:

def parametric_eq(self, audio, center_freq, gain_db, q_factor=1.0):
    """Apply parametric EQ filter."""
    # Design peaking filter
    b, a = signal.iirpeak(
        center_freq,
        Q=q_factor,
        fs=self.sample_rate
    )

    # Apply gain
    b = b * (10 ** (gain_db / 20))

    # Filter audio
    filtered = signal.lfilter(b, a, audio.numpy())
    return torch.from_numpy(filtered).float()

5. VST Plugin Interface

Integrate with DAWs:

# This would require python-vst or similar library
def process_block(self, audio_block):
    """Process audio block (VST-style callback)."""
    # Convert to tensor
    audio_tt = ttnn.from_torch(audio_block, device=self.device)

    # Apply effects chain
    processed = self.apply_effects_chain(audio_tt)

    # Convert back
    return ttnn.to_torch(processed).cpu().numpy()

What You Learned

Return to Cookbook Overview