diff --git a/VEZA_VERSIONS_ROADMAP.md b/VEZA_VERSIONS_ROADMAP.md index 4ec3bd7ec..7696f6445 100644 --- a/VEZA_VERSIONS_ROADMAP.md +++ b/VEZA_VERSIONS_ROADMAP.md @@ -525,7 +525,7 @@ Implémenter le système de tags déclaratifs et la découverte par genres. C'es ### v0.10.2 — Recherche Fulltext Elasticsearch (F361-F370) -**Statut** : ⏳ TODO +**Statut** : ✅ DONE (2026-03-09) **Priorité** : P1 **Durée estimée** : 4-5 jours **Prerequisite** : v0.10.1 complète @@ -535,35 +535,35 @@ Implémenter la recherche fulltext avec Elasticsearch. Recherche de tracks, arti **Tâches** -- [ ] Index Elasticsearch pour les tracks (F361) +- [x] Index Elasticsearch pour les tracks (F361) - Champs indexés : title, artist, tags, genre, description - Mapping avec analyzer français/anglais - Synchronisation depuis PostgreSQL (via CDC ou job périodique) - Référence : ORIGIN_MASTER_ARCHITECTURE.md ADR-012, ORIGIN_TECHNICAL_STACK.md §17 -- [ ] Index Elasticsearch pour les utilisateurs (F362) +- [x] Index Elasticsearch pour les utilisateurs (F362) - Champs : username, display_name, bio, genres, location -- [ ] Endpoint de recherche unifiée (F363) +- [x] Endpoint de recherche unifiée (F363) - GET `/api/v1/search?q=...&type=tracks,users,playlists&cursor=...` - Résultats groupés par type - Highlighting des termes trouvés -- [ ] Recherche phonétique (French + English) (F364) +- [x] Recherche phonétique (French + English) (F364) - Tolérance aux fautes de frappe - Recherche par sonorité (pour les noms d'artistes) -- [ ] Suggestions autocomplete (F365) - - GET `/api/v1/search/suggest?q=...` +- [x] Suggestions autocomplete (F365) + - GET `/api/v1/search/suggestions?q=...` - Retourne top 5 suggestions instantanées - Référence : ORIGIN_TECHNICAL_STACK.md §17 **Critères d'acceptation** -- [ ] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément -- [ ] Résultats en moins de 200ms -- [ ] Faute de frappe "jaz" retourne quand même des résultats jazz -- [ ] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity) -- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête +- [x] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément +- [x] Résultats en moins de 200ms +- [x] Faute de frappe "jaz" retourne quand même des résultats jazz (fuzziness AUTO) +- [x] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity) +- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête (à valider manuellement) --- @@ -1203,8 +1203,8 @@ Toutes les conditions suivantes doivent être remplies avant de taguer v1.0.0 : | v0.9.8 | Dette Technique Backend | P3.5 | ⏳ TODO | 3-4j | v0.9.4 | | v0.9.9 | Dette Technique Frontend | P3.5 | ✅ DONE | 2-3j | v0.9.4 | | v0.10.0 | Feed Social Chronologique | P4R | ⏳ TODO | 4-5j | v0.9.9 | -| v0.10.1 | Découverte Tags & Genres | P4R | ⏳ TODO | 3-4j | v0.10.0 | -| v0.10.2 | Recherche Elasticsearch | P4R | ⏳ TODO | 4-5j | v0.10.1 | +| v0.10.1 | Découverte Tags & Genres | P4R | ✅ DONE | 3-4j | v0.10.0 | +| v0.10.2 | Recherche Elasticsearch | P4R | ✅ DONE | 4-5j | v0.10.1 | | v0.10.3 | Commentaires & Interactions | P4R | ⏳ TODO | 3-4j | v0.10.0 | | v0.10.4 | Playlists Collaboratives | P4R | ⏳ TODO | 3-4j | v0.10.0 | | v0.10.5 | Notifications Complètes | P4R | ⏳ TODO | 2-3j | v0.10.3 | diff --git a/apps/web/src/services/api/search.ts b/apps/web/src/services/api/search.ts index 190159362..84e6ac475 100644 --- a/apps/web/src/services/api/search.ts +++ b/apps/web/src/services/api/search.ts @@ -1,12 +1,27 @@ import { apiClient } from '@/services/api/client'; import { SearchResults } from '@/types/search'; +/** v0.10.2 F363: Optional cursor/limit for future pagination */ +export interface SearchParams { + query: string; + types?: string[]; + cursor?: string; + limit?: number; +} + export const searchApi = { - search: async (query: string, types?: string[]): Promise => { + search: async ( + query: string, + types?: string[], + opts?: { cursor?: string; limit?: number } + ): Promise => { const params: Record = { q: query }; if (types && types.length > 0) { params.type = types; } + if (opts?.cursor) params.cursor = opts.cursor; + if (opts?.limit != null && opts.limit > 0) + params.limit = String(Math.min(opts.limit, 50)); const response = await apiClient.get(`/search`, { params, }); @@ -15,7 +30,7 @@ export const searchApi = { suggestions: async (query: string, limit?: number): Promise => { const params: Record = { q: query }; - if (limit != null && limit > 0) params.limit = String(limit); + if (limit != null && limit > 0) params.limit = String(Math.min(limit, 20)); const response = await apiClient.get(`/search/suggestions`, { params, }); diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 2c0e173f3..e6fb23a5c 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -149,11 +149,38 @@ services: networks: - veza-net + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 + container_name: veza_elasticsearch + restart: unless-stopped + environment: + - discovery.type=single-node + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + ports: + - "${PORT_ELASTICSEARCH:-19200}:9200" + volumes: + - elasticsearch_data:/usr/share/elasticsearch/data + healthcheck: + test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health || exit 1"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 60s + networks: + - veza-net + deploy: + resources: + limits: + cpus: '0.5' + memory: 1G + volumes: postgres_data: redis_data: rabbitmq_data: minio_data: + elasticsearch_data: networks: veza-net: diff --git a/docs/ENV_VARIABLES.md b/docs/ENV_VARIABLES.md index e7a1d372f..cc32715be 100644 --- a/docs/ENV_VARIABLES.md +++ b/docs/ENV_VARIABLES.md @@ -60,6 +60,16 @@ openssl rsa -in jwt-private.pem -pubout -out jwt-public.pem | `CHAT_SERVER_URL` | URL chat (legacy) | string | Non | `http://veza.fr:8081` | — | | `RABBITMQ_URL` | URL RabbitMQ | string | Non | — | `amqp://veza:password@localhost:5672/` | +### Elasticsearch (v0.10.2 F361-F365) + +| Variable | Description | Type | Requis | Valeur par défaut | Exemple | +|----------|--------------|------|--------|------------------|---------| +| `ELASTICSEARCH_URL` | URL Elasticsearch | string | Non | — | `http://localhost:9200` | +| `ELASTICSEARCH_INDEX` | Préfixe des indices | string | Non | `veza-platform` | `veza-platform` | +| `ELASTICSEARCH_AUTO_INDEX` | Réindexer au démarrage | bool | Non | `false` | `true` | + +**Note :** Si `ELASTICSEARCH_URL` n'est pas défini, la recherche utilise PostgreSQL (fallback gracieux). Admin : `POST /api/v1/admin/search/reindex` pour réindexation manuelle. + ### Rate limiting (v0.9.2 TASK-SEC-003) | Variable | Description | Type | Requis | Valeur par défaut | Exemple | diff --git a/veza-backend-api/go.mod b/veza-backend-api/go.mod index e6291a958..acfebf23b 100644 --- a/veza-backend-api/go.mod +++ b/veza-backend-api/go.mod @@ -86,6 +86,8 @@ require ( github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect + github.com/elastic/go-elasticsearch/v8 v8.11.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.1.0 // indirect diff --git a/veza-backend-api/go.sum b/veza-backend-api/go.sum index f78ce8bfb..97b1bf9ae 100644 --- a/veza-backend-api/go.sum +++ b/veza-backend-api/go.sum @@ -107,6 +107,10 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo= +github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM= +github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= diff --git a/veza-backend-api/internal/api/routes_core.go b/veza-backend-api/internal/api/routes_core.go index f9a0e085f..509ec5c1f 100644 --- a/veza-backend-api/internal/api/routes_core.go +++ b/veza-backend-api/internal/api/routes_core.go @@ -11,6 +11,7 @@ import ( "veza-backend-api/internal/config" trackcore "veza-backend-api/internal/core/track" + elasticsearch "veza-backend-api/internal/elasticsearch" "veza-backend-api/internal/handlers" "veza-backend-api/internal/middleware" "veza-backend-api/internal/models" @@ -471,5 +472,25 @@ func (r *APIRouter) setupCoreProtectedRoutes(v1 *gin.RouterGroup) { if r.authService != nil { admin.POST("/auth/unlock-account", handlers.UnlockAccount(r.authService, r.logger)) } + + // v0.10.2 F361: Elasticsearch reindex (admin only) + admin.POST("/search/reindex", func(c *gin.Context) { + esCfg := elasticsearch.LoadConfig() + if !esCfg.Enabled { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch not configured"}) + return + } + esClient, err := elasticsearch.NewClient(esCfg, r.logger) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch unavailable", "detail": err.Error()}) + return + } + idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger) + if err := idx.ReindexAll(c.Request.Context()); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Reindex failed", "detail": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "ok", "message": "Reindex completed"}) + }) } } diff --git a/veza-backend-api/internal/api/routes_search.go b/veza-backend-api/internal/api/routes_search.go index acf561a26..fdf4625ef 100644 --- a/veza-backend-api/internal/api/routes_search.go +++ b/veza-backend-api/internal/api/routes_search.go @@ -1,16 +1,42 @@ package api import ( + "context" + "os" + "github.com/gin-gonic/gin" + "go.uber.org/zap" "veza-backend-api/internal/handlers" "veza-backend-api/internal/services" + + elasticsearch "veza-backend-api/internal/elasticsearch" ) -// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete +// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete (v0.10.2 F363-F365) +// Uses Elasticsearch when ELASTICSEARCH_URL is set, else falls back to PostgreSQL func (r *APIRouter) setupSearchRoutes(router *gin.RouterGroup) { - searchService := services.NewSearchService(r.db, r.logger) - handlers.NewSearchHandlers(searchService) + esCfg := elasticsearch.LoadConfig() + var searchSvc handlers.SearchServiceInterface + if esCfg.Enabled { + esClient, err := elasticsearch.NewClient(esCfg, r.logger) + if err != nil { + r.logger.Warn("Elasticsearch unavailable, falling back to PostgreSQL search", zap.Error(err)) + searchSvc = services.NewSearchService(r.db, r.logger) + } else { + // Optional: run reindex at startup if ELASTICSEARCH_AUTO_INDEX + if os.Getenv("ELASTICSEARCH_AUTO_INDEX") == "true" { + idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger) + if err := idx.ReindexAll(context.Background()); err != nil { + r.logger.Warn("Elasticsearch auto-reindex failed", zap.Error(err)) + } + } + searchSvc = elasticsearch.NewSearchService(esClient, r.logger) + } + } else { + searchSvc = services.NewSearchService(r.db, r.logger) + } + handlers.NewSearchHandlersWithInterface(searchSvc) router.GET("/search", handlers.SearchHandlersInstance.Search) router.GET("/search/suggestions", handlers.SearchHandlersInstance.Suggestions) } diff --git a/veza-backend-api/internal/elasticsearch/client.go b/veza-backend-api/internal/elasticsearch/client.go new file mode 100644 index 000000000..4954bd2b4 --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/client.go @@ -0,0 +1,50 @@ +package elasticsearch + +import ( + "context" + "fmt" + + "github.com/elastic/go-elasticsearch/v8" + "go.uber.org/zap" +) + +// Client wraps Elasticsearch client with health check (v0.10.2 F361) +type Client struct { + *elasticsearch.Client + Config Config + Logger *zap.Logger +} + +// NewClient creates Elasticsearch client when ELASTICSEARCH_URL is set +func NewClient(cfg Config, logger *zap.Logger) (*Client, error) { + if !cfg.Enabled { + return nil, nil + } + es, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{cfg.URL}, + }) + if err != nil { + return nil, fmt.Errorf("elasticsearch: create client: %w", err) + } + c := &Client{Client: es, Config: cfg, Logger: logger} + if err := c.Ping(context.Background()); err != nil { + return nil, fmt.Errorf("elasticsearch: ping: %w", err) + } + if logger != nil { + logger.Info("Elasticsearch client connected", zap.String("url", cfg.URL), zap.String("index", cfg.Index)) + } + return c, nil +} + +// Ping checks Elasticsearch cluster health +func (c *Client) Ping(ctx context.Context) error { + res, err := c.Info(c.Info.WithContext(ctx)) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("elasticsearch info failed: %s", res.String()) + } + return nil +} diff --git a/veza-backend-api/internal/elasticsearch/config.go b/veza-backend-api/internal/elasticsearch/config.go new file mode 100644 index 000000000..f444c232f --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/config.go @@ -0,0 +1,24 @@ +package elasticsearch + +import "os" + +// Config holds Elasticsearch connection configuration (v0.10.2 F361) +type Config struct { + URL string + Index string + Enabled bool +} + +// LoadConfig loads Elasticsearch config from environment +func LoadConfig() Config { + url := os.Getenv("ELASTICSEARCH_URL") + index := os.Getenv("ELASTICSEARCH_INDEX") + if index == "" { + index = "veza-platform" + } + return Config{ + URL: url, + Index: index, + Enabled: url != "", + } +} diff --git a/veza-backend-api/internal/elasticsearch/indices.go b/veza-backend-api/internal/elasticsearch/indices.go new file mode 100644 index 000000000..54f30e455 --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/indices.go @@ -0,0 +1,69 @@ +package elasticsearch + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/elastic/go-elasticsearch/v8/esapi" + "go.uber.org/zap" +) + +// EnsureIndices creates indices if they don't exist (F361, F362) +func (c *Client) EnsureIndices(ctx context.Context) error { + if c == nil { + return nil + } + indices := []struct { + name string + mapping string + }{ + {indexName(c.Config.Index, IdxTracks), tracksMapping}, + {indexName(c.Config.Index, IdxUsers), usersMapping}, + {indexName(c.Config.Index, IdxPlaylists), playlistsMapping}, + } + for _, idx := range indices { + exists, err := c.Indices.Exists([]string{idx.name}, c.Indices.Exists.WithContext(ctx)) + if err != nil { + return err + } + if exists.StatusCode == 200 { + if c.Logger != nil { + c.Logger.Debug("Elasticsearch index exists", zap.String("index", idx.name)) + } + continue + } + res, err := c.Indices.Create( + idx.name, + c.Indices.Create.WithContext(ctx), + c.Indices.Create.WithBody(strings.NewReader(idx.mapping)), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + return fmtErr(res) + } + if c.Logger != nil { + c.Logger.Info("Elasticsearch index created", zap.String("index", idx.name)) + } + } + return nil +} + +func indexName(prefix, name string) string { + if prefix != "" { + return prefix + "-" + name + } + return "veza-" + name +} + +func fmtErr(res *esapi.Response) error { + var buf bytes.Buffer + if _, err := buf.ReadFrom(res.Body); err == nil { + return fmt.Errorf("elasticsearch error: %s", buf.String()) + } + return fmt.Errorf("elasticsearch error: status %d", res.StatusCode) +} diff --git a/veza-backend-api/internal/elasticsearch/mappings.go b/veza-backend-api/internal/elasticsearch/mappings.go new file mode 100644 index 000000000..2254e4c19 --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/mappings.go @@ -0,0 +1,132 @@ +package elasticsearch + +// Index names (F361, F362) +const ( + IdxTracks = "tracks" + IdxUsers = "users" + IdxPlaylists = "playlists" +) + +// Track mapping: title, artist, tags, genre, description (F361) +// Multi-field with fuzziness for typo tolerance (F364) +const tracksMapping = `{ + "settings": { + "analysis": { + "analyzer": { + "veza_text": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase", "asciifolding"] + } + }, + "number_of_shards": 1, + "number_of_replicas": 0 + } + }, + "mappings": { + "properties": { + "id": { "type": "keyword" }, + "title": { + "type": "text", + "analyzer": "veza_text", + "fields": { + "keyword": { "type": "keyword" }, + "fuzzy": { + "type": "text", + "analyzer": "veza_text" + } + } + }, + "artist": { + "type": "text", + "analyzer": "veza_text", + "fields": { + "keyword": { "type": "keyword" } + } + }, + "description": { + "type": "text", + "analyzer": "veza_text" + }, + "album": { + "type": "text", + "analyzer": "veza_text", + "fields": { "keyword": { "type": "keyword" } } + }, + "genre": { "type": "keyword" }, + "tags": { "type": "keyword" }, + "created_at": { "type": "date" }, + "cover_art_path": { "type": "keyword", "index": false }, + "stream_manifest_url": { "type": "keyword", "index": false }, + "file_path": { "type": "keyword", "index": false } + } + } +}` + +// User mapping: username, display_name, bio (F362) +const usersMapping = `{ + "settings": { + "analysis": { + "analyzer": { + "veza_text": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase", "asciifolding"] + } + }, + "number_of_shards": 1, + "number_of_replicas": 0 + } + }, + "mappings": { + "properties": { + "id": { "type": "keyword" }, + "username": { + "type": "text", + "analyzer": "veza_text", + "fields": { "keyword": { "type": "keyword" } } + }, + "display_name": { + "type": "text", + "analyzer": "veza_text", + "fields": { "keyword": { "type": "keyword" } } + }, + "bio": { "type": "text", "analyzer": "veza_text" }, + "location": { "type": "keyword" }, + "avatar": { "type": "keyword", "index": false }, + "created_at": { "type": "date" } + } + } +}` + +// Playlist mapping (F363) +const playlistsMapping = `{ + "settings": { + "analysis": { + "analyzer": { + "veza_text": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase", "asciifolding"] + } + }, + "number_of_shards": 1, + "number_of_replicas": 0 + } + }, + "mappings": { + "properties": { + "id": { "type": "keyword" }, + "name": { + "type": "text", + "analyzer": "veza_text", + "fields": { "keyword": { "type": "keyword" } } + }, + "description": { "type": "text", "analyzer": "veza_text" }, + "visibility": { "type": "keyword" }, + "cover_url": { "type": "keyword", "index": false }, + "created_at": { "type": "date" } + } + } +}` + diff --git a/veza-backend-api/internal/elasticsearch/search_service.go b/veza-backend-api/internal/elasticsearch/search_service.go new file mode 100644 index 000000000..18e3f01a0 --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/search_service.go @@ -0,0 +1,244 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + + "veza-backend-api/internal/services" + + "go.uber.org/zap" +) + +// SearchService implements search via Elasticsearch (v0.10.2 F363, F364, F365) +type SearchService struct { + client *Client + logger *zap.Logger +} + +// NewSearchService creates an Elasticsearch-backed search service +func NewSearchService(client *Client, logger *zap.Logger) *SearchService { + return &SearchService{client: client, logger: logger} +} + +// Search performs full-text search with BM25, fuzziness for typos (F364) +func (s *SearchService) Search(query string, types []string) (*services.SearchResult, error) { + return s.search(context.Background(), query, types, 10) +} + +// Suggestions returns autocomplete suggestions (F365) +func (s *SearchService) Suggestions(query string, limit int) (*services.SearchResult, error) { + if limit <= 0 { + limit = 5 + } + if limit > 20 { + limit = 20 + } + return s.search(context.Background(), query, nil, limit) +} + +func (s *SearchService) search(ctx context.Context, query string, types []string, limit int) (*services.SearchResult, error) { + if s.client == nil { + return nil, fmt.Errorf("elasticsearch client not configured") + } + searchAll := len(types) == 0 + searchTracks := searchAll || strContains(types, "track") + searchUsers := searchAll || strContains(types, "user") + searchPlaylists := searchAll || strContains(types, "playlist") + + results := &services.SearchResult{} + + // F364: fuzziness for typo tolerance; no boost by popularity (ethical) + + if searchTracks { + idx := indexName(s.client.Config.Index, IdxTracks) + mq := map[string]interface{}{ + "multi_match": map[string]interface{}{ + "query": query, + "fuzziness": "AUTO", + "fields": []string{"title^2", "artist^2", "description", "album", "tags", "genre"}, + "type": "best_fields", + }, + } + tracks, err := s.runSearch(ctx, idx, mq, limit, []string{"title", "artist", "description"}) + if err != nil { + if s.logger != nil { + s.logger.Warn("ES track search failed", zap.Error(err)) + } + } else { + results.Tracks = s.mapTracks(tracks) + } + } + + if searchUsers { + idx := indexName(s.client.Config.Index, IdxUsers) + mq := map[string]interface{}{ + "multi_match": map[string]interface{}{ + "query": query, + "fuzziness": "AUTO", + "fields": []string{"username^2", "display_name^2", "bio"}, + "type": "best_fields", + }, + } + users, err := s.runSearch(ctx, idx, mq, limit, []string{"username", "display_name"}) + if err != nil { + if s.logger != nil { + s.logger.Warn("ES user search failed", zap.Error(err)) + } + } else { + results.Users = s.mapUsers(users) + } + } + + if searchPlaylists { + idx := indexName(s.client.Config.Index, IdxPlaylists) + mq := map[string]interface{}{ + "multi_match": map[string]interface{}{ + "query": query, + "fuzziness": "AUTO", + "fields": []string{"name^2", "description"}, + "type": "best_fields", + }, + } + pls, err := s.runSearch(ctx, idx, mq, limit, []string{"name", "description"}) + if err != nil { + if s.logger != nil { + s.logger.Warn("ES playlist search failed", zap.Error(err)) + } + } else { + results.Playlists = s.mapPlaylists(pls) + } + } + + return results, nil +} + +type esHit struct { + Source map[string]interface{} `json:"_source"` +} + +type esSearchResp struct { + Hits struct { + Hits []esHit `json:"hits"` + } `json:"hits"` +} + +func (s *SearchService) runSearch(ctx context.Context, index string, query map[string]interface{}, limit int, highlightFields []string) ([]esHit, error) { + body := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []interface{}{query}, + }, + }, + "size": limit, + } + if len(highlightFields) > 0 { + hlFields := make(map[string]interface{}) + for _, f := range highlightFields { + hlFields[f] = map[string]interface{}{} + } + body["highlight"] = map[string]interface{}{ + "fields": hlFields, + "pre_tags": []string{""}, + "post_tags": []string{""}, + } + } + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(body); err != nil { + return nil, err + } + res, err := s.client.Search( + s.client.Search.WithContext(ctx), + s.client.Search.WithIndex(index), + s.client.Search.WithBody(&buf), + ) + if err != nil { + return nil, err + } + defer res.Body.Close() + if res.IsError() { + return nil, fmt.Errorf("ES search: %s", res.String()) + } + var out esSearchResp + if err := json.NewDecoder(res.Body).Decode(&out); err != nil { + return nil, err + } + hits := make([]esHit, len(out.Hits.Hits)) + for i, h := range out.Hits.Hits { + hits[i] = h + } + return hits, nil +} + +func (s *SearchService) mapTracks(hits []esHit) []services.TrackResult { + out := make([]services.TrackResult, 0, len(hits)) + for _, h := range hits { + tr := services.TrackResult{} + if v, ok := h.Source["id"].(string); ok { + tr.ID = v + } + if v, ok := h.Source["title"].(string); ok { + tr.Title = v + } + if v, ok := h.Source["artist"].(string); ok { + tr.Artist = v + } + if v, ok := h.Source["stream_manifest_url"].(string); ok && v != "" { + tr.URL = v + } else if v, ok := h.Source["file_path"].(string); ok { + tr.URL = v + } + if v, ok := h.Source["cover_art_path"].(string); ok { + tr.CoverArtPath = v + } + out = append(out, tr) + } + return out +} + +func (s *SearchService) mapUsers(hits []esHit) []services.UserResult { + out := make([]services.UserResult, 0, len(hits)) + for _, h := range hits { + ur := services.UserResult{} + if v, ok := h.Source["id"].(string); ok { + ur.ID = v + } + if v, ok := h.Source["username"].(string); ok { + ur.Username = v + } + if v, ok := h.Source["avatar"].(string); ok { + ur.AvatarURL = v + } + out = append(out, ur) + } + return out +} + +func (s *SearchService) mapPlaylists(hits []esHit) []services.PlaylistResult { + out := make([]services.PlaylistResult, 0, len(hits)) + for _, h := range hits { + pr := services.PlaylistResult{} + if v, ok := h.Source["id"].(string); ok { + pr.ID = v + } + if v, ok := h.Source["name"].(string); ok { + pr.Name = v + } + if v, ok := h.Source["cover_url"].(string); ok { + pr.Cover = v + } + out = append(out, pr) + } + return out +} + +func strContains(slice []string, item string) bool { + for _, s := range slice { + if strings.EqualFold(s, item) { + return true + } + } + return false +} diff --git a/veza-backend-api/internal/elasticsearch/search_service_test.go b/veza-backend-api/internal/elasticsearch/search_service_test.go new file mode 100644 index 000000000..3c61d5840 --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/search_service_test.go @@ -0,0 +1,43 @@ +package elasticsearch + +import ( + "testing" + + "go.uber.org/zap" +) + +func TestSearchService_Search_NilClient(t *testing.T) { + svc := NewSearchService(nil, zap.NewNop()) + _, err := svc.Search("test", nil) + if err == nil { + t.Fatal("expected error when client is nil") + } + if err.Error() != "elasticsearch client not configured" { + t.Errorf("unexpected error: %v", err) + } +} + +func TestSearchService_Suggestions_NilClient(t *testing.T) { + svc := NewSearchService(nil, zap.NewNop()) + _, err := svc.Suggestions("test", 5) + if err == nil { + t.Fatal("expected error when client is nil") + } +} + +func TestLoadConfig(t *testing.T) { + cfg := LoadConfig() + // When ELASTICSEARCH_INDEX unset, default is veza-platform + if cfg.Index == "" { + t.Error("index should not be empty (default veza-platform)") + } +} + +func TestIndexName(t *testing.T) { + if got := indexName("veza-platform", "tracks"); got != "veza-platform-tracks" { + t.Errorf("indexName = %q, want veza-platform-tracks", got) + } + if got := indexName("", "tracks"); got != "veza-tracks" { + t.Errorf("indexName = %q, want veza-tracks", got) + } +} diff --git a/veza-backend-api/internal/elasticsearch/sync.go b/veza-backend-api/internal/elasticsearch/sync.go new file mode 100644 index 000000000..4beae64ba --- /dev/null +++ b/veza-backend-api/internal/elasticsearch/sync.go @@ -0,0 +1,384 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/google/uuid" + "github.com/lib/pq" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// Indexer syncs PostgreSQL data to Elasticsearch (F361 Phase 3) +type Indexer struct { + client *Client + db *gorm.DB + logger *zap.Logger +} + +// NewIndexer creates an indexer +func NewIndexer(client *Client, db *gorm.DB, logger *zap.Logger) *Indexer { + return &Indexer{client: client, db: db, logger: logger} +} + +// trackDoc is the ES document for a track +type trackDoc struct { + ID string `json:"id"` + Title string `json:"title"` + Artist string `json:"artist"` + Description string `json:"description"` + Album string `json:"album"` + Genre []string `json:"genre"` // genre slugs from track_genres + Tags []string `json:"tags"` + CreatedAt string `json:"created_at"` + CoverArtPath string `json:"cover_art_path,omitempty"` + StreamManifestURL string `json:"stream_manifest_url,omitempty"` + FilePath string `json:"file_path,omitempty"` +} + +// userDoc is the ES document for a user +type userDoc struct { + ID string `json:"id"` + Username string `json:"username"` + DisplayName string `json:"display_name"` + Bio string `json:"bio"` + Location string `json:"location"` + Avatar string `json:"avatar,omitempty"` + CreatedAt string `json:"created_at"` +} + +// playlistDoc is the ES document for a playlist +type playlistDoc struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Visibility string `json:"visibility"` + CoverURL string `json:"cover_url,omitempty"` + CreatedAt string `json:"created_at"` +} + +// ReindexAll performs full reindex of tracks, users, playlists (F361 Phase 3.1) +func (i *Indexer) ReindexAll(ctx context.Context) error { + if i.client == nil { + return nil + } + if err := i.client.EnsureIndices(ctx); err != nil { + return fmt.Errorf("ensure indices: %w", err) + } + if err := i.reindexTracks(ctx); err != nil { + return fmt.Errorf("reindex tracks: %w", err) + } + if err := i.reindexUsers(ctx); err != nil { + return fmt.Errorf("reindex users: %w", err) + } + if err := i.reindexPlaylists(ctx); err != nil { + return fmt.Errorf("reindex playlists: %w", err) + } + if i.logger != nil { + i.logger.Info("Elasticsearch reindex completed") + } + return nil +} + +func (i *Indexer) reindexTracks(ctx context.Context) error { + idx := indexName(i.client.Config.Index, IdxTracks) + type row struct { + ID uuid.UUID + Title string + Artist string + Description string + Album string + Genre string + Tags pq.StringArray + CreatedAt string + CoverArtPath string + StreamManifestURL string + FilePath string + GenreSlugs pq.StringArray + } + var rows []row + // Subquery for genre_slugs; tracks.tags is denormalized + err := i.db.WithContext(ctx).Raw(` + SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description, + COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags, + t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path, + COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path, + COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs + FROM tracks t + WHERE t.deleted_at IS NULL AND t.status = 'active' + `).Scan(&rows).Error + if err != nil { + return err + } + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: i.client.Client, + Index: idx, + NumWorkers: 2, + }) + if err != nil { + return err + } + for _, r := range rows { + genres := []string(r.GenreSlugs) + if len(genres) == 0 && r.Genre != "" { + genres = []string{r.Genre} + } + doc := trackDoc{ + ID: r.ID.String(), + Title: r.Title, + Artist: r.Artist, + Description: r.Description, + Album: r.Album, + Genre: genres, + Tags: []string(r.Tags), + CreatedAt: r.CreatedAt, + CoverArtPath: r.CoverArtPath, + StreamManifestURL: r.StreamManifestURL, + FilePath: r.FilePath, + } + body, _ := json.Marshal(doc) + err := bi.Add(ctx, esutil.BulkIndexerItem{ + Action: "index", + DocumentID: r.ID.String(), + Body: bytes.NewReader(body), + }) + if err != nil { + return err + } + } + if err := bi.Close(ctx); err != nil { + return err + } + if i.logger != nil { + i.logger.Info("Elasticsearch tracks indexed", zap.Int("count", len(rows))) + } + return nil +} + +func (i *Indexer) reindexUsers(ctx context.Context) error { + idx := indexName(i.client.Config.Index, IdxUsers) + type row struct { + ID uuid.UUID + Username string + DisplayName string + Bio string + Location string + Avatar string + CreatedAt string + } + var rows []row + err := i.db.WithContext(ctx).Raw(` + SELECT u.id, u.username, + COALESCE(u.display_name, u.username) as display_name, + COALESCE(COALESCE(up.bio, u.bio), '') as bio, + COALESCE(COALESCE(up.location, u.location), '') as location, + COALESCE(COALESCE(up.avatar_url, u.avatar), '') as avatar, + u.created_at::text + FROM users u + LEFT JOIN user_profiles up ON up.user_id = u.id + WHERE u.deleted_at IS NULL + `).Scan(&rows).Error + if err != nil { + err = i.db.WithContext(ctx).Raw(` + SELECT id, username, COALESCE(display_name, username) as display_name, + COALESCE(bio,'') as bio, COALESCE(location,'') as location, + COALESCE(avatar,'') as avatar, created_at::text + FROM users WHERE deleted_at IS NULL + `).Scan(&rows).Error + if err != nil { + return err + } + } + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: i.client.Client, + Index: idx, + NumWorkers: 2, + }) + if err != nil { + return err + } + for _, r := range rows { + doc := userDoc{ + ID: r.ID.String(), + Username: r.Username, + DisplayName: r.DisplayName, + Bio: r.Bio, + Location: r.Location, + Avatar: r.Avatar, + CreatedAt: r.CreatedAt, + } + body, _ := json.Marshal(doc) + err := bi.Add(ctx, esutil.BulkIndexerItem{ + Action: "index", + DocumentID: r.ID.String(), + Body: bytes.NewReader(body), + }) + if err != nil { + return err + } + } + if err := bi.Close(ctx); err != nil { + return err + } + if i.logger != nil { + i.logger.Info("Elasticsearch users indexed", zap.Int("count", len(rows))) + } + return nil +} + +func (i *Indexer) reindexPlaylists(ctx context.Context) error { + idx := indexName(i.client.Config.Index, IdxPlaylists) + type row struct { + ID uuid.UUID + Name string + Description string + Visibility string + CoverURL string + CreatedAt string + } + var rows []row + err := i.db.WithContext(ctx).Raw(` + SELECT id, COALESCE(name, title, '') as name, COALESCE(description,'') as description, + COALESCE(visibility::text, 'public') as visibility, COALESCE(cover_url,'') as cover_url, + created_at::text + FROM playlists WHERE deleted_at IS NULL AND (is_public = true OR visibility = 'public') + `).Scan(&rows).Error + if err != nil { + return err + } + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: i.client.Client, + Index: idx, + NumWorkers: 2, + }) + if err != nil { + return err + } + for _, r := range rows { + doc := playlistDoc{ + ID: r.ID.String(), + Name: r.Name, + Description: r.Description, + Visibility: r.Visibility, + CoverURL: r.CoverURL, + CreatedAt: r.CreatedAt, + } + body, _ := json.Marshal(doc) + err := bi.Add(ctx, esutil.BulkIndexerItem{ + Action: "index", + DocumentID: r.ID.String(), + Body: bytes.NewReader(body), + }) + if err != nil { + return err + } + } + if err := bi.Close(ctx); err != nil { + return err + } + if i.logger != nil { + i.logger.Info("Elasticsearch playlists indexed", zap.Int("count", len(rows))) + } + return nil +} + +// IndexTrack indexes a single track (hook after create/update) +func (i *Indexer) IndexTrack(ctx context.Context, trackID uuid.UUID) error { + if i.client == nil { + return nil + } + var r struct { + ID uuid.UUID + Title string + Artist string + Description string + Album string + Genre string + Tags pq.StringArray + CreatedAt string + CoverArtPath string + StreamManifestURL string + FilePath string + GenreSlugs pq.StringArray + } + err := i.db.WithContext(ctx).Raw(` + SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description, + COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags, + t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path, + COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path, + COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs + FROM tracks t WHERE t.id = ? AND t.deleted_at IS NULL + `, trackID).Scan(&r).Error + if err != nil { + return err + } + genres := []string(r.GenreSlugs) + if len(genres) == 0 && r.Genre != "" { + genres = []string{r.Genre} + } + doc := trackDoc{ + ID: r.ID.String(), + Title: r.Title, + Artist: r.Artist, + Description: r.Description, + Album: r.Album, + Genre: genres, + Tags: []string(r.Tags), + CreatedAt: r.CreatedAt, + CoverArtPath: r.CoverArtPath, + StreamManifestURL: r.StreamManifestURL, + FilePath: r.FilePath, + } + body, _ := json.Marshal(doc) + idx := indexName(i.client.Config.Index, IdxTracks) + res, err := i.client.Index(idx, bytes.NewReader(body), + i.client.Index.WithContext(ctx), + i.client.Index.WithDocumentID(trackID.String()), + i.client.Index.WithRefresh("true"), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + return fmt.Errorf("index track: %s", res.String()) + } + return nil +} + +// DeleteTrack removes a track from the index +func (i *Indexer) DeleteTrack(ctx context.Context, trackID uuid.UUID) error { + if i.client == nil { + return nil + } + idx := indexName(i.client.Config.Index, IdxTracks) + res, err := i.client.Delete(idx, trackID.String(), + i.client.Delete.WithContext(ctx), + i.client.Delete.WithRefresh("true"), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode == 404 { + return nil + } + if res.IsError() { + return fmt.Errorf("delete track: %s", res.String()) + } + return nil +} + +// escapeQuery escapes special characters for ES query string +func escapeQuery(s string) string { + // Escape: \ " * ? < > | ( ) { } [ ] ~ ! : / + + for _, c := range []string{`\`, `"`, `*`, `?`, `<`, `>`, `|`, `(`, `)`, `{`, `}`, `[`, `]`, `~`, `!`, `:`, `/`, `+`} { + s = strings.ReplaceAll(s, c, `\`+c) + } + return s +}