Module: Familia::Horreum::RepairMethods
- Included in:
- ManagementMethods
- Defined in:
- lib/familia/horreum/management/repair.rb
Overview
RepairMethods provides repair and rebuild operations for Horreum models.
Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).
Instance Method Summary collapse
-
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
-
#repair_all!(batch_size: 100) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
-
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
-
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
-
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
-
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
Instance Method Details
#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer
Full SCAN-based rebuild of the instances timeline with atomic swap.
Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/familia/horreum/management/repair.rb', line 67 def rebuild_instances(batch_size: 100, &progress) pattern = scan_pattern final_key = instances.dbkey temp_key = "#{final_key}:rebuild:#{Familia.now.to_i}" count = 0 cursor = "0" batch = [] loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each do |key| identifier = extract_identifier_from_key(key) next if identifier.nil? || identifier.empty? batch << { key: key, identifier: identifier } end # Process batch when it reaches threshold if batch.size >= batch_size count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) batch.clear end break if cursor == "0" end # Process remaining batch unless batch.empty? count += process_rebuild_batch(batch, temp_key) progress&.call(phase: :rebuilding, current: count, total: nil) end # Atomic swap Familia::Features::Relationships::Indexing::RebuildStrategies.atomic_swap( temp_key, final_key, dbclient ) progress&.call(phase: :completed, current: count, total: count) count end |
#repair_all!(batch_size: 100) {|Hash| ... } ⇒ Hash
Runs health_check then all repair methods.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/familia/horreum/management/repair.rb', line 190 def repair_all!(batch_size: 100, &progress) report = health_check(batch_size: batch_size, &progress) instances_result = repair_instances!(report.instances) indexes_result = repair_indexes!(report.unique_indexes) participations_result = repair_participations!(report.participations) { report: report, instances: instances_result, indexes: indexes_result, participations: participations_result, } end |
#repair_indexes!(audit_results = nil) ⇒ Hash
Repairs indexes by running existing rebuild methods for stale indexes.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/familia/horreum/management/repair.rb', line 116 def repair_indexes!(audit_results = nil) audit_results ||= audit_unique_indexes rebuilt = [] audit_results.each do |idx_result| index_name = idx_result[:index_name] next if idx_result[:stale].empty? && idx_result[:missing].empty? rebuild_method = :"rebuild_#{index_name}" if respond_to?(rebuild_method) send(rebuild_method) rebuilt << index_name end end { rebuilt: rebuilt } end |
#repair_instances!(audit_result = nil) ⇒ Hash
Repairs the instances timeline by removing phantoms and adding missing entries.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/familia/horreum/management/repair.rb', line 18 def repair_instances!(audit_result = nil) audit_result ||= audit_instances phantoms_removed = 0 missing_added = 0 # Remove phantoms (in timeline but key expired/deleted). # Batch all ZREMs in a single pipeline to avoid N round-trips. phantoms = audit_result[:phantoms] unless phantoms.empty? instances_key = instances.dbkey pipelined do |pipe| phantoms.each do |identifier| pipe.zrem(instances_key, identifier) end end phantoms_removed = phantoms.size end # Add missing (key exists but not in timeline). # Batch-load all objects via load_multi, then batch ZADDs in a pipeline. missing = audit_result[:missing] unless missing.empty? objects = load_multi(missing) instances_key = instances.dbkey pipelined do |pipe| missing.each_with_index do |identifier, idx| obj = objects[idx] next unless obj # Key expired between SCAN and load score = (obj) pipe.zadd(instances_key, score, identifier) missing_added += 1 end end end { phantoms_removed: phantoms_removed, missing_added: missing_added } end |
#repair_participations!(audit_results = nil) ⇒ Hash
Repairs participation collections by removing stale members.
Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.
Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/familia/horreum/management/repair.rb', line 149 def repair_participations!(audit_results = nil) audit_results ||= audit_participations # Build a lookup from collection_name to the target class's dbclient. # The audit reads collections using target_class.dbclient, so repairs # must use the same client (critical in multi-database setups). client_by_name = {} if respond_to?(:participation_relationships) participation_relationships.each do |rel| client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient end end # Collect all valid stale entries and group by collection_key # so we can batch Redis operations instead of 2N round-trips. grouped = Hash.new { |h, k| h[k] = [] } key_clients = {} audit_results.each do |part_result| part_result[:stale_members].each do |entry| identifier = entry[:identifier] collection_key = entry[:collection_key] collection_name = entry[:collection_name] next unless collection_key && identifier grouped[collection_key] << identifier key_clients[collection_key] ||= client_by_name[collection_name.to_s] end end return { stale_removed: 0 } if grouped.empty? stale_removed = batch_remove_stale_members(grouped, key_clients) { stale_removed: stale_removed } end |
#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator
SCAN helper for enumerating keys matching a pattern.
212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/familia/horreum/management/repair.rb', line 212 def scan_keys(filter = '*', batch_size: 100, &block) pattern = dbkey(filter) return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given? cursor = "0" loop do cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size) keys.each(&block) break if cursor == "0" end end |