-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaudio.py
1792 lines (1572 loc) · 66.5 KB
/
audio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
The main `Echo Nest`_ `Remix API`_ module for manipulating audio files and
their associated `Echo Nest`_ `Analyze API`_ analyses.
AudioData, and getpieces by Robert Ochshorn on 2008-06-06.
Some refactoring and everything else by Joshua Lifton 2008-09-07.
Refactoring by Ben Lacker 2009-02-11.
Other contributions by Adam Lindsay.
Additional functions and cleanup by Peter Sobot on 2012-11-01.
:group Base Classes: AudioAnalysis, AudioRenderable, AudioData, AudioData32
:group Audio-plus-Analysis Classes: AudioFile, LocalAudioFile, LocalAnalysis
:group Building Blocks: AudioQuantum, AudioSegment, AudioQuantumList, ModifiedRenderable
:group Effects: AudioEffect, LevelDB, AmplitudeFactor, TimeTruncateFactor, TimeTruncateLength, Simultaneous
:group Exception Classes: FileTypeError, EchoNestRemixError
:group Audio helper functions: getpieces, mix, assemble, megamix
:group Utility functions: _dataParser, _attributeParser, _segmentsParser
.. _Analyze API: http://developer.echonest.com/
.. _Remix API: https://github.com/echonest/remix
.. _Echo Nest: http://the.echonest.com/
"""
__version__ = "$Revision: 0 $"
# $Source$
import hashlib
import numpy
import os
import sys
import errno
import cPickle
import shutil
import struct
import tempfile
import logging
import wave
import time
import traceback
import cStringIO
import xml.etree.ElementTree as etree
import xml.dom.minidom as minidom
import weakref
from pyechonest import track
from pyechonest.util import EchoNestAPIError
import pyechonest.util
import pyechonest.config as config
from ffmpeg import ffmpeg, ffmpeg_downconvert
MP3_BITRATE = 128
log = logging.getLogger(__name__)
class AudioAnalysis(object):
"""
This class uses (but does not wrap) `pyechonest.track` to allow
transparent caching of the audio analysis of an audio file.
For example, the following script will display the bars of a track
twice::
from echonest import *
a = audio.AudioAnalysis('YOUR_TRACK_ID_HERE')
a.bars
a.bars
The first time `a.bars` is called, a network request is made of the
`Echo Nest`_ `Analyze API`_. The second time time `a.bars` is called, the
cached value is returned immediately.
An `AudioAnalysis` object can be created using an existing ID, as in
the example above, or by specifying the audio file to upload in
order to create the ID, as in::
a = audio.AudioAnalysis('FULL_PATH_TO_AUDIO_FILE')
.. _Analyze API: http://developer.echonest.com/pages/overview?version=2
.. _Echo Nest: http://the.echonest.com/
"""
@classmethod
def __get_cache_path(cls, identifier):
return "cache/%s.pickle" % identifier
def __new__(cls, *args, **kwargs):
if len(args):
initializer = args[0]
if type(initializer) is str and len(initializer) == 32:
path = cls.__get_cache_path(initializer)
if os.path.exists(path):
return cPickle.load(open(path, 'r'))
return object.__new__(cls, *args, **kwargs)
def __init__(self, initializer, filetype = None, lastTry = False):
"""
Constructor. If the argument is a valid local path or a URL,
the track ID is generated by uploading the file to the `Echo Nest`_
`Analyze API`_\. Otherwise, the argument is assumed to be
the track ID.
:param path_or_identifier_or_file:
A string representing either a path to a local
file, or the ID of a file that has already
been uploaded for analysis, or an open file-like object.
.. _Analyze API: http://developer.echonest.com/docs/v4/track.html
.. _Echo Nest: http://the.echonest.com/
"""
if type(initializer) not in [str, unicode] and not hasattr(initializer, 'read'):
# Argument is invalid.
raise TypeError("Argument 'initializer' must be a string \
representing either a filename, track ID, or MD5, or \
instead, a file-like object.")
__save_to_cache = False
try:
if isinstance(initializer, basestring):
# see if path_or_identifier is a path or an ID
if os.path.isfile(initializer):
# it's a filename
self.pyechonest_track = track.track_from_filename(initializer)
self.pyechonest_track.get_analysis()
else:
if initializer.startswith('music://') or \
(initializer.startswith('TR') and
len(initializer) == 18):
# it's an id
self.pyechonest_track = track.track_from_id(initializer)
self.pyechonest_track.get_analysis()
elif len(initializer) == 32:
# it's an md5
self.pyechonest_track = track.track_from_md5(initializer)
self.pyechonest_track.get_analysis()
__save_to_cache = True
else:
assert(filetype is not None)
initializer.seek(0)
try:
self.pyechonest_track = track.track_from_file(initializer, filetype)
self.pyechonest_track.get_analysis()
except (IOError, pyechonest.util.EchoNestAPIError) as e:
if lastTry:
raise
if (isinstance(e, IOError)
and (e.errno in [errno.EPIPE, errno.ECONNRESET]))\
or (isinstance(e, pyechonest.util.EchoNestAPIError)
and any([("Error %s" % x) in str(e) for x in [-1, 5, 6]])):
logging.getLogger(__name__).warning("Upload to EN failed - transcoding and reattempting.")
self.__init__(ffmpeg_downconvert(initializer, filetype), 'mp3', lastTry=True)
return
elif (isinstance(e, pyechonest.util.EchoNestAPIError)
and any([("Error %s" % x) in str(e) for x in [3]])):
logging.getLogger(__name__).warning("EN API limit hit. Waiting 10 seconds.")
time.sleep(10)
self.__init__(initializer, filetype, lastTry=True)
return
else:
logging.getLogger(__name__).warning("Got unhandlable EN exception. Raising:\n%s",
traceback.format_exc())
raise
except Exception as e:
if lastTry or type(initializer) is str:
raise
if "the track is still being analyzed" in str(e)\
or "there was an error analyzing the track" in str(e):
logging.getLogger(__name__).warning("Could not analyze track - truncating last byte and trying again.")
try:
initializer.seek(-1, os.SEEK_END)
initializer.truncate()
initializer.seek(0)
except IOError:
initializer.seek(-1, os.SEEK_END)
new_len = initializer.tell()
initializer.seek(0)
initializer = cStringIO.StringIO(initializer.read(new_len))
self.__init__(initializer, filetype, lastTry=True)
return
else:
logging.getLogger(__name__).warning("Got a further unhandlable EN exception. Raising:\n%s",
traceback.format_exc())
raise
if self.pyechonest_track is None:
# This is an EN-side error that will *not* be solved by repeated calls
if type(initializer) is str:
raise EchoNestRemixError('Could not find track %s' % initializer)
else:
raise EchoNestRemixError('Could not find analysis for track!')
self.source = None
self._bars = None
self._beats = None
self._tatums = None
self._sections = None
self._segments = None
self.identifier = self.pyechonest_track.id
# Patching around the fact that sometimes pyechonest doesn't give back metadata
# As of 11/2012, metadata is not used by remix
try:
self.metadata = self.pyechonest_track.meta
except AttributeError:
self.metadata = None
print >> sys.stderr, "Warning: no metadata returned for track."
for attribute in ('time_signature', 'mode', 'tempo', 'key'):
d = {'value': getattr(self.pyechonest_track, attribute),
'confidence': getattr(self.pyechonest_track, attribute + '_confidence')}
setattr(self, attribute, d)
for attribute in ('end_of_fade_in', 'start_of_fade_out', 'duration', 'loudness'):
setattr(self, attribute, getattr(self.pyechonest_track, attribute))
if __save_to_cache:
path = self.__get_cache_path(initializer)
if not os.path.isfile(path) and os.path.isdir(os.path.dirname(path)):
cPickle.dump(self, open(path, 'w'), 2)
@property
def bars(self):
if self._bars is None:
self._bars = _dataParser('bar', self.pyechonest_track.bars)
self._bars.attach(self)
return self._bars
@property
def beats(self):
if self._beats is None:
self._beats = _dataParser('beat', self.pyechonest_track.beats)
self._beats.attach(self)
return self._beats
@property
def tatums(self):
if self._tatums is None:
self._tatums = _dataParser('tatum', self.pyechonest_track.tatums)
self._tatums.attach(self)
return self._tatums
@property
def sections(self):
if self._sections is None:
self._sections = _attributeParser('section', self.pyechonest_track.sections)
self._sections.attach(self)
return self._sections
@property
def segments(self):
if self._segments is None:
self._segments = _segmentsParser(self.pyechonest_track.segments)
self._segments.attach(self)
return self._segments
def __getstate__(self):
"""
Eliminates the circular reference for pickling.
"""
dictclone = self.__dict__.copy()
del dictclone['source']
return dictclone
def __setstate__(self, state):
"""
Recreates circular references after unpickling.
"""
self.__dict__.update(state)
if hasattr(AudioAnalysis, 'CACHED_VARIABLES'):
for cached_var in AudioAnalysis.CACHED_VARIABLES:
if type(object.__getattribute__(self, cached_var)) == AudioQuantumList:
object.__getattribute__(self, cached_var).attach(self)
class AudioRenderable(object):
"""
An object that gives an `AudioData` in response to a call to its `render`\()
method.
Intended to be an abstract class that helps enforce the `AudioRenderable`
protocol. Picked up a couple of convenience methods common to many descendants.
Every `AudioRenderable` must provide three things:
render()
A method returning the `AudioData` for the object. The rhythmic duration (point
at which any following audio is appended) is signified by the `endindex` accessor,
measured in samples.
source
An accessor pointing to the `AudioData` that contains the original sample data of
(a superset of) this audio object.
duration
An accessor returning the rhythmic duration (in seconds) of the audio object.
"""
def resolve_source(self, alt):
"""
Given an alternative, fallback `alt` source, return either `self`'s
source or the alternative. Throw an informative error if no source
is found.
Utility code that ended up being replicated in several places, so
it ended up here. Not necessary for use in the RenderableAudioObject
protocol.
"""
if hasattr(self, 'source'):
source = self.source
else:
if isinstance(alt, AudioData):
source = alt
else:
print >> sys.stderr, self.__repr__()
raise EchoNestRemixError("%s has no implicit or explicit source \
during rendering." %
(self.__class__.__name__, ))
return source
@staticmethod
def init_audio_data(source, num_samples):
"""
Convenience function for rendering: return a pre-allocated, zeroed
`AudioData`.
"""
if source.numChannels > 1:
newchans = source.numChannels
newshape = (num_samples, newchans)
else:
newchans = 1
newshape = (num_samples,)
return AudioData32(shape=newshape, sampleRate=source.sampleRate,
numChannels=newchans, defer=False)
def sources(self):
return set([self.source])
def encode(self, filename):
"""
Shortcut function that takes care of the need to obtain an `AudioData`
object first, through `render`.
"""
self.render().encode(filename)
class AudioData(AudioRenderable):
"""
Handles audio data transparently. A smart audio container
with accessors that include:
sampleRate
samples per second
numChannels
number of channels
data
a `numpy.array`_
.. _numpy.array: http://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html
"""
def __init__(self, filename=None, ndarray = None, shape=None, sampleRate=None, numChannels=None, defer=False, verbose=True):
"""
Given an input `ndarray`, import the sample values and shape
(if none is specified) of the input `numpy.array`.
Given a `filename` (and an input ndarray), use ffmpeg to convert
the file to wave, then load the file into the data,
auto-detecting the sample rate, and number of channels.
:param filename: a path to an audio file for loading its sample
data into the AudioData.data
:param ndarray: a `numpy.array`_ instance with sample data
:param shape: a tuple of array dimensions
:param sampleRate: sample rate, in Hz
:param numChannels: number of channels
.. _numpy.array: http://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html
"""
self.verbose = verbose
self.defer = defer
self.filename = filename
self.sampleRate = sampleRate
self.numChannels = numChannels
self.convertedfile = None
self.endindex = 0
if shape is None and isinstance(ndarray, numpy.ndarray) and not self.defer:
self.data = numpy.zeros(ndarray.shape, dtype=numpy.int16)
elif shape is not None and not self.defer:
self.data = numpy.zeros(shape, dtype=numpy.int16)
elif not self.defer and self.filename:
self.data = None
self.load()
else:
self.data = None
if ndarray is not None and self.data is not None:
self.endindex = len(ndarray)
self.data[0:self.endindex] = ndarray
def load(self):
if isinstance(self.data, numpy.ndarray):
return
temp_file_handle = None
if self.filename.lower().endswith(".wav") and (self.sampleRate, self.numChannels) == (44100, 2):
file_to_read = self.filename
elif self.convertedfile:
file_to_read = self.convertedfile
else:
temp_file_handle, self.convertedfile = tempfile.mkstemp(".wav")
self.sampleRate, self.numChannels = ffmpeg(self.filename, self.convertedfile, overwrite=True,
numChannels=self.numChannels, sampleRate=self.sampleRate, verbose=self.verbose)
file_to_read = self.convertedfile
w = wave.open(file_to_read, 'r')
numFrames = w.getnframes()
raw = w.readframes(numFrames)
sampleSize = numFrames * self.numChannels
data = numpy.frombuffer(raw, dtype="<h", count=sampleSize)
ndarray = numpy.array(data, dtype=numpy.int16)
if self.numChannels > 1:
ndarray.resize((numFrames, self.numChannels))
self.data = numpy.zeros(ndarray.shape, dtype=numpy.int16)
self.endindex = 0
if ndarray is not None:
self.endindex = len(ndarray)
self.data = ndarray
if temp_file_handle is not None:
os.close(temp_file_handle)
w.close()
def __getitem__(self, index):
"""
Fetches a frame or slice. Returns an individual frame (if the index
is a time offset float or an integer sample number) or a slice if
the index is an `AudioQuantum` (or quacks like one).
"""
if not isinstance(self.data, numpy.ndarray) and self.defer:
self.load()
if isinstance(index, float):
index = int(index * self.sampleRate)
elif hasattr(index, "start") and hasattr(index, "duration"):
index = slice(float(index.start), index.start + index.duration)
if isinstance(index, slice):
if (hasattr(index.start, "start") and
hasattr(index.stop, "duration") and
hasattr(index.stop, "start")):
index = slice(index.start.start, index.stop.start + index.stop.duration)
if isinstance(index, slice):
return self.getslice(index)
else:
return self.getsample(index)
def getslice(self, index):
"Help `__getitem__` return a new AudioData for a given slice"
if not isinstance(self.data, numpy.ndarray) and self.defer:
self.load()
if isinstance(index.start, float):
index = slice(int(index.start * self.sampleRate),
int(index.stop * self.sampleRate), index.step)
return AudioData(None, self.data[index], sampleRate=self.sampleRate,
numChannels=self.numChannels, defer=False)
def getsample(self, index):
"""
Help `__getitem__` return a frame (all channels for a given
sample index)
"""
if not isinstance(self.data, numpy.ndarray) and self.defer:
self.load()
if isinstance(index, int):
return self.data[index]
else:
#let the numpy array interface be clever
return AudioData(None, self.data[index], defer=False)
def pad_with_zeros(self, num_samples):
if num_samples > 0:
if self.numChannels == 1:
extra_shape = (num_samples,)
else:
extra_shape = (num_samples, self.numChannels)
self.data = numpy.append(self.data,
numpy.zeros(extra_shape, dtype=numpy.int16), axis=0)
def append(self, another_audio_data):
"Appends the input to the end of this `AudioData`."
extra = len(another_audio_data.data) - (len(self.data) - self.endindex)
self.pad_with_zeros(extra)
self.data[self.endindex : self.endindex + len(another_audio_data)] += another_audio_data.data
self.endindex += another_audio_data.endindex
def sum(self, another_audio_data):
extra = len(another_audio_data.data) - len(self.data)
self.pad_with_zeros(extra)
compare_limit = min(len(another_audio_data.data), len(self.data)) - 1
self.data[: compare_limit] += another_audio_data.data[: compare_limit]
def add_at(self, time, another_audio_data):
"""
Adds the input `another_audio_data` to this `AudioData`
at the `time` specified in seconds. If `another_audio_data` has fewer channels than
this `AudioData`, the `another_audio_data` will be resampled to match.
In this case, this method will modify `another_audio_data`.
"""
offset = int(time * self.sampleRate)
extra = offset + len(another_audio_data.data) - len(self.data)
self.pad_with_zeros(extra)
if another_audio_data.numChannels < self.numChannels:
# Resample another_audio_data
another_audio_data.data = numpy.repeat(another_audio_data.data, self.numChannels).reshape(len(another_audio_data), self.numChannels)
another_audio_data.numChannels = self.numChannels
self.data[offset : offset + len(another_audio_data.data)] += another_audio_data.data
def __len__(self):
if self.data is not None:
return len(self.data)
else:
return 0
def __add__(self, other):
"""Supports stuff like this: sound3 = sound1 + sound2"""
return assemble([self, other], numChannels=self.numChannels,
sampleRate=self.sampleRate)
def encode(self, filename=None, mp3=None):
"""
Outputs an MP3 or WAVE file to `filename`.
Format is determined by `mp3` parameter.
"""
if not mp3 and filename.lower().endswith('.wav'):
mp3 = False
else:
mp3 = True
if mp3:
foo, tempfilename = tempfile.mkstemp(".wav")
os.close(foo)
else:
tempfilename = filename
fid = open(tempfilename, 'wb')
# Based on Scipy svn
# http://projects.scipy.org/pipermail/scipy-svn/2007-August/001189.html
fid.write('RIFF')
fid.write(struct.pack('<i', 0)) # write a 0 for length now, we'll go back and add it later
fid.write('WAVE')
# fmt chunk
fid.write('fmt ')
if self.data.ndim == 1:
noc = 1
else:
noc = self.data.shape[1]
bits = self.data.dtype.itemsize * 8
sbytes = self.sampleRate * (bits / 8) * noc
ba = noc * (bits / 8)
fid.write(struct.pack('<ihHiiHH', 16, 1, noc, self.sampleRate, sbytes, ba, bits))
# data chunk
fid.write('data')
fid.write(struct.pack('<i', self.data.nbytes))
self.data.tofile(fid)
# Determine file size and place it in correct
# position at start of the file.
size = fid.tell()
fid.seek(4)
fid.write(struct.pack('<i', size - 8))
fid.close()
if not mp3:
return tempfilename
# now convert it to mp3
if not filename.lower().endswith('.mp3'):
filename = filename + '.mp3'
try:
bitRate = MP3_BITRATE
except NameError:
bitRate = 128
ffmpeg(tempfilename, filename, bitRate=bitRate, verbose=self.verbose)
if tempfilename != filename:
if self.verbose:
print >> sys.stderr, "Deleting: %s" % tempfilename
os.remove(tempfilename)
return filename
def unload(self):
self.data = None
if self.convertedfile:
if self.verbose:
print >> sys.stderr, "Deleting: %s" % self.convertedfile
os.remove(self.convertedfile)
self.convertedfile = None
def render(self, start=0.0, to_audio=None, with_source=None):
if not to_audio:
return self
if with_source != self:
return
to_audio.add_at(start, self)
return
@property
def duration(self):
return float(self.endindex) / self.sampleRate
@property
def source(self):
return self
class AudioData32(AudioData):
"""A 32-bit variant of AudioData, intended for data collection on
audio rendering with headroom."""
def __init__(self, filename=None, ndarray = None, shape=None, sampleRate=None, numChannels=None, defer=False, verbose=True):
"""
Special form of AudioData to allow for headroom when collecting samples.
"""
self.verbose = verbose
self.defer = defer
self.filename = filename
self.sampleRate = sampleRate
self.numChannels = numChannels
self.convertedfile = None
if shape is None and isinstance(ndarray, numpy.ndarray) and not self.defer:
self.data = numpy.zeros(ndarray.shape, dtype=numpy.int32)
elif shape is not None and not self.defer:
self.data = numpy.zeros(shape, dtype=numpy.int32)
elif not self.defer and self.filename:
self.load()
else:
self.data = None
self.endindex = 0
if ndarray is not None and self.data is not None:
self.endindex = len(ndarray)
self.data[0:self.endindex] = ndarray
def load(self):
if isinstance(self.data, numpy.ndarray):
return
temp_file_handle = None
if self.filename.lower().endswith(".wav") and (self.sampleRate, self.numChannels) == (44100, 2):
file_to_read = self.filename
elif self.convertedfile:
file_to_read = self.convertedfile
else:
temp_file_handle, self.convertedfile = tempfile.mkstemp(".wav")
self.sampleRate, self.numChannels = ffmpeg(self.filename, self.convertedfile, overwrite=True,
numChannels=self.numChannels, sampleRate=self.sampleRate, verbose=self.verbose)
file_to_read = self.convertedfile
w = wave.open(file_to_read, 'r')
numFrames = w.getnframes()
raw = w.readframes(numFrames)
sampleSize = numFrames * self.numChannels
data = numpy.frombuffer(raw, dtype="<h", count=sampleSize)
ndarray = numpy.array(data, dtype=numpy.int16)
if self.numChannels > 1:
ndarray.resize((numFrames, self.numChannels))
self.data = numpy.zeros(ndarray.shape, dtype=numpy.int32)
self.endindex = 0
if ndarray is not None:
self.endindex = len(ndarray)
self.data[0:self.endindex] = ndarray
if temp_file_handle is not None:
os.close(temp_file_handle)
w.close()
def encode(self, filename=None, mp3=None):
"""
Outputs an MP3 or WAVE file to `filename`.
Format is determined by `mp3` parameter.
"""
normalized = self.normalized()
temp_file_handle = None
if not mp3 and filename.lower().endswith('.wav'):
mp3 = False
else:
mp3 = True
if mp3:
temp_file_handle, tempfilename = tempfile.mkstemp(".wav")
else:
tempfilename = filename
fid = open(tempfilename, 'wb')
# Based on Scipy svn
# http://projects.scipy.org/pipermail/scipy-svn/2007-August/001189.html
fid.write('RIFF')
fid.write(struct.pack('<i', 0)) # write a 0 for length now, we'll go back and add it later
fid.write('WAVE')
# fmt chunk
fid.write('fmt ')
if normalized.ndim == 1:
noc = 1
else:
noc = normalized.shape[1]
bits = normalized.dtype.itemsize * 8
sbytes = self.sampleRate * (bits / 8) * noc
ba = noc * (bits / 8)
fid.write(struct.pack('<ihHiiHH', 16, 1, noc, self.sampleRate, sbytes, ba, bits))
# data chunk
fid.write('data')
fid.write(struct.pack('<i', normalized.nbytes))
normalized.tofile(fid)
# Determine file size and place it in correct
# position at start of the file.
size = fid.tell()
fid.seek(4)
fid.write(struct.pack('<i', size - 8))
fid.close()
if not mp3:
return tempfilename
# now convert it to mp3
if not filename.lower().endswith('.mp3'):
filename = filename + '.mp3'
try:
bitRate = MP3_BITRATE
except NameError:
bitRate = 128
ffmpeg(tempfilename, filename, bitRate=bitRate, verbose=self.verbose)
if tempfilename != filename:
if self.verbose:
print >> sys.stderr, "Deleting: %s" % tempfilename
os.remove(tempfilename)
if temp_file_handle is not None:
os.close(temp_file_handle)
return filename
def normalized(self):
"""Return to 16-bit for encoding."""
factor = 32767.0 / numpy.max(numpy.absolute(self.data.flatten()))
# If the max was 32768, don't bother scaling:
if factor < 1.000031:
return (self.data * factor).astype(numpy.int16)
else:
return self.data.astype(numpy.int16)
def pad_with_zeros(self, num_samples):
if num_samples > 0:
if self.numChannels == 1:
extra_shape = (num_samples,)
else:
extra_shape = (num_samples, self.numChannels)
self.data = numpy.append(self.data,
numpy.zeros(extra_shape, dtype=numpy.int32), axis=0)
def getpieces(audioData, segs):
"""
Collects audio samples for output.
Returns a new `AudioData` where the new sample data is assembled
from the input audioData according to the time offsets in each
of the elements of the input segs (commonly an `AudioQuantumList`).
:param audioData: an `AudioData` object
:param segs: an iterable containing objects that may be accessed
as slices or indices for an `AudioData`
"""
# Ensure that we have data
if audioData.data == None or audioData.defer:
audioData.data = None
audioData.load()
dur = 0
for s in segs:
dur += int(s.duration * audioData.sampleRate)
# if I wanted to add some padding to the length, I'd do it here
#determine shape of new array
if len(audioData.data.shape) > 1:
newshape = (dur, audioData.data.shape[1])
newchans = audioData.data.shape[1]
else:
newshape = (dur,)
newchans = 1
#make accumulator segment
newAD = AudioData(shape=newshape, sampleRate=audioData.sampleRate,
numChannels=newchans, defer=False, verbose=audioData.verbose)
#concatenate segs to the new segment
for s in segs:
newAD.append(audioData[s])
# audioData.unload()
return newAD
def assemble(audioDataList, numChannels=1, sampleRate=44100, verbose=True):
"""
Collects audio samples for output.
Returns a new `AudioData` object assembled
by concatenating all the elements of audioDataList.
:param audioDataList: a list of `AudioData` objects
"""
return AudioData(ndarray=numpy.concatenate([a.data for a in audioDataList]),
numChannels=numChannels,
sampleRate=sampleRate, defer=False, verbose=verbose)
def mix(dataA, dataB, mix=0.5):
"""
Mixes two `AudioData` objects. Assumes they have the same sample rate
and number of channels.
Mix takes a float 0-1 and determines the relative mix of two audios.
i.e., mix=0.9 yields greater presence of dataA in the final mix.
"""
if dataA.endindex > dataB.endindex:
newdata = AudioData(ndarray=dataA.data, sampleRate=dataA.sampleRate, numChannels=dataA.numChannels, defer=False)
newdata.data *= float(mix)
newdata.data[:dataB.endindex] += dataB.data[:] * (1 - float(mix))
else:
newdata = AudioData(ndarray=dataB.data, sampleRate=dataB.sampleRate, numChannels=dataB.numChannels, defer=False)
newdata.data *= 1 - float(mix)
newdata.data[:dataA.endindex] += dataA.data[:] * float(mix)
return newdata
def normalize(audio):
"""
For compatibility with some legacy Wub Machine calls.
"""
return audio.normalized()
def __genFade(fadeLength, dimensions=1):
"""
Internal helper for fadeEdges()
"""
fadeOut = numpy.linspace(1.0, 0.0, fadeLength) ** 2
if dimensions == 2:
return fadeOut[:, numpy.newaxis]
return fadeOut
def fadeEdges(input_, fadeLength=50):
"""
Fade in/out the ends of an audioData to prevent clicks/pops at edges.
Optional fadeLength argument is the number of samples to fade in/out.
"""
if isinstance(input_, AudioData):
ad = input_.data
elif isinstance(input_, numpy.ndarray):
ad = input_
else:
raise Exception("Cannot fade edges of unknown datatype.")
fadeOut = __genFade(min(fadeLength, len(ad)), ad.shape[1])
ad[0:fadeLength] *= fadeOut[::-1]
ad[-1 * fadeLength:] *= fadeOut
return input_
def truncatemix(dataA, dataB, mix=0.5):
"""
Mixes two `AudioData` objects. Assumes they have the same sample rate
and number of channels.
Mix takes a float 0-1 and determines the relative mix of two audios.
i.e., mix=0.9 yields greater presence of dataA in the final mix.
If dataB is longer than dataA, dataB is truncated to dataA's length.
Note that if dataA is longer than dataB, dataA will not be truncated.
"""
newdata = AudioData(ndarray=dataA.data, sampleRate=dataA.sampleRate,
numChannels=dataA.numChannels, verbose=False)
newdata.data *= float(mix)
if dataB.endindex > dataA.endindex:
newdata.data[:] += dataB.data[:dataA.endindex] * (1 - float(mix))
else:
newdata.data[:dataB.endindex] += dataB.data[:] * (1 - float(mix))
return newdata
def megamix(dataList):
"""
Mix together any number of `AudioData` objects. Keep the shape of
the first one in the list. Assume they all have the same sample rate
and number of channels.
"""
if not isinstance(dataList, list):
raise TypeError('input must be a list of AudioData objects')
newdata = AudioData(shape=dataList[0].data.shape, sampleRate=dataList[0].sampleRate,
numChannels=dataList[0].numChannels, defer=False)
for adata in dataList:
if not isinstance(adata, AudioData):
raise TypeError('input must be a list of AudioData objects')
if len(adata) > len(newdata):
newseg = AudioData(ndarray=adata[:newdata.endindex].data,
numChannels=newdata.numChannels,
sampleRate=newdata.sampleRate, defer=False)
newseg.endindex = newdata.endindex
else:
newseg = AudioData(ndarray=adata.data,
numChannels=newdata.numChannels,
sampleRate=newdata.sampleRate, defer=False)
newseg.endindex = adata.endindex
newdata.data[:newseg.endindex] += (newseg.data / float(len(dataList))).astype(newdata.data.dtype)
newdata.endindex = len(newdata)
return newdata
class LocalAudioFile(AudioData):
"""
The basic do-everything class for remixing. Acts as an `AudioData`
object, but with an added `analysis` selector which is an
`AudioAnalysis` object. It conditionally uploads the file
it was initialized with. If the file is already known to the
Analyze API, then it does not bother uploading the file.
"""
def __new__(cls, filename, verbose=True, defer=False, sampleRate=None, numChannels=None):
# There must be a better way to avoid collisions between analysis files and .wav files
if filename is not None and '.analysis.en' in filename:
print >> sys.stderr, "Reading analysis from local file " + filename
f = open(filename, 'rb')
audiofile = cPickle.load(f)
f.close()
return audiofile
else:
# This just creates the object and goes straight on to initializing it
return AudioData.__new__(cls, filename=filename, verbose=verbose, defer=defer, sampleRate=sampleRate)
def __init__(self, filename, verbose=True, defer=False, sampleRate=None, numChannels=None):
"""
:param filename: path to a local MP3 file
"""
# We have to skip the initialization here as the local file is already a complete object
if '.analysis.en' in filename:
self.is_local = True
else:
AudioData.__init__(self, filename=filename, verbose=verbose, defer=defer,
sampleRate=sampleRate, numChannels=numChannels)
track_md5 = hashlib.md5(file(self.filename, 'rb').read()).hexdigest()
if verbose:
print >> sys.stderr, "Computed MD5 of file is " + track_md5
try:
if verbose:
print >> sys.stderr, "Probing for existing analysis"
tempanalysis = AudioAnalysis(track_md5)
except Exception:
if verbose:
print >> sys.stderr, "Analysis not found. Uploading..."
tempanalysis = AudioAnalysis(filename)
self.analysis = tempanalysis
self.analysis.source = self
self.is_local = False
# Save out as a pickled file.
def save(self):
# If we loaded from a local file, there's no need to save
if self.is_local is True:
print >> sys.stderr, "Analysis was loaded from local file, not saving"
else:
input_path = os.path.split(self.filename)[0]
input_file = os.path.split(self.filename)[1]
path_to_wave = self.convertedfile
wav_filename = input_file + '.wav'
new_path = os.path.abspath(input_path) + os.path.sep
wav_path = new_path + wav_filename
try:
shutil.copyfile(path_to_wave, wav_path)
except shutil.Error:
print >> sys.stderr, "Error when moving .wav file: the same file may already exist in this folder"
return
self.convertedfile = wav_path
analysis_filename = input_file + '.analysis.en'
analysis_path = new_path + analysis_filename
print >> sys.stderr, "Saving analysis to local file " + analysis_path
f = open(analysis_path, 'wb')
cPickle.dump(self, f)
f.close()
def toxml(self, context=None):
raise NotImplementedError
@property
def duration(self):
"""
Since we consider `AudioFile` to be an evolved version of
`AudioData`, we return the measured duration from the analysis.
"""
return self.analysis.duration
def __setstate__(self, state):
"""
Recreates circular reference after unpickling.
"""
self.__dict__.update(state)
self.analysis.source = weakref.proxy(self)
class LocalAnalysis(object):
"""
Like `LocalAudioFile`, it conditionally uploads the file with which
it was initialized. Unlike `LocalAudioFile`, it is not a subclass of
`AudioData`, so contains no sample data.
"""
def __init__(self, filename, verbose=True):
"""
:param filename: path to a local MP3 file
"""