269 lines
9.2 KiB
Ruby
269 lines
9.2 KiB
Ruby
module JamRuby
|
|
class QuickMix < ActiveRecord::Base
|
|
include S3ManagerMixin
|
|
|
|
MAX_MIX_TIME = 7200 # 2 hours
|
|
|
|
attr_accessible :ogg_url, :should_retry, as: :admin
|
|
attr_accessor :marking_complete, :is_skip_mount_uploader
|
|
attr_writer :current_user
|
|
|
|
|
|
|
|
|
|
belongs_to :user, :class_name => "JamRuby::User", :inverse_of => :quick_mixes
|
|
belongs_to :recording, :class_name => "JamRuby::Recording", :inverse_of => :quick_mixes, :foreign_key => 'recording_id'
|
|
validates :ogg_md5, :presence => true, :if => :upload_starting?
|
|
validates :ogg_length, length: {minimum: 1, maximum: 1024 * 1024 * 256 }, if: :upload_starting? # 256 megs max. is this reasonable? surely...
|
|
validates :user, presence: true
|
|
validate :validate_fully_uploaded
|
|
validate :validate_part_complete
|
|
validate :validate_too_many_upload_failures
|
|
|
|
before_destroy :delete_s3_files
|
|
|
|
skip_callback :save, :before, :store_picture!, if: :is_skip_mount_uploader
|
|
|
|
def too_many_upload_failures?
|
|
upload_failures >= APP_CONFIG.max_track_upload_failures
|
|
end
|
|
|
|
def upload_starting?
|
|
next_part_to_upload_was == 0 && next_part_to_upload == 1
|
|
end
|
|
|
|
def validate_too_many_upload_failures
|
|
if upload_failures >= APP_CONFIG.max_track_upload_failures
|
|
errors.add(:upload_failures, ValidationMessages::UPLOAD_FAILURES_EXCEEDED)
|
|
end
|
|
end
|
|
|
|
def validate_fully_uploaded
|
|
if marking_complete && fully_uploaded && fully_uploaded_was
|
|
errors.add(:fully_uploaded, ValidationMessages::ALREADY_UPLOADED)
|
|
end
|
|
end
|
|
|
|
def validate_part_complete
|
|
|
|
# if we see a transition from is_part_uploading from true to false, we validate
|
|
if is_part_uploading_was && !is_part_uploading
|
|
if next_part_to_upload_was + 1 != next_part_to_upload
|
|
errors.add(:next_part_to_upload, ValidationMessages::INVALID_PART_NUMBER_SPECIFIED)
|
|
end
|
|
|
|
if file_offset > ogg_length
|
|
errors.add(:file_offset, ValidationMessages::FILE_OFFSET_EXCEEDS_LENGTH)
|
|
end
|
|
elsif next_part_to_upload_was + 1 == next_part_to_upload
|
|
# this makes sure we are only catching 'upload_part_complete' transitions, and not upload_start
|
|
if next_part_to_upload_was != 0
|
|
# we see that the part number was ticked--but was is_part_upload set to true before this transition?
|
|
if !is_part_uploading_was && !is_part_uploading
|
|
errors.add(:next_part_to_upload, ValidationMessages::PART_NOT_STARTED)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def sanitize_active_admin
|
|
self.user_id = nil if self.user_id == ''
|
|
end
|
|
|
|
def upload_start(length, md5)
|
|
#self.upload_id set by the observer
|
|
self.next_part_to_upload = 1
|
|
self.ogg_length = length
|
|
self.ogg_md5 = md5
|
|
save
|
|
end
|
|
|
|
# if for some reason the server thinks the client can't carry on with the upload,
|
|
# this resets everything to the initial state
|
|
def reset_upload
|
|
self.upload_failures = self.upload_failures + 1
|
|
self.part_failures = 0
|
|
self.file_offset = 0
|
|
self.next_part_to_upload = 0
|
|
self.upload_id = nil
|
|
self.ogg_md5 = nil
|
|
self.ogg_length = 0
|
|
self.fully_uploaded = false
|
|
self.is_part_uploading = false
|
|
save :validate => false # skip validation because we need this to always work
|
|
end
|
|
|
|
def upload_next_part(length, md5)
|
|
self.marking_complete = true
|
|
if next_part_to_upload == 0
|
|
upload_start(length, md5)
|
|
end
|
|
self.is_part_uploading = true
|
|
save
|
|
end
|
|
|
|
def upload_sign(content_md5)
|
|
s3_manager.upload_sign(self[:ogg_url], content_md5, next_part_to_upload, upload_id)
|
|
end
|
|
|
|
def upload_part_complete(part, offset)
|
|
# validated by :validate_part_complete
|
|
self.marking_complete = true
|
|
self.is_part_uploading = false
|
|
self.next_part_to_upload = self.next_part_to_upload + 1
|
|
self.file_offset = offset.to_i
|
|
self.part_failures = 0
|
|
save
|
|
end
|
|
|
|
def upload_complete
|
|
# validate from happening twice by :validate_fully_uploaded
|
|
self.fully_uploaded = true
|
|
self.marking_complete = true
|
|
enqueue
|
|
save
|
|
end
|
|
|
|
def increment_part_failures(part_failure_before_error)
|
|
self.part_failures = part_failure_before_error + 1
|
|
QuickMix.update_all("part_failures = #{self.part_failures}", "id = '#{self.id}'")
|
|
end
|
|
|
|
def self.create(recording, user)
|
|
raise if recording.nil?
|
|
|
|
mix = QuickMix.new
|
|
mix.is_skip_mount_uploader = true
|
|
mix.recording = recording
|
|
mix.user = user
|
|
mix.save
|
|
mix[:ogg_url] = construct_filename(mix.created_at, recording.id, mix.id, type='ogg')
|
|
mix[:mp3_url] = construct_filename(mix.created_at, recording.id, mix.id, type='mp3')
|
|
mix.save
|
|
mix.is_skip_mount_uploader = false
|
|
|
|
mix
|
|
end
|
|
|
|
def enqueue
|
|
begin
|
|
Resque.enqueue(QuickMixer, self.id, self.sign_put(3600 * 24, 'mp3'))
|
|
rescue Exception => e
|
|
# implies redis is down. we don't update started_at by bailing out here
|
|
false
|
|
end
|
|
|
|
# avoid db validations
|
|
QuickMix.where(:id => self.id).update_all(:started_at => Time.now, :should_retry => false)
|
|
|
|
true
|
|
end
|
|
|
|
def mix_timeout?
|
|
Time.now - started_at > 60 * 30 # 30 minutes to mix is more than enough
|
|
end
|
|
|
|
def state
|
|
return 'mixed' if completed
|
|
return 'waiting-to-mix' if started_at.nil?
|
|
return 'error' if error_count > 0 || mix_timeout?
|
|
return 'mixing'
|
|
end
|
|
|
|
def error
|
|
return nil if state != 'error'
|
|
return {error_count: error_count, error_reason: error_reason, error_detail: error_detail} if error_count > 0
|
|
return {error_count: 1, error_reason: 'mix-timeout', error_detail: started_at} if mix_timeout?
|
|
return {error_count: 1, error_reason: 'unknown', error_detail: 'unknown'}
|
|
end
|
|
|
|
def errored(reason, detail)
|
|
self.started_at = nil
|
|
self.error_reason = reason
|
|
self.error_detail = detail
|
|
self.error_count = self.error_count + 1
|
|
if self.error_count <= 3
|
|
self.should_retry = true
|
|
end
|
|
save
|
|
end
|
|
|
|
def finish(mp3_length, mp3_md5)
|
|
self.completed_at = Time.now
|
|
self.mp3_length = mp3_length
|
|
self.mp3_md5 = mp3_md5
|
|
self.completed = true
|
|
save!
|
|
Recording.where(:id => self.recording.id).update_all(:has_stream_mix => true)
|
|
# only update first_quick_mix_id pointer if this is the 1st quick mix to complete for this recording
|
|
Recording.where(:id => self.recording.id).update_all(:first_quick_mix_id => self.id) if recording.first_quick_mix_id.nil?
|
|
end
|
|
|
|
def s3_url(type='ogg')
|
|
if type == 'ogg'
|
|
s3_manager.s3_url(self[:ogg_url])
|
|
else
|
|
s3_manager.s3_url(self[:mp3_url])
|
|
end
|
|
end
|
|
|
|
def is_completed
|
|
completed
|
|
end
|
|
|
|
# if the url starts with http, just return it because it's in some other store. Otherwise it's a relative path in s3 and needs be signed
|
|
def resolve_url(url_field, mime_type, expiration_time)
|
|
self[url_field].start_with?('http') ? self[url_field] : s3_manager.sign_url(self[url_field], {:expires => expiration_time, :response_content_type => mime_type, :secure => false})
|
|
end
|
|
|
|
def sign_url(expiration_time = 120, type='ogg')
|
|
type ||= 'ogg'
|
|
# expire link in 1 minute--the expectation is that a client is immediately following this link
|
|
if type == 'ogg'
|
|
resolve_url(:ogg_url, 'audio/ogg', expiration_time)
|
|
else
|
|
resolve_url(:mp3_url, 'audio/mpeg', expiration_time)
|
|
end
|
|
end
|
|
|
|
def sign_put(expiration_time = 3600 * 24, type='ogg')
|
|
type ||= 'ogg'
|
|
if type == 'ogg'
|
|
s3_manager.sign_url(self[:ogg_url], {:expires => expiration_time, :content_type => 'audio/ogg', :secure => false}, :put)
|
|
else
|
|
s3_manager.sign_url(self[:mp3_url], {:expires => expiration_time, :content_type => 'audio/mpeg', :secure => false}, :put)
|
|
end
|
|
end
|
|
|
|
def self.cleanup_excessive_storage
|
|
QuickMix
|
|
.joins(:recording)
|
|
.includes(:recording)
|
|
.where("cleaned = FALSE AND completed = TRUE AND NOW() - completed_at > '7 days'::INTERVAL AND (has_final_mix = TRUE OR (has_stream_mix = TRUE AND quick_mixes.id IN (SELECT qm.id FROM quick_mixes qm WHERE qm.recording_id = recordings.id AND (recordings.first_quick_mix_id IS NULL OR recordings.first_quick_mix_id != qm.id))))").limit(1000).each do |quick_mix|
|
|
|
|
quick_mix.delete_s3_files
|
|
|
|
QuickMix.where(:id => quick_mix.id).update_all(:cleaned => true)
|
|
|
|
if quick_mix.recording.first_quick_mix_id == quick_mix.id
|
|
Recording.where(:id => quick_mix.recording.id).update_all(:has_stream_mix => false, :first_quick_mix_id => nil)
|
|
end
|
|
end
|
|
end
|
|
|
|
def filename(type='ogg')
|
|
# construct a path for s3
|
|
QuickMix.construct_filename(self.created_at, self.recording_id, self.id, type)
|
|
end
|
|
|
|
def delete_s3_files
|
|
s3_manager.delete(filename(type='ogg')) if self[:ogg_url] && s3_manager.exists?(filename(type='ogg'))
|
|
s3_manager.delete(filename(type='mp3')) if self[:mp3_url] && s3_manager.exists?(filename(type='mp3'))
|
|
end
|
|
|
|
def self.construct_filename(created_at, recording_id, id, type='ogg')
|
|
raise "unknown ID" unless id
|
|
"recordings/#{created_at.strftime('%m-%d-%Y')}/#{recording_id}/stream-mix-#{id}.#{type}"
|
|
end
|
|
end
|
|
end |