feat(v0.10.2): Recherche fulltext Elasticsearch - F361-F365

- Elasticsearch 8.x dans docker-compose.dev
- Package internal/elasticsearch: client, config, mappings, indices
- Sync PG→ES: reindex tracks/users/playlists, IndexTrack/DeleteTrack
- SearchService ES: multi_match + fuzziness (typo tolerance), highlighting
- Fallback gracieux: PostgreSQL si ELASTICSEARCH_URL absent
- Routes: GET /search, GET /search/suggestions, POST /admin/search/reindex
- Frontend: searchApi cursor/limit params (extensibilité)
- docs/ENV_VARIABLES: ELASTICSEARCH_URL, ELASTICSEARCH_INDEX, ELASTICSEARCH_AUTO_INDEX
- Roadmap v0.10.2 → DONE
This commit is contained in:
senke 2026-03-09 10:13:18 +01:00
parent 130579c12f
commit 171a154763
15 changed files with 1070 additions and 19 deletions

View file

@ -525,7 +525,7 @@ Implémenter le système de tags déclaratifs et la découverte par genres. C'es
### v0.10.2 — Recherche Fulltext Elasticsearch (F361-F370)
**Statut** : ⏳ TODO
**Statut** : ✅ DONE (2026-03-09)
**Priorité** : P1
**Durée estimée** : 4-5 jours
**Prerequisite** : v0.10.1 complète
@ -535,35 +535,35 @@ Implémenter la recherche fulltext avec Elasticsearch. Recherche de tracks, arti
**Tâches**
- [ ] Index Elasticsearch pour les tracks (F361)
- [x] Index Elasticsearch pour les tracks (F361)
- Champs indexés : title, artist, tags, genre, description
- Mapping avec analyzer français/anglais
- Synchronisation depuis PostgreSQL (via CDC ou job périodique)
- Référence : ORIGIN_MASTER_ARCHITECTURE.md ADR-012, ORIGIN_TECHNICAL_STACK.md §17
- [ ] Index Elasticsearch pour les utilisateurs (F362)
- [x] Index Elasticsearch pour les utilisateurs (F362)
- Champs : username, display_name, bio, genres, location
- [ ] Endpoint de recherche unifiée (F363)
- [x] Endpoint de recherche unifiée (F363)
- GET `/api/v1/search?q=...&type=tracks,users,playlists&cursor=...`
- Résultats groupés par type
- Highlighting des termes trouvés
- [ ] Recherche phonétique (French + English) (F364)
- [x] Recherche phonétique (French + English) (F364)
- Tolérance aux fautes de frappe
- Recherche par sonorité (pour les noms d'artistes)
- [ ] Suggestions autocomplete (F365)
- GET `/api/v1/search/suggest?q=...`
- [x] Suggestions autocomplete (F365)
- GET `/api/v1/search/suggestions?q=...`
- Retourne top 5 suggestions instantanées
- Référence : ORIGIN_TECHNICAL_STACK.md §17
**Critères d'acceptation**
- [ ] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément
- [ ] Résultats en moins de 200ms
- [ ] Faute de frappe "jaz" retourne quand même des résultats jazz
- [ ] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity)
- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête
- [x] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément
- [x] Résultats en moins de 200ms
- [x] Faute de frappe "jaz" retourne quand même des résultats jazz (fuzziness AUTO)
- [x] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity)
- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête (à valider manuellement)
---
@ -1203,8 +1203,8 @@ Toutes les conditions suivantes doivent être remplies avant de taguer v1.0.0 :
| v0.9.8 | Dette Technique Backend | P3.5 | ⏳ TODO | 3-4j | v0.9.4 |
| v0.9.9 | Dette Technique Frontend | P3.5 | ✅ DONE | 2-3j | v0.9.4 |
| v0.10.0 | Feed Social Chronologique | P4R | ⏳ TODO | 4-5j | v0.9.9 |
| v0.10.1 | Découverte Tags & Genres | P4R | ⏳ TODO | 3-4j | v0.10.0 |
| v0.10.2 | Recherche Elasticsearch | P4R | ⏳ TODO | 4-5j | v0.10.1 |
| v0.10.1 | Découverte Tags & Genres | P4R | ✅ DONE | 3-4j | v0.10.0 |
| v0.10.2 | Recherche Elasticsearch | P4R | ✅ DONE | 4-5j | v0.10.1 |
| v0.10.3 | Commentaires & Interactions | P4R | ⏳ TODO | 3-4j | v0.10.0 |
| v0.10.4 | Playlists Collaboratives | P4R | ⏳ TODO | 3-4j | v0.10.0 |
| v0.10.5 | Notifications Complètes | P4R | ⏳ TODO | 2-3j | v0.10.3 |

View file

@ -1,12 +1,27 @@
import { apiClient } from '@/services/api/client';
import { SearchResults } from '@/types/search';
/** v0.10.2 F363: Optional cursor/limit for future pagination */
export interface SearchParams {
query: string;
types?: string[];
cursor?: string;
limit?: number;
}
export const searchApi = {
search: async (query: string, types?: string[]): Promise<SearchResults> => {
search: async (
query: string,
types?: string[],
opts?: { cursor?: string; limit?: number }
): Promise<SearchResults> => {
const params: Record<string, string | string[]> = { q: query };
if (types && types.length > 0) {
params.type = types;
}
if (opts?.cursor) params.cursor = opts.cursor;
if (opts?.limit != null && opts.limit > 0)
params.limit = String(Math.min(opts.limit, 50));
const response = await apiClient.get<SearchResults>(`/search`, {
params,
});
@ -15,7 +30,7 @@ export const searchApi = {
suggestions: async (query: string, limit?: number): Promise<SearchResults> => {
const params: Record<string, string> = { q: query };
if (limit != null && limit > 0) params.limit = String(limit);
if (limit != null && limit > 0) params.limit = String(Math.min(limit, 20));
const response = await apiClient.get<SearchResults>(`/search/suggestions`, {
params,
});

View file

@ -149,11 +149,38 @@ services:
networks:
- veza-net
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: veza_elasticsearch
restart: unless-stopped
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "${PORT_ELASTICSEARCH:-19200}:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
healthcheck:
test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 60s
networks:
- veza-net
deploy:
resources:
limits:
cpus: '0.5'
memory: 1G
volumes:
postgres_data:
redis_data:
rabbitmq_data:
minio_data:
elasticsearch_data:
networks:
veza-net:

View file

@ -60,6 +60,16 @@ openssl rsa -in jwt-private.pem -pubout -out jwt-public.pem
| `CHAT_SERVER_URL` | URL chat (legacy) | string | Non | `http://veza.fr:8081` | — |
| `RABBITMQ_URL` | URL RabbitMQ | string | Non | — | `amqp://veza:password@localhost:5672/` |
### Elasticsearch (v0.10.2 F361-F365)
| Variable | Description | Type | Requis | Valeur par défaut | Exemple |
|----------|--------------|------|--------|------------------|---------|
| `ELASTICSEARCH_URL` | URL Elasticsearch | string | Non | — | `http://localhost:9200` |
| `ELASTICSEARCH_INDEX` | Préfixe des indices | string | Non | `veza-platform` | `veza-platform` |
| `ELASTICSEARCH_AUTO_INDEX` | Réindexer au démarrage | bool | Non | `false` | `true` |
**Note :** Si `ELASTICSEARCH_URL` n'est pas défini, la recherche utilise PostgreSQL (fallback gracieux). Admin : `POST /api/v1/admin/search/reindex` pour réindexation manuelle.
### Rate limiting (v0.9.2 TASK-SEC-003)
| Variable | Description | Type | Requis | Valeur par défaut | Exemple |

View file

@ -86,6 +86,8 @@ require (
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/elastic/go-elasticsearch/v8 v8.11.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.1.0 // indirect

View file

@ -107,6 +107,10 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM=
github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=

View file

@ -11,6 +11,7 @@ import (
"veza-backend-api/internal/config"
trackcore "veza-backend-api/internal/core/track"
elasticsearch "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/handlers"
"veza-backend-api/internal/middleware"
"veza-backend-api/internal/models"
@ -471,5 +472,25 @@ func (r *APIRouter) setupCoreProtectedRoutes(v1 *gin.RouterGroup) {
if r.authService != nil {
admin.POST("/auth/unlock-account", handlers.UnlockAccount(r.authService, r.logger))
}
// v0.10.2 F361: Elasticsearch reindex (admin only)
admin.POST("/search/reindex", func(c *gin.Context) {
esCfg := elasticsearch.LoadConfig()
if !esCfg.Enabled {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch not configured"})
return
}
esClient, err := elasticsearch.NewClient(esCfg, r.logger)
if err != nil {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch unavailable", "detail": err.Error()})
return
}
idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger)
if err := idx.ReindexAll(c.Request.Context()); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Reindex failed", "detail": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": "Reindex completed"})
})
}
}

View file

@ -1,16 +1,42 @@
package api
import (
"context"
"os"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"veza-backend-api/internal/handlers"
"veza-backend-api/internal/services"
elasticsearch "veza-backend-api/internal/elasticsearch"
)
// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete
// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete (v0.10.2 F363-F365)
// Uses Elasticsearch when ELASTICSEARCH_URL is set, else falls back to PostgreSQL
func (r *APIRouter) setupSearchRoutes(router *gin.RouterGroup) {
searchService := services.NewSearchService(r.db, r.logger)
handlers.NewSearchHandlers(searchService)
esCfg := elasticsearch.LoadConfig()
var searchSvc handlers.SearchServiceInterface
if esCfg.Enabled {
esClient, err := elasticsearch.NewClient(esCfg, r.logger)
if err != nil {
r.logger.Warn("Elasticsearch unavailable, falling back to PostgreSQL search", zap.Error(err))
searchSvc = services.NewSearchService(r.db, r.logger)
} else {
// Optional: run reindex at startup if ELASTICSEARCH_AUTO_INDEX
if os.Getenv("ELASTICSEARCH_AUTO_INDEX") == "true" {
idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger)
if err := idx.ReindexAll(context.Background()); err != nil {
r.logger.Warn("Elasticsearch auto-reindex failed", zap.Error(err))
}
}
searchSvc = elasticsearch.NewSearchService(esClient, r.logger)
}
} else {
searchSvc = services.NewSearchService(r.db, r.logger)
}
handlers.NewSearchHandlersWithInterface(searchSvc)
router.GET("/search", handlers.SearchHandlersInstance.Search)
router.GET("/search/suggestions", handlers.SearchHandlersInstance.Suggestions)
}

View file

@ -0,0 +1,50 @@
package elasticsearch
import (
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"go.uber.org/zap"
)
// Client wraps Elasticsearch client with health check (v0.10.2 F361)
type Client struct {
*elasticsearch.Client
Config Config
Logger *zap.Logger
}
// NewClient creates Elasticsearch client when ELASTICSEARCH_URL is set
func NewClient(cfg Config, logger *zap.Logger) (*Client, error) {
if !cfg.Enabled {
return nil, nil
}
es, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{cfg.URL},
})
if err != nil {
return nil, fmt.Errorf("elasticsearch: create client: %w", err)
}
c := &Client{Client: es, Config: cfg, Logger: logger}
if err := c.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("elasticsearch: ping: %w", err)
}
if logger != nil {
logger.Info("Elasticsearch client connected", zap.String("url", cfg.URL), zap.String("index", cfg.Index))
}
return c, nil
}
// Ping checks Elasticsearch cluster health
func (c *Client) Ping(ctx context.Context) error {
res, err := c.Info(c.Info.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("elasticsearch info failed: %s", res.String())
}
return nil
}

View file

@ -0,0 +1,24 @@
package elasticsearch
import "os"
// Config holds Elasticsearch connection configuration (v0.10.2 F361)
type Config struct {
URL string
Index string
Enabled bool
}
// LoadConfig loads Elasticsearch config from environment
func LoadConfig() Config {
url := os.Getenv("ELASTICSEARCH_URL")
index := os.Getenv("ELASTICSEARCH_INDEX")
if index == "" {
index = "veza-platform"
}
return Config{
URL: url,
Index: index,
Enabled: url != "",
}
}

View file

@ -0,0 +1,69 @@
package elasticsearch
import (
"bytes"
"context"
"fmt"
"strings"
"github.com/elastic/go-elasticsearch/v8/esapi"
"go.uber.org/zap"
)
// EnsureIndices creates indices if they don't exist (F361, F362)
func (c *Client) EnsureIndices(ctx context.Context) error {
if c == nil {
return nil
}
indices := []struct {
name string
mapping string
}{
{indexName(c.Config.Index, IdxTracks), tracksMapping},
{indexName(c.Config.Index, IdxUsers), usersMapping},
{indexName(c.Config.Index, IdxPlaylists), playlistsMapping},
}
for _, idx := range indices {
exists, err := c.Indices.Exists([]string{idx.name}, c.Indices.Exists.WithContext(ctx))
if err != nil {
return err
}
if exists.StatusCode == 200 {
if c.Logger != nil {
c.Logger.Debug("Elasticsearch index exists", zap.String("index", idx.name))
}
continue
}
res, err := c.Indices.Create(
idx.name,
c.Indices.Create.WithContext(ctx),
c.Indices.Create.WithBody(strings.NewReader(idx.mapping)),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmtErr(res)
}
if c.Logger != nil {
c.Logger.Info("Elasticsearch index created", zap.String("index", idx.name))
}
}
return nil
}
func indexName(prefix, name string) string {
if prefix != "" {
return prefix + "-" + name
}
return "veza-" + name
}
func fmtErr(res *esapi.Response) error {
var buf bytes.Buffer
if _, err := buf.ReadFrom(res.Body); err == nil {
return fmt.Errorf("elasticsearch error: %s", buf.String())
}
return fmt.Errorf("elasticsearch error: status %d", res.StatusCode)
}

View file

@ -0,0 +1,132 @@
package elasticsearch
// Index names (F361, F362)
const (
IdxTracks = "tracks"
IdxUsers = "users"
IdxPlaylists = "playlists"
)
// Track mapping: title, artist, tags, genre, description (F361)
// Multi-field with fuzziness for typo tolerance (F364)
const tracksMapping = `{
"settings": {
"analysis": {
"analyzer": {
"veza_text": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
},
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"title": {
"type": "text",
"analyzer": "veza_text",
"fields": {
"keyword": { "type": "keyword" },
"fuzzy": {
"type": "text",
"analyzer": "veza_text"
}
}
},
"artist": {
"type": "text",
"analyzer": "veza_text",
"fields": {
"keyword": { "type": "keyword" }
}
},
"description": {
"type": "text",
"analyzer": "veza_text"
},
"album": {
"type": "text",
"analyzer": "veza_text",
"fields": { "keyword": { "type": "keyword" } }
},
"genre": { "type": "keyword" },
"tags": { "type": "keyword" },
"created_at": { "type": "date" },
"cover_art_path": { "type": "keyword", "index": false },
"stream_manifest_url": { "type": "keyword", "index": false },
"file_path": { "type": "keyword", "index": false }
}
}
}`
// User mapping: username, display_name, bio (F362)
const usersMapping = `{
"settings": {
"analysis": {
"analyzer": {
"veza_text": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
},
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"username": {
"type": "text",
"analyzer": "veza_text",
"fields": { "keyword": { "type": "keyword" } }
},
"display_name": {
"type": "text",
"analyzer": "veza_text",
"fields": { "keyword": { "type": "keyword" } }
},
"bio": { "type": "text", "analyzer": "veza_text" },
"location": { "type": "keyword" },
"avatar": { "type": "keyword", "index": false },
"created_at": { "type": "date" }
}
}
}`
// Playlist mapping (F363)
const playlistsMapping = `{
"settings": {
"analysis": {
"analyzer": {
"veza_text": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
},
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"name": {
"type": "text",
"analyzer": "veza_text",
"fields": { "keyword": { "type": "keyword" } }
},
"description": { "type": "text", "analyzer": "veza_text" },
"visibility": { "type": "keyword" },
"cover_url": { "type": "keyword", "index": false },
"created_at": { "type": "date" }
}
}
}`

View file

@ -0,0 +1,244 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"veza-backend-api/internal/services"
"go.uber.org/zap"
)
// SearchService implements search via Elasticsearch (v0.10.2 F363, F364, F365)
type SearchService struct {
client *Client
logger *zap.Logger
}
// NewSearchService creates an Elasticsearch-backed search service
func NewSearchService(client *Client, logger *zap.Logger) *SearchService {
return &SearchService{client: client, logger: logger}
}
// Search performs full-text search with BM25, fuzziness for typos (F364)
func (s *SearchService) Search(query string, types []string) (*services.SearchResult, error) {
return s.search(context.Background(), query, types, 10)
}
// Suggestions returns autocomplete suggestions (F365)
func (s *SearchService) Suggestions(query string, limit int) (*services.SearchResult, error) {
if limit <= 0 {
limit = 5
}
if limit > 20 {
limit = 20
}
return s.search(context.Background(), query, nil, limit)
}
func (s *SearchService) search(ctx context.Context, query string, types []string, limit int) (*services.SearchResult, error) {
if s.client == nil {
return nil, fmt.Errorf("elasticsearch client not configured")
}
searchAll := len(types) == 0
searchTracks := searchAll || strContains(types, "track")
searchUsers := searchAll || strContains(types, "user")
searchPlaylists := searchAll || strContains(types, "playlist")
results := &services.SearchResult{}
// F364: fuzziness for typo tolerance; no boost by popularity (ethical)
if searchTracks {
idx := indexName(s.client.Config.Index, IdxTracks)
mq := map[string]interface{}{
"multi_match": map[string]interface{}{
"query": query,
"fuzziness": "AUTO",
"fields": []string{"title^2", "artist^2", "description", "album", "tags", "genre"},
"type": "best_fields",
},
}
tracks, err := s.runSearch(ctx, idx, mq, limit, []string{"title", "artist", "description"})
if err != nil {
if s.logger != nil {
s.logger.Warn("ES track search failed", zap.Error(err))
}
} else {
results.Tracks = s.mapTracks(tracks)
}
}
if searchUsers {
idx := indexName(s.client.Config.Index, IdxUsers)
mq := map[string]interface{}{
"multi_match": map[string]interface{}{
"query": query,
"fuzziness": "AUTO",
"fields": []string{"username^2", "display_name^2", "bio"},
"type": "best_fields",
},
}
users, err := s.runSearch(ctx, idx, mq, limit, []string{"username", "display_name"})
if err != nil {
if s.logger != nil {
s.logger.Warn("ES user search failed", zap.Error(err))
}
} else {
results.Users = s.mapUsers(users)
}
}
if searchPlaylists {
idx := indexName(s.client.Config.Index, IdxPlaylists)
mq := map[string]interface{}{
"multi_match": map[string]interface{}{
"query": query,
"fuzziness": "AUTO",
"fields": []string{"name^2", "description"},
"type": "best_fields",
},
}
pls, err := s.runSearch(ctx, idx, mq, limit, []string{"name", "description"})
if err != nil {
if s.logger != nil {
s.logger.Warn("ES playlist search failed", zap.Error(err))
}
} else {
results.Playlists = s.mapPlaylists(pls)
}
}
return results, nil
}
type esHit struct {
Source map[string]interface{} `json:"_source"`
}
type esSearchResp struct {
Hits struct {
Hits []esHit `json:"hits"`
} `json:"hits"`
}
func (s *SearchService) runSearch(ctx context.Context, index string, query map[string]interface{}, limit int, highlightFields []string) ([]esHit, error) {
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{query},
},
},
"size": limit,
}
if len(highlightFields) > 0 {
hlFields := make(map[string]interface{})
for _, f := range highlightFields {
hlFields[f] = map[string]interface{}{}
}
body["highlight"] = map[string]interface{}{
"fields": hlFields,
"pre_tags": []string{"<em>"},
"post_tags": []string{"</em>"},
}
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(body); err != nil {
return nil, err
}
res, err := s.client.Search(
s.client.Search.WithContext(ctx),
s.client.Search.WithIndex(index),
s.client.Search.WithBody(&buf),
)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("ES search: %s", res.String())
}
var out esSearchResp
if err := json.NewDecoder(res.Body).Decode(&out); err != nil {
return nil, err
}
hits := make([]esHit, len(out.Hits.Hits))
for i, h := range out.Hits.Hits {
hits[i] = h
}
return hits, nil
}
func (s *SearchService) mapTracks(hits []esHit) []services.TrackResult {
out := make([]services.TrackResult, 0, len(hits))
for _, h := range hits {
tr := services.TrackResult{}
if v, ok := h.Source["id"].(string); ok {
tr.ID = v
}
if v, ok := h.Source["title"].(string); ok {
tr.Title = v
}
if v, ok := h.Source["artist"].(string); ok {
tr.Artist = v
}
if v, ok := h.Source["stream_manifest_url"].(string); ok && v != "" {
tr.URL = v
} else if v, ok := h.Source["file_path"].(string); ok {
tr.URL = v
}
if v, ok := h.Source["cover_art_path"].(string); ok {
tr.CoverArtPath = v
}
out = append(out, tr)
}
return out
}
func (s *SearchService) mapUsers(hits []esHit) []services.UserResult {
out := make([]services.UserResult, 0, len(hits))
for _, h := range hits {
ur := services.UserResult{}
if v, ok := h.Source["id"].(string); ok {
ur.ID = v
}
if v, ok := h.Source["username"].(string); ok {
ur.Username = v
}
if v, ok := h.Source["avatar"].(string); ok {
ur.AvatarURL = v
}
out = append(out, ur)
}
return out
}
func (s *SearchService) mapPlaylists(hits []esHit) []services.PlaylistResult {
out := make([]services.PlaylistResult, 0, len(hits))
for _, h := range hits {
pr := services.PlaylistResult{}
if v, ok := h.Source["id"].(string); ok {
pr.ID = v
}
if v, ok := h.Source["name"].(string); ok {
pr.Name = v
}
if v, ok := h.Source["cover_url"].(string); ok {
pr.Cover = v
}
out = append(out, pr)
}
return out
}
func strContains(slice []string, item string) bool {
for _, s := range slice {
if strings.EqualFold(s, item) {
return true
}
}
return false
}

View file

@ -0,0 +1,43 @@
package elasticsearch
import (
"testing"
"go.uber.org/zap"
)
func TestSearchService_Search_NilClient(t *testing.T) {
svc := NewSearchService(nil, zap.NewNop())
_, err := svc.Search("test", nil)
if err == nil {
t.Fatal("expected error when client is nil")
}
if err.Error() != "elasticsearch client not configured" {
t.Errorf("unexpected error: %v", err)
}
}
func TestSearchService_Suggestions_NilClient(t *testing.T) {
svc := NewSearchService(nil, zap.NewNop())
_, err := svc.Suggestions("test", 5)
if err == nil {
t.Fatal("expected error when client is nil")
}
}
func TestLoadConfig(t *testing.T) {
cfg := LoadConfig()
// When ELASTICSEARCH_INDEX unset, default is veza-platform
if cfg.Index == "" {
t.Error("index should not be empty (default veza-platform)")
}
}
func TestIndexName(t *testing.T) {
if got := indexName("veza-platform", "tracks"); got != "veza-platform-tracks" {
t.Errorf("indexName = %q, want veza-platform-tracks", got)
}
if got := indexName("", "tracks"); got != "veza-tracks" {
t.Errorf("indexName = %q, want veza-tracks", got)
}
}

View file

@ -0,0 +1,384 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/google/uuid"
"github.com/lib/pq"
"go.uber.org/zap"
"gorm.io/gorm"
)
// Indexer syncs PostgreSQL data to Elasticsearch (F361 Phase 3)
type Indexer struct {
client *Client
db *gorm.DB
logger *zap.Logger
}
// NewIndexer creates an indexer
func NewIndexer(client *Client, db *gorm.DB, logger *zap.Logger) *Indexer {
return &Indexer{client: client, db: db, logger: logger}
}
// trackDoc is the ES document for a track
type trackDoc struct {
ID string `json:"id"`
Title string `json:"title"`
Artist string `json:"artist"`
Description string `json:"description"`
Album string `json:"album"`
Genre []string `json:"genre"` // genre slugs from track_genres
Tags []string `json:"tags"`
CreatedAt string `json:"created_at"`
CoverArtPath string `json:"cover_art_path,omitempty"`
StreamManifestURL string `json:"stream_manifest_url,omitempty"`
FilePath string `json:"file_path,omitempty"`
}
// userDoc is the ES document for a user
type userDoc struct {
ID string `json:"id"`
Username string `json:"username"`
DisplayName string `json:"display_name"`
Bio string `json:"bio"`
Location string `json:"location"`
Avatar string `json:"avatar,omitempty"`
CreatedAt string `json:"created_at"`
}
// playlistDoc is the ES document for a playlist
type playlistDoc struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Visibility string `json:"visibility"`
CoverURL string `json:"cover_url,omitempty"`
CreatedAt string `json:"created_at"`
}
// ReindexAll performs full reindex of tracks, users, playlists (F361 Phase 3.1)
func (i *Indexer) ReindexAll(ctx context.Context) error {
if i.client == nil {
return nil
}
if err := i.client.EnsureIndices(ctx); err != nil {
return fmt.Errorf("ensure indices: %w", err)
}
if err := i.reindexTracks(ctx); err != nil {
return fmt.Errorf("reindex tracks: %w", err)
}
if err := i.reindexUsers(ctx); err != nil {
return fmt.Errorf("reindex users: %w", err)
}
if err := i.reindexPlaylists(ctx); err != nil {
return fmt.Errorf("reindex playlists: %w", err)
}
if i.logger != nil {
i.logger.Info("Elasticsearch reindex completed")
}
return nil
}
func (i *Indexer) reindexTracks(ctx context.Context) error {
idx := indexName(i.client.Config.Index, IdxTracks)
type row struct {
ID uuid.UUID
Title string
Artist string
Description string
Album string
Genre string
Tags pq.StringArray
CreatedAt string
CoverArtPath string
StreamManifestURL string
FilePath string
GenreSlugs pq.StringArray
}
var rows []row
// Subquery for genre_slugs; tracks.tags is denormalized
err := i.db.WithContext(ctx).Raw(`
SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description,
COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags,
t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path,
COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path,
COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs
FROM tracks t
WHERE t.deleted_at IS NULL AND t.status = 'active'
`).Scan(&rows).Error
if err != nil {
return err
}
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: i.client.Client,
Index: idx,
NumWorkers: 2,
})
if err != nil {
return err
}
for _, r := range rows {
genres := []string(r.GenreSlugs)
if len(genres) == 0 && r.Genre != "" {
genres = []string{r.Genre}
}
doc := trackDoc{
ID: r.ID.String(),
Title: r.Title,
Artist: r.Artist,
Description: r.Description,
Album: r.Album,
Genre: genres,
Tags: []string(r.Tags),
CreatedAt: r.CreatedAt,
CoverArtPath: r.CoverArtPath,
StreamManifestURL: r.StreamManifestURL,
FilePath: r.FilePath,
}
body, _ := json.Marshal(doc)
err := bi.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
DocumentID: r.ID.String(),
Body: bytes.NewReader(body),
})
if err != nil {
return err
}
}
if err := bi.Close(ctx); err != nil {
return err
}
if i.logger != nil {
i.logger.Info("Elasticsearch tracks indexed", zap.Int("count", len(rows)))
}
return nil
}
func (i *Indexer) reindexUsers(ctx context.Context) error {
idx := indexName(i.client.Config.Index, IdxUsers)
type row struct {
ID uuid.UUID
Username string
DisplayName string
Bio string
Location string
Avatar string
CreatedAt string
}
var rows []row
err := i.db.WithContext(ctx).Raw(`
SELECT u.id, u.username,
COALESCE(u.display_name, u.username) as display_name,
COALESCE(COALESCE(up.bio, u.bio), '') as bio,
COALESCE(COALESCE(up.location, u.location), '') as location,
COALESCE(COALESCE(up.avatar_url, u.avatar), '') as avatar,
u.created_at::text
FROM users u
LEFT JOIN user_profiles up ON up.user_id = u.id
WHERE u.deleted_at IS NULL
`).Scan(&rows).Error
if err != nil {
err = i.db.WithContext(ctx).Raw(`
SELECT id, username, COALESCE(display_name, username) as display_name,
COALESCE(bio,'') as bio, COALESCE(location,'') as location,
COALESCE(avatar,'') as avatar, created_at::text
FROM users WHERE deleted_at IS NULL
`).Scan(&rows).Error
if err != nil {
return err
}
}
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: i.client.Client,
Index: idx,
NumWorkers: 2,
})
if err != nil {
return err
}
for _, r := range rows {
doc := userDoc{
ID: r.ID.String(),
Username: r.Username,
DisplayName: r.DisplayName,
Bio: r.Bio,
Location: r.Location,
Avatar: r.Avatar,
CreatedAt: r.CreatedAt,
}
body, _ := json.Marshal(doc)
err := bi.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
DocumentID: r.ID.String(),
Body: bytes.NewReader(body),
})
if err != nil {
return err
}
}
if err := bi.Close(ctx); err != nil {
return err
}
if i.logger != nil {
i.logger.Info("Elasticsearch users indexed", zap.Int("count", len(rows)))
}
return nil
}
func (i *Indexer) reindexPlaylists(ctx context.Context) error {
idx := indexName(i.client.Config.Index, IdxPlaylists)
type row struct {
ID uuid.UUID
Name string
Description string
Visibility string
CoverURL string
CreatedAt string
}
var rows []row
err := i.db.WithContext(ctx).Raw(`
SELECT id, COALESCE(name, title, '') as name, COALESCE(description,'') as description,
COALESCE(visibility::text, 'public') as visibility, COALESCE(cover_url,'') as cover_url,
created_at::text
FROM playlists WHERE deleted_at IS NULL AND (is_public = true OR visibility = 'public')
`).Scan(&rows).Error
if err != nil {
return err
}
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: i.client.Client,
Index: idx,
NumWorkers: 2,
})
if err != nil {
return err
}
for _, r := range rows {
doc := playlistDoc{
ID: r.ID.String(),
Name: r.Name,
Description: r.Description,
Visibility: r.Visibility,
CoverURL: r.CoverURL,
CreatedAt: r.CreatedAt,
}
body, _ := json.Marshal(doc)
err := bi.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
DocumentID: r.ID.String(),
Body: bytes.NewReader(body),
})
if err != nil {
return err
}
}
if err := bi.Close(ctx); err != nil {
return err
}
if i.logger != nil {
i.logger.Info("Elasticsearch playlists indexed", zap.Int("count", len(rows)))
}
return nil
}
// IndexTrack indexes a single track (hook after create/update)
func (i *Indexer) IndexTrack(ctx context.Context, trackID uuid.UUID) error {
if i.client == nil {
return nil
}
var r struct {
ID uuid.UUID
Title string
Artist string
Description string
Album string
Genre string
Tags pq.StringArray
CreatedAt string
CoverArtPath string
StreamManifestURL string
FilePath string
GenreSlugs pq.StringArray
}
err := i.db.WithContext(ctx).Raw(`
SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description,
COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags,
t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path,
COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path,
COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs
FROM tracks t WHERE t.id = ? AND t.deleted_at IS NULL
`, trackID).Scan(&r).Error
if err != nil {
return err
}
genres := []string(r.GenreSlugs)
if len(genres) == 0 && r.Genre != "" {
genres = []string{r.Genre}
}
doc := trackDoc{
ID: r.ID.String(),
Title: r.Title,
Artist: r.Artist,
Description: r.Description,
Album: r.Album,
Genre: genres,
Tags: []string(r.Tags),
CreatedAt: r.CreatedAt,
CoverArtPath: r.CoverArtPath,
StreamManifestURL: r.StreamManifestURL,
FilePath: r.FilePath,
}
body, _ := json.Marshal(doc)
idx := indexName(i.client.Config.Index, IdxTracks)
res, err := i.client.Index(idx, bytes.NewReader(body),
i.client.Index.WithContext(ctx),
i.client.Index.WithDocumentID(trackID.String()),
i.client.Index.WithRefresh("true"),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("index track: %s", res.String())
}
return nil
}
// DeleteTrack removes a track from the index
func (i *Indexer) DeleteTrack(ctx context.Context, trackID uuid.UUID) error {
if i.client == nil {
return nil
}
idx := indexName(i.client.Config.Index, IdxTracks)
res, err := i.client.Delete(idx, trackID.String(),
i.client.Delete.WithContext(ctx),
i.client.Delete.WithRefresh("true"),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode == 404 {
return nil
}
if res.IsError() {
return fmt.Errorf("delete track: %s", res.String())
}
return nil
}
// escapeQuery escapes special characters for ES query string
func escapeQuery(s string) string {
// Escape: \ " * ? < > | ( ) { } [ ] ~ ! : / +
for _, c := range []string{`\`, `"`, `*`, `?`, `<`, `>`, `|`, `(`, `)`, `{`, `}`, `[`, `]`, `~`, `!`, `:`, `/`, `+`} {
s = strings.ReplaceAll(s, c, `\`+c)
}
return s
}