Skip to content

Audio

Audio entrainment pipeline: adaptive session engine, entrainment verification scoring (EVS), SSGF-based geometry-to-audio mapping, and per-user profile persistence.

Adaptive Engine

sc_neurocore.audio.adaptive_engine

AdaptiveSessionReport dataclass

Summary of a completed adaptive audio session.

Source code in src/sc_neurocore/audio/adaptive_engine.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@dataclass
class AdaptiveSessionReport:
    """Summary of a completed adaptive audio session."""

    total_ticks: int = 0
    avg_evs: float = 0.0
    peak_evs: float = 0.0
    verified_pct: float = 0.0
    grade: str = "F"
    adaptations: int = 0
    phase_durations: Dict[str, int] = field(default_factory=dict)
    final_audio: Dict[str, float] = field(default_factory=dict)

    def to_dict(self) -> dict[str, Any]:
        return {
            "total_ticks": self.total_ticks,
            "avg_evs": round(self.avg_evs, 2),
            "peak_evs": round(self.peak_evs, 2),
            "verified_pct": round(self.verified_pct, 2),
            "grade": self.grade,
            "adaptations": self.adaptations,
            "phase_durations": self.phase_durations,
            "final_audio": self.final_audio,
        }

AdaptiveAudioEngine

Closed-loop adaptive audio controller coupling SSGF with EVS.

Parameters

ssgf : SSGFEngine The geometry solver producing audio mappings. evs : EVSEngine The entrainment verification scorer. profile : UserProfile, optional User preferences for chronotype-aware adaptation.

Source code in src/sc_neurocore/audio/adaptive_engine.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class AdaptiveAudioEngine:
    """Closed-loop adaptive audio controller coupling SSGF with EVS.

    Parameters
    ----------
    ssgf : SSGFEngine
        The geometry solver producing audio mappings.
    evs : EVSEngine
        The entrainment verification scorer.
    profile : UserProfile, optional
        User preferences for chronotype-aware adaptation.
    """

    def __init__(
        self,
        ssgf: SSGFEngine,
        evs: EVSEngine,
        profile: Optional[UserProfile] = None,
    ):
        self.ssgf = ssgf
        self.evs = evs
        self.profile = profile

        # Session state
        self._tick: int = 0
        self._phase: SessionPhase = SessionPhase.DISCOVERY
        self._phase_start_tick: int = 0

        # EVS tracking
        self._evs_scores: List[float] = []
        self._verified_count: int = 0

        # Trend detection
        self._recent_evs: List[float] = []
        self._trend_window: int = 10

        # Adaptation log
        self._adaptations: List[_AdaptationRecord] = []

        # Discovery sweep state
        self._sweep_direction: float = 1.0
        self._sweep_hz: float = 10.0 if profile is None else profile.get_best_target_hz()

    # ── Phase Management ─────────────────────────────────────────────

    def _update_phase(self) -> None:
        """Transition between session phases based on tick count."""
        if self._phase == SessionPhase.DISCOVERY and self._tick >= _DISCOVERY_TICKS:
            self._phase = SessionPhase.LOCK_ON
            self._phase_start_tick = self._tick
            logger.info("Session phase -> LOCK_ON at tick %d", self._tick)
        elif self._phase == SessionPhase.LOCK_ON and self._tick >= _LOCKON_TICKS:
            self._phase = SessionPhase.DEEPENING
            self._phase_start_tick = self._tick
            logger.info("Session phase -> DEEPENING at tick %d", self._tick)

    # ── Trend Analysis ───────────────────────────────────────────────

    def _evs_trend(self) -> float:
        """Return recent EVS trend: positive = improving, negative = declining."""
        if len(self._recent_evs) < 3:
            return 0.0
        recent = np.array(self._recent_evs[-self._trend_window :])
        if len(recent) < 3:
            return 0.0
        # Simple linear slope
        x = np.arange(len(recent), dtype=np.float64)
        x_mean = x.mean()
        y_mean = recent.mean()
        denom = np.sum((x - x_mean) ** 2)
        if denom < 1e-12:
            return 0.0
        slope = np.sum((x - x_mean) * (recent - y_mean)) / denom
        return float(slope)

    # ── Core Tick ────────────────────────────────────────────────────

    def on_evs_update(self, snapshot: EVSSnapshot) -> Dict[str, float]:
        """Process one EVS update and return adapted audio parameters.

        This is the main feedback loop entry point.  Call it each time
        a new EVSSnapshot is available (~every 500 ms).

        Returns
        -------
        dict
            Audio parameters from SSGF, possibly adjusted by adaptation.
        """
        self._tick += 1
        self._update_phase()

        # Track EVS
        score = snapshot.evs_score
        self._evs_scores.append(score)
        self._recent_evs.append(score)
        if len(self._recent_evs) > self._trend_window * 2:
            self._recent_evs = self._recent_evs[-self._trend_window * 2 :]
        if snapshot.is_verified:
            self._verified_count += 1

        trend = self._evs_trend()

        # Phase-specific adaptation
        if self._phase == SessionPhase.DISCOVERY:
            self._adapt_discovery(snapshot, trend)
        elif self._phase == SessionPhase.LOCK_ON:
            self._adapt_lock_on(snapshot, trend)
        else:
            self._adapt_deepening(snapshot, trend)

        # Run one SSGF outer step to update geometry
        self.ssgf.outer_step()

        return self.ssgf.get_audio_mapping()

    # ── Phase-Specific Adaptation ────────────────────────────────────

    def _adapt_discovery(self, snap: EVSSnapshot, trend: float) -> None:
        """Discovery phase: gentle frequency sweep, widen geometry."""
        cfg = self.ssgf.cfg

        # Sweep target Hz slowly
        self._sweep_hz += self._sweep_direction * 0.1
        if self._sweep_hz > 15.0:
            self._sweep_direction = -1.0
        elif self._sweep_hz < 5.0:
            self._sweep_direction = 1.0
        self.evs.set_target(self._sweep_hz)

        # Keep sigma_g moderate for exploration
        old_sg = cfg.sigma_g
        cfg.sigma_g = float(np.clip(cfg.sigma_g, 0.15, 0.35))
        if cfg.sigma_g != old_sg:
            self._log_adaptation("sigma_g", old_sg, cfg.sigma_g, "discovery bounds")

        # Higher learning rate for faster geometry search
        old_lr = cfg.lr_z
        cfg.lr_z = 0.015
        if cfg.lr_z != old_lr:
            self._log_adaptation("lr_z", old_lr, cfg.lr_z, "discovery exploration")

    def _adapt_lock_on(self, snap: EVSSnapshot, trend: float) -> None:
        """Lock-On phase: responsive frequency tracking, tighten geometry."""
        cfg = self.ssgf.cfg

        # If EVS is declining, increase geometry feedback
        if trend < -0.5:
            old_sg = cfg.sigma_g
            new_sg = float(np.clip(cfg.sigma_g + 0.02, 0.1, 0.6))
            if new_sg != old_sg:
                cfg.sigma_g = new_sg
                self._log_adaptation("sigma_g", old_sg, new_sg, "EVS declining, boost coupling")

        # If EVS is improving, reduce learning rate to stabilise
        if trend > 0.5:
            old_lr = cfg.lr_z
            new_lr = float(np.clip(cfg.lr_z * 0.95, 0.002, 0.02))
            if new_lr != old_lr:
                cfg.lr_z = new_lr
                self._log_adaptation("lr_z", old_lr, new_lr, "EVS improving, stabilise")

        # Responsive target adjustment based on peak alignment
        if snap.peak_alignment < 0.5 and snap.peak_hz > 0.5:
            # Nudge target toward actual brain peak
            delta = (snap.peak_hz - snap.target_hz) * 0.1
            new_target = float(np.clip(snap.target_hz + delta, 0.5, 40.0))
            self.evs.set_target(new_target)

    def _adapt_deepening(self, snap: EVSSnapshot, trend: float) -> None:
        """Deepening phase: push toward theurgic coherence."""
        cfg = self.ssgf.cfg

        # Increase field pressure to encourage synchrony
        old_fp = cfg.field_pressure
        new_fp = float(np.clip(cfg.field_pressure + 0.005, 0.05, 0.4))
        if new_fp != old_fp:
            cfg.field_pressure = new_fp
            self._log_adaptation("field_pressure", old_fp, new_fp, "deepening push")

        # Increase sigma_g gradually
        old_sg = cfg.sigma_g
        new_sg = float(np.clip(cfg.sigma_g + 0.005, 0.2, 0.8))
        if new_sg != old_sg:
            cfg.sigma_g = new_sg
            self._log_adaptation("sigma_g", old_sg, new_sg, "deepening geometry boost")

        # Lower learning rate for stability
        old_lr = cfg.lr_z
        new_lr = float(np.clip(cfg.lr_z * 0.98, 0.001, 0.01))
        if new_lr != old_lr:
            cfg.lr_z = new_lr
            self._log_adaptation("lr_z", old_lr, new_lr, "deepening stabilise")

        # If R > 0.9, we're close to theurgic -- fine-tune
        if self.ssgf.R_global > 0.9:
            old_fp2 = cfg.field_pressure
            new_fp2 = float(np.clip(cfg.field_pressure + 0.01, 0.1, 0.5))
            if new_fp2 != old_fp2:
                cfg.field_pressure = new_fp2
                self._log_adaptation("field_pressure", old_fp2, new_fp2, "near-theurgic push")

    # ── Logging ──────────────────────────────────────────────────────

    def _log_adaptation(
        self,
        param: str,
        old: float,
        new: float,
        reason: str,
    ) -> None:
        record = _AdaptationRecord(
            tick=self._tick,
            phase=self._phase.value,
            param=param,
            old_value=old,
            new_value=new,
            reason=reason,
        )
        self._adaptations.append(record)
        logger.debug(
            "Tick %d [%s] %s: %.4f -> %.4f (%s)",
            self._tick,
            self._phase.value,
            param,
            old,
            new,
            reason,
        )

    # ── Session Report ───────────────────────────────────────────────

    def get_session_report(self) -> AdaptiveSessionReport:
        """Generate summary report of the current session."""
        total = len(self._evs_scores)
        avg_evs = float(np.mean(self._evs_scores)) if self._evs_scores else 0.0
        peak_evs = float(np.max(self._evs_scores)) if self._evs_scores else 0.0
        verified_pct = (self._verified_count / total * 100.0) if total > 0 else 0.0

        # Phase durations
        phase_durations: Dict[str, int] = {}
        if self._tick > 0:
            if self._tick <= _DISCOVERY_TICKS:
                phase_durations["discovery"] = self._tick
            elif self._tick <= _LOCKON_TICKS:
                phase_durations["discovery"] = _DISCOVERY_TICKS
                phase_durations["lock_on"] = self._tick - _DISCOVERY_TICKS
            else:
                phase_durations["discovery"] = _DISCOVERY_TICKS
                phase_durations["lock_on"] = _LOCKON_TICKS - _DISCOVERY_TICKS
                phase_durations["deepening"] = self._tick - _LOCKON_TICKS

        return AdaptiveSessionReport(
            total_ticks=total,
            avg_evs=avg_evs,
            peak_evs=peak_evs,
            verified_pct=verified_pct,
            grade=_compute_grade(verified_pct),
            adaptations=len(self._adaptations),
            phase_durations=phase_durations,
            final_audio=self.ssgf.get_audio_mapping(),
        )

    # ── Utilities ────────────────────────────────────────────────────

    @property
    def current_phase(self) -> SessionPhase:
        return self._phase

    @property
    def tick(self) -> int:
        return self._tick

    def reset(self) -> None:
        """Reset session state (does not reset SSGF or EVS)."""
        self._tick = 0
        self._phase = SessionPhase.DISCOVERY
        self._phase_start_tick = 0
        self._evs_scores.clear()
        self._verified_count = 0
        self._recent_evs.clear()
        self._adaptations.clear()
        self._sweep_direction = 1.0
        self._sweep_hz = 10.0 if self.profile is None else self.profile.get_best_target_hz()

on_evs_update(snapshot)

Process one EVS update and return adapted audio parameters.

This is the main feedback loop entry point. Call it each time a new EVSSnapshot is available (~every 500 ms).

Returns

dict Audio parameters from SSGF, possibly adjusted by adaptation.

Source code in src/sc_neurocore/audio/adaptive_engine.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def on_evs_update(self, snapshot: EVSSnapshot) -> Dict[str, float]:
    """Process one EVS update and return adapted audio parameters.

    This is the main feedback loop entry point.  Call it each time
    a new EVSSnapshot is available (~every 500 ms).

    Returns
    -------
    dict
        Audio parameters from SSGF, possibly adjusted by adaptation.
    """
    self._tick += 1
    self._update_phase()

    # Track EVS
    score = snapshot.evs_score
    self._evs_scores.append(score)
    self._recent_evs.append(score)
    if len(self._recent_evs) > self._trend_window * 2:
        self._recent_evs = self._recent_evs[-self._trend_window * 2 :]
    if snapshot.is_verified:
        self._verified_count += 1

    trend = self._evs_trend()

    # Phase-specific adaptation
    if self._phase == SessionPhase.DISCOVERY:
        self._adapt_discovery(snapshot, trend)
    elif self._phase == SessionPhase.LOCK_ON:
        self._adapt_lock_on(snapshot, trend)
    else:
        self._adapt_deepening(snapshot, trend)

    # Run one SSGF outer step to update geometry
    self.ssgf.outer_step()

    return self.ssgf.get_audio_mapping()

get_session_report()

Generate summary report of the current session.

Source code in src/sc_neurocore/audio/adaptive_engine.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def get_session_report(self) -> AdaptiveSessionReport:
    """Generate summary report of the current session."""
    total = len(self._evs_scores)
    avg_evs = float(np.mean(self._evs_scores)) if self._evs_scores else 0.0
    peak_evs = float(np.max(self._evs_scores)) if self._evs_scores else 0.0
    verified_pct = (self._verified_count / total * 100.0) if total > 0 else 0.0

    # Phase durations
    phase_durations: Dict[str, int] = {}
    if self._tick > 0:
        if self._tick <= _DISCOVERY_TICKS:
            phase_durations["discovery"] = self._tick
        elif self._tick <= _LOCKON_TICKS:
            phase_durations["discovery"] = _DISCOVERY_TICKS
            phase_durations["lock_on"] = self._tick - _DISCOVERY_TICKS
        else:
            phase_durations["discovery"] = _DISCOVERY_TICKS
            phase_durations["lock_on"] = _LOCKON_TICKS - _DISCOVERY_TICKS
            phase_durations["deepening"] = self._tick - _LOCKON_TICKS

    return AdaptiveSessionReport(
        total_ticks=total,
        avg_evs=avg_evs,
        peak_evs=peak_evs,
        verified_pct=verified_pct,
        grade=_compute_grade(verified_pct),
        adaptations=len(self._adaptations),
        phase_durations=phase_durations,
        final_audio=self.ssgf.get_audio_mapping(),
    )

reset()

Reset session state (does not reset SSGF or EVS).

Source code in src/sc_neurocore/audio/adaptive_engine.py
386
387
388
389
390
391
392
393
394
395
396
def reset(self) -> None:
    """Reset session state (does not reset SSGF or EVS)."""
    self._tick = 0
    self._phase = SessionPhase.DISCOVERY
    self._phase_start_tick = 0
    self._evs_scores.clear()
    self._verified_count = 0
    self._recent_evs.clear()
    self._adaptations.clear()
    self._sweep_direction = 1.0
    self._sweep_hz = 10.0 if self.profile is None else self.profile.get_best_target_hz()

EVS Engine

sc_neurocore.audio.evs_engine

EVSConfig dataclass

Tuneable parameters for the EVS engine.

Source code in src/sc_neurocore/audio/evs_engine.py
64
65
66
67
68
69
70
71
@dataclass
class EVSConfig:
    """Tuneable parameters for the EVS engine."""

    sample_rate: int = 256
    fft_window: int = 512
    baseline_duration_s: float = 30.0
    update_interval_samples: int = 128

EVSSnapshot dataclass

Single-tick EVS observation.

Source code in src/sc_neurocore/audio/evs_engine.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@dataclass
class EVSSnapshot:
    """Single-tick EVS observation."""

    evs_score: float = 0.0
    relative_increase: float = 0.0
    peak_alignment: float = 0.0
    band_dominance: float = 0.0
    temporal_consistency: float = 0.0
    is_verified: bool = False
    confidence: float = 0.0
    target_hz: float = 10.0
    peak_hz: float = 0.0
    band_powers: Dict[str, float] = field(default_factory=dict)
    timestamp: float = 0.0

    def to_dict(self) -> dict[str, Any]:
        return {
            "evs_score": round(self.evs_score, 2),
            "relative_increase": round(self.relative_increase, 4),
            "peak_alignment": round(self.peak_alignment, 4),
            "band_dominance": round(self.band_dominance, 4),
            "temporal_consistency": round(self.temporal_consistency, 4),
            "is_verified": self.is_verified,
            "confidence": round(self.confidence, 4),
            "target_hz": round(self.target_hz, 2),
            "peak_hz": round(self.peak_hz, 2),
            "band_powers": {k: round(v, 6) for k, v in self.band_powers.items()},
            "timestamp": self.timestamp,
        }

EVSEngine

FFT-based Entrainment Verification Score engine.

Workflow
  1. start_baseline() -- begin collecting baseline EEG
  2. add_sample(voltage) -- feed raw EEG samples one at a time
  3. After baseline_duration_s, baseline finalises automatically
  4. set_target(hz) -- set the entrainment target frequency
  5. compute() returns EVSSnapshot every update_interval_samples
Source code in src/sc_neurocore/audio/evs_engine.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
class EVSEngine:
    """FFT-based Entrainment Verification Score engine.

    Workflow
    --------
    1. ``start_baseline()`` -- begin collecting baseline EEG
    2. ``add_sample(voltage)`` -- feed raw EEG samples one at a time
    3. After baseline_duration_s, baseline finalises automatically
    4. ``set_target(hz)`` -- set the entrainment target frequency
    5. ``compute()`` returns ``EVSSnapshot`` every *update_interval_samples*
    """

    def __init__(self, cfg: Optional[EVSConfig] = None):
        self.cfg = cfg or EVSConfig()
        c = self.cfg

        # Sample buffer (ring)
        self._buf = np.zeros(c.fft_window, dtype=np.float64)
        self._buf_idx: int = 0
        self._buf_full: bool = False
        self._total_samples: int = 0

        # Baseline
        self._baseline_active: bool = False
        self._baseline_done: bool = False
        self._baseline_samples: list[float] = []
        self._baseline_powers: Dict[str, float] = {}

        # Target
        self._target_hz: float = 10.0

        # Score history for temporal consistency
        self._score_history: list[float] = []

    # ── Baseline ─────────────────────────────────────────────────────

    def start_baseline(self) -> None:
        """Begin baseline EEG collection."""
        self._baseline_active = True
        self._baseline_done = False
        self._baseline_samples.clear()
        self._baseline_powers.clear()
        logger.info("EVS baseline recording started")

    def _finalise_baseline(self) -> None:
        """Compute baseline band powers from collected samples."""
        arr = np.array(self._baseline_samples[-self.cfg.fft_window :])
        if len(arr) < 32:
            # Not enough samples; use flat baseline
            self._baseline_powers = {name: 1.0 for name in BANDS}
        else:
            self._baseline_powers = self._band_powers(arr)
        self._baseline_active = False
        self._baseline_done = True
        logger.info("EVS baseline finalised: %s", self._baseline_powers)

    # ── Sample Ingestion ─────────────────────────────────────────────

    def add_sample(self, voltage: float) -> None:
        """Feed one raw EEG voltage sample."""
        # Ring buffer
        self._buf[self._buf_idx] = voltage
        self._buf_idx = (self._buf_idx + 1) % self.cfg.fft_window
        if self._buf_idx == 0:
            self._buf_full = True
        self._total_samples += 1

        # Baseline collection
        if self._baseline_active:
            self._baseline_samples.append(voltage)
            needed = int(self.cfg.baseline_duration_s * self.cfg.sample_rate)
            if len(self._baseline_samples) >= needed:
                self._finalise_baseline()

    def set_target(self, hz: float) -> None:
        """Set the entrainment target frequency."""
        self._target_hz = float(np.clip(hz, 0.5, 45.0))

    # ── FFT Helpers ──────────────────────────────────────────────────

    def _ordered_buf(self) -> np.ndarray[Any, Any]:
        """Return the ring buffer in time-order."""
        if not self._buf_full:
            return self._buf[: self._buf_idx].copy()
        return np.concatenate([self._buf[self._buf_idx :], self._buf[: self._buf_idx]])

    def _band_powers(self, signal: np.ndarray[Any, Any]) -> Dict[str, float]:
        """Compute power in each canonical EEG band via FFT."""
        n = len(signal)
        if n < 4:
            return {name: 0.0 for name in BANDS}

        # Hanning window
        windowed = signal * np.hanning(n)
        spectrum = np.abs(np.fft.rfft(windowed)) ** 2
        freqs = np.fft.rfftfreq(n, d=1.0 / self.cfg.sample_rate)

        powers: Dict[str, float] = {}
        for name, (lo, hi) in BANDS.items():
            mask = (freqs >= lo) & (freqs < hi)
            powers[name] = float(np.mean(spectrum[mask])) if mask.any() else 0.0

        return powers

    def _peak_frequency(self, signal: np.ndarray[Any, Any]) -> float:
        """Dominant frequency in the signal."""
        n = len(signal)
        if n < 4:
            return 0.0
        windowed = signal * np.hanning(n)
        spectrum = np.abs(np.fft.rfft(windowed))
        freqs = np.fft.rfftfreq(n, d=1.0 / self.cfg.sample_rate)
        # Ignore DC
        spectrum[0] = 0.0
        idx = int(np.argmax(spectrum))
        return float(freqs[idx])

    # ── Compute EVS ──────────────────────────────────────────────────

    def compute(self) -> Optional[EVSSnapshot]:
        """Compute current EVS snapshot.

        Returns None if insufficient data or baseline not done.
        """
        if not self._baseline_done:
            return None
        if not self._buf_full and self._buf_idx < 32:
            return None

        signal = self._ordered_buf()
        current_powers = self._band_powers(signal)
        peak_hz = self._peak_frequency(signal)

        target_band = _hz_to_band(self._target_hz)
        target_power = current_powers.get(target_band, 0.0)
        baseline_power = self._baseline_powers.get(target_band, 1.0)
        total_power = sum(current_powers.values()) or 1.0

        # -- Component scores (each 0-1) --

        # 1. Relative increase (40%)
        if baseline_power > 1e-12:
            ri = (target_power - baseline_power) / baseline_power
        else:
            ri = 0.0
        relative_increase = float(np.clip(ri, 0.0, 1.0))

        # 2. Peak alignment (30%)
        band_lo, band_hi = BANDS[target_band]
        band_width = band_hi - band_lo
        if band_width > 0:
            alignment = 1.0 - abs(peak_hz - self._target_hz) / band_width
        else:
            alignment = 0.0
        peak_alignment = float(np.clip(alignment, 0.0, 1.0))

        # 3. Band dominance (20%)
        band_dominance = float(np.clip(target_power / total_power, 0.0, 1.0))

        # 4. Temporal consistency (10%)
        if len(self._score_history) >= 3:
            recent_std = float(np.std(self._score_history[-10:]))
            temporal_consistency = float(np.clip(1.0 - recent_std / 50.0, 0.0, 1.0))
        else:
            temporal_consistency = 0.5

        # Composite score 0-100
        score = (
            40.0 * relative_increase
            + 30.0 * peak_alignment
            + 20.0 * band_dominance
            + 10.0 * temporal_consistency
        )
        score = float(np.clip(score, 0.0, 100.0))
        self._score_history.append(score)

        # Confidence (grows with samples, capped at 1.0)
        n_updates = len(self._score_history)
        confidence = float(np.clip(n_updates / 20.0, 0.0, 1.0))

        is_verified = (score >= 50.0) and (confidence >= 0.6)

        snap = EVSSnapshot(
            evs_score=score,
            relative_increase=relative_increase,
            peak_alignment=peak_alignment,
            band_dominance=band_dominance,
            temporal_consistency=temporal_consistency,
            is_verified=is_verified,
            confidence=confidence,
            target_hz=self._target_hz,
            peak_hz=peak_hz,
            band_powers=current_powers,
            timestamp=time.time(),
        )
        return snap

    # ── Utilities ────────────────────────────────────────────────────

    @property
    def baseline_done(self) -> bool:
        return self._baseline_done

    @property
    def score_history(self) -> List[float]:
        return list(self._score_history)

    def reset(self) -> None:
        """Full reset."""
        self._buf[:] = 0.0
        self._buf_idx = 0
        self._buf_full = False
        self._total_samples = 0
        self._baseline_active = False
        self._baseline_done = False
        self._baseline_samples.clear()
        self._baseline_powers.clear()
        self._score_history.clear()

start_baseline()

Begin baseline EEG collection.

Source code in src/sc_neurocore/audio/evs_engine.py
148
149
150
151
152
153
154
def start_baseline(self) -> None:
    """Begin baseline EEG collection."""
    self._baseline_active = True
    self._baseline_done = False
    self._baseline_samples.clear()
    self._baseline_powers.clear()
    logger.info("EVS baseline recording started")

add_sample(voltage)

Feed one raw EEG voltage sample.

Source code in src/sc_neurocore/audio/evs_engine.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def add_sample(self, voltage: float) -> None:
    """Feed one raw EEG voltage sample."""
    # Ring buffer
    self._buf[self._buf_idx] = voltage
    self._buf_idx = (self._buf_idx + 1) % self.cfg.fft_window
    if self._buf_idx == 0:
        self._buf_full = True
    self._total_samples += 1

    # Baseline collection
    if self._baseline_active:
        self._baseline_samples.append(voltage)
        needed = int(self.cfg.baseline_duration_s * self.cfg.sample_rate)
        if len(self._baseline_samples) >= needed:
            self._finalise_baseline()

set_target(hz)

Set the entrainment target frequency.

Source code in src/sc_neurocore/audio/evs_engine.py
186
187
188
def set_target(self, hz: float) -> None:
    """Set the entrainment target frequency."""
    self._target_hz = float(np.clip(hz, 0.5, 45.0))

compute()

Compute current EVS snapshot.

Returns None if insufficient data or baseline not done.

Source code in src/sc_neurocore/audio/evs_engine.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def compute(self) -> Optional[EVSSnapshot]:
    """Compute current EVS snapshot.

    Returns None if insufficient data or baseline not done.
    """
    if not self._baseline_done:
        return None
    if not self._buf_full and self._buf_idx < 32:
        return None

    signal = self._ordered_buf()
    current_powers = self._band_powers(signal)
    peak_hz = self._peak_frequency(signal)

    target_band = _hz_to_band(self._target_hz)
    target_power = current_powers.get(target_band, 0.0)
    baseline_power = self._baseline_powers.get(target_band, 1.0)
    total_power = sum(current_powers.values()) or 1.0

    # -- Component scores (each 0-1) --

    # 1. Relative increase (40%)
    if baseline_power > 1e-12:
        ri = (target_power - baseline_power) / baseline_power
    else:
        ri = 0.0
    relative_increase = float(np.clip(ri, 0.0, 1.0))

    # 2. Peak alignment (30%)
    band_lo, band_hi = BANDS[target_band]
    band_width = band_hi - band_lo
    if band_width > 0:
        alignment = 1.0 - abs(peak_hz - self._target_hz) / band_width
    else:
        alignment = 0.0
    peak_alignment = float(np.clip(alignment, 0.0, 1.0))

    # 3. Band dominance (20%)
    band_dominance = float(np.clip(target_power / total_power, 0.0, 1.0))

    # 4. Temporal consistency (10%)
    if len(self._score_history) >= 3:
        recent_std = float(np.std(self._score_history[-10:]))
        temporal_consistency = float(np.clip(1.0 - recent_std / 50.0, 0.0, 1.0))
    else:
        temporal_consistency = 0.5

    # Composite score 0-100
    score = (
        40.0 * relative_increase
        + 30.0 * peak_alignment
        + 20.0 * band_dominance
        + 10.0 * temporal_consistency
    )
    score = float(np.clip(score, 0.0, 100.0))
    self._score_history.append(score)

    # Confidence (grows with samples, capped at 1.0)
    n_updates = len(self._score_history)
    confidence = float(np.clip(n_updates / 20.0, 0.0, 1.0))

    is_verified = (score >= 50.0) and (confidence >= 0.6)

    snap = EVSSnapshot(
        evs_score=score,
        relative_increase=relative_increase,
        peak_alignment=peak_alignment,
        band_dominance=band_dominance,
        temporal_consistency=temporal_consistency,
        is_verified=is_verified,
        confidence=confidence,
        target_hz=self._target_hz,
        peak_hz=peak_hz,
        band_powers=current_powers,
        timestamp=time.time(),
    )
    return snap

reset()

Full reset.

Source code in src/sc_neurocore/audio/evs_engine.py
319
320
321
322
323
324
325
326
327
328
329
def reset(self) -> None:
    """Full reset."""
    self._buf[:] = 0.0
    self._buf_idx = 0
    self._buf_full = False
    self._total_samples = 0
    self._baseline_active = False
    self._baseline_done = False
    self._baseline_samples.clear()
    self._baseline_powers.clear()
    self._score_history.clear()

SSGF Engine

sc_neurocore.audio.ssgf_engine

SSGFConfig dataclass

All tuneable knobs for SSGFEngine.

Source code in src/sc_neurocore/audio/ssgf_engine.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@dataclass
class SSGFConfig:
    """All tuneable knobs for SSGFEngine."""

    N: int = 16
    z_dim: int = 120
    lr_z: float = 0.01
    sigma_g: float = 0.3
    micro_steps: int = 10
    dt: float = 0.001
    noise: float = 0.2
    K_base: float = 0.45
    K_alpha: float = 0.3
    field_pressure: float = 0.1
    seed: int = 42

SSGFEngine

Lightweight SSGF geometry-coupled Kuramoto solver.

Maintains a latent vector z whose decoded geometry matrix W(t) feeds back into the micro-cycle, steering oscillators toward higher global coherence R. Audio-mapping observables are derived from the resulting phase dynamics and spectral properties of W.

Source code in src/sc_neurocore/audio/ssgf_engine.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
class SSGFEngine:
    """Lightweight SSGF geometry-coupled Kuramoto solver.

    Maintains a latent vector *z* whose decoded geometry matrix W(t)
    feeds back into the micro-cycle, steering oscillators toward
    higher global coherence R.  Audio-mapping observables are derived
    from the resulting phase dynamics and spectral properties of W.
    """

    def __init__(self, cfg: Optional[SSGFConfig] = None):
        self.cfg = cfg or SSGFConfig()
        c = self.cfg
        self._rng = np.random.RandomState(c.seed)

        # Phase state
        self.N = c.N
        self.omega = (
            OMEGA_N[: c.N].copy()
            if c.N <= 16
            else np.tile(
                OMEGA_N,
                (c.N // 16 + 1),
            )[: c.N].copy()
        )
        self.theta = self._rng.uniform(0, 2 * np.pi, c.N)

        # Coupling
        self.K = build_knm_matrix(c.N)

        # Latent geometry
        self.z = self._rng.randn(c.z_dim).astype(np.float64) * 0.1
        self.W = self._decode(self.z)

        # Spectral cache
        self._eigvals = np.zeros(c.N)
        self._eigvecs = np.eye(c.N)

        # History for phase-velocity estimate
        self._prev_theta = self.theta.copy()

        # Running stats
        self.outer_step_count: int = 0
        self.R_global: float = 0.0
        self._cost_history: list[float] = []

    # ── Decoder: z -> W ──────────────────────────────────────────────

    def _decode(self, z: np.ndarray[Any, Any]) -> np.ndarray[Any, Any]:
        """Decode latent vector into a symmetric, non-negative weight
        matrix with zero diagonal via softplus on a symmetric shell."""
        N = self.N
        # Number of unique off-diagonal upper-triangle entries
        n_upper = N * (N - 1) // 2
        # Tile z to fill if z_dim < n_upper, or truncate
        flat = np.tile(z, (n_upper // len(z) + 1))[:n_upper]

        A = np.zeros((N, N))
        idx_upper = np.triu_indices(N, k=1)
        A[idx_upper] = flat
        A = A + A.T  # symmetric

        # Softplus: log(1 + exp(x)), numerically stable
        W = np.where(A > 20, A, np.log1p(np.exp(A)))
        np.fill_diagonal(W, 0.0)
        return W

    # ── Micro-Cycle ──────────────────────────────────────────────────

    def _micro_step(self) -> None:
        """One Kuramoto + geometry-feedback timestep (vectorised)."""
        c = self.cfg
        N = self.N
        theta = self.theta

        # Phase differences: diff[n, m] = theta[m] - theta[n]
        diff = theta[np.newaxis, :] - theta[:, np.newaxis]
        sin_diff = np.sin(diff)

        # dtheta = omega + K coupling + geometry coupling + field + noise
        coupling_k = np.sum(self.K * sin_diff, axis=1)
        coupling_w = c.sigma_g * np.sum(self.W * sin_diff, axis=1)
        field_term = c.field_pressure * np.cos(theta)
        noise_term = c.noise * self._rng.randn(N)

        dtheta = self.omega + coupling_k + coupling_w + field_term + noise_term
        self.theta = (theta + dtheta * c.dt) % (2 * np.pi)

    # ── Spectral Bridge ──────────────────────────────────────────────

    def _spectral(self) -> None:
        """Compute eigendecomposition of the normalised Laplacian of W."""
        W = self.W
        d = W.sum(axis=1)
        d_safe = np.where(d > 1e-12, d, 1e-12)
        d_inv_sqrt = 1.0 / np.sqrt(d_safe)

        L_sym = np.eye(self.N) - (d_inv_sqrt[:, None] * W * d_inv_sqrt[None, :])
        # Force exact symmetry
        L_sym = 0.5 * (L_sym + L_sym.T)

        eigvals, eigvecs = np.linalg.eigh(L_sym)
        self._eigvals = eigvals
        self._eigvecs = eigvecs

    # ── Cost ─────────────────────────────────────────────────────────

    def _compute_R(self) -> float:
        """Kuramoto order parameter R = |<exp(i*theta)>|."""
        z_complex = np.mean(np.exp(1j * self.theta))
        return float(np.abs(z_complex))

    def _cost(self) -> float:
        """Composite cost: minimise negative coherence + regularise W."""
        R = self._compute_R()
        c_micro = 1.0 - R
        c_reg = 0.01 * np.sum(self.W**2) / (self.N * self.N)
        return c_micro + c_reg

    # ── Outer Cycle ──────────────────────────────────────────────────

    def outer_step(self) -> float:
        """One outer-cycle step: micro-cycle -> spectral -> grad update on z.

        Returns the cost after the step.
        """
        c = self.cfg

        # Save state
        self._prev_theta = self.theta.copy()

        # Run micro-cycle
        for _ in range(c.micro_steps):
            self._micro_step()

        # Spectral bridge
        self._spectral()

        # Update R
        self.R_global = self._compute_R()

        # Finite-difference gradient descent on z
        base_cost = self._cost()
        eps = 1e-4
        grad = np.zeros_like(self.z)

        for i in range(len(self.z)):
            z_plus = self.z.copy()
            z_plus[i] += eps
            W_backup = self.W
            self.W = self._decode(z_plus)
            cost_plus = self._cost()
            self.W = W_backup
            grad[i] = (cost_plus - base_cost) / eps

        self.z -= c.lr_z * grad
        self.W = self._decode(self.z)

        self.outer_step_count += 1
        self._cost_history.append(base_cost)
        return base_cost

    # ── Audio Mapping ────────────────────────────────────────────────

    def get_audio_mapping(self) -> Dict[str, float]:
        """Derive CCW audio parameters from current SSGF state.

        Returns
        -------
        dict with keys:
            binaural_hz      -- 0.5-40 Hz (from layer-2 phase velocity)
            pulse_rate        -- isochronic pulse rate (layer-4 coherence)
            spatial_angle     -- 0-360 degrees (layer-7 phase)
            intensity         -- 0-1 (from R_global)
            fiedler           -- algebraic connectivity of W
            spectral_gap      -- lambda_1 / lambda_2
            theurgic_mode     -- bool, True when R > 0.95
        """
        R = self.R_global

        # Layer 2 phase velocity -> binaural Hz (0.5 - 40)
        if self.N > 2:
            dphase_2 = (self.theta[1] - self._prev_theta[1]) / self.cfg.dt
            binaural_hz = float(np.clip(0.5 + abs(dphase_2) * 2.0, 0.5, 40.0))
        else:
            binaural_hz = 10.0

        # Layer 4 coherence -> pulse rate
        if self.N > 4:
            local_r = float(np.abs(np.mean(np.exp(1j * self.theta[3:5]))))
            pulse_rate = float(np.clip(2.0 + local_r * 18.0, 2.0, 20.0))
        else:
            pulse_rate = 8.0

        # Layer 7 phase -> spatial angle
        if self.N > 7:
            spatial_angle = float((self.theta[6] % (2 * np.pi)) / (2 * np.pi) * 360.0)
        else:
            spatial_angle = 0.0

        # R_global -> intensity
        intensity = float(np.clip(R, 0.0, 1.0))

        # Spectral properties
        fiedler = float(self._eigvals[1]) if len(self._eigvals) > 1 else 0.0
        spectral_gap = 0.0
        if len(self._eigvals) > 2 and abs(self._eigvals[2]) > 1e-12:
            spectral_gap = float(self._eigvals[1] / self._eigvals[2])

        theurgic = bool(R > 0.95)

        return {
            "binaural_hz": round(binaural_hz, 3),
            "pulse_rate": round(pulse_rate, 3),
            "spatial_angle": round(spatial_angle, 2),
            "intensity": round(intensity, 4),
            "fiedler": round(fiedler, 6),
            "spectral_gap": round(spectral_gap, 6),
            "theurgic_mode": theurgic,
        }

    # ── State ────────────────────────────────────────────────────────

    def get_state(self) -> Dict[str, Any]:
        """Full engine state snapshot."""
        return {
            "outer_step": self.outer_step_count,
            "R_global": round(self.R_global, 6),
            "theta": self.theta.tolist(),
            "z_norm": round(float(np.linalg.norm(self.z)), 6),
            "W_density": round(float(np.mean(self.W > 0.01)), 4),
            "W_mean": round(float(np.mean(self.W)), 6),
            "eigvals": [round(float(v), 6) for v in self._eigvals[:4]],
            "cost": round(self._cost_history[-1], 6) if self._cost_history else None,
            "audio": self.get_audio_mapping(),
        }

outer_step()

One outer-cycle step: micro-cycle -> spectral -> grad update on z.

Returns the cost after the step.

Source code in src/sc_neurocore/audio/ssgf_engine.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def outer_step(self) -> float:
    """One outer-cycle step: micro-cycle -> spectral -> grad update on z.

    Returns the cost after the step.
    """
    c = self.cfg

    # Save state
    self._prev_theta = self.theta.copy()

    # Run micro-cycle
    for _ in range(c.micro_steps):
        self._micro_step()

    # Spectral bridge
    self._spectral()

    # Update R
    self.R_global = self._compute_R()

    # Finite-difference gradient descent on z
    base_cost = self._cost()
    eps = 1e-4
    grad = np.zeros_like(self.z)

    for i in range(len(self.z)):
        z_plus = self.z.copy()
        z_plus[i] += eps
        W_backup = self.W
        self.W = self._decode(z_plus)
        cost_plus = self._cost()
        self.W = W_backup
        grad[i] = (cost_plus - base_cost) / eps

    self.z -= c.lr_z * grad
    self.W = self._decode(self.z)

    self.outer_step_count += 1
    self._cost_history.append(base_cost)
    return base_cost

get_audio_mapping()

Derive CCW audio parameters from current SSGF state.

Returns

dict with keys: binaural_hz -- 0.5-40 Hz (from layer-2 phase velocity) pulse_rate -- isochronic pulse rate (layer-4 coherence) spatial_angle -- 0-360 degrees (layer-7 phase) intensity -- 0-1 (from R_global) fiedler -- algebraic connectivity of W spectral_gap -- lambda_1 / lambda_2 theurgic_mode -- bool, True when R > 0.95

Source code in src/sc_neurocore/audio/ssgf_engine.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def get_audio_mapping(self) -> Dict[str, float]:
    """Derive CCW audio parameters from current SSGF state.

    Returns
    -------
    dict with keys:
        binaural_hz      -- 0.5-40 Hz (from layer-2 phase velocity)
        pulse_rate        -- isochronic pulse rate (layer-4 coherence)
        spatial_angle     -- 0-360 degrees (layer-7 phase)
        intensity         -- 0-1 (from R_global)
        fiedler           -- algebraic connectivity of W
        spectral_gap      -- lambda_1 / lambda_2
        theurgic_mode     -- bool, True when R > 0.95
    """
    R = self.R_global

    # Layer 2 phase velocity -> binaural Hz (0.5 - 40)
    if self.N > 2:
        dphase_2 = (self.theta[1] - self._prev_theta[1]) / self.cfg.dt
        binaural_hz = float(np.clip(0.5 + abs(dphase_2) * 2.0, 0.5, 40.0))
    else:
        binaural_hz = 10.0

    # Layer 4 coherence -> pulse rate
    if self.N > 4:
        local_r = float(np.abs(np.mean(np.exp(1j * self.theta[3:5]))))
        pulse_rate = float(np.clip(2.0 + local_r * 18.0, 2.0, 20.0))
    else:
        pulse_rate = 8.0

    # Layer 7 phase -> spatial angle
    if self.N > 7:
        spatial_angle = float((self.theta[6] % (2 * np.pi)) / (2 * np.pi) * 360.0)
    else:
        spatial_angle = 0.0

    # R_global -> intensity
    intensity = float(np.clip(R, 0.0, 1.0))

    # Spectral properties
    fiedler = float(self._eigvals[1]) if len(self._eigvals) > 1 else 0.0
    spectral_gap = 0.0
    if len(self._eigvals) > 2 and abs(self._eigvals[2]) > 1e-12:
        spectral_gap = float(self._eigvals[1] / self._eigvals[2])

    theurgic = bool(R > 0.95)

    return {
        "binaural_hz": round(binaural_hz, 3),
        "pulse_rate": round(pulse_rate, 3),
        "spatial_angle": round(spatial_angle, 2),
        "intensity": round(intensity, 4),
        "fiedler": round(fiedler, 6),
        "spectral_gap": round(spectral_gap, 6),
        "theurgic_mode": theurgic,
    }

get_state()

Full engine state snapshot.

Source code in src/sc_neurocore/audio/ssgf_engine.py
281
282
283
284
285
286
287
288
289
290
291
292
293
def get_state(self) -> Dict[str, Any]:
    """Full engine state snapshot."""
    return {
        "outer_step": self.outer_step_count,
        "R_global": round(self.R_global, 6),
        "theta": self.theta.tolist(),
        "z_norm": round(float(np.linalg.norm(self.z)), 6),
        "W_density": round(float(np.mean(self.W > 0.01)), 4),
        "W_mean": round(float(np.mean(self.W)), 6),
        "eigvals": [round(float(v), 6) for v in self._eigvals[:4]],
        "cost": round(self._cost_history[-1], 6) if self._cost_history else None,
        "audio": self.get_audio_mapping(),
    }

User Profile

sc_neurocore.audio.user_profile

User profile: chronotype and session preferences.

Chronotype

Bases: str, Enum

Sleep chronotype model (after Dr. Michael Breus).

Each chronotype has a preferred entrainment frequency range and optimal session timing.

Source code in src/sc_neurocore/audio/user_profile.py
22
23
24
25
26
27
28
29
30
31
32
class Chronotype(str, Enum):
    """Sleep chronotype model (after Dr. Michael Breus).

    Each chronotype has a preferred entrainment frequency range and
    optimal session timing.
    """

    LION = "lion"  # Early riser, alpha-dominant mornings
    BEAR = "bear"  # Solar schedule, balanced spectrum
    WOLF = "wolf"  # Night owl, theta-rich evenings
    DOLPHIN = "dolphin"  # Light sleeper, high beta baseline

UserProfile dataclass

Per-user preference and adaptation model.

Parameters

user_id : str Unique user identifier. chronotype : Chronotype Sleep chronotype. baseline_band_powers : dict Resting-state EEG band powers (populated after first baseline). preferred_cost_weights : dict SSGF cost weights tuned to this user. sensitivity_map : dict Per-band sensitivity multipliers (e.g. {"alpha": 1.2}). session_count : int Total completed sessions. preferred_target_hz : float, optional Explicitly set target frequency (overrides chronotype default).

Source code in src/sc_neurocore/audio/user_profile.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@dataclass
class UserProfile:
    """Per-user preference and adaptation model.

    Parameters
    ----------
    user_id : str
        Unique user identifier.
    chronotype : Chronotype
        Sleep chronotype.
    baseline_band_powers : dict
        Resting-state EEG band powers (populated after first baseline).
    preferred_cost_weights : dict
        SSGF cost weights tuned to this user.
    sensitivity_map : dict
        Per-band sensitivity multipliers (e.g. {"alpha": 1.2}).
    session_count : int
        Total completed sessions.
    preferred_target_hz : float, optional
        Explicitly set target frequency (overrides chronotype default).
    """

    user_id: str = "anonymous"
    chronotype: Chronotype = Chronotype.BEAR
    baseline_band_powers: Dict[str, float] = field(default_factory=dict)
    preferred_cost_weights: Dict[str, float] = field(default_factory=dict)
    sensitivity_map: Dict[str, float] = field(default_factory=dict)
    session_count: int = 0
    preferred_target_hz: Optional[float] = None

    def __post_init__(self) -> None:
        # Populate defaults from chronotype if not provided
        if not self.preferred_cost_weights:
            self.preferred_cost_weights = dict(
                _CHRONOTYPE_WEIGHTS.get(self.chronotype, _CHRONOTYPE_WEIGHTS[Chronotype.BEAR]),
            )
        if not self.sensitivity_map:
            self.sensitivity_map = {
                "delta": 1.0,
                "theta": 1.0,
                "alpha": 1.0,
                "beta": 1.0,
                "gamma": 1.0,
            }

    # ── Target Hz ────────────────────────────────────────────────────

    def get_best_target_hz(self) -> float:
        """Return the best entrainment target for this user.

        Uses explicit preference if set, otherwise chronotype default.
        """
        if self.preferred_target_hz is not None:
            return self.preferred_target_hz
        return _CHRONOTYPE_TARGET_HZ.get(self.chronotype, 10.0)

    # ── Session Update ───────────────────────────────────────────────

    def update_from_session(
        self,
        avg_evs: float,
        peak_evs: float,
        best_target_hz: Optional[float] = None,
        band_powers: Optional[Dict[str, float]] = None,
    ) -> None:
        """Update profile after a completed session.

        Parameters
        ----------
        avg_evs : float
            Average EVS score over the session.
        peak_evs : float
            Peak EVS score.
        best_target_hz : float, optional
            If the adaptive engine found a better target, adopt it.
        band_powers : dict, optional
            Updated baseline band powers from this session.
        """
        self.session_count += 1

        # Adopt best target if it outperformed
        if best_target_hz is not None and avg_evs > 50.0:
            if self.preferred_target_hz is None:
                self.preferred_target_hz = best_target_hz
            else:
                # Exponential moving average toward the new target
                alpha = 0.3
                self.preferred_target_hz = (
                    1 - alpha
                ) * self.preferred_target_hz + alpha * best_target_hz

        # Update baseline band powers (EMA blend)
        if band_powers:
            if not self.baseline_band_powers:
                self.baseline_band_powers = dict(band_powers)
            else:
                alpha = 0.2
                for band, power in band_powers.items():
                    old = self.baseline_band_powers.get(band, power)
                    self.baseline_band_powers[band] = (1 - alpha) * old + alpha * power

        logger.info(
            "Profile updated: session #%d, avg_evs=%.1f, target=%.2f Hz",
            self.session_count,
            avg_evs,
            self.preferred_target_hz or self.get_best_target_hz(),
        )

    # ── Serialisation ────────────────────────────────────────────────

    def to_dict(self) -> dict[str, Any]:
        return {
            "user_id": self.user_id,
            "chronotype": self.chronotype.value,
            "baseline_band_powers": dict(self.baseline_band_powers),
            "preferred_cost_weights": dict(self.preferred_cost_weights),
            "sensitivity_map": dict(self.sensitivity_map),
            "session_count": self.session_count,
            "preferred_target_hz": self.preferred_target_hz,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> UserProfile:
        chrono = data.get("chronotype", "bear")
        return cls(
            user_id=data.get("user_id", "anonymous"),
            chronotype=Chronotype(chrono),
            baseline_band_powers=data.get("baseline_band_powers", {}),
            preferred_cost_weights=data.get("preferred_cost_weights", {}),
            sensitivity_map=data.get("sensitivity_map", {}),
            session_count=data.get("session_count", 0),
            preferred_target_hz=data.get("preferred_target_hz"),
        )

get_best_target_hz()

Return the best entrainment target for this user.

Uses explicit preference if set, otherwise chronotype default.

Source code in src/sc_neurocore/audio/user_profile.py
118
119
120
121
122
123
124
125
def get_best_target_hz(self) -> float:
    """Return the best entrainment target for this user.

    Uses explicit preference if set, otherwise chronotype default.
    """
    if self.preferred_target_hz is not None:
        return self.preferred_target_hz
    return _CHRONOTYPE_TARGET_HZ.get(self.chronotype, 10.0)

update_from_session(avg_evs, peak_evs, best_target_hz=None, band_powers=None)

Update profile after a completed session.

Parameters

avg_evs : float Average EVS score over the session. peak_evs : float Peak EVS score. best_target_hz : float, optional If the adaptive engine found a better target, adopt it. band_powers : dict, optional Updated baseline band powers from this session.

Source code in src/sc_neurocore/audio/user_profile.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def update_from_session(
    self,
    avg_evs: float,
    peak_evs: float,
    best_target_hz: Optional[float] = None,
    band_powers: Optional[Dict[str, float]] = None,
) -> None:
    """Update profile after a completed session.

    Parameters
    ----------
    avg_evs : float
        Average EVS score over the session.
    peak_evs : float
        Peak EVS score.
    best_target_hz : float, optional
        If the adaptive engine found a better target, adopt it.
    band_powers : dict, optional
        Updated baseline band powers from this session.
    """
    self.session_count += 1

    # Adopt best target if it outperformed
    if best_target_hz is not None and avg_evs > 50.0:
        if self.preferred_target_hz is None:
            self.preferred_target_hz = best_target_hz
        else:
            # Exponential moving average toward the new target
            alpha = 0.3
            self.preferred_target_hz = (
                1 - alpha
            ) * self.preferred_target_hz + alpha * best_target_hz

    # Update baseline band powers (EMA blend)
    if band_powers:
        if not self.baseline_band_powers:
            self.baseline_band_powers = dict(band_powers)
        else:
            alpha = 0.2
            for band, power in band_powers.items():
                old = self.baseline_band_powers.get(band, power)
                self.baseline_band_powers[band] = (1 - alpha) * old + alpha * power

    logger.info(
        "Profile updated: session #%d, avg_evs=%.1f, target=%.2f Hz",
        self.session_count,
        avg_evs,
        self.preferred_target_hz or self.get_best_target_hz(),
    )