ホームページ >バックエンド開発 >Python チュートリアル >Pythonはミュージックレトリバーの機能を実装します
曲を聞いて曲を識別します。その名が示すように、デバイスを使用して曲を「聞く」と、それが何の曲であるかを教えてくれます。そして十中八九、その曲を再生しなければなりません。このような機能は、QQ Music などのアプリケーションで以前から登場していました。今日は、曲を聞いて独自の曲認識をしてみます
私たちが設計した全体的なフローチャートは非常にシンプルです:
-----
録音部分
-----
us 「聴きたい」場合は、まず録音プロセスが必要です。私たちの実験では、音楽ライブラリも録音コードを使用して録音し、特徴を抽出してデータベースに保存します。私たちは次のような考え方で録音しています
# coding=utf8 import wave import pyaudio class recode(): def recode(self, CHUNK=44100, FORMAT=pyaudio.paInt16, CHANNELS=2, RATE=44100, RECORD_SECONDS=200, WAVE_OUTPUT_FILENAME="record.wav"): ''' :param CHUNK: 缓冲区大小 :param FORMAT: 采样大小 :param CHANNELS:通道数 :param RATE:采样率 :param RECORD_SECONDS:录的时间 :param WAVE_OUTPUT_FILENAME:输出文件路径 :return: ''' p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(''.join(frames)) wf.close() if __name__ == '__main__': a = recode() a.recode(RECORD_SECONDS=30, WAVE_OUTPUT_FILENAME='record_pianai.wav')
私たちが録音した曲はどのような形式ですか?
1チャンネルだけ見るとこんな感じの一次元配列です
横軸のインデックス値に従って描画する、よく見るオーディオの形です。
オーディオ処理部分
ここにコアコードを記述します。肝心の「曲の見分け方」。私たち人間がどのようにして曲を区別しているか考えてみてはいかがでしょうか? 上記のような1次元配列を考えることでしょうか?曲の大きさによるのでしょうか?どちらでもない。
私たちは耳から聞こえる固有の周波数のシーケンスを通じて曲を記憶するため、曲を聞いて認識することについて書きたい場合は、オーディオの周波数シーケンスに焦点を当てる必要があります。
フーリエ変換とは何かを復習してください。ブロガーの「信号とシステム」の授業はとても人気がありましたが、授業では具体的な変換形式をメモしなかったものの、感覚的には理解できました。
フーリエ変換の本質は、時間領域の信号を周波数領域の信号に変換することです。つまり、元の X 軸と Y 軸はそれぞれ配列の添字と配列要素であり、現在は周波数 (これは正確ではありませんが、ここでは正しく理解されています) とこの周波数でのコンポーネントのサイズになっています。
周波数領域を理解するにはどうすればよいですか?信号処理についてあまり詳しくない私たちにとって、最も重要なことは、オーディオの構成についての理解を変えることです。私たちは当初、オーディオは最初に与えた波形のようなもので、各時刻に振幅を持ち、異なる振幅シーケンスが特定のサウンドを構成すると考えていました。さて、音は異なる周波数信号の混合物であり、それぞれの信号が最初から最後まで存在していると考えられます。そして、彼らは、予測されたコンポーネントに従って貢献します。
曲を周波数ドメインに変換するとどうなるか見てみましょう?
これらの周波数の成分は平均的ではなく、その差が非常に大きいことが観察できます。画像内の明らかに盛り上がったピークは、出力エネルギーが大きい周波数信号であるとある程度考えることができます。これは、この信号がこの可聴周波数内で高い位置を占めていることを意味します。そこで、曲の特徴を抽出するためにこのような信号を選択しました。
しかし、前に話したのは周波数シーケンスのことであり、一連のフーリエ変換では、曲全体の周波数情報しか知ることができず、その後、時間の関係、つまり「シーケンス」が失われます。それについて話す方法はありません。そこで、時間に応じてオーディオを小さなチャンクに分割するという、より妥協的な方法を採用しました。ここでは、1 秒あたり 40 のチャンクに分割しました。
ここに質問を残してください: なぜこのような 1 秒あたり 1 つの大きなブロックではなく、小さなブロックを使用するのですか?
各ブロックに対してフーリエ変換を実行し、それを剰余演算して配列を取得します。添え字値 (0,40)、(40,80)、(80,120)、(120,180) を持つ 4 つの区間でモジュール長が最大の添え字を取得し、4 つのタプルを合成します。コアオーディオの「フィンガープリント」。
抽出した「指紋」は次のようなものです
(39, 65, 110, 131), (15, 66, 108, 161), (3, 63, 118, 146), (11, 62, 82, 158), (15, 41, 95, 140), (2, 71, 106, 143), (15, 44, 80, 133), (36, 43, 80, 135), (22, 58, 80, 120), (29, 52, 89, 126), (15, 59, 89, 126), (37, 59, 89, 126), (37, 59, 89, 126), (37, 67, 119, 126)
音频处理的类有三个方法:载入数据,傅里叶变换,播放音乐。
如下:
# coding=utf8 import os import re import wave import numpy as np import pyaudio class voice(): def loaddata(self, filepath): ''' :param filepath: 文件路径,为wav文件 :return: 如果无异常则返回True,如果有异常退出并返回False self.wave_data内储存着多通道的音频数据,其中self.wave_data[0]代表第一通道 具体有几通道,看self.nchannels ''' if type(filepath) != str: print 'the type of filepath must be string' return False p1 = re.compile('\.wav') if p1.findall(filepath) is None: print 'the suffix of file must be .wav' return False try: f = wave.open(filepath, 'rb') params = f.getparams() self.nchannels, self.sampwidth, self.framerate, self.nframes = params[:4] str_data = f.readframes(self.nframes) self.wave_data = np.fromstring(str_data, dtype=np.short) self.wave_data.shape = -1, self.sampwidth self.wave_data = self.wave_data.T f.close() self.name = os.path.basename(filepath) # 记录下文件名 return True except: print 'File Error!' def fft(self, frames=40): ''' :param frames: frames是指定每秒钟分块数 :return: ''' block = [] fft_blocks = [] self.high_point = [] blocks_size = self.framerate / frames # block_size为每一块的frame数量 blocks_num = self.nframes / blocks_size # 将音频分块的数量 for i in xrange(0, len(self.wave_data[0]) - blocks_size, blocks_size): block.append(self.wave_data[0][i:i + blocks_size]) fft_blocks.append(np.abs(np.fft.fft(self.wave_data[0][i:i + blocks_size]))) self.high_point.append((np.argmax(fft_blocks[-1][:40]), np.argmax(fft_blocks[-1][40:80]) + 40, np.argmax(fft_blocks[-1][80:120]) + 80, np.argmax(fft_blocks[-1][120:180]) + 120, # np.argmax(fft_blocks[-1][180:300]) + 180, )) # 提取指纹的关键步骤,没有取最后一个,但是保留了这一项,可以想想为什么去掉了? def play(self, filepath): ''' 用来做音频播放的方法 :param filepath:文件路径 :return: ''' chunk = 1024 wf = wave.open(filepath, 'rb') p = pyaudio.PyAudio() # 打开声音输出流 stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # 写声音输出流进行播放 while True: data = wf.readframes(chunk) if data == "": break stream.write(data) stream.close() p.terminate() if __name__ == '__main__': p = voice() p.loaddata('record_beiyiwang.wav') p.fft()
这里面的self.high_point是未来应用的核心数据。列表类型,里面的元素都是上面所解释过的指纹的形式。
数据存储和检索部分
因为我们是事先做好了曲库来等待检索,所以必须要有相应的持久化方法。我采用的是直接用mysql数据库来存储我们的歌曲对应的指纹,这样有一个好处:省写代码的时间
我们将指纹和歌曲存成这样的形式:
顺便一说:为什么各个歌曲前几个的指纹都一样?(当然,后面肯定是千差万别的)其实是音乐开始之前的时间段中没有什么能量较强的点,而由于我们44100的采样率比较高,就会导致开头会有很多重复,别担心。
我们怎么来进行匹配呢?我们可以直接搜索音频指纹相同的数量,不过这样又损失了我们之前说的序列,我们必须要把时间序列用上。否则一首歌曲越长就越容易被匹配到,这种歌曲像野草一样疯狂的占据了所有搜索音频的结果排行榜中的第一名。而且从理论上说,音频所包含的信息就是在序列中体现,就像一句话是靠各个短语和词汇按照一定顺序才能表达出它自己的意思。单纯的看两个句子里的词汇重叠数是完全不能判定两句话是否相似的。我们采用的是下面的算法,不过我们这只是实验性的代码,算法设计的很简单,效率不高。建议想要做更好的结果的同学可以使用改进的DTW算法。
我们在匹配过程中滑动指纹序列,每次比对模式串和源串的对应子串,如果对应位置的指纹相同,则这次的比对相似值加一,我们把滑动过程中得到的最大相似值作为这两首歌的相似度。
举例:
曲库中的一首曲子的指纹序列:[fp13, fp20, fp10, fp29, fp14, fp25, fp13, fp13, fp20, fp33, fp14]
检索音乐的指纹序列: [fp14, fp25, fp13, fp17]
比对过程:
最终的匹配相似值为3
存储检索部分的实现代码
# coding=utf-8 import os import MySQLdb import my_audio class memory(): def __init__(self, host, port, user, passwd, db): ''' 初始化存储类 :param host:主机位置 :param port:端口 :param user:用户名 :param passwd:密码 :param db:数据库名 ''' self.host = host self.port = port self.user = user self.passwd = passwd self.db = db def addsong(self, path): ''' 添加歌曲方法,将指定路径的歌曲提取指纹后放到数据库 :param path:路径 :return: ''' if type(path) != str: print 'path need string' return None basename = os.path.basename(path) try: conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db, charset='utf8') # 创建与数据库的连接 except: print 'DataBase error' return None cur = conn.cursor() namecount = cur.execute("select * from fingerprint.musicdata WHERE song_name = '%s'" % basename) # 查询新添加的歌曲是否已经在曲库中了 if namecount > 0: print 'the song has been record!' return None v = my_audio.voice() v.loaddata(path) v.fft() cur.execute("insert into fingerprint.musicdata VALUES('%s','%s')" % (basename, v.high_point.__str__())) # 将新歌曲的名字和指纹存到数据库中 conn.commit() cur.close() conn.close() def fp_compare(self, search_fp, match_fp): ''' 指纹比对方法。 :param search_fp: 查询指纹 :param match_fp: 库中指纹 :return:最大相似值 float ''' if len(search_fp) > len(match_fp): return 0 max_similar = 0 search_fp_len = len(search_fp) match_fp_len = len(match_fp) for i in range(match_fp_len - search_fp_len): temp = 0 for j in range(search_fp_len): if match_fp[i + j] == search_fp[j]: temp += 1 if temp > max_similar: max_similar = temp return max_similar def search(self, path): ''' 从数据库检索出 :param path: 需要检索的音频的路径 :return:返回列表,元素是二元组,第一项是匹配的相似值,第二项是歌曲名 ''' v = my_audio.voice() v.loaddata(path) v.fft() try: conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db, charset='utf8') except: print 'DataBase error' return None cur = conn.cursor() cur.execute("SELECT * FROM fingerprint.musicdata") result = cur.fetchall() compare_res = [] for i in result: compare_res.append((self.fp_compare(v.high_point[:-1], eval(i[1])), i[0])) compare_res.sort(reverse=True) cur.close() conn.close() print compare_res return compare_res def search_and_play(self, path): ''' 跟上个方法一样,不过增加了将搜索出的最优结果直接播放的功能 :param path: 带检索歌曲路径 :return: ''' v = my_audio.voice() v.loaddata(path) v.fft() # print v.high_point try: conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db, charset='utf8') except: print 'DataBase error' return None cur = conn.cursor() cur.execute("SELECT * FROM fingerprint.musicdata") result = cur.fetchall() compare_res = [] for i in result: compare_res.append((self.fp_compare(v.high_point[:-1], eval(i[1])), i[0])) compare_res.sort(reverse=True) cur.close() conn.close() print compare_res v.play(compare_res[0][1]) return compare_res if __name__ == '__main__': sss = memory('localhost', 3306, 'root', 'root', 'fingerprint') sss.addsong('taiyangzhaochangshengqi.wav') sss.addsong('beiyiwangdeshiguang.wav') sss.addsong('xiaozezhenger.wav') sss.addsong('nverqing.wav') sss.addsong('the_mess.wav') sss.addsong('windmill.wav') sss.addsong('end_of_world.wav') sss.addsong('pianai.wav') sss.search_and_play('record_beiyiwang.wav')
总结
我们这个实验很多地方都很粗糙,核心的算法是从shazam公司提出的算法吸取的“指纹”的思想。希望读者可以提出宝贵建议。
更多Pythonはミュージックレトリバーの機能を実装します相关文章请关注PHP中文网!