Module: Familia::Features::Relationships::Indexing::RebuildStrategies
- Defined in:
- lib/familia/features/relationships/indexing/rebuild_strategies.rb
Overview
RebuildStrategies provides atomic index rebuild operations with zero downtime.
All rebuild strategies follow a consistent pattern:
- Build index in temporary key
- Batch processing with transactions per batch (not entire rebuild)
- Atomic swap via Lua script at completion
- Progress callbacks throughout
This ensures:
- Zero downtime during rebuild (live index remains available)
- Memory efficiency (batch processing)
- Consistent progress reporting
- Safe failure handling (temp key abandoned on error)
Class Method Summary collapse
-
.atomic_swap(temp_key, final_key, redis) ⇒ Object
Performs atomic swap of temp key to final key.
-
.build_temp_key(base_key) ⇒ String
Builds a temporary key name for atomic swaps.
-
.process_scan_batch(keys, indexed_class, field, temp_key, index_hashkey, scope_instance) ⇒ Integer
Processes a batch of keys from SCAN (module_function helper).
-
.rebuild_via_instances(indexed_class, field, add_method, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by loading objects from ModelClass.instances sorted set.
-
.rebuild_via_participation(scope_instance, indexed_class, field, add_method, collection, cardinality, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by loading objects from a participation collection.
-
.rebuild_via_scan(indexed_class, field, add_method, index_hashkey, scope_instance: nil, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by scanning all keys matching a pattern.
Class Method Details
.atomic_swap(temp_key, final_key, redis) ⇒ Object
Performs atomic swap of temp key to final key.
This ensures zero downtime during rebuild:
- DEL final_key (remove old index)
- RENAME temp_key final_key (atomically replace)
RENAME is atomic, so the old index remains queryable until replaced:
- Partial updates
- Race conditions
- Stale data visibility
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 450 def atomic_swap(temp_key, final_key, redis) # Check if temp key exists first - RENAME fails on non-existent keys unless redis.exists(temp_key) > 0 Familia.info "[Rebuild] No temp key to swap (empty result set)" # Just ensure final key is cleared redis.del(final_key) return end # Atomic swap: DEL final key, then RENAME temp -> final # RENAME is already atomic, so we just need to clear the final key first redis.del(final_key) redis.rename(temp_key, final_key) Familia.info "[Rebuild] Atomic swap completed: #{temp_key} -> #{final_key}" rescue Redis::CommandError => e # If temp key doesn't exist, just log and return (already handled above) if e..include?("no such key") Familia.info "[Rebuild] Temp key vanished during swap (concurrent operation?)" return end # For other errors, preserve temp key for debugging Familia.warn "[Rebuild] Atomic swap failed: #{e.}" Familia.warn "[Rebuild] Temp key preserved for debugging: #{temp_key}" raise end |
.build_temp_key(base_key) ⇒ String
Builds a temporary key name for atomic swaps
430 431 432 433 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 430 def build_temp_key(base_key) = Familia.now.to_i "#{base_key}:rebuild:#{}" end |
.process_scan_batch(keys, indexed_class, field, temp_key, index_hashkey, scope_instance) ⇒ Integer
Processes a batch of keys from SCAN (module_function helper)
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 389 def process_scan_batch(keys, indexed_class, field, temp_key, index_hashkey, scope_instance) # Load objects by keys objects = indexed_class.load_multi_by_keys(keys).compact # For instance-scoped indexes, filter objects by scope if scope_instance # Get the participation collection for this scope participation = indexed_class.participation_relationships.find do |rel| rel.target_class == scope_instance.class end if participation collection_name = participation.collection_name scope_collection = scope_instance.send(collection_name) # Filter to only objects that belong to this scope objects = objects.select { |obj| scope_collection.member?(obj) } end end # Transaction per batch batch_indexed = 0 indexed_class.transaction do |tx| objects.each do |obj| value = obj.send(field) next unless value && !value.to_s.strip.empty? tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj)) batch_indexed += 1 end end batch_indexed rescue StandardError => e Familia.warn "[Rebuild] Error processing batch: #{e.}" 0 end |
.rebuild_via_instances(indexed_class, field, add_method, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by loading objects from ModelClass.instances sorted set.
This is the preferred strategy for models with class-level indexes that maintain an instances collection. It's efficient because:
- Direct access to all object identifiers via ZRANGE
- Bulk loading via load_multi
- No key pattern matching required
Process:
- Enumerate identifiers from ModelClass.instances.members
- Load objects in batches via load_multi(identifiers).compact
- Build temp index via transactions (one per batch)
- Atomic swap temp -> final key via Lua
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 79 def rebuild_via_instances(indexed_class, field, add_method, index_hashkey, batch_size: 100, &progress) unless indexed_class.respond_to?(:instances) raise ArgumentError, "#{indexed_class.name} does not have an instances collection" end instances = indexed_class.instances total = instances.size start_time = Familia.now Familia.info "[Rebuild] Starting via_instances for #{indexed_class.name}.#{field} (#{total} objects)" # Determine the final index key by examining the class-level index # Extract index name from add_method (e.g., add_to_email_index -> email_index) # or add_to_class_email_index -> email_index index_name = add_method.to_s.gsub(/^(add_to|update_in|remove_from)_(class_)?/, '') # Access the class-level index directly unless indexed_class.respond_to?(index_name) raise ArgumentError, "#{indexed_class.name} does not have index accessor: #{index_name}" end index_hashkey = indexed_class.send(index_name) final_key = index_hashkey.dbkey temp_key = RebuildStrategies.build_temp_key(final_key) processed = 0 indexed_count = 0 # Process in batches - use members to get deserialized identifiers instances.members.each_slice(batch_size) do |identifiers| # Bulk load objects, filtering out nils (deleted/missing objects) objects = indexed_class.load_multi(identifiers).compact # Transaction per batch (NOT entire rebuild) batch_indexed = 0 indexed_class.transaction do |tx| objects.each do |obj| value = obj.send(field) # Skip nil/empty field values gracefully next unless value && !value.to_s.strip.empty? # For class-level indexes, use HSET with serialized value for consistency tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj)) batch_indexed += 1 end end processed += identifiers.size indexed_count += batch_indexed elapsed = Familia.now - start_time rate = processed / elapsed progress&.call( completed: processed, total: total, rate: rate.round(2), elapsed: elapsed.round(2) ) end # Atomic swap: temp -> final (ZERO DOWNTIME) RebuildStrategies.atomic_swap(temp_key, final_key, indexed_class.dbclient) elapsed = Familia.now - start_time Familia.info "[Rebuild] Completed via_instances: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s" indexed_count end |
.rebuild_via_participation(scope_instance, indexed_class, field, add_method, collection, cardinality, index_hashkey, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by loading objects from a participation collection.
This strategy is for instance-scoped indexes where objects participate in a parent's collection (e.g., employees in company.employees_collection).
Process:
- Enumerate members from collection (SortedSet, UnsortedSet, or ListKey)
- Load objects in batches via load_multi(identifiers).compact
- Build temp index via transactions (one per batch)
- Atomic swap temp -> final key via Lua
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 180 def rebuild_via_participation(scope_instance, indexed_class, field, add_method, collection, cardinality, index_hashkey, batch_size: 100, &progress) total = collection.size start_time = Familia.now scope_class = scope_instance.class.name Familia.info "[Rebuild] Starting via_participation for #{scope_class}##{indexed_class.name}.#{field} (#{total} objects)" # Guard: This method only supports unique indexes if cardinality != :unique raise ArgumentError, <<~ERROR.strip rebuild_via_participation only supports unique indexes (cardinality: :unique) Received cardinality: #{cardinality.inspect} for field: #{field} Multi-indexes require field-value-specific keys and use specialized 4-phase rebuild logic. Use the dedicated rebuild method generated on the scope instance instead. ERROR end # Build temp key for the unique index. # # Extract index name from add_method. The add_method follows the pattern: # add_to_{scope_class_config}_{index_name} # # For example: # add_to_test_company_badge_index -> badge_index # add_to_company_badge_index -> badge_index # # We need to remove the "add_to_{scope_class_config}_" prefix. scope_class_config = scope_instance.class.config_name prefix = "add_to_#{scope_class_config}_" index_name = add_method.to_s.gsub(/^#{Regexp.escape(prefix)}/, '') # Get the actual index accessor from the scope instance to derive the correct key. # This ensures we use the same dbkey as the actual index DataType. unless scope_instance.respond_to?(index_name) raise ArgumentError, "#{scope_instance.class} does not have index accessor: #{index_name}" end index_datatype = scope_instance.send(index_name) final_key = index_datatype.dbkey temp_key = RebuildStrategies.build_temp_key(final_key) processed = 0 indexed_count = 0 # Process in batches - use members to get deserialized identifiers collection.members.each_slice(batch_size) do |identifiers| objects = indexed_class.load_multi(identifiers).compact # Transaction per batch batch_indexed = 0 scope_instance.transaction do |tx| objects.each do |obj| value = obj.send(field) next unless value && !value.to_s.strip.empty? # For unique index: HSET temp_key field_value serialized_identifier # For multi-index: SADD temp_key:field_value identifier tx.hset(temp_key, value.to_s, index_hashkey.serialize_value(obj)) batch_indexed += 1 end end processed += identifiers.size indexed_count += batch_indexed elapsed = Familia.now - start_time rate = processed / elapsed progress&.call( completed: processed, total: total, rate: rate.round(2), elapsed: elapsed.round(2) ) end # Atomic swap RebuildStrategies.atomic_swap(temp_key, final_key, scope_instance.dbclient) elapsed = Familia.now - start_time Familia.info "[Rebuild] Completed via_participation: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s" indexed_count end |
.rebuild_via_scan(indexed_class, field, add_method, index_hashkey, scope_instance: nil, batch_size: 100) {|Hash| ... } ⇒ Integer
Rebuilds index by scanning all keys matching a pattern.
This is the fallback strategy when:
- No instances collection available
- No participation relationship
- Need to rebuild from raw keys
Uses SCAN (not KEYS) for memory-efficient iteration. Filters by scope if scope_instance provided.
Process:
- Use redis.scan_each(match: pattern, count: batch_size)
- Filter by scope_instance if provided
- Load objects in batches via load_multi_by_keys
- Build temp index via transactions (one per batch)
- Atomic swap temp -> final key via Lua
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/familia/features/relationships/indexing/rebuild_strategies.rb', line 298 def rebuild_via_scan(indexed_class, field, add_method, index_hashkey, scope_instance: nil, batch_size: 100, &progress) start_time = Familia.now # Build key pattern for SCAN # For instance-scoped indexes, we still scan all objects of indexed_class # (not scoped under parent), then filter by scope during processing # Use centralized scan_pattern method for consistent key generation pattern = indexed_class.scan_pattern Familia.info "[Rebuild] Starting via_scan for #{indexed_class.name}.#{field} (pattern: #{pattern})" Familia.warn "[Rebuild] Using SCAN fallback - consider adding instances collection for better performance" # Determine final key by examining the index # Extract index name from add_method (e.g., add_to_class_email_index -> email_index) # For instance-scoped: add_to_rebuild_test_company_badge_index -> badge_index index_name = add_method.to_s.gsub(/^(add_to|update_in|remove_from)_(class_)?/, '') # Strip scope class config prefix if present (e.g., rebuild_test_company_badge_index -> badge_index) # For instance-scoped indexes, the index lives on scope_instance, not indexed_class if scope_instance scope_config = scope_instance.class.config_name index_name = index_name.gsub(/^#{scope_config}_/, '') end # For instance-scoped indexes, check scope_instance for accessor # For class-level indexes, check indexed_class index_owner = scope_instance || indexed_class unless index_owner.respond_to?(index_name) raise ArgumentError, "#{index_owner.class.name} does not have index accessor: #{index_name}" end index_hashkey = index_owner.send(index_name) final_key = index_hashkey.dbkey temp_key = RebuildStrategies.build_temp_key(final_key) processed = 0 indexed_count = 0 scanned = 0 redis = indexed_class.dbclient # Use SCAN (not KEYS) for memory efficiency batch = [] redis.scan_each(match: pattern, count: batch_size) do |key| batch << key scanned += 1 # Process in batches if batch.size >= batch_size batch_indexed = RebuildStrategies.process_scan_batch(batch, indexed_class, field, temp_key, index_hashkey, scope_instance) processed += batch.size indexed_count += batch_indexed elapsed = Familia.now - start_time rate = processed / elapsed progress&.call( completed: processed, scanned: scanned, rate: rate.round(2), elapsed: elapsed.round(2) ) batch.clear end end # Process remaining batch unless batch.empty? batch_indexed = RebuildStrategies.process_scan_batch(batch, indexed_class, field, temp_key, index_hashkey, scope_instance) processed += batch.size indexed_count += batch_indexed end # Atomic swap RebuildStrategies.atomic_swap(temp_key, final_key, redis) elapsed = Familia.now - start_time Familia.info "[Rebuild] Completed via_scan: #{indexed_count} indexed (#{processed} total) in #{elapsed.round(2)}s (scanned: #{scanned})" indexed_count end |