Class: Familia::Migration::Pipeline Abstract
- Defined in:
- lib/familia/migration/pipeline.rb
Overview
Subclass and implement #should_process? and #build_update_fields
Pipeline-based migration for batch Redis operations with improved performance
Inherits all Model functionality but processes records in batches using Redis pipelining instead of individual operations. This provides significant performance improvements for large datasets with simple updates.
When to Use Pipeline vs Model
Use Pipeline when:
- Processing thousands+ records with simple field updates
- All records get similar field modifications
- Performance is more important than per-record error handling
- Updates can be expressed as Hash field assignments
Use Model when:
- Complex logic needed per record
- Individual error handling is important
- Records need different processing logic
- Updates involve method calls beyond simple field assignment
Subclassing Requirements
Subclasses must implement:
- Model#prepare - Set @model_class and @batch_size (inherited)
- #should_process? - Return true/false for each record
- #build_update_fields - Return Hash of field updates
Subclasses may override:
- #execute_update - Customize the pipeline update operation
Usage Example
class CustomerObjidMigration < Familia::Migration::Pipeline def prepare @model_class = Customer @batch_size = 100 # Smaller batches for pipelines end
def should_process?(obj)
return track_stat(:skipped_empty_custid) && false if obj.custid.empty?
true
end
def build_update_fields(obj)
{
objid: obj.objid || SecureRandom.uuid_v7_from(obj.created),
user_type: 'authenticated'
}
end
end
Performance Notes
- Use smaller batch sizes (50-200) compared to Model
- Pipeline operations are atomic per batch, not per record
- Error handling is less granular than Model
Instance Method Summary collapse
-
#build_update_fields(obj) ⇒ Hash
protected
abstract
Build fields hash for Redis HMSET operation.
-
#execute_update(pipe, obj, fields, original_key = nil) ⇒ void
protected
Execute pipeline update operation.
-
#process_batch(objects) ⇒ void
Main batch processor - executes Redis operations in pipeline.
-
#process_record(obj, key) ⇒ Object
protected
Not used in Pipeline - batch processing instead.
-
#should_process?(obj) ⇒ Boolean
protected
abstract
Determine if object should be processed in this batch.
Constructor Details
This class inherits a constructor from Familia::Migration::Model
Instance Method Details
#build_update_fields(obj) ⇒ Hash (protected)
Subclasses must implement this method
Build fields hash for Redis HMSET operation
Required for subclasses - return a hash of field names to values that will be applied via Redis HMSET in the pipeline. Return an empty hash or nil to skip the default HMSET operation.
180 181 182 |
# File 'lib/familia/migration/pipeline.rb', line 180 def build_update_fields(obj) raise NotImplementedError, "#{self.class} must implement #build_update_fields" end |
#execute_update(pipe, obj, fields, original_key = nil) ⇒ void (protected)
This method returns an undefined value.
Execute pipeline update operation
Override this method to customize pipeline operations beyond simple HMSET field updates. The default implementation handles HMSET with dry-run support.
Important: Use the provided pipe parameter, not the regular
Redis connection, to ensure operations are pipelined.
NOTE: The track_stat(:records_updated) method should not be called here
(or anywhere else in a pipeline migration actually) as it is called by the
pipeline migration framework itself.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/familia/migration/pipeline.rb', line 202 def execute_update(pipe, obj, fields, original_key = nil) klass_name = obj.class.name.split('::').last unless fields&.any? return debug("Would skip #{klass_name} b/c empty fields (#{original_key})") end # Use original_key for records that can't generate valid keys dbkey = original_key || obj.dbkey # USE THE PIPELINE AND NOT THE regular redis connection. pipe.hmset(dbkey, *fields.flatten) dry_run_only? do debug("Would update #{klass_name}: #{fields}") end end |
#process_batch(objects) ⇒ void
This method returns an undefined value.
Main batch processor - executes Redis operations in pipeline
Processes an array of objects using Redis pipelining for improved performance. Each object is checked via #should_process? and updated via #execute_update if processing is needed.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/familia/migration/pipeline.rb', line 80 def process_batch(objects) dbclient.pipelined do |pipe| objects.each do |obj, original_key| next unless should_process?(obj) fields = build_update_fields(obj) # Previously we skipped here when the migration returned no fields # to update. We're not always here to update though. Sometimes we # delete or update expirations or do other stuff. If we skip ahead # here, we never get to the execute_update method which migrations # can override to do whatever they want. # # Now, we simply return inside the default execute_update. The end # result is the same but it gives us the opportunity to perform # additional operations on the record. execute_update(pipe, obj, fields, original_key) track_stat(:records_updated) end end end |
#process_record(obj, key) ⇒ Object (protected)
Not used in Pipeline - batch processing instead
221 222 223 |
# File 'lib/familia/migration/pipeline.rb', line 221 def process_record(obj, key) # No-op: Pipeline uses batch processing end |
#should_process?(obj) ⇒ Boolean (protected)
Subclasses must implement this method
Determine if object should be processed in this batch
Required for subclasses - implement filtering logic to determine which records should be included in the pipeline update. Use Model#track_stat to count skipped records.
166 167 168 |
# File 'lib/familia/migration/pipeline.rb', line 166 def should_process?(obj) raise NotImplementedError, "#{self.class} must implement #should_process?" end |