Class: Familia::ThreadSafety::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/familia/thread_safety/monitor.rb

Overview

Thread safety monitoring for production observability

Tracks mutex contention, race conditions, and synchronization metrics to provide insights into thread safety behavior in production.

Examples:

Basic usage

Familia::ThreadSafety::Monitor.start!
# ... application runs ...
report = Familia::ThreadSafety::Monitor.report
puts report[:summary]

Custom instrumentation

Familia::ThreadSafety::Monitor.record_contention('connection_chain')
Familia::ThreadSafety::Monitor.time_critical_section('field_registration') do
  # ... critical code ...
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMonitor

Returns a new instance of Monitor.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/familia/thread_safety/monitor.rb', line 47

def initialize
  @enabled = false
  @started_at = nil
  @mutex_contentions = Concurrent::AtomicFixnum.new(0)
  @race_detections = Concurrent::AtomicFixnum.new(0)
  @critical_sections = Concurrent::AtomicFixnum.new(0)
  @deadlock_checks = Concurrent::AtomicFixnum.new(0)

  # Track contention points with counts
  @contention_points = Concurrent::Map.new

  # Track wait time aggregates for critical sections (removed @wait_times to prevent memory leak)
  @wait_time_totals = Concurrent::Map.new
  @wait_time_counts = Concurrent::Map.new

  # Track thread-local state for nested monitoring
  @thread_state = Concurrent::Map.new

  # Track concurrent operation counts
  @concurrent_operations = Concurrent::Map.new

  # Performance metrics
  @section_timings = Concurrent::Map.new
  @section_counts = Concurrent::Map.new
end

Instance Attribute Details

#enabledObject (readonly)

Returns the value of attribute enabled.



45
46
47
# File 'lib/familia/thread_safety/monitor.rb', line 45

def enabled
  @enabled
end

#started_atObject (readonly)

Returns the value of attribute started_at.



45
46
47
# File 'lib/familia/thread_safety/monitor.rb', line 45

def started_at
  @started_at
end

Class Method Details

.instanceObject



27
28
29
# File 'lib/familia/thread_safety/monitor.rb', line 27

def instance
  @instance ||= new
end

.method_missing(method, *args, &block) ⇒ Object

Delegate all methods to singleton instance



32
33
34
35
36
37
38
# File 'lib/familia/thread_safety/monitor.rb', line 32

def method_missing(method, *args, &block)
  if instance.respond_to?(method)
    instance.send(method, *args, &block)
  else
    super
  end
end

.respond_to_missing?(method, include_private = false) ⇒ Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/familia/thread_safety/monitor.rb', line 40

def respond_to_missing?(method, include_private = false)
  instance.respond_to?(method) || super
end

Instance Method Details

#apm_transaction(name, &block) ⇒ Object

Hook for APM integration



320
321
322
323
324
325
# File 'lib/familia/thread_safety/monitor.rb', line 320

def apm_transaction(name, &block)
  return yield unless @enabled

  # This is where you'd integrate with NewRelic, DataDog, etc.
  time_critical_section(name, &block)
end

#calculate_health_scoreObject

Calculate a health score (0-100)



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/familia/thread_safety/monitor.rb', line 254

def calculate_health_score
  return 100 unless @started_at

  duration = Time.now - @started_at
  return 100 if duration < 60  # Need at least 1 minute of data

  contentions_per_hour = (@mutex_contentions.value / duration) * 3600
  races_per_hour = (@race_detections.value / duration) * 3600

  score = 100
  score -= [contentions_per_hour / 10.0, 30].min  # -3 points per 100 contentions/hour, max -30
  score -= [races_per_hour * 10, 50].min  # -10 points per race/hour, max -50

  [score, 0].max.round
end

#check_deadlockObject

Check for potential deadlocks



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/familia/thread_safety/monitor.rb', line 180

def check_deadlock
  return unless @enabled

  @deadlock_checks.increment

  # This is a simple check - in production you might want more sophisticated detection
  thread_count = Thread.list.count
  if thread_count > 100
    Familia.warn("[ThreadSafety] High thread count: #{thread_count}")
  end

  # Check for threads waiting on mutexes (simplified)
  waiting_threads = Thread.list.select { |t| t.status == "sleep" }
  if waiting_threads.size > thread_count * 0.8
    Familia.warn("[ThreadSafety] Potential deadlock: #{waiting_threads.size}/#{thread_count} threads sleeping")
  end
end

#export_metricsObject

Export metrics in a format suitable for APM tools



309
310
311
312
313
314
315
316
317
# File 'lib/familia/thread_safety/monitor.rb', line 309

def export_metrics
  {
    'familia.thread_safety.mutex_contentions' => @mutex_contentions.value,
    'familia.thread_safety.race_detections' => @race_detections.value,
    'familia.thread_safety.critical_sections' => @critical_sections.value,
    'familia.thread_safety.deadlock_checks' => @deadlock_checks.value,
    'familia.thread_safety.health_score' => calculate_health_score
  }
end

#generate_recommendations(hot_spots) ⇒ Object

Generate recommendations based on metrics



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/familia/thread_safety/monitor.rb', line 271

def generate_recommendations(hot_spots)
  recommendations = []

  if @race_detections.value > 0
    recommendations << {
      severity: 'critical',
      message: "#{@race_detections.value} potential race conditions detected - investigate immediately"
    }
  end

  if hot_spots.any? { |h| h[:contentions] > 100 }
    high_contention = hot_spots.select { |h| h[:contentions] > 100 }
    locations = high_contention.map { |h| h[:location] }.join(', ')
    recommendations << {
      severity: 'warning',
      message: "High contention detected at: #{locations}"
    }
  end

  if hot_spots.any? { |h| h[:avg_wait_μs] > 100_000 }  # > 100ms in microseconds
    slow_spots = hot_spots.select { |h| h[:avg_wait_μs] > 100_000 }
    recommendations << {
      severity: 'warning',
      message: "Long wait times at: #{slow_spots.map { |h| "#{h[:location]} (#{(h[:avg_wait_μs] / 1000.0).round(1)}ms)" }.join(', ')}"
    }
  end

  if @deadlock_checks.value > 0 && Thread.list.count > 50
    recommendations << {
      severity: 'info',
      message: "Consider connection pooling - high thread count detected"
    }
  end

  recommendations
end

#record_contention(location, wait_time = nil) ⇒ Object

Record a mutex contention event



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/familia/thread_safety/monitor.rb', line 108

def record_contention(location, wait_time = nil)
  return unless @enabled

  @mutex_contentions.increment
  @contention_points[location] = @contention_points.fetch(location, 0) + 1

  if wait_time
    record_wait_time(location, wait_time)
  end

  Familia.trace(:THREAD_CONTENTION, nil, "Contention at #{location} (wait: #{wait_time&.round(4)}s)")
end

#record_race_condition(location, details = nil) ⇒ Object

Record a potential race condition detection



130
131
132
133
134
135
136
137
# File 'lib/familia/thread_safety/monitor.rb', line 130

def record_race_condition(location, details = nil)
  return unless @enabled

  @race_detections.increment
  msg = "Potential race condition at #{location}"
  msg += ": #{details}" if details
  Familia.warn("[ThreadSafety] #{msg}")
end

#record_wait_time(location, wait_time) ⇒ Object

Record wait time for a location



122
123
124
125
126
127
# File 'lib/familia/thread_safety/monitor.rb', line 122

def record_wait_time(location, wait_time)
  # Note: @wait_times was removed to prevent memory leak from unbounded array growth
  # We only need the aggregated totals and counts for calculations
  @wait_time_totals[location] = @wait_time_totals.fetch(location, 0.0) + wait_time
  @wait_time_counts[location] = @wait_time_counts.fetch(location, 0) + 1
end

#reportObject

Generate a comprehensive report



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/familia/thread_safety/monitor.rb', line 199

def report
  return { enabled: false, message: "Monitoring not enabled" } unless @started_at

  duration = Time.now - @started_at

  # Calculate hot spots
  hot_spots = []
  @contention_points.each_pair do |location, count|
    hot_spots << [location, count]
  end
  hot_spots = hot_spots
    .sort_by { |_, count| -count }
    .first(10)
    .map { |location, count|
      avg_wait_μs = if @wait_time_counts[location] && @wait_time_counts[location] > 0
        (@wait_time_totals[location] / @wait_time_counts[location]).round(0)
      else
        0
      end
      {
        location: location,
        contentions: count,
        avg_wait_μs: avg_wait_μs
      }
    }

  # Calculate critical section performance
  section_performance = []
  @section_counts.each_pair do |name, count|
    avg_time_μs = (@section_timings[name] / count).round(0)
    section_performance << {
      section: name,
      calls: count,
      avg_time_μs: avg_time_μs,
      total_time_μs: @section_timings[name]
    }
  end
  section_performance.sort_by! { |s| -s[:total_time_μs] }

  {
    summary: {
      monitoring_duration_s: duration.round(2),
      mutex_contentions: @mutex_contentions.value,
      race_detections: @race_detections.value,
      critical_sections: @critical_sections.value,
      deadlock_checks: @deadlock_checks.value
    },
    hot_spots: hot_spots,
    section_performance: section_performance,
    health: calculate_health_score,
    recommendations: generate_recommendations(hot_spots)
  }
end

#reset_metricsObject

Reset all metrics



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/familia/thread_safety/monitor.rb', line 92

def reset_metrics
  @mutex_contentions.value = 0
  @race_detections.value = 0
  @critical_sections.value = 0
  @deadlock_checks.value = 0
  @contention_points.clear
  # @wait_times.clear - removed to prevent memory leak
  @wait_time_totals.clear
  @wait_time_counts.clear
  @thread_state.clear
  @concurrent_operations.clear
  @section_timings.clear
  @section_counts.clear
end

#start!Object

Start monitoring



74
75
76
77
78
79
80
# File 'lib/familia/thread_safety/monitor.rb', line 74

def start!
  @enabled = true
  @started_at = Time.now
  reset_metrics
  Familia.info("[ThreadSafety] Monitoring started")
  true
end

#stop!Object

Stop monitoring



83
84
85
86
87
88
89
# File 'lib/familia/thread_safety/monitor.rb', line 83

def stop!
  @enabled = false
  duration = @started_at ? Time.now - @started_at : 0
  Familia.info("[ThreadSafety] Monitoring stopped after #{duration.round(2)}s")
  @started_at = nil
  true
end

#time_critical_section(name) ⇒ Object

Time a critical section with contention tracking



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/familia/thread_safety/monitor.rb', line 140

def time_critical_section(name)
  return yield unless @enabled

  thread_id = Thread.current.object_id
  start_time = Familia.now_in_μs

  # Check for concurrent execution
  concurrent_count = @concurrent_operations[name] = @concurrent_operations.fetch(name, 0) + 1
  if concurrent_count > 1
    record_contention(name)
  end

  @critical_sections.increment

  begin
    result = yield
  ensure
    end_time = Familia.now_in_μs
    duration_μs = end_time - start_time

    # Record timing in microseconds
    @section_timings[name] = @section_timings.fetch(name, 0) + duration_μs
    @section_counts[name] = @section_counts.fetch(name, 0) + 1

    # Decrement concurrent count
    @concurrent_operations[name] = @concurrent_operations.fetch(name, 1) - 1

    if duration_μs > 100_000  # Log slow critical sections (> 100ms = 100,000μs)
      Familia.warn("[ThreadSafety] Slow critical section '#{name}': #{(duration_μs / 1000.0).round(2)}ms")
    end
  end

  result
end