fix(stream): migrate sqlx query! macros to runtime queries

- Convert all sqlx::query!() and sqlx::query_scalar!() compile-time
  macros to runtime sqlx::query() and sqlx::query_scalar() with .bind()
- Affected files: segment_tracker.rs, processor.rs, callbacks.rs
- This removes the dependency on .sqlx/ directory for offline mode
- Update Dockerfile to remove SQLX_OFFLINE=true and .sqlx COPY
- Stream server can now compile without a live database connection

The compile-time macros required either a DATABASE_URL at build time or
a .sqlx directory with cached query metadata (neither was available).
Runtime queries trade compile-time SQL validation for buildability.

Addresses audit finding: debt item 1 (stream server compilation).

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
senke 2026-02-11 22:49:30 +01:00
parent d934eaa763
commit c62d63fc05
4 changed files with 43 additions and 43 deletions

View file

@ -16,8 +16,8 @@ RUN cargo fetch --locked
# Copy source code
COPY src ./src
COPY .sqlx ./.sqlx
ENV SQLX_OFFLINE=true
# No longer using sqlx compile-time macros (query!), so no .sqlx dir or SQLX_OFFLINE needed
ENV SQLX_OFFLINE=false
# Copy migrations if they exist (Removed as directory does not exist)
# COPY migrations ./migrations
COPY proto ./proto

View file

@ -53,14 +53,14 @@ impl ProcessingCallbacks {
);
// Mettre à jour le statut en DB
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET status = 'done', updated_at = NOW()
WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.execute(&self.db)
.await
.map_err(|e| AppError::InternalError {
@ -95,15 +95,15 @@ impl ProcessingCallbacks {
);
// Mettre à jour le statut avec l'erreur
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET status = 'error', error_message = $1, updated_at = NOW()
WHERE id = $2
"#,
error_message,
self.job_id
)
.bind(error_message)
.bind(&self.job_id)
.execute(&self.db)
.await
.map_err(|e| AppError::InternalError {

View file

@ -258,12 +258,12 @@ impl StreamProcessor {
})?;
// 1. VALIDATION : Vérifier que le job existe et est en "encoding"
let job_status: Option<String> = sqlx::query_scalar!(
let job_status: Option<String> = sqlx::query_scalar(
r#"
SELECT status FROM stream_jobs WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -280,19 +280,19 @@ impl StreamProcessor {
let segments = tracker.get_segments().await;
if !segments.is_empty() {
for segment in segments.iter() {
sqlx::query!(
sqlx::query(
r#"
INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration, created_at = NOW()
"#,
self.job.track_id,
self.job.quality,
segment.index as i32,
segment.path.to_string_lossy(),
segment.duration
)
.bind(&self.job.track_id)
.bind(&self.job.quality)
.bind(segment.index as i32)
.bind(segment.path.to_string_lossy().as_ref())
.bind(segment.duration)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -306,14 +306,14 @@ impl StreamProcessor {
let total_duration: f64 = segments.iter().map(|s| s.duration).sum();
// 4. UPDATE job : Finalisation (status = 'done', duration, completed_at)
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET status = 'done', updated_at = NOW()
WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -402,15 +402,15 @@ impl StreamProcessor {
/// Met à jour le statut en DB
async fn update_status(&self, status: &str) -> Result<(), AppError> {
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET status = $1, updated_at = NOW()
WHERE id = $2
"#,
status,
self.job_id
)
.bind(status)
.bind(&self.job_id)
.execute(&self.db)
.await
.map_err(|e| AppError::InternalError {

View file

@ -93,12 +93,12 @@ impl SegmentTracker {
})?;
// 1. VALIDATION : Vérifier que le job existe et est en "processing"
let job_status: Option<String> = sqlx::query_scalar!(
let job_status: Option<String> = sqlx::query_scalar(
r#"
SELECT status FROM stream_jobs WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -112,19 +112,19 @@ impl SegmentTracker {
}
// 2. INSERT segment
sqlx::query!(
sqlx::query(
r#"
INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration, created_at = NOW()
"#,
self.track_id,
self.quality,
segment.index as i32,
segment.path.to_string_lossy(),
segment.duration
)
.bind(&self.track_id)
.bind(&self.quality)
.bind(segment.index as i32)
.bind(segment.path.to_string_lossy().as_ref())
.bind(segment.duration)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -137,14 +137,14 @@ impl SegmentTracker {
let total_duration: f64 = segments.iter().map(|s| s.duration).sum();
// 4. UPDATE job (duration + updated_at)
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET updated_at = NOW()
WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -175,14 +175,14 @@ impl SegmentTracker {
// Note: Si la colonne current_duration n'existe pas dans stream_jobs,
// on peut l'ajouter via une migration ou utiliser une autre table
// Pour l'instant, on calcule depuis stream_segments
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET updated_at = NOW()
WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.execute(&self.db)
.await
.map_err(|e| AppError::InternalError {
@ -240,19 +240,19 @@ impl SegmentTracker {
})?;
// 1. VALIDATION : Vérifier que le job existe
let job_exists: bool = sqlx::query_scalar!(
let job_exists: Option<bool> = sqlx::query_scalar(
r#"
SELECT EXISTS(SELECT 1 FROM stream_jobs WHERE id = $1)
"#,
self.job_id
)
.bind(&self.job_id)
.fetch_one(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
message: format!("Failed to validate job {}: {}", self.job_id, e),
})?;
if !job_exists {
if !job_exists.unwrap_or(false) {
return Err(AppError::NotFound {
resource: format!("Job {}", self.job_id),
});
@ -260,19 +260,19 @@ impl SegmentTracker {
// 2. INSERT tous les segments en batch
for segment in segments.iter() {
sqlx::query!(
sqlx::query(
r#"
INSERT INTO stream_segments (track_id, quality, segment_index, path, duration, created_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (track_id, quality, segment_index) DO UPDATE
SET path = EXCLUDED.path, duration = EXCLUDED.duration, created_at = NOW()
"#,
self.track_id,
self.quality,
segment.index as i32,
segment.path.to_string_lossy(),
segment.duration
)
.bind(&self.track_id)
.bind(&self.quality)
.bind(segment.index as i32)
.bind(segment.path.to_string_lossy().as_ref())
.bind(segment.duration)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {
@ -285,14 +285,14 @@ impl SegmentTracker {
let total_duration: f64 = segments.iter().map(|s| s.duration).sum();
// 4. UPDATE job (updated_at)
sqlx::query!(
sqlx::query(
r#"
UPDATE stream_jobs
SET updated_at = NOW()
WHERE id = $1
"#,
self.job_id
)
.bind(&self.job_id)
.execute(&mut *tx)
.await
.map_err(|e| AppError::InternalError {