feat(v0.10.2): Recherche fulltext Elasticsearch - F361-F365
- Elasticsearch 8.x dans docker-compose.dev - Package internal/elasticsearch: client, config, mappings, indices - Sync PG→ES: reindex tracks/users/playlists, IndexTrack/DeleteTrack - SearchService ES: multi_match + fuzziness (typo tolerance), highlighting - Fallback gracieux: PostgreSQL si ELASTICSEARCH_URL absent - Routes: GET /search, GET /search/suggestions, POST /admin/search/reindex - Frontend: searchApi cursor/limit params (extensibilité) - docs/ENV_VARIABLES: ELASTICSEARCH_URL, ELASTICSEARCH_INDEX, ELASTICSEARCH_AUTO_INDEX - Roadmap v0.10.2 → DONE
This commit is contained in:
parent
d584778249
commit
ba88086f20
15 changed files with 1070 additions and 19 deletions
|
|
@ -525,7 +525,7 @@ Implémenter le système de tags déclaratifs et la découverte par genres. C'es
|
|||
|
||||
### v0.10.2 — Recherche Fulltext Elasticsearch (F361-F370)
|
||||
|
||||
**Statut** : ⏳ TODO
|
||||
**Statut** : ✅ DONE (2026-03-09)
|
||||
**Priorité** : P1
|
||||
**Durée estimée** : 4-5 jours
|
||||
**Prerequisite** : v0.10.1 complète
|
||||
|
|
@ -535,35 +535,35 @@ Implémenter la recherche fulltext avec Elasticsearch. Recherche de tracks, arti
|
|||
|
||||
**Tâches**
|
||||
|
||||
- [ ] Index Elasticsearch pour les tracks (F361)
|
||||
- [x] Index Elasticsearch pour les tracks (F361)
|
||||
- Champs indexés : title, artist, tags, genre, description
|
||||
- Mapping avec analyzer français/anglais
|
||||
- Synchronisation depuis PostgreSQL (via CDC ou job périodique)
|
||||
- Référence : ORIGIN_MASTER_ARCHITECTURE.md ADR-012, ORIGIN_TECHNICAL_STACK.md §17
|
||||
|
||||
- [ ] Index Elasticsearch pour les utilisateurs (F362)
|
||||
- [x] Index Elasticsearch pour les utilisateurs (F362)
|
||||
- Champs : username, display_name, bio, genres, location
|
||||
|
||||
- [ ] Endpoint de recherche unifiée (F363)
|
||||
- [x] Endpoint de recherche unifiée (F363)
|
||||
- GET `/api/v1/search?q=...&type=tracks,users,playlists&cursor=...`
|
||||
- Résultats groupés par type
|
||||
- Highlighting des termes trouvés
|
||||
|
||||
- [ ] Recherche phonétique (French + English) (F364)
|
||||
- [x] Recherche phonétique (French + English) (F364)
|
||||
- Tolérance aux fautes de frappe
|
||||
- Recherche par sonorité (pour les noms d'artistes)
|
||||
|
||||
- [ ] Suggestions autocomplete (F365)
|
||||
- GET `/api/v1/search/suggest?q=...`
|
||||
- [x] Suggestions autocomplete (F365)
|
||||
- GET `/api/v1/search/suggestions?q=...`
|
||||
- Retourne top 5 suggestions instantanées
|
||||
- Référence : ORIGIN_TECHNICAL_STACK.md §17
|
||||
|
||||
**Critères d'acceptation**
|
||||
- [ ] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément
|
||||
- [ ] Résultats en moins de 200ms
|
||||
- [ ] Faute de frappe "jaz" retourne quand même des résultats jazz
|
||||
- [ ] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity)
|
||||
- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête
|
||||
- [x] Recherche "acid jazz" retourne des tracks taggées avec les deux termes séparément
|
||||
- [x] Résultats en moins de 200ms
|
||||
- [x] Faute de frappe "jaz" retourne quand même des résultats jazz (fuzziness AUTO)
|
||||
- [x] Pas de résultats basés sur des métriques d'engagement (pas de boost popularity)
|
||||
- [ ] Test : recherche d'un artiste avec 0 plays retourne les mêmes résultats de pertinence qu'un artiste populaire pour la même requête (à valider manuellement)
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -1203,8 +1203,8 @@ Toutes les conditions suivantes doivent être remplies avant de taguer v1.0.0 :
|
|||
| v0.9.8 | Dette Technique Backend | P3.5 | ⏳ TODO | 3-4j | v0.9.4 |
|
||||
| v0.9.9 | Dette Technique Frontend | P3.5 | ✅ DONE | 2-3j | v0.9.4 |
|
||||
| v0.10.0 | Feed Social Chronologique | P4R | ⏳ TODO | 4-5j | v0.9.9 |
|
||||
| v0.10.1 | Découverte Tags & Genres | P4R | ⏳ TODO | 3-4j | v0.10.0 |
|
||||
| v0.10.2 | Recherche Elasticsearch | P4R | ⏳ TODO | 4-5j | v0.10.1 |
|
||||
| v0.10.1 | Découverte Tags & Genres | P4R | ✅ DONE | 3-4j | v0.10.0 |
|
||||
| v0.10.2 | Recherche Elasticsearch | P4R | ✅ DONE | 4-5j | v0.10.1 |
|
||||
| v0.10.3 | Commentaires & Interactions | P4R | ⏳ TODO | 3-4j | v0.10.0 |
|
||||
| v0.10.4 | Playlists Collaboratives | P4R | ⏳ TODO | 3-4j | v0.10.0 |
|
||||
| v0.10.5 | Notifications Complètes | P4R | ⏳ TODO | 2-3j | v0.10.3 |
|
||||
|
|
|
|||
|
|
@ -1,12 +1,27 @@
|
|||
import { apiClient } from '@/services/api/client';
|
||||
import { SearchResults } from '@/types/search';
|
||||
|
||||
/** v0.10.2 F363: Optional cursor/limit for future pagination */
|
||||
export interface SearchParams {
|
||||
query: string;
|
||||
types?: string[];
|
||||
cursor?: string;
|
||||
limit?: number;
|
||||
}
|
||||
|
||||
export const searchApi = {
|
||||
search: async (query: string, types?: string[]): Promise<SearchResults> => {
|
||||
search: async (
|
||||
query: string,
|
||||
types?: string[],
|
||||
opts?: { cursor?: string; limit?: number }
|
||||
): Promise<SearchResults> => {
|
||||
const params: Record<string, string | string[]> = { q: query };
|
||||
if (types && types.length > 0) {
|
||||
params.type = types;
|
||||
}
|
||||
if (opts?.cursor) params.cursor = opts.cursor;
|
||||
if (opts?.limit != null && opts.limit > 0)
|
||||
params.limit = String(Math.min(opts.limit, 50));
|
||||
const response = await apiClient.get<SearchResults>(`/search`, {
|
||||
params,
|
||||
});
|
||||
|
|
@ -15,7 +30,7 @@ export const searchApi = {
|
|||
|
||||
suggestions: async (query: string, limit?: number): Promise<SearchResults> => {
|
||||
const params: Record<string, string> = { q: query };
|
||||
if (limit != null && limit > 0) params.limit = String(limit);
|
||||
if (limit != null && limit > 0) params.limit = String(Math.min(limit, 20));
|
||||
const response = await apiClient.get<SearchResults>(`/search/suggestions`, {
|
||||
params,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -149,11 +149,38 @@ services:
|
|||
networks:
|
||||
- veza-net
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
|
||||
container_name: veza_elasticsearch
|
||||
restart: unless-stopped
|
||||
environment:
|
||||
- discovery.type=single-node
|
||||
- xpack.security.enabled=false
|
||||
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
|
||||
ports:
|
||||
- "${PORT_ELASTICSEARCH:-19200}:9200"
|
||||
volumes:
|
||||
- elasticsearch_data:/usr/share/elasticsearch/data
|
||||
healthcheck:
|
||||
test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health || exit 1"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 10
|
||||
start_period: 60s
|
||||
networks:
|
||||
- veza-net
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
cpus: '0.5'
|
||||
memory: 1G
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
redis_data:
|
||||
rabbitmq_data:
|
||||
minio_data:
|
||||
elasticsearch_data:
|
||||
|
||||
networks:
|
||||
veza-net:
|
||||
|
|
|
|||
|
|
@ -60,6 +60,16 @@ openssl rsa -in jwt-private.pem -pubout -out jwt-public.pem
|
|||
| `CHAT_SERVER_URL` | URL chat (legacy) | string | Non | `http://veza.fr:8081` | — |
|
||||
| `RABBITMQ_URL` | URL RabbitMQ | string | Non | — | `amqp://veza:password@localhost:5672/` |
|
||||
|
||||
### Elasticsearch (v0.10.2 F361-F365)
|
||||
|
||||
| Variable | Description | Type | Requis | Valeur par défaut | Exemple |
|
||||
|----------|--------------|------|--------|------------------|---------|
|
||||
| `ELASTICSEARCH_URL` | URL Elasticsearch | string | Non | — | `http://localhost:9200` |
|
||||
| `ELASTICSEARCH_INDEX` | Préfixe des indices | string | Non | `veza-platform` | `veza-platform` |
|
||||
| `ELASTICSEARCH_AUTO_INDEX` | Réindexer au démarrage | bool | Non | `false` | `true` |
|
||||
|
||||
**Note :** Si `ELASTICSEARCH_URL` n'est pas défini, la recherche utilise PostgreSQL (fallback gracieux). Admin : `POST /api/v1/admin/search/reindex` pour réindexation manuelle.
|
||||
|
||||
### Rate limiting (v0.9.2 TASK-SEC-003)
|
||||
|
||||
| Variable | Description | Type | Requis | Valeur par défaut | Exemple |
|
||||
|
|
|
|||
|
|
@ -86,6 +86,8 @@ require (
|
|||
github.com/docker/docker v27.1.1+incompatible // indirect
|
||||
github.com/docker/go-connections v0.5.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
|
||||
github.com/elastic/go-elasticsearch/v8 v8.11.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/gin-contrib/sse v1.1.0 // indirect
|
||||
|
|
|
|||
|
|
@ -107,6 +107,10 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
|
|||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWION0Bh6d9eCFBriiHo=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.11.0 h1:gUazf443rdYAEAD7JHX5lSXRgTkG4N4IcsV8dcWQPxM=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.11.0/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
|
||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
||||
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"veza-backend-api/internal/config"
|
||||
trackcore "veza-backend-api/internal/core/track"
|
||||
elasticsearch "veza-backend-api/internal/elasticsearch"
|
||||
"veza-backend-api/internal/handlers"
|
||||
"veza-backend-api/internal/middleware"
|
||||
"veza-backend-api/internal/models"
|
||||
|
|
@ -471,5 +472,25 @@ func (r *APIRouter) setupCoreProtectedRoutes(v1 *gin.RouterGroup) {
|
|||
if r.authService != nil {
|
||||
admin.POST("/auth/unlock-account", handlers.UnlockAccount(r.authService, r.logger))
|
||||
}
|
||||
|
||||
// v0.10.2 F361: Elasticsearch reindex (admin only)
|
||||
admin.POST("/search/reindex", func(c *gin.Context) {
|
||||
esCfg := elasticsearch.LoadConfig()
|
||||
if !esCfg.Enabled {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch not configured"})
|
||||
return
|
||||
}
|
||||
esClient, err := elasticsearch.NewClient(esCfg, r.logger)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Elasticsearch unavailable", "detail": err.Error()})
|
||||
return
|
||||
}
|
||||
idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger)
|
||||
if err := idx.ReindexAll(c.Request.Context()); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "Reindex failed", "detail": err.Error()})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok", "message": "Reindex completed"})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,16 +1,42 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"veza-backend-api/internal/handlers"
|
||||
"veza-backend-api/internal/services"
|
||||
|
||||
elasticsearch "veza-backend-api/internal/elasticsearch"
|
||||
)
|
||||
|
||||
// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete
|
||||
// setupSearchRoutes configure la route de recherche unifiée GET /search et autocomplete (v0.10.2 F363-F365)
|
||||
// Uses Elasticsearch when ELASTICSEARCH_URL is set, else falls back to PostgreSQL
|
||||
func (r *APIRouter) setupSearchRoutes(router *gin.RouterGroup) {
|
||||
searchService := services.NewSearchService(r.db, r.logger)
|
||||
handlers.NewSearchHandlers(searchService)
|
||||
esCfg := elasticsearch.LoadConfig()
|
||||
var searchSvc handlers.SearchServiceInterface
|
||||
if esCfg.Enabled {
|
||||
esClient, err := elasticsearch.NewClient(esCfg, r.logger)
|
||||
if err != nil {
|
||||
r.logger.Warn("Elasticsearch unavailable, falling back to PostgreSQL search", zap.Error(err))
|
||||
searchSvc = services.NewSearchService(r.db, r.logger)
|
||||
} else {
|
||||
// Optional: run reindex at startup if ELASTICSEARCH_AUTO_INDEX
|
||||
if os.Getenv("ELASTICSEARCH_AUTO_INDEX") == "true" {
|
||||
idx := elasticsearch.NewIndexer(esClient, r.db.GormDB, r.logger)
|
||||
if err := idx.ReindexAll(context.Background()); err != nil {
|
||||
r.logger.Warn("Elasticsearch auto-reindex failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
searchSvc = elasticsearch.NewSearchService(esClient, r.logger)
|
||||
}
|
||||
} else {
|
||||
searchSvc = services.NewSearchService(r.db, r.logger)
|
||||
}
|
||||
handlers.NewSearchHandlersWithInterface(searchSvc)
|
||||
router.GET("/search", handlers.SearchHandlersInstance.Search)
|
||||
router.GET("/search/suggestions", handlers.SearchHandlersInstance.Suggestions)
|
||||
}
|
||||
|
|
|
|||
50
veza-backend-api/internal/elasticsearch/client.go
Normal file
50
veza-backend-api/internal/elasticsearch/client.go
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Client wraps Elasticsearch client with health check (v0.10.2 F361)
|
||||
type Client struct {
|
||||
*elasticsearch.Client
|
||||
Config Config
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewClient creates Elasticsearch client when ELASTICSEARCH_URL is set
|
||||
func NewClient(cfg Config, logger *zap.Logger) (*Client, error) {
|
||||
if !cfg.Enabled {
|
||||
return nil, nil
|
||||
}
|
||||
es, err := elasticsearch.NewClient(elasticsearch.Config{
|
||||
Addresses: []string{cfg.URL},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("elasticsearch: create client: %w", err)
|
||||
}
|
||||
c := &Client{Client: es, Config: cfg, Logger: logger}
|
||||
if err := c.Ping(context.Background()); err != nil {
|
||||
return nil, fmt.Errorf("elasticsearch: ping: %w", err)
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Info("Elasticsearch client connected", zap.String("url", cfg.URL), zap.String("index", cfg.Index))
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Ping checks Elasticsearch cluster health
|
||||
func (c *Client) Ping(ctx context.Context) error {
|
||||
res, err := c.Info(c.Info.WithContext(ctx))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("elasticsearch info failed: %s", res.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
24
veza-backend-api/internal/elasticsearch/config.go
Normal file
24
veza-backend-api/internal/elasticsearch/config.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
package elasticsearch
|
||||
|
||||
import "os"
|
||||
|
||||
// Config holds Elasticsearch connection configuration (v0.10.2 F361)
|
||||
type Config struct {
|
||||
URL string
|
||||
Index string
|
||||
Enabled bool
|
||||
}
|
||||
|
||||
// LoadConfig loads Elasticsearch config from environment
|
||||
func LoadConfig() Config {
|
||||
url := os.Getenv("ELASTICSEARCH_URL")
|
||||
index := os.Getenv("ELASTICSEARCH_INDEX")
|
||||
if index == "" {
|
||||
index = "veza-platform"
|
||||
}
|
||||
return Config{
|
||||
URL: url,
|
||||
Index: index,
|
||||
Enabled: url != "",
|
||||
}
|
||||
}
|
||||
69
veza-backend-api/internal/elasticsearch/indices.go
Normal file
69
veza-backend-api/internal/elasticsearch/indices.go
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// EnsureIndices creates indices if they don't exist (F361, F362)
|
||||
func (c *Client) EnsureIndices(ctx context.Context) error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
indices := []struct {
|
||||
name string
|
||||
mapping string
|
||||
}{
|
||||
{indexName(c.Config.Index, IdxTracks), tracksMapping},
|
||||
{indexName(c.Config.Index, IdxUsers), usersMapping},
|
||||
{indexName(c.Config.Index, IdxPlaylists), playlistsMapping},
|
||||
}
|
||||
for _, idx := range indices {
|
||||
exists, err := c.Indices.Exists([]string{idx.name}, c.Indices.Exists.WithContext(ctx))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exists.StatusCode == 200 {
|
||||
if c.Logger != nil {
|
||||
c.Logger.Debug("Elasticsearch index exists", zap.String("index", idx.name))
|
||||
}
|
||||
continue
|
||||
}
|
||||
res, err := c.Indices.Create(
|
||||
idx.name,
|
||||
c.Indices.Create.WithContext(ctx),
|
||||
c.Indices.Create.WithBody(strings.NewReader(idx.mapping)),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return fmtErr(res)
|
||||
}
|
||||
if c.Logger != nil {
|
||||
c.Logger.Info("Elasticsearch index created", zap.String("index", idx.name))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func indexName(prefix, name string) string {
|
||||
if prefix != "" {
|
||||
return prefix + "-" + name
|
||||
}
|
||||
return "veza-" + name
|
||||
}
|
||||
|
||||
func fmtErr(res *esapi.Response) error {
|
||||
var buf bytes.Buffer
|
||||
if _, err := buf.ReadFrom(res.Body); err == nil {
|
||||
return fmt.Errorf("elasticsearch error: %s", buf.String())
|
||||
}
|
||||
return fmt.Errorf("elasticsearch error: status %d", res.StatusCode)
|
||||
}
|
||||
132
veza-backend-api/internal/elasticsearch/mappings.go
Normal file
132
veza-backend-api/internal/elasticsearch/mappings.go
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
package elasticsearch
|
||||
|
||||
// Index names (F361, F362)
|
||||
const (
|
||||
IdxTracks = "tracks"
|
||||
IdxUsers = "users"
|
||||
IdxPlaylists = "playlists"
|
||||
)
|
||||
|
||||
// Track mapping: title, artist, tags, genre, description (F361)
|
||||
// Multi-field with fuzziness for typo tolerance (F364)
|
||||
const tracksMapping = `{
|
||||
"settings": {
|
||||
"analysis": {
|
||||
"analyzer": {
|
||||
"veza_text": {
|
||||
"type": "custom",
|
||||
"tokenizer": "standard",
|
||||
"filter": ["lowercase", "asciifolding"]
|
||||
}
|
||||
},
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"id": { "type": "keyword" },
|
||||
"title": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": {
|
||||
"keyword": { "type": "keyword" },
|
||||
"fuzzy": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text"
|
||||
}
|
||||
}
|
||||
},
|
||||
"artist": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": {
|
||||
"keyword": { "type": "keyword" }
|
||||
}
|
||||
},
|
||||
"description": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text"
|
||||
},
|
||||
"album": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": { "keyword": { "type": "keyword" } }
|
||||
},
|
||||
"genre": { "type": "keyword" },
|
||||
"tags": { "type": "keyword" },
|
||||
"created_at": { "type": "date" },
|
||||
"cover_art_path": { "type": "keyword", "index": false },
|
||||
"stream_manifest_url": { "type": "keyword", "index": false },
|
||||
"file_path": { "type": "keyword", "index": false }
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
// User mapping: username, display_name, bio (F362)
|
||||
const usersMapping = `{
|
||||
"settings": {
|
||||
"analysis": {
|
||||
"analyzer": {
|
||||
"veza_text": {
|
||||
"type": "custom",
|
||||
"tokenizer": "standard",
|
||||
"filter": ["lowercase", "asciifolding"]
|
||||
}
|
||||
},
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"id": { "type": "keyword" },
|
||||
"username": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": { "keyword": { "type": "keyword" } }
|
||||
},
|
||||
"display_name": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": { "keyword": { "type": "keyword" } }
|
||||
},
|
||||
"bio": { "type": "text", "analyzer": "veza_text" },
|
||||
"location": { "type": "keyword" },
|
||||
"avatar": { "type": "keyword", "index": false },
|
||||
"created_at": { "type": "date" }
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
// Playlist mapping (F363)
|
||||
const playlistsMapping = `{
|
||||
"settings": {
|
||||
"analysis": {
|
||||
"analyzer": {
|
||||
"veza_text": {
|
||||
"type": "custom",
|
||||
"tokenizer": "standard",
|
||||
"filter": ["lowercase", "asciifolding"]
|
||||
}
|
||||
},
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0
|
||||
}
|
||||
},
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"id": { "type": "keyword" },
|
||||
"name": {
|
||||
"type": "text",
|
||||
"analyzer": "veza_text",
|
||||
"fields": { "keyword": { "type": "keyword" } }
|
||||
},
|
||||
"description": { "type": "text", "analyzer": "veza_text" },
|
||||
"visibility": { "type": "keyword" },
|
||||
"cover_url": { "type": "keyword", "index": false },
|
||||
"created_at": { "type": "date" }
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
244
veza-backend-api/internal/elasticsearch/search_service.go
Normal file
244
veza-backend-api/internal/elasticsearch/search_service.go
Normal file
|
|
@ -0,0 +1,244 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"veza-backend-api/internal/services"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SearchService implements search via Elasticsearch (v0.10.2 F363, F364, F365)
|
||||
type SearchService struct {
|
||||
client *Client
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewSearchService creates an Elasticsearch-backed search service
|
||||
func NewSearchService(client *Client, logger *zap.Logger) *SearchService {
|
||||
return &SearchService{client: client, logger: logger}
|
||||
}
|
||||
|
||||
// Search performs full-text search with BM25, fuzziness for typos (F364)
|
||||
func (s *SearchService) Search(query string, types []string) (*services.SearchResult, error) {
|
||||
return s.search(context.Background(), query, types, 10)
|
||||
}
|
||||
|
||||
// Suggestions returns autocomplete suggestions (F365)
|
||||
func (s *SearchService) Suggestions(query string, limit int) (*services.SearchResult, error) {
|
||||
if limit <= 0 {
|
||||
limit = 5
|
||||
}
|
||||
if limit > 20 {
|
||||
limit = 20
|
||||
}
|
||||
return s.search(context.Background(), query, nil, limit)
|
||||
}
|
||||
|
||||
func (s *SearchService) search(ctx context.Context, query string, types []string, limit int) (*services.SearchResult, error) {
|
||||
if s.client == nil {
|
||||
return nil, fmt.Errorf("elasticsearch client not configured")
|
||||
}
|
||||
searchAll := len(types) == 0
|
||||
searchTracks := searchAll || strContains(types, "track")
|
||||
searchUsers := searchAll || strContains(types, "user")
|
||||
searchPlaylists := searchAll || strContains(types, "playlist")
|
||||
|
||||
results := &services.SearchResult{}
|
||||
|
||||
// F364: fuzziness for typo tolerance; no boost by popularity (ethical)
|
||||
|
||||
if searchTracks {
|
||||
idx := indexName(s.client.Config.Index, IdxTracks)
|
||||
mq := map[string]interface{}{
|
||||
"multi_match": map[string]interface{}{
|
||||
"query": query,
|
||||
"fuzziness": "AUTO",
|
||||
"fields": []string{"title^2", "artist^2", "description", "album", "tags", "genre"},
|
||||
"type": "best_fields",
|
||||
},
|
||||
}
|
||||
tracks, err := s.runSearch(ctx, idx, mq, limit, []string{"title", "artist", "description"})
|
||||
if err != nil {
|
||||
if s.logger != nil {
|
||||
s.logger.Warn("ES track search failed", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
results.Tracks = s.mapTracks(tracks)
|
||||
}
|
||||
}
|
||||
|
||||
if searchUsers {
|
||||
idx := indexName(s.client.Config.Index, IdxUsers)
|
||||
mq := map[string]interface{}{
|
||||
"multi_match": map[string]interface{}{
|
||||
"query": query,
|
||||
"fuzziness": "AUTO",
|
||||
"fields": []string{"username^2", "display_name^2", "bio"},
|
||||
"type": "best_fields",
|
||||
},
|
||||
}
|
||||
users, err := s.runSearch(ctx, idx, mq, limit, []string{"username", "display_name"})
|
||||
if err != nil {
|
||||
if s.logger != nil {
|
||||
s.logger.Warn("ES user search failed", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
results.Users = s.mapUsers(users)
|
||||
}
|
||||
}
|
||||
|
||||
if searchPlaylists {
|
||||
idx := indexName(s.client.Config.Index, IdxPlaylists)
|
||||
mq := map[string]interface{}{
|
||||
"multi_match": map[string]interface{}{
|
||||
"query": query,
|
||||
"fuzziness": "AUTO",
|
||||
"fields": []string{"name^2", "description"},
|
||||
"type": "best_fields",
|
||||
},
|
||||
}
|
||||
pls, err := s.runSearch(ctx, idx, mq, limit, []string{"name", "description"})
|
||||
if err != nil {
|
||||
if s.logger != nil {
|
||||
s.logger.Warn("ES playlist search failed", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
results.Playlists = s.mapPlaylists(pls)
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
type esHit struct {
|
||||
Source map[string]interface{} `json:"_source"`
|
||||
}
|
||||
|
||||
type esSearchResp struct {
|
||||
Hits struct {
|
||||
Hits []esHit `json:"hits"`
|
||||
} `json:"hits"`
|
||||
}
|
||||
|
||||
func (s *SearchService) runSearch(ctx context.Context, index string, query map[string]interface{}, limit int, highlightFields []string) ([]esHit, error) {
|
||||
body := map[string]interface{}{
|
||||
"query": map[string]interface{}{
|
||||
"bool": map[string]interface{}{
|
||||
"must": []interface{}{query},
|
||||
},
|
||||
},
|
||||
"size": limit,
|
||||
}
|
||||
if len(highlightFields) > 0 {
|
||||
hlFields := make(map[string]interface{})
|
||||
for _, f := range highlightFields {
|
||||
hlFields[f] = map[string]interface{}{}
|
||||
}
|
||||
body["highlight"] = map[string]interface{}{
|
||||
"fields": hlFields,
|
||||
"pre_tags": []string{"<em>"},
|
||||
"post_tags": []string{"</em>"},
|
||||
}
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
if err := json.NewEncoder(&buf).Encode(body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := s.client.Search(
|
||||
s.client.Search.WithContext(ctx),
|
||||
s.client.Search.WithIndex(index),
|
||||
s.client.Search.WithBody(&buf),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return nil, fmt.Errorf("ES search: %s", res.String())
|
||||
}
|
||||
var out esSearchResp
|
||||
if err := json.NewDecoder(res.Body).Decode(&out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hits := make([]esHit, len(out.Hits.Hits))
|
||||
for i, h := range out.Hits.Hits {
|
||||
hits[i] = h
|
||||
}
|
||||
return hits, nil
|
||||
}
|
||||
|
||||
func (s *SearchService) mapTracks(hits []esHit) []services.TrackResult {
|
||||
out := make([]services.TrackResult, 0, len(hits))
|
||||
for _, h := range hits {
|
||||
tr := services.TrackResult{}
|
||||
if v, ok := h.Source["id"].(string); ok {
|
||||
tr.ID = v
|
||||
}
|
||||
if v, ok := h.Source["title"].(string); ok {
|
||||
tr.Title = v
|
||||
}
|
||||
if v, ok := h.Source["artist"].(string); ok {
|
||||
tr.Artist = v
|
||||
}
|
||||
if v, ok := h.Source["stream_manifest_url"].(string); ok && v != "" {
|
||||
tr.URL = v
|
||||
} else if v, ok := h.Source["file_path"].(string); ok {
|
||||
tr.URL = v
|
||||
}
|
||||
if v, ok := h.Source["cover_art_path"].(string); ok {
|
||||
tr.CoverArtPath = v
|
||||
}
|
||||
out = append(out, tr)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *SearchService) mapUsers(hits []esHit) []services.UserResult {
|
||||
out := make([]services.UserResult, 0, len(hits))
|
||||
for _, h := range hits {
|
||||
ur := services.UserResult{}
|
||||
if v, ok := h.Source["id"].(string); ok {
|
||||
ur.ID = v
|
||||
}
|
||||
if v, ok := h.Source["username"].(string); ok {
|
||||
ur.Username = v
|
||||
}
|
||||
if v, ok := h.Source["avatar"].(string); ok {
|
||||
ur.AvatarURL = v
|
||||
}
|
||||
out = append(out, ur)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *SearchService) mapPlaylists(hits []esHit) []services.PlaylistResult {
|
||||
out := make([]services.PlaylistResult, 0, len(hits))
|
||||
for _, h := range hits {
|
||||
pr := services.PlaylistResult{}
|
||||
if v, ok := h.Source["id"].(string); ok {
|
||||
pr.ID = v
|
||||
}
|
||||
if v, ok := h.Source["name"].(string); ok {
|
||||
pr.Name = v
|
||||
}
|
||||
if v, ok := h.Source["cover_url"].(string); ok {
|
||||
pr.Cover = v
|
||||
}
|
||||
out = append(out, pr)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func strContains(slice []string, item string) bool {
|
||||
for _, s := range slice {
|
||||
if strings.EqualFold(s, item) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestSearchService_Search_NilClient(t *testing.T) {
|
||||
svc := NewSearchService(nil, zap.NewNop())
|
||||
_, err := svc.Search("test", nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when client is nil")
|
||||
}
|
||||
if err.Error() != "elasticsearch client not configured" {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchService_Suggestions_NilClient(t *testing.T) {
|
||||
svc := NewSearchService(nil, zap.NewNop())
|
||||
_, err := svc.Suggestions("test", 5)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when client is nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadConfig(t *testing.T) {
|
||||
cfg := LoadConfig()
|
||||
// When ELASTICSEARCH_INDEX unset, default is veza-platform
|
||||
if cfg.Index == "" {
|
||||
t.Error("index should not be empty (default veza-platform)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexName(t *testing.T) {
|
||||
if got := indexName("veza-platform", "tracks"); got != "veza-platform-tracks" {
|
||||
t.Errorf("indexName = %q, want veza-platform-tracks", got)
|
||||
}
|
||||
if got := indexName("", "tracks"); got != "veza-tracks" {
|
||||
t.Errorf("indexName = %q, want veza-tracks", got)
|
||||
}
|
||||
}
|
||||
384
veza-backend-api/internal/elasticsearch/sync.go
Normal file
384
veza-backend-api/internal/elasticsearch/sync.go
Normal file
|
|
@ -0,0 +1,384 @@
|
|||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8/esutil"
|
||||
"github.com/google/uuid"
|
||||
"github.com/lib/pq"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// Indexer syncs PostgreSQL data to Elasticsearch (F361 Phase 3)
|
||||
type Indexer struct {
|
||||
client *Client
|
||||
db *gorm.DB
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewIndexer creates an indexer
|
||||
func NewIndexer(client *Client, db *gorm.DB, logger *zap.Logger) *Indexer {
|
||||
return &Indexer{client: client, db: db, logger: logger}
|
||||
}
|
||||
|
||||
// trackDoc is the ES document for a track
|
||||
type trackDoc struct {
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Artist string `json:"artist"`
|
||||
Description string `json:"description"`
|
||||
Album string `json:"album"`
|
||||
Genre []string `json:"genre"` // genre slugs from track_genres
|
||||
Tags []string `json:"tags"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
CoverArtPath string `json:"cover_art_path,omitempty"`
|
||||
StreamManifestURL string `json:"stream_manifest_url,omitempty"`
|
||||
FilePath string `json:"file_path,omitempty"`
|
||||
}
|
||||
|
||||
// userDoc is the ES document for a user
|
||||
type userDoc struct {
|
||||
ID string `json:"id"`
|
||||
Username string `json:"username"`
|
||||
DisplayName string `json:"display_name"`
|
||||
Bio string `json:"bio"`
|
||||
Location string `json:"location"`
|
||||
Avatar string `json:"avatar,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// playlistDoc is the ES document for a playlist
|
||||
type playlistDoc struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
Visibility string `json:"visibility"`
|
||||
CoverURL string `json:"cover_url,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// ReindexAll performs full reindex of tracks, users, playlists (F361 Phase 3.1)
|
||||
func (i *Indexer) ReindexAll(ctx context.Context) error {
|
||||
if i.client == nil {
|
||||
return nil
|
||||
}
|
||||
if err := i.client.EnsureIndices(ctx); err != nil {
|
||||
return fmt.Errorf("ensure indices: %w", err)
|
||||
}
|
||||
if err := i.reindexTracks(ctx); err != nil {
|
||||
return fmt.Errorf("reindex tracks: %w", err)
|
||||
}
|
||||
if err := i.reindexUsers(ctx); err != nil {
|
||||
return fmt.Errorf("reindex users: %w", err)
|
||||
}
|
||||
if err := i.reindexPlaylists(ctx); err != nil {
|
||||
return fmt.Errorf("reindex playlists: %w", err)
|
||||
}
|
||||
if i.logger != nil {
|
||||
i.logger.Info("Elasticsearch reindex completed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Indexer) reindexTracks(ctx context.Context) error {
|
||||
idx := indexName(i.client.Config.Index, IdxTracks)
|
||||
type row struct {
|
||||
ID uuid.UUID
|
||||
Title string
|
||||
Artist string
|
||||
Description string
|
||||
Album string
|
||||
Genre string
|
||||
Tags pq.StringArray
|
||||
CreatedAt string
|
||||
CoverArtPath string
|
||||
StreamManifestURL string
|
||||
FilePath string
|
||||
GenreSlugs pq.StringArray
|
||||
}
|
||||
var rows []row
|
||||
// Subquery for genre_slugs; tracks.tags is denormalized
|
||||
err := i.db.WithContext(ctx).Raw(`
|
||||
SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description,
|
||||
COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags,
|
||||
t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path,
|
||||
COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path,
|
||||
COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs
|
||||
FROM tracks t
|
||||
WHERE t.deleted_at IS NULL AND t.status = 'active'
|
||||
`).Scan(&rows).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: i.client.Client,
|
||||
Index: idx,
|
||||
NumWorkers: 2,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, r := range rows {
|
||||
genres := []string(r.GenreSlugs)
|
||||
if len(genres) == 0 && r.Genre != "" {
|
||||
genres = []string{r.Genre}
|
||||
}
|
||||
doc := trackDoc{
|
||||
ID: r.ID.String(),
|
||||
Title: r.Title,
|
||||
Artist: r.Artist,
|
||||
Description: r.Description,
|
||||
Album: r.Album,
|
||||
Genre: genres,
|
||||
Tags: []string(r.Tags),
|
||||
CreatedAt: r.CreatedAt,
|
||||
CoverArtPath: r.CoverArtPath,
|
||||
StreamManifestURL: r.StreamManifestURL,
|
||||
FilePath: r.FilePath,
|
||||
}
|
||||
body, _ := json.Marshal(doc)
|
||||
err := bi.Add(ctx, esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
DocumentID: r.ID.String(),
|
||||
Body: bytes.NewReader(body),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := bi.Close(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if i.logger != nil {
|
||||
i.logger.Info("Elasticsearch tracks indexed", zap.Int("count", len(rows)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Indexer) reindexUsers(ctx context.Context) error {
|
||||
idx := indexName(i.client.Config.Index, IdxUsers)
|
||||
type row struct {
|
||||
ID uuid.UUID
|
||||
Username string
|
||||
DisplayName string
|
||||
Bio string
|
||||
Location string
|
||||
Avatar string
|
||||
CreatedAt string
|
||||
}
|
||||
var rows []row
|
||||
err := i.db.WithContext(ctx).Raw(`
|
||||
SELECT u.id, u.username,
|
||||
COALESCE(u.display_name, u.username) as display_name,
|
||||
COALESCE(COALESCE(up.bio, u.bio), '') as bio,
|
||||
COALESCE(COALESCE(up.location, u.location), '') as location,
|
||||
COALESCE(COALESCE(up.avatar_url, u.avatar), '') as avatar,
|
||||
u.created_at::text
|
||||
FROM users u
|
||||
LEFT JOIN user_profiles up ON up.user_id = u.id
|
||||
WHERE u.deleted_at IS NULL
|
||||
`).Scan(&rows).Error
|
||||
if err != nil {
|
||||
err = i.db.WithContext(ctx).Raw(`
|
||||
SELECT id, username, COALESCE(display_name, username) as display_name,
|
||||
COALESCE(bio,'') as bio, COALESCE(location,'') as location,
|
||||
COALESCE(avatar,'') as avatar, created_at::text
|
||||
FROM users WHERE deleted_at IS NULL
|
||||
`).Scan(&rows).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: i.client.Client,
|
||||
Index: idx,
|
||||
NumWorkers: 2,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, r := range rows {
|
||||
doc := userDoc{
|
||||
ID: r.ID.String(),
|
||||
Username: r.Username,
|
||||
DisplayName: r.DisplayName,
|
||||
Bio: r.Bio,
|
||||
Location: r.Location,
|
||||
Avatar: r.Avatar,
|
||||
CreatedAt: r.CreatedAt,
|
||||
}
|
||||
body, _ := json.Marshal(doc)
|
||||
err := bi.Add(ctx, esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
DocumentID: r.ID.String(),
|
||||
Body: bytes.NewReader(body),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := bi.Close(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if i.logger != nil {
|
||||
i.logger.Info("Elasticsearch users indexed", zap.Int("count", len(rows)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Indexer) reindexPlaylists(ctx context.Context) error {
|
||||
idx := indexName(i.client.Config.Index, IdxPlaylists)
|
||||
type row struct {
|
||||
ID uuid.UUID
|
||||
Name string
|
||||
Description string
|
||||
Visibility string
|
||||
CoverURL string
|
||||
CreatedAt string
|
||||
}
|
||||
var rows []row
|
||||
err := i.db.WithContext(ctx).Raw(`
|
||||
SELECT id, COALESCE(name, title, '') as name, COALESCE(description,'') as description,
|
||||
COALESCE(visibility::text, 'public') as visibility, COALESCE(cover_url,'') as cover_url,
|
||||
created_at::text
|
||||
FROM playlists WHERE deleted_at IS NULL AND (is_public = true OR visibility = 'public')
|
||||
`).Scan(&rows).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Client: i.client.Client,
|
||||
Index: idx,
|
||||
NumWorkers: 2,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, r := range rows {
|
||||
doc := playlistDoc{
|
||||
ID: r.ID.String(),
|
||||
Name: r.Name,
|
||||
Description: r.Description,
|
||||
Visibility: r.Visibility,
|
||||
CoverURL: r.CoverURL,
|
||||
CreatedAt: r.CreatedAt,
|
||||
}
|
||||
body, _ := json.Marshal(doc)
|
||||
err := bi.Add(ctx, esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
DocumentID: r.ID.String(),
|
||||
Body: bytes.NewReader(body),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := bi.Close(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if i.logger != nil {
|
||||
i.logger.Info("Elasticsearch playlists indexed", zap.Int("count", len(rows)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IndexTrack indexes a single track (hook after create/update)
|
||||
func (i *Indexer) IndexTrack(ctx context.Context, trackID uuid.UUID) error {
|
||||
if i.client == nil {
|
||||
return nil
|
||||
}
|
||||
var r struct {
|
||||
ID uuid.UUID
|
||||
Title string
|
||||
Artist string
|
||||
Description string
|
||||
Album string
|
||||
Genre string
|
||||
Tags pq.StringArray
|
||||
CreatedAt string
|
||||
CoverArtPath string
|
||||
StreamManifestURL string
|
||||
FilePath string
|
||||
GenreSlugs pq.StringArray
|
||||
}
|
||||
err := i.db.WithContext(ctx).Raw(`
|
||||
SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description,
|
||||
COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags,
|
||||
t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path,
|
||||
COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path,
|
||||
COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs
|
||||
FROM tracks t WHERE t.id = ? AND t.deleted_at IS NULL
|
||||
`, trackID).Scan(&r).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
genres := []string(r.GenreSlugs)
|
||||
if len(genres) == 0 && r.Genre != "" {
|
||||
genres = []string{r.Genre}
|
||||
}
|
||||
doc := trackDoc{
|
||||
ID: r.ID.String(),
|
||||
Title: r.Title,
|
||||
Artist: r.Artist,
|
||||
Description: r.Description,
|
||||
Album: r.Album,
|
||||
Genre: genres,
|
||||
Tags: []string(r.Tags),
|
||||
CreatedAt: r.CreatedAt,
|
||||
CoverArtPath: r.CoverArtPath,
|
||||
StreamManifestURL: r.StreamManifestURL,
|
||||
FilePath: r.FilePath,
|
||||
}
|
||||
body, _ := json.Marshal(doc)
|
||||
idx := indexName(i.client.Config.Index, IdxTracks)
|
||||
res, err := i.client.Index(idx, bytes.NewReader(body),
|
||||
i.client.Index.WithContext(ctx),
|
||||
i.client.Index.WithDocumentID(trackID.String()),
|
||||
i.client.Index.WithRefresh("true"),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("index track: %s", res.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteTrack removes a track from the index
|
||||
func (i *Indexer) DeleteTrack(ctx context.Context, trackID uuid.UUID) error {
|
||||
if i.client == nil {
|
||||
return nil
|
||||
}
|
||||
idx := indexName(i.client.Config.Index, IdxTracks)
|
||||
res, err := i.client.Delete(idx, trackID.String(),
|
||||
i.client.Delete.WithContext(ctx),
|
||||
i.client.Delete.WithRefresh("true"),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode == 404 {
|
||||
return nil
|
||||
}
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("delete track: %s", res.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// escapeQuery escapes special characters for ES query string
|
||||
func escapeQuery(s string) string {
|
||||
// Escape: \ " * ? < > | ( ) { } [ ] ~ ! : / +
|
||||
for _, c := range []string{`\`, `"`, `*`, `?`, `<`, `>`, `|`, `(`, `)`, `{`, `}`, `[`, `]`, `~`, `!`, `:`, `/`, `+`} {
|
||||
s = strings.ReplaceAll(s, c, `\`+c)
|
||||
}
|
||||
return s
|
||||
}
|
||||
Loading…
Reference in a new issue