Class: Familia::Migration::Pipeline Abstract

Inherits:
Model
  • Object
show all
Defined in:
lib/familia/migration/pipeline.rb

Overview

This class is abstract.

Subclass and implement #should_process? and #build_update_fields

Pipeline-based migration for batch Redis operations with improved performance

Inherits all Model functionality but processes records in batches using Redis pipelining instead of individual operations. This provides significant performance improvements for large datasets with simple updates.

When to Use Pipeline vs Model

Use Pipeline when:

  • Processing thousands+ records with simple field updates
  • All records get similar field modifications
  • Performance is more important than per-record error handling
  • Updates can be expressed as Hash field assignments

Use Model when:

  • Complex logic needed per record
  • Individual error handling is important
  • Records need different processing logic
  • Updates involve method calls beyond simple field assignment

Subclassing Requirements

Subclasses must implement:

Subclasses may override:

Usage Example

class CustomerObjidMigration < Familia::Migration::Pipeline def prepare @model_class = Customer @batch_size = 100 # Smaller batches for pipelines end

def should_process?(obj)
  return track_stat(:skipped_empty_custid) && false if obj.custid.empty?
  true
end

def build_update_fields(obj)
  {
    objid: obj.objid || SecureRandom.uuid_v7_from(obj.created),
    user_type: 'authenticated'
  }
end

end

Performance Notes

  • Use smaller batch sizes (50-200) compared to Model
  • Pipeline operations are atomic per batch, not per record
  • Error handling is less granular than Model

Instance Method Summary collapse

Constructor Details

This class inherits a constructor from Familia::Migration::Model

Instance Method Details

#build_update_fields(obj) ⇒ Hash (protected)

This method is abstract.

Subclasses must implement this method

Build fields hash for Redis HMSET operation

Required for subclasses - return a hash of field names to values that will be applied via Redis HMSET in the pipeline. Return an empty hash or nil to skip the default HMSET operation.

Parameters:

Returns:

  • (Hash)

    field_name => value pairs for Redis HMSET

Raises:

  • (NotImplementedError)

    if not implemented



180
181
182
# File 'lib/familia/migration/pipeline.rb', line 180

def build_update_fields(obj)
  raise NotImplementedError, "#{self.class} must implement #build_update_fields"
end

#execute_update(pipe, obj, fields, original_key = nil) ⇒ void (protected)

This method returns an undefined value.

Execute pipeline update operation

Override this method to customize pipeline operations beyond simple HMSET field updates. The default implementation handles HMSET with dry-run support.

Important: Use the provided pipe parameter, not the regular Redis connection, to ensure operations are pipelined.

NOTE: The track_stat(:records_updated) method should not be called here (or anywhere else in a pipeline migration actually) as it is called by the pipeline migration framework itself.

Parameters:

  • pipe (Redis::Pipeline)

    Redis pipeline instance

  • obj (Familia::Horreum)

    object being updated

  • fields (Hash)

    field updates from #build_update_fields

  • original_key (String) (defaults to: nil)

    original database key from SCAN



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/familia/migration/pipeline.rb', line 202

def execute_update(pipe, obj, fields, original_key = nil)
  klass_name = obj.class.name.split('::').last

  unless fields&.any?
    return debug("Would skip #{klass_name} b/c empty fields (#{original_key})")
  end

  # Use original_key for records that can't generate valid keys
  dbkey = original_key || obj.dbkey

  # USE THE PIPELINE AND NOT THE regular redis connection.
  pipe.hmset(dbkey, *fields.flatten)

  dry_run_only? do
    debug("Would update #{klass_name}: #{fields}")
  end
end

#process_batch(objects) ⇒ void

This method returns an undefined value.

Main batch processor - executes Redis operations in pipeline

Processes an array of objects using Redis pipelining for improved performance. Each object is checked via #should_process? and updated via #execute_update if processing is needed.

Parameters:

  • objects (Array<Array>)

    Array of tuples: [obj, original_dbkey] The original database key is preserved because records with missing/empty identifier fields cannot reconstitute their database key via obj.dbkey. Only the original key from SCAN guarantees we can operate on the record.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/familia/migration/pipeline.rb', line 80

def process_batch(objects)
  dbclient.pipelined do |pipe|
    objects.each do |obj, original_key|
      next unless should_process?(obj)

      fields = build_update_fields(obj)

      # Previously we skipped here when the migration returned no fields
      # to update. We're not always here to update though. Sometimes we
      # delete or update expirations or do other stuff. If we skip ahead
      # here, we never get to the execute_update method which migrations
      # can override to do whatever they want.
      #
      # Now, we simply return inside the default execute_update. The end
      # result is the same but it gives us the opportunity to perform
      # additional operations on the record.

      execute_update(pipe, obj, fields, original_key)

      track_stat(:records_updated)
    end
  end
end

#process_record(obj, key) ⇒ Object (protected)

Not used in Pipeline - batch processing instead



221
222
223
# File 'lib/familia/migration/pipeline.rb', line 221

def process_record(obj, key)
  # No-op: Pipeline uses batch processing
end

#should_process?(obj) ⇒ Boolean (protected)

This method is abstract.

Subclasses must implement this method

Determine if object should be processed in this batch

Required for subclasses - implement filtering logic to determine which records should be included in the pipeline update. Use Model#track_stat to count skipped records.

Parameters:

Returns:

  • (Boolean)

    true to process, false to skip

Raises:

  • (NotImplementedError)

    if not implemented



166
167
168
# File 'lib/familia/migration/pipeline.rb', line 166

def should_process?(obj)
  raise NotImplementedError, "#{self.class} must implement #should_process?"
end