Class: Familia::ThreadSafety::Monitor
- Inherits:
-
Object
- Object
- Familia::ThreadSafety::Monitor
- Defined in:
- lib/familia/thread_safety/monitor.rb
Overview
Thread safety monitoring for production observability
Tracks mutex contention, race conditions, and synchronization metrics to provide insights into thread safety behavior in production.
Instance Attribute Summary collapse
-
#enabled ⇒ Object
readonly
Returns the value of attribute enabled.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
Class Method Summary collapse
- .instance ⇒ Object
-
.method_missing(method, *args, &block) ⇒ Object
Delegate all methods to singleton instance.
- .respond_to_missing?(method, include_private = false) ⇒ Boolean
Instance Method Summary collapse
-
#apm_transaction(name, &block) ⇒ Object
Hook for APM integration.
-
#calculate_health_score ⇒ Object
Calculate a health score (0-100).
-
#check_deadlock ⇒ Object
Check for potential deadlocks.
-
#export_metrics ⇒ Object
Export metrics in a format suitable for APM tools.
-
#generate_recommendations(hot_spots) ⇒ Object
Generate recommendations based on metrics.
-
#initialize ⇒ Monitor
constructor
A new instance of Monitor.
-
#record_contention(location, wait_time = nil) ⇒ Object
Record a mutex contention event.
-
#record_race_condition(location, details = nil) ⇒ Object
Record a potential race condition detection.
-
#record_wait_time(location, wait_time) ⇒ Object
Record wait time for a location.
-
#report ⇒ Object
Generate a comprehensive report.
-
#reset_metrics ⇒ Object
Reset all metrics.
-
#start! ⇒ Object
Start monitoring.
-
#stop! ⇒ Object
Stop monitoring.
-
#time_critical_section(name) ⇒ Object
Time a critical section with contention tracking.
Constructor Details
#initialize ⇒ Monitor
Returns a new instance of Monitor.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/familia/thread_safety/monitor.rb', line 47 def initialize @enabled = false @started_at = nil @mutex_contentions = Concurrent::AtomicFixnum.new(0) @race_detections = Concurrent::AtomicFixnum.new(0) @critical_sections = Concurrent::AtomicFixnum.new(0) @deadlock_checks = Concurrent::AtomicFixnum.new(0) # Track contention points with counts @contention_points = Concurrent::Map.new # Track wait time aggregates for critical sections (removed @wait_times to prevent memory leak) @wait_time_totals = Concurrent::Map.new @wait_time_counts = Concurrent::Map.new # Track thread-local state for nested monitoring @thread_state = Concurrent::Map.new # Track concurrent operation counts @concurrent_operations = Concurrent::Map.new # Performance metrics @section_timings = Concurrent::Map.new @section_counts = Concurrent::Map.new end |
Instance Attribute Details
#enabled ⇒ Object (readonly)
Returns the value of attribute enabled.
45 46 47 |
# File 'lib/familia/thread_safety/monitor.rb', line 45 def enabled @enabled end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
45 46 47 |
# File 'lib/familia/thread_safety/monitor.rb', line 45 def started_at @started_at end |
Class Method Details
.instance ⇒ Object
27 28 29 |
# File 'lib/familia/thread_safety/monitor.rb', line 27 def instance @instance ||= new end |
.method_missing(method, *args, &block) ⇒ Object
Delegate all methods to singleton instance
32 33 34 35 36 37 38 |
# File 'lib/familia/thread_safety/monitor.rb', line 32 def method_missing(method, *args, &block) if instance.respond_to?(method) instance.send(method, *args, &block) else super end end |
.respond_to_missing?(method, include_private = false) ⇒ Boolean
40 41 42 |
# File 'lib/familia/thread_safety/monitor.rb', line 40 def respond_to_missing?(method, include_private = false) instance.respond_to?(method) || super end |
Instance Method Details
#apm_transaction(name, &block) ⇒ Object
Hook for APM integration
320 321 322 323 324 325 |
# File 'lib/familia/thread_safety/monitor.rb', line 320 def apm_transaction(name, &block) return yield unless @enabled # This is where you'd integrate with NewRelic, DataDog, etc. time_critical_section(name, &block) end |
#calculate_health_score ⇒ Object
Calculate a health score (0-100)
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/familia/thread_safety/monitor.rb', line 254 def calculate_health_score return 100 unless @started_at duration = Time.now - @started_at return 100 if duration < 60 # Need at least 1 minute of data contentions_per_hour = (@mutex_contentions.value / duration) * 3600 races_per_hour = (@race_detections.value / duration) * 3600 score = 100 score -= [contentions_per_hour / 10.0, 30].min # -3 points per 100 contentions/hour, max -30 score -= [races_per_hour * 10, 50].min # -10 points per race/hour, max -50 [score, 0].max.round end |
#check_deadlock ⇒ Object
Check for potential deadlocks
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/familia/thread_safety/monitor.rb', line 180 def check_deadlock return unless @enabled @deadlock_checks.increment # This is a simple check - in production you might want more sophisticated detection thread_count = Thread.list.count if thread_count > 100 Familia.warn("[ThreadSafety] High thread count: #{thread_count}") end # Check for threads waiting on mutexes (simplified) waiting_threads = Thread.list.select { |t| t.status == "sleep" } if waiting_threads.size > thread_count * 0.8 Familia.warn("[ThreadSafety] Potential deadlock: #{waiting_threads.size}/#{thread_count} threads sleeping") end end |
#export_metrics ⇒ Object
Export metrics in a format suitable for APM tools
309 310 311 312 313 314 315 316 317 |
# File 'lib/familia/thread_safety/monitor.rb', line 309 def export_metrics { 'familia.thread_safety.mutex_contentions' => @mutex_contentions.value, 'familia.thread_safety.race_detections' => @race_detections.value, 'familia.thread_safety.critical_sections' => @critical_sections.value, 'familia.thread_safety.deadlock_checks' => @deadlock_checks.value, 'familia.thread_safety.health_score' => calculate_health_score } end |
#generate_recommendations(hot_spots) ⇒ Object
Generate recommendations based on metrics
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/familia/thread_safety/monitor.rb', line 271 def generate_recommendations(hot_spots) recommendations = [] if @race_detections.value > 0 recommendations << { severity: 'critical', message: "#{@race_detections.value} potential race conditions detected - investigate immediately" } end if hot_spots.any? { |h| h[:contentions] > 100 } high_contention = hot_spots.select { |h| h[:contentions] > 100 } locations = high_contention.map { |h| h[:location] }.join(', ') recommendations << { severity: 'warning', message: "High contention detected at: #{locations}" } end if hot_spots.any? { |h| h[:avg_wait_μs] > 100_000 } # > 100ms in microseconds slow_spots = hot_spots.select { |h| h[:avg_wait_μs] > 100_000 } recommendations << { severity: 'warning', message: "Long wait times at: #{slow_spots.map { |h| "#{h[:location]} (#{(h[:avg_wait_μs] / 1000.0).round(1)}ms)" }.join(', ')}" } end if @deadlock_checks.value > 0 && Thread.list.count > 50 recommendations << { severity: 'info', message: "Consider connection pooling - high thread count detected" } end recommendations end |
#record_contention(location, wait_time = nil) ⇒ Object
Record a mutex contention event
108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/familia/thread_safety/monitor.rb', line 108 def record_contention(location, wait_time = nil) return unless @enabled @mutex_contentions.increment @contention_points[location] = @contention_points.fetch(location, 0) + 1 if wait_time record_wait_time(location, wait_time) end Familia.trace(:THREAD_CONTENTION, nil, "Contention at #{location} (wait: #{wait_time&.round(4)}s)") end |
#record_race_condition(location, details = nil) ⇒ Object
Record a potential race condition detection
130 131 132 133 134 135 136 137 |
# File 'lib/familia/thread_safety/monitor.rb', line 130 def record_race_condition(location, details = nil) return unless @enabled @race_detections.increment msg = "Potential race condition at #{location}" msg += ": #{details}" if details Familia.warn("[ThreadSafety] #{msg}") end |
#record_wait_time(location, wait_time) ⇒ Object
Record wait time for a location
122 123 124 125 126 127 |
# File 'lib/familia/thread_safety/monitor.rb', line 122 def record_wait_time(location, wait_time) # Note: @wait_times was removed to prevent memory leak from unbounded array growth # We only need the aggregated totals and counts for calculations @wait_time_totals[location] = @wait_time_totals.fetch(location, 0.0) + wait_time @wait_time_counts[location] = @wait_time_counts.fetch(location, 0) + 1 end |
#report ⇒ Object
Generate a comprehensive report
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/familia/thread_safety/monitor.rb', line 199 def report return { enabled: false, message: "Monitoring not enabled" } unless @started_at duration = Time.now - @started_at # Calculate hot spots hot_spots = [] @contention_points.each_pair do |location, count| hot_spots << [location, count] end hot_spots = hot_spots .sort_by { |_, count| -count } .first(10) .map { |location, count| avg_wait_μs = if @wait_time_counts[location] && @wait_time_counts[location] > 0 (@wait_time_totals[location] / @wait_time_counts[location]).round(0) else 0 end { location: location, contentions: count, avg_wait_μs: avg_wait_μs } } # Calculate critical section performance section_performance = [] @section_counts.each_pair do |name, count| avg_time_μs = (@section_timings[name] / count).round(0) section_performance << { section: name, calls: count, avg_time_μs: avg_time_μs, total_time_μs: @section_timings[name] } end section_performance.sort_by! { |s| -s[:total_time_μs] } { summary: { monitoring_duration_s: duration.round(2), mutex_contentions: @mutex_contentions.value, race_detections: @race_detections.value, critical_sections: @critical_sections.value, deadlock_checks: @deadlock_checks.value }, hot_spots: hot_spots, section_performance: section_performance, health: calculate_health_score, recommendations: generate_recommendations(hot_spots) } end |
#reset_metrics ⇒ Object
Reset all metrics
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/familia/thread_safety/monitor.rb', line 92 def reset_metrics @mutex_contentions.value = 0 @race_detections.value = 0 @critical_sections.value = 0 @deadlock_checks.value = 0 @contention_points.clear # @wait_times.clear - removed to prevent memory leak @wait_time_totals.clear @wait_time_counts.clear @thread_state.clear @concurrent_operations.clear @section_timings.clear @section_counts.clear end |
#start! ⇒ Object
Start monitoring
74 75 76 77 78 79 80 |
# File 'lib/familia/thread_safety/monitor.rb', line 74 def start! @enabled = true @started_at = Time.now reset_metrics Familia.info("[ThreadSafety] Monitoring started") true end |
#stop! ⇒ Object
Stop monitoring
83 84 85 86 87 88 89 |
# File 'lib/familia/thread_safety/monitor.rb', line 83 def stop! @enabled = false duration = @started_at ? Time.now - @started_at : 0 Familia.info("[ThreadSafety] Monitoring stopped after #{duration.round(2)}s") @started_at = nil true end |
#time_critical_section(name) ⇒ Object
Time a critical section with contention tracking
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/familia/thread_safety/monitor.rb', line 140 def time_critical_section(name) return yield unless @enabled thread_id = Thread.current.object_id start_time = Familia.now_in_μs # Check for concurrent execution concurrent_count = @concurrent_operations[name] = @concurrent_operations.fetch(name, 0) + 1 if concurrent_count > 1 record_contention(name) end @critical_sections.increment begin result = yield ensure end_time = Familia.now_in_μs duration_μs = end_time - start_time # Record timing in microseconds @section_timings[name] = @section_timings.fetch(name, 0) + duration_μs @section_counts[name] = @section_counts.fetch(name, 0) + 1 # Decrement concurrent count @concurrent_operations[name] = @concurrent_operations.fetch(name, 1) - 1 if duration_μs > 100_000 # Log slow critical sections (> 100ms = 100,000μs) Familia.warn("[ThreadSafety] Slow critical section '#{name}': #{(duration_μs / 1000.0).round(2)}ms") end end result end |