package elasticsearch import ( "bytes" "context" "encoding/json" "fmt" "strings" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/google/uuid" "github.com/lib/pq" "go.uber.org/zap" "gorm.io/gorm" ) // Indexer syncs PostgreSQL data to Elasticsearch (F361 Phase 3) type Indexer struct { client *Client db *gorm.DB logger *zap.Logger } // NewIndexer creates an indexer func NewIndexer(client *Client, db *gorm.DB, logger *zap.Logger) *Indexer { return &Indexer{client: client, db: db, logger: logger} } // trackDoc is the ES document for a track type trackDoc struct { ID string `json:"id"` Title string `json:"title"` Artist string `json:"artist"` Description string `json:"description"` Album string `json:"album"` Genre []string `json:"genre"` // genre slugs from track_genres Tags []string `json:"tags"` CreatedAt string `json:"created_at"` CoverArtPath string `json:"cover_art_path,omitempty"` StreamManifestURL string `json:"stream_manifest_url,omitempty"` FilePath string `json:"file_path,omitempty"` } // userDoc is the ES document for a user type userDoc struct { ID string `json:"id"` Username string `json:"username"` DisplayName string `json:"display_name"` Bio string `json:"bio"` Location string `json:"location"` Avatar string `json:"avatar,omitempty"` CreatedAt string `json:"created_at"` } // playlistDoc is the ES document for a playlist type playlistDoc struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Visibility string `json:"visibility"` CoverURL string `json:"cover_url,omitempty"` CreatedAt string `json:"created_at"` } // ReindexAll performs full reindex of tracks, users, playlists (F361 Phase 3.1) func (i *Indexer) ReindexAll(ctx context.Context) error { if i.client == nil { return nil } if err := i.client.EnsureIndices(ctx); err != nil { return fmt.Errorf("ensure indices: %w", err) } if err := i.reindexTracks(ctx); err != nil { return fmt.Errorf("reindex tracks: %w", err) } if err := i.reindexUsers(ctx); err != nil { return fmt.Errorf("reindex users: %w", err) } if err := i.reindexPlaylists(ctx); err != nil { return fmt.Errorf("reindex playlists: %w", err) } if i.logger != nil { i.logger.Info("Elasticsearch reindex completed") } return nil } func (i *Indexer) reindexTracks(ctx context.Context) error { idx := indexName(i.client.Config.Index, IdxTracks) type row struct { ID uuid.UUID Title string Artist string Description string Album string Genre string Tags pq.StringArray CreatedAt string CoverArtPath string StreamManifestURL string FilePath string GenreSlugs pq.StringArray } var rows []row // Subquery for genre_slugs; tracks.tags is denormalized err := i.db.WithContext(ctx).Raw(` SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description, COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags, t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path, COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path, COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs FROM tracks t WHERE t.deleted_at IS NULL AND t.status = 'active' `).Scan(&rows).Error if err != nil { return err } bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: i.client.Client, Index: idx, NumWorkers: 2, }) if err != nil { return err } for _, r := range rows { genres := []string(r.GenreSlugs) if len(genres) == 0 && r.Genre != "" { genres = []string{r.Genre} } doc := trackDoc{ ID: r.ID.String(), Title: r.Title, Artist: r.Artist, Description: r.Description, Album: r.Album, Genre: genres, Tags: []string(r.Tags), CreatedAt: r.CreatedAt, CoverArtPath: r.CoverArtPath, StreamManifestURL: r.StreamManifestURL, FilePath: r.FilePath, } body, _ := json.Marshal(doc) err := bi.Add(ctx, esutil.BulkIndexerItem{ Action: "index", DocumentID: r.ID.String(), Body: bytes.NewReader(body), }) if err != nil { return err } } if err := bi.Close(ctx); err != nil { return err } if i.logger != nil { i.logger.Info("Elasticsearch tracks indexed", zap.Int("count", len(rows))) } return nil } func (i *Indexer) reindexUsers(ctx context.Context) error { idx := indexName(i.client.Config.Index, IdxUsers) type row struct { ID uuid.UUID Username string DisplayName string Bio string Location string Avatar string CreatedAt string } var rows []row err := i.db.WithContext(ctx).Raw(` SELECT u.id, u.username, COALESCE(u.display_name, u.username) as display_name, COALESCE(COALESCE(up.bio, u.bio), '') as bio, COALESCE(COALESCE(up.location, u.location), '') as location, COALESCE(COALESCE(up.avatar_url, u.avatar), '') as avatar, u.created_at::text FROM users u LEFT JOIN user_profiles up ON up.user_id = u.id WHERE u.deleted_at IS NULL `).Scan(&rows).Error if err != nil { err = i.db.WithContext(ctx).Raw(` SELECT id, username, COALESCE(display_name, username) as display_name, COALESCE(bio,'') as bio, COALESCE(location,'') as location, COALESCE(avatar,'') as avatar, created_at::text FROM users WHERE deleted_at IS NULL `).Scan(&rows).Error if err != nil { return err } } bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: i.client.Client, Index: idx, NumWorkers: 2, }) if err != nil { return err } for _, r := range rows { doc := userDoc{ ID: r.ID.String(), Username: r.Username, DisplayName: r.DisplayName, Bio: r.Bio, Location: r.Location, Avatar: r.Avatar, CreatedAt: r.CreatedAt, } body, _ := json.Marshal(doc) err := bi.Add(ctx, esutil.BulkIndexerItem{ Action: "index", DocumentID: r.ID.String(), Body: bytes.NewReader(body), }) if err != nil { return err } } if err := bi.Close(ctx); err != nil { return err } if i.logger != nil { i.logger.Info("Elasticsearch users indexed", zap.Int("count", len(rows))) } return nil } func (i *Indexer) reindexPlaylists(ctx context.Context) error { idx := indexName(i.client.Config.Index, IdxPlaylists) type row struct { ID uuid.UUID Name string Description string Visibility string CoverURL string CreatedAt string } var rows []row err := i.db.WithContext(ctx).Raw(` SELECT id, COALESCE(name, title, '') as name, COALESCE(description,'') as description, COALESCE(visibility::text, 'public') as visibility, COALESCE(cover_url,'') as cover_url, created_at::text FROM playlists WHERE deleted_at IS NULL AND (is_public = true OR visibility = 'public') `).Scan(&rows).Error if err != nil { return err } bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Client: i.client.Client, Index: idx, NumWorkers: 2, }) if err != nil { return err } for _, r := range rows { doc := playlistDoc{ ID: r.ID.String(), Name: r.Name, Description: r.Description, Visibility: r.Visibility, CoverURL: r.CoverURL, CreatedAt: r.CreatedAt, } body, _ := json.Marshal(doc) err := bi.Add(ctx, esutil.BulkIndexerItem{ Action: "index", DocumentID: r.ID.String(), Body: bytes.NewReader(body), }) if err != nil { return err } } if err := bi.Close(ctx); err != nil { return err } if i.logger != nil { i.logger.Info("Elasticsearch playlists indexed", zap.Int("count", len(rows))) } return nil } // IndexTrack indexes a single track (hook after create/update) func (i *Indexer) IndexTrack(ctx context.Context, trackID uuid.UUID) error { if i.client == nil { return nil } var r struct { ID uuid.UUID Title string Artist string Description string Album string Genre string Tags pq.StringArray CreatedAt string CoverArtPath string StreamManifestURL string FilePath string GenreSlugs pq.StringArray } err := i.db.WithContext(ctx).Raw(` SELECT t.id, t.title, COALESCE(t.artist,'') as artist, COALESCE(t.description,'') as description, COALESCE(t.album,'') as album, COALESCE(t.genre,'') as genre, COALESCE(t.tags,'{}') as tags, t.created_at::text, COALESCE(t.cover_art_path,'') as cover_art_path, COALESCE(t.stream_manifest_url,'') as stream_manifest_url, COALESCE(t.file_path,'') as file_path, COALESCE((SELECT array_agg(tg.genre_slug ORDER BY tg.position) FROM track_genres tg WHERE tg.track_id = t.id), '{}') as genre_slugs FROM tracks t WHERE t.id = ? AND t.deleted_at IS NULL `, trackID).Scan(&r).Error if err != nil { return err } genres := []string(r.GenreSlugs) if len(genres) == 0 && r.Genre != "" { genres = []string{r.Genre} } doc := trackDoc{ ID: r.ID.String(), Title: r.Title, Artist: r.Artist, Description: r.Description, Album: r.Album, Genre: genres, Tags: []string(r.Tags), CreatedAt: r.CreatedAt, CoverArtPath: r.CoverArtPath, StreamManifestURL: r.StreamManifestURL, FilePath: r.FilePath, } body, _ := json.Marshal(doc) idx := indexName(i.client.Config.Index, IdxTracks) res, err := i.client.Index(idx, bytes.NewReader(body), i.client.Index.WithContext(ctx), i.client.Index.WithDocumentID(trackID.String()), i.client.Index.WithRefresh("true"), ) if err != nil { return err } defer res.Body.Close() if res.IsError() { return fmt.Errorf("index track: %s", res.String()) } return nil } // DeleteTrack removes a track from the index func (i *Indexer) DeleteTrack(ctx context.Context, trackID uuid.UUID) error { if i.client == nil { return nil } idx := indexName(i.client.Config.Index, IdxTracks) res, err := i.client.Delete(idx, trackID.String(), i.client.Delete.WithContext(ctx), i.client.Delete.WithRefresh("true"), ) if err != nil { return err } defer res.Body.Close() if res.StatusCode == 404 { return nil } if res.IsError() { return fmt.Errorf("delete track: %s", res.String()) } return nil } // escapeQuery escapes special characters for ES query string func escapeQuery(s string) string { // Escape: \ " * ? < > | ( ) { } [ ] ~ ! : / + for _, c := range []string{`\`, `"`, `*`, `?`, `<`, `>`, `|`, `(`, `)`, `{`, `}`, `[`, `]`, `~`, `!`, `:`, `/`, `+`} { s = strings.ReplaceAll(s, c, `\`+c) } return s }