Module: Familia::Features::Relationships::Indexing::RebuildStrategies

Defined in:
lib/familia/features/relationships/indexing/rebuild_strategies.rb

Overview

RebuildStrategies provides atomic index rebuild operations with zero downtime.

All rebuild strategies follow a consistent pattern:

  1. Build index in temporary key
  2. Batch processing with transactions per batch (not entire rebuild)
  3. Atomic swap via Lua script at completion
  4. Progress callbacks throughout

This ensures:

  • Zero downtime during rebuild (live index remains available)
  • Memory efficiency (batch processing)
  • Consistent progress reporting
  • Safe failure handling (temp key abandoned on error)

Examples:

Via instances collection

RebuildStrategies.rebuild_via_instances(
  User,
  :email,
  :add_to_email_index,
  batch_size: 100
) { |progress| puts "Processed: #{progress[:completed]}/#{progress[:total]}" }

Via participation relationship

RebuildStrategies.rebuild_via_participation(
  company,
  Employee,
  :department,
  :add_to_company_dept_index,
  company.employees_collection,
  batch_size: 100
)

Via SCAN (fallback for complex scenarios)

RebuildStrategies.rebuild_via_scan(
  User,
  :email,
  :add_to_email_index,
  batch_size: 100
)

Class Method Summary collapse

Class Method Details

.atomic_swap(temp_key, final_key, redis) ⇒ Object

Performs atomic swap of temp key to final key.

This ensures zero downtime during rebuild:

  1. DEL final_key (remove old index)
  2. RENAME temp_key final_key (atomically replace)

RENAME is atomic, so the old index remains queryable until replaced:

  • Partial updates
  • Race conditions
  • Stale data visibility

Parameters:

  • temp_key (String)

    The temporary key containing rebuilt index

  • final_key (String)

    The live index key

  • redis (Redis)

    The Redis connection



450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 450

def atomic_swap(temp_key, final_key, redis)
  # Check if temp key exists first - RENAME fails on non-existent keys
  unless redis.exists(temp_key) > 0
    Familia.info "[Rebuild] No temp key to swap (empty result set)"
    # Just ensure final key is cleared
    redis.del(final_key)
    return
  end

  # Atomic swap: DEL final key, then RENAME temp -> final
  # RENAME is already atomic, so we just need to clear the final key first
  redis.del(final_key)
  redis.rename(temp_key, final_key)
  Familia.info "[Rebuild] Atomic swap completed: #{temp_key} -> #{final_key}"
rescue Redis::CommandError => e
  # If temp key doesn't exist, just log and return (already handled above)
  if e.message.include?("no such key")
    Familia.info "[Rebuild] Temp key vanished during swap (concurrent operation?)"
    return
  end

  # For other errors, preserve temp key for debugging
  Familia.warn "[Rebuild] Atomic swap failed: #{e.message}"
  Familia.warn "[Rebuild] Temp key preserved for debugging: #{temp_key}"
  raise
end

.build_temp_key(base_key) ⇒ String

Builds a temporary key name for atomic swaps

Parameters:

  • base_key (String)

    The final index key

Returns:

  • (String)

    Temporary key with timestamp suffix



430
431
432
433
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 430

def build_temp_key(base_key)
  timestamp = Familia.now.to_i
  "#{base_key}:rebuild:#{timestamp}"
end

.process_scan_batch(keys, indexed_class, field, temp_key, index_hashkey, scope_instance) ⇒ Integer

Processes a batch of keys from SCAN (module_function helper)

Parameters:

  • keys (Array<String>)

    Array of Redis keys

  • indexed_class (Class)

    The model class

  • field (Symbol)

    The field to index

  • temp_key (String)

    The temporary index key

  • scope_instance (Object, nil)

    Optional scope instance. If provided, only objects belonging to this scope will be indexed.

Returns:

  • (Integer)

    Number of objects indexed in this batch



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 389

def process_scan_batch(keys, indexed_class, field, temp_key, index_hashkey, scope_instance)
  # Load objects by keys
  objects = indexed_class.load_multi_by_keys(keys).compact

  # For instance-scoped indexes, filter objects by scope
  if scope_instance
    # Get the participation collection for this scope
    participation = indexed_class.participation_relationships.find do |rel|
      rel.target_class == scope_instance.class
    end

    if participation
      collection_name = participation.collection_name
      scope_collection = scope_instance.send(collection_name)
      # Filter to only objects that belong to this scope
      objects = objects.select { |obj| scope_collection.member?(obj) }
    end
  end

  # Transaction per batch
  batch_indexed = 0
  indexed_class.transaction do |tx|
    objects.each do |obj|
      value = obj.send(field)
      next unless value && !value.to_s.strip.empty?

      tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj))
      batch_indexed += 1
    end
  end
  batch_indexed
rescue StandardError => e
  Familia.warn "[Rebuild] Error processing batch: #{e.message}"
  0
end

.rebuild_via_instances(indexed_class, field, add_method, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer

Rebuilds index by loading objects from ModelClass.instances sorted set.

This is the preferred strategy for models with class-level indexes that maintain an instances collection. It's efficient because:

  • Direct access to all object identifiers via ZRANGE
  • Bulk loading via load_multi
  • No key pattern matching required

Process:

  1. Enumerate identifiers from ModelClass.instances.members
  2. Load objects in batches via load_multi(identifiers).compact
  3. Build temp index via transactions (one per batch)
  4. Atomic swap temp -> final key via Lua

Examples:

Rebuild user email index

count = RebuildStrategies.rebuild_via_instances(
  User,
  :email,
  :add_to_email_index,
  batch_size: 100
) { |p| puts "#{p[:completed]}/#{p[:total]} (#{p[:rate]}/s)" }

Parameters:

  • indexed_class (Class)

    The model class being indexed (e.g., User)

  • field (Symbol)

    The field to index (e.g., :email)

  • add_method (Symbol)

    The mutation method to call (e.g., :add_to_email_index)

  • batch_size (Integer) (defaults to: 100)

    Number of objects per batch (default: 100)

Yields:

  • (Hash)

    Progress info: total:, rate:, elapsed:

Returns:

  • (Integer)

    Number of objects processed



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 79

def rebuild_via_instances(indexed_class, field, add_method, index_hashkey, batch_size: 100, &progress)
  unless indexed_class.respond_to?(:instances)
    raise ArgumentError, "#{indexed_class.name} does not have an instances collection"
  end

  instances = indexed_class.instances
  total = instances.size
  start_time = Familia.now

  Familia.info "[Rebuild] Starting via_instances for #{indexed_class.name}.#{field} (#{total} objects)"

  # Determine the final index key by examining the class-level index
  # Extract index name from add_method (e.g., add_to_email_index -> email_index)
  # or add_to_class_email_index -> email_index
  index_name = add_method.to_s.gsub(/^(add_to|update_in|remove_from)_(class_)?/, '')

  # Access the class-level index directly
  unless indexed_class.respond_to?(index_name)
    raise ArgumentError, "#{indexed_class.name} does not have index accessor: #{index_name}"
  end

  index_hashkey = indexed_class.send(index_name)
  final_key = index_hashkey.dbkey
  temp_key = RebuildStrategies.build_temp_key(final_key)

  processed = 0
  indexed_count = 0

  # Process in batches - use members to get deserialized identifiers
  instances.members.each_slice(batch_size) do |identifiers|
    # Bulk load objects, filtering out nils (deleted/missing objects)
    objects = indexed_class.load_multi(identifiers).compact

    # Transaction per batch (NOT entire rebuild)
    batch_indexed = 0
    indexed_class.transaction do |tx|
      objects.each do |obj|
        value = obj.send(field)
        # Skip nil/empty field values gracefully
        next unless value && !value.to_s.strip.empty?

        # For class-level indexes, use HSET with serialized value for consistency
        tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj))
        batch_indexed += 1
      end
    end

    processed += identifiers.size
    indexed_count += batch_indexed
    elapsed = Familia.now - start_time
    rate = processed / elapsed

    progress&.call(
      completed: processed,
      total: total,
      rate: rate.round(2),
      elapsed: elapsed.round(2)
    )
  end

  # Atomic swap: temp -> final (ZERO DOWNTIME)
  RebuildStrategies.atomic_swap(temp_key, final_key, indexed_class.dbclient)

  elapsed = Familia.now - start_time
  Familia.info "[Rebuild] Completed via_instances: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s"

  indexed_count
end

.rebuild_via_participation(scope_instance, indexed_class, field, add_method, collection, cardinality, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer

Rebuilds index by loading objects from a participation collection.

This strategy is for instance-scoped indexes where objects participate in a parent's collection (e.g., employees in company.employees_collection).

Process:

  1. Enumerate members from collection (SortedSet, UnsortedSet, or ListKey)
  2. Load objects in batches via load_multi(identifiers).compact
  3. Build temp index via transactions (one per batch)
  4. Atomic swap temp -> final key via Lua

Examples:

Rebuild company badge index

count = RebuildStrategies.rebuild_via_participation(
  company,
  Employee,
  :badge_number,
  :add_to_company_badge_index,
  company.employees_collection,
  :unique,
  batch_size: 100
)

Parameters:

  • scope_instance (Object)

    The parent instance providing scope (e.g., company)

  • indexed_class (Class)

    The model class being indexed (e.g., Employee)

  • field (Symbol)

    The field to index (e.g., :badge_number)

  • add_method (Symbol)

    The mutation method (e.g., :add_to_company_badge_index)

  • collection (DataType)

    The collection containing members (SortedSet/UnsortedSet/ListKey)

  • cardinality (Symbol)

    The index cardinality (:unique or :multi) - must be :unique

  • batch_size (Integer) (defaults to: 100)

    Number of objects per batch (default: 100)

Yields:

  • (Hash)

    Progress info: total:, rate:, elapsed:

Returns:

  • (Integer)

    Number of objects processed



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 180

def rebuild_via_participation(scope_instance, indexed_class, field, add_method, collection, cardinality, index_hashkey, batch_size: 100, &progress)
  total = collection.size
  start_time = Familia.now

  scope_class = scope_instance.class.name
  Familia.info "[Rebuild] Starting via_participation for #{scope_class}##{indexed_class.name}.#{field} (#{total} objects)"

  # Guard: This method only supports unique indexes
  if cardinality != :unique
    raise ArgumentError, <<~ERROR.strip
      rebuild_via_participation only supports unique indexes (cardinality: :unique)
      Received cardinality: #{cardinality.inspect} for field: #{field}

      Multi-indexes require field-value-specific keys and use specialized 4-phase rebuild logic.
      Use the dedicated rebuild method generated on the scope instance instead.
    ERROR
  end

  # Build temp key for the unique index.
  #
  # Extract index name from add_method. The add_method follows the pattern:
  #   add_to_{scope_class_config}_{index_name}
  #
  # For example:
  #   add_to_test_company_badge_index -> badge_index
  #   add_to_company_badge_index -> badge_index
  #
  # We need to remove the "add_to_{scope_class_config}_" prefix.
  scope_class_config = scope_instance.class.config_name
  prefix = "add_to_#{scope_class_config}_"
  index_name = add_method.to_s.gsub(/^#{Regexp.escape(prefix)}/, '')

  # Get the actual index accessor from the scope instance to derive the correct key.
  # This ensures we use the same dbkey as the actual index DataType.
  unless scope_instance.respond_to?(index_name)
    raise ArgumentError, "#{scope_instance.class} does not have index accessor: #{index_name}"
  end

  index_datatype = scope_instance.send(index_name)
  final_key = index_datatype.dbkey
  temp_key = RebuildStrategies.build_temp_key(final_key)

  processed = 0
  indexed_count = 0

  # Process in batches - use members to get deserialized identifiers
  collection.members.each_slice(batch_size) do |identifiers|
    objects = indexed_class.load_multi(identifiers).compact

    # Transaction per batch
    batch_indexed = 0
    scope_instance.transaction do |tx|
      objects.each do |obj|
        value = obj.send(field)
        next unless value && !value.to_s.strip.empty?

        # For unique index: HSET temp_key field_value serialized_identifier
        # For multi-index: SADD temp_key:field_value identifier
        tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj))
        batch_indexed += 1
      end
    end

    processed += identifiers.size
    indexed_count += batch_indexed
    elapsed = Familia.now - start_time
    rate = processed / elapsed

    progress&.call(
      completed: processed,
      total: total,
      rate: rate.round(2),
      elapsed: elapsed.round(2)
    )
  end

  # Atomic swap
  RebuildStrategies.atomic_swap(temp_key, final_key, scope_instance.dbclient)

  elapsed = Familia.now - start_time
  Familia.info "[Rebuild] Completed via_participation: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s"

  indexed_count
end

.rebuild_via_scan(indexed_class, field, add_method, index_hashkey, scope_instance: nil, batch_size: 100) {|Hash| ... } ⇒ Integer

Rebuilds index by scanning all keys matching a pattern.

This is the fallback strategy when:

  • No instances collection available
  • No participation relationship
  • Need to rebuild from raw keys

Uses SCAN (not KEYS) for memory-efficient iteration. Filters by scope if scope_instance provided.

Process:

  1. Use redis.scan_each(match: pattern, count: batch_size)
  2. Filter by scope_instance if provided
  3. Load objects in batches via load_multi_by_keys
  4. Build temp index via transactions (one per batch)
  5. Atomic swap temp -> final key via Lua

Examples:

Rebuild without instances collection

count = RebuildStrategies.rebuild_via_scan(
  User,
  :email,
  :add_to_email_index,
  batch_size: 100
)

Parameters:

  • indexed_class (Class)

    The model class being indexed

  • field (Symbol)

    The field to index

  • add_method (Symbol)

    The mutation method

  • scope_instance (Object, nil) (defaults to: nil)

    Optional scope for filtering

  • batch_size (Integer) (defaults to: 100)

    Number of keys per SCAN iteration (default: 100)

Yields:

  • (Hash)

    Progress info: scanned:, rate:, elapsed:

Returns:

  • (Integer)

    Number of objects processed



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 298

def rebuild_via_scan(indexed_class, field, add_method, index_hashkey, scope_instance: nil, batch_size: 100, &progress)
  start_time = Familia.now

  # Build key pattern for SCAN
  # For instance-scoped indexes, we still scan all objects of indexed_class
  # (not scoped under parent), then filter by scope during processing
  # Use centralized scan_pattern method for consistent key generation
  pattern = indexed_class.scan_pattern

  Familia.info "[Rebuild] Starting via_scan for #{indexed_class.name}.#{field} (pattern: #{pattern})"
  Familia.warn "[Rebuild] Using SCAN fallback - consider adding instances collection for better performance"

  # Determine final key by examining the index
  # Extract index name from add_method (e.g., add_to_class_email_index -> email_index)
  # For instance-scoped: add_to_rebuild_test_company_badge_index -> badge_index
  index_name = add_method.to_s.gsub(/^(add_to|update_in|remove_from)_(class_)?/, '')

  # Strip scope class config prefix if present (e.g., rebuild_test_company_badge_index -> badge_index)
  # For instance-scoped indexes, the index lives on scope_instance, not indexed_class
  if scope_instance
    scope_config = scope_instance.class.config_name
    index_name = index_name.gsub(/^#{scope_config}_/, '')
  end

  # For instance-scoped indexes, check scope_instance for accessor
  # For class-level indexes, check indexed_class
  index_owner = scope_instance || indexed_class
  unless index_owner.respond_to?(index_name)
    raise ArgumentError, "#{index_owner.class.name} does not have index accessor: #{index_name}"
  end

  index_hashkey = index_owner.send(index_name)
  final_key = index_hashkey.dbkey
  temp_key = RebuildStrategies.build_temp_key(final_key)

  processed = 0
  indexed_count = 0
  scanned = 0
  redis = indexed_class.dbclient

  # Use SCAN (not KEYS) for memory efficiency
  batch = []
  redis.scan_each(match: pattern, count: batch_size) do |key|
    batch << key
    scanned += 1

    # Process in batches
    if batch.size >= batch_size
      batch_indexed = RebuildStrategies.process_scan_batch(batch, indexed_class, field, temp_key, index_hashkey, scope_instance)
      processed += batch.size
      indexed_count += batch_indexed

      elapsed = Familia.now - start_time
      rate = processed / elapsed

      progress&.call(
        completed: processed,
        scanned: scanned,
        rate: rate.round(2),
        elapsed: elapsed.round(2)
      )

      batch.clear
    end
  end

  # Process remaining batch
  unless batch.empty?
    batch_indexed = RebuildStrategies.process_scan_batch(batch, indexed_class, field, temp_key, index_hashkey, scope_instance)
    processed += batch.size
    indexed_count += batch_indexed
  end

  # Atomic swap
  RebuildStrategies.atomic_swap(temp_key, final_key, redis)

  elapsed = Familia.now - start_time
  Familia.info "[Rebuild] Completed via_scan: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s (scanned: #{scanned})"

  indexed_count
end