Module: Familia::Horreum::RepairMethods

Included in:
ManagementMethods
Defined in:
lib/familia/horreum/management/repair.rb

Overview

RepairMethods provides repair and rebuild operations for Horreum models.

Included in ManagementMethods so every Horreum subclass gets these as class methods (e.g. Customer.repair_instances!, Customer.rebuild_instances).

Instance Method Summary collapse

Instance Method Details

#rebuild_instances(batch_size: 100) {|Hash| ... } ⇒ Integer

Full SCAN-based rebuild of the instances timeline with atomic swap.

Scans all hash keys matching this class's pattern, extracts identifiers, and rebuilds the sorted set with timestamps from the objects.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (Hash)

    Progress: current:, total:

Returns:

  • (Integer)

    Number of instances rebuilt



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/familia/horreum/management/repair.rb', line 67

def rebuild_instances(batch_size: 100, &progress)
  pattern = scan_pattern
  final_key = instances.dbkey
  temp_key = "#{final_key}:rebuild:#{Familia.now.to_i}"

  count = 0
  cursor = "0"
  batch = []

  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)

    keys.each do |key|
      identifier = extract_identifier_from_key(key)
      next if identifier.nil? || identifier.empty?

      batch << { key: key, identifier: identifier }
    end

    # Process batch when it reaches threshold
    if batch.size >= batch_size
      count += process_rebuild_batch(batch, temp_key)
      progress&.call(phase: :rebuilding, current: count, total: nil)
      batch.clear
    end

    break if cursor == "0"
  end

  # Process remaining batch
  unless batch.empty?
    count += process_rebuild_batch(batch, temp_key)
    progress&.call(phase: :rebuilding, current: count, total: nil)
  end

  # Atomic swap
  Familia::Features::Relationships::Indexing::RebuildStrategies.atomic_swap(
    temp_key, final_key, dbclient
  )

  progress&.call(phase: :completed, current: count, total: count)
  count
end

#repair_all!(batch_size: 100) {|Hash| ... } ⇒ Hash

Runs health_check then all repair methods.

Parameters:

  • batch_size (Integer) (defaults to: 100)

    SCAN batch size

Yields:

  • (Hash)

    Progress callbacks

Returns:

  • (Hash)

    Combined repair results plus the AuditReport



190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/familia/horreum/management/repair.rb', line 190

def repair_all!(batch_size: 100, &progress)
  report = health_check(batch_size: batch_size, &progress)

  instances_result = repair_instances!(report.instances)
  indexes_result = repair_indexes!(report.unique_indexes)
  participations_result = repair_participations!(report.participations)

  {
    report: report,
    instances: instances_result,
    indexes: indexes_result,
    participations: participations_result,
  }
end

#repair_indexes!(audit_results = nil) ⇒ Hash

Repairs indexes by running existing rebuild methods for stale indexes.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_unique_indexes

Returns:

  • (Hash)

    [index_names]



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/familia/horreum/management/repair.rb', line 116

def repair_indexes!(audit_results = nil)
  audit_results ||= audit_unique_indexes

  rebuilt = []

  audit_results.each do |idx_result|
    index_name = idx_result[:index_name]
    next if idx_result[:stale].empty? && idx_result[:missing].empty?

    rebuild_method = :"rebuild_#{index_name}"
    if respond_to?(rebuild_method)
      send(rebuild_method)
      rebuilt << index_name
    end
  end

  { rebuilt: rebuilt }
end

#repair_instances!(audit_result = nil) ⇒ Hash

Repairs the instances timeline by removing phantoms and adding missing entries.

Parameters:

  • audit_result (Hash, nil) (defaults to: nil)

    Result from audit_instances (runs audit if nil)

Returns:

  • (Hash)

    N, missing_added: N



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/familia/horreum/management/repair.rb', line 18

def repair_instances!(audit_result = nil)
  audit_result ||= audit_instances

  phantoms_removed = 0
  missing_added = 0

  # Remove phantoms (in timeline but key expired/deleted).
  # Batch all ZREMs in a single pipeline to avoid N round-trips.
  phantoms = audit_result[:phantoms]
  unless phantoms.empty?
    instances_key = instances.dbkey
    pipelined do |pipe|
      phantoms.each do |identifier|
        pipe.zrem(instances_key, identifier)
      end
    end
    phantoms_removed = phantoms.size
  end

  # Add missing (key exists but not in timeline).
  # Batch-load all objects via load_multi, then batch ZADDs in a pipeline.
  missing = audit_result[:missing]
  unless missing.empty?
    objects = load_multi(missing)
    instances_key = instances.dbkey
    pipelined do |pipe|
      missing.each_with_index do |identifier, idx|
        obj = objects[idx]
        next unless obj # Key expired between SCAN and load

        score = extract_timestamp_score(obj)
        pipe.zadd(instances_key, score, identifier)
        missing_added += 1
      end
    end
  end

  { phantoms_removed: phantoms_removed, missing_added: missing_added }
end

#repair_participations!(audit_results = nil) ⇒ Hash

Repairs participation collections by removing stale members.

Removes identifiers from the actual participation collections (not the instances timeline). Each stale entry from the audit carries a collection_key identifying the exact Redis key to remove from, plus the raw identifier string to remove.

Uses raw Redis commands (ZREM/SREM/LREM) because the stored member values are raw identifier strings (not JSON-encoded), and the DataType#remove method would JSON-encode string args.

Parameters:

  • audit_results (Array<Hash>, nil) (defaults to: nil)

    Results from audit_participations

Returns:

  • (Hash)

    N



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/familia/horreum/management/repair.rb', line 149

def repair_participations!(audit_results = nil)
  audit_results ||= audit_participations

  # Build a lookup from collection_name to the target class's dbclient.
  # The audit reads collections using target_class.dbclient, so repairs
  # must use the same client (critical in multi-database setups).
  client_by_name = {}
  if respond_to?(:participation_relationships)
    participation_relationships.each do |rel|
      client_by_name[rel.collection_name.to_s] = rel.target_class.dbclient
    end
  end

  # Collect all valid stale entries and group by collection_key
  # so we can batch Redis operations instead of 2N round-trips.
  grouped = Hash.new { |h, k| h[k] = [] }
  key_clients = {}
  audit_results.each do |part_result|
    part_result[:stale_members].each do |entry|
      identifier = entry[:identifier]
      collection_key = entry[:collection_key]
      collection_name = entry[:collection_name]
      next unless collection_key && identifier

      grouped[collection_key] << identifier
      key_clients[collection_key] ||= client_by_name[collection_name.to_s]
    end
  end

  return { stale_removed: 0 } if grouped.empty?

  stale_removed = batch_remove_stale_members(grouped, key_clients)
  { stale_removed: stale_removed }
end

#scan_keys(filter = '*', batch_size: 100) {|String| ... } ⇒ Enumerator

SCAN helper for enumerating keys matching a pattern.

Parameters:

  • filter (String) (defaults to: '*')

    Glob filter appended to class prefix (default: '*')

  • batch_size (Integer) (defaults to: 100)

    SCAN cursor count hint (default: 100)

Yields:

  • (String)

    Each matching key

Returns:

  • (Enumerator)

    If no block given



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/familia/horreum/management/repair.rb', line 212

def scan_keys(filter = '*', batch_size: 100, &block)
  pattern = dbkey(filter)
  return enum_for(:scan_keys, filter, batch_size: batch_size) unless block_given?

  cursor = "0"
  loop do
    cursor, keys = dbclient.scan(cursor, match: pattern, count: batch_size)
    keys.each(&block)
    break if cursor == "0"
  end
end