Skip to content

Commit

Permalink
Multi Agent Beat Tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianPoell committed Sep 6, 2017
1 parent ceb5c74 commit b4d5381
Showing 1 changed file with 373 additions and 1 deletion.
374 changes: 373 additions & 1 deletion madmom/features/beats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ..audio.signal import smooth as smooth_signal
from ..ml.nn import average_predictions
from ..processors import (OnlineProcessor, ParallelProcessor, Processor,
SequentialProcessor, )
SequentialProcessor, BufferProcessor)


# classes for tracking (down-)beats with RNNs
Expand Down Expand Up @@ -1238,6 +1238,378 @@ def add_arguments(parser, min_bpm=MIN_BPM, max_bpm=MAX_BPM,
return g


class MultiAgentBeatTrackingProcessor(OnlineProcessor):
"""
Beat Tracking via Multiple Agents. Oriented towards the paper "Beat
Tracking for multiple applications: A multi-agent system architecture with
state recovery" by Lobate Oliveira et al., 2012
Parameters
----------
max_agents : int
Max number of agents.
num_tempi : int
Number of tempi to be considered.
induction_time : float
Window length in seconds used for inducting the agents.
"""
MAX_AGENTS = 30
NUM_TEMPI = 3
INDUCTION_TIME = 2.

class Agent(object):
"""
Agent class for tracking the beats. Each agent has a tempo, a score,
a current prediction where the next beat is and a complete history
of all previously detected beats.
Parameters
----------
inner_window : int
Inner window in frames to each side where a beat is accepted.
outer_window : float
outer window factor to each side depending on the interval.
threshold : float
Threshold value for accepting a beat.
correction_factor : float
Allow agent to adapt to errors.
inherit_score_factor : float
Child agents will inherit a percentage of their parents.
"""
# TODO: Should we use @classmethod to set all values from outside?
INNER_WINDOW = 5
OUTER_WINDOW = 0.4
THRESHOLD = 0.05
CORRECTION_FACTOR = 0.25
INHERIT_SCORE_FACTOR = 0.9

# used for normalizing the score, set from outside
_MAX_INTERVAL = None
_MIN_INTERVAL = None

def __init__(self, score=0, interval=0, prediction=0, beats=[]):
self.score = score
self.interval = interval
self.prediction = prediction
self.beats = beats

def __hash__(self):
return hash(self.prediction + self.interval)

def __eq__(self, other):
# TODO: Maybe allow for slight variations here
eq_prediction = self.prediction == other.prediction
eq_interval = self.interval == other.interval
return eq_prediction and eq_interval

def fork(self, error):
"""
Return child agents based on the given error. The childs
inherit a part of the parents score and all of the parents
detections. The interval and the prediction for the next beat
are adjusted to the error of the last prediction.
"""
# TODO: Check whether all childs are really needed
agents = [
# create agent with same tempo but adjusted prediction
self.__class__(score=self.score * self.INHERIT_SCORE_FACTOR,
interval=self.interval,
prediction=self.prediction + error,
beats=self.beats),
# create agent with adjusted tempo and prediction
self.__class__(score=self.score * self.INHERIT_SCORE_FACTOR,
interval=self.interval + error,
prediction=self.prediction + error,
beats=self.beats),
# create agent with adjusted tempo and prediction by half
self.__class__(score=self.score * self.INHERIT_SCORE_FACTOR,
interval=self.interval + int(error * 0.5),
prediction=self.prediction + int(error * 0.5),
beats=self.beats)
]
# only return agents which are within tempo range
return [agent for agent in agents if
self._MIN_INTERVAL <= agent.interval <= self._MAX_INTERVAL]

def accept(self, activation, idx):
"""
Accept beat at global frame position idx if the
activation exceeds the threshold.
"""
if activation > self.THRESHOLD:
self.beats = self.beats + [idx]

def process(self, activations, idx):
"""
Set the next prediction, score the agent and create
new child agents if necessary. This method is called after
the outer window has passed for offline and online.
Parameters
----------
activations : list
Activation window which surrounds the prediction by
outer window length on both sides. For online mode we wait
until all of that information is available before calling
this method.
idx : int
Global frame counter index of the last activation. Since we
calculate everything in absolute frame times.
Returns
-------
agents : list
New child agent objects
"""
# calculate frames to look around
frames = int(self.interval * self.OUTER_WINDOW)
# get predicted activation
act = activations[int(idx - self.prediction)]
# get max index within outer window
max_idx = idx - len(activations) + np.argmax(activations)
# distance between max activation and predicted position
error = max_idx - self.prediction
# faster agents should not get a better score
normalization = self.interval / self._MIN_INTERVAL
# if max activation was in inner window
if abs(error) <= self.INNER_WINDOW:
# if no beat has been accepted yet, accept max (for offline)
if self.beats[-1] < idx - len(activations):
self.accept(max(activations), max_idx)
# update prediction
self.prediction = max_idx + self.interval
# reward agent for detecting the beat
self.score += (1 - abs(error) / frames) * normalization * act
# adapt agent to error
self.interval += int(error * self.CORRECTION_FACTOR)
self.prediction += int(error * self.CORRECTION_FACTOR)
# return no new child agents
return []
# if max activation was in outer window
else:
# if no beat has been accepted yet, accept act (for offline)
if self.beats[-1] < idx - len(activations):
self.accept(act, self.prediction)
# update prediction
self.prediction += self.interval
# create child agents
new_agents = self.fork(error)
# penalize agent for not detecting the beat
self.score -= (abs(error) / frames) * normalization * act
# return new child agents
return new_agents

def __init__(self, fps=None, tempo_estimator=None, online=False, **kwargs):
# pylint: disable=unused-argument
super(MultiAgentBeatTrackingProcessor, self).__init__(online=online)
# save variables
self.fps = fps
# tempo estimator
if tempo_estimator is None:
# import the TempoEstimation here otherwise we have a loop
from .tempo import TempoEstimationProcessor
# create default tempo estimator
tempo_estimator = TempoEstimationProcessor(fps=fps, **kwargs)
self.tempo_estimator = tempo_estimator
self.agents = []
# TODO: Not sure if thats nice?
self.Agent._MIN_INTERVAL = tempo_estimator.min_interval
self.Agent._MAX_INTERVAL = tempo_estimator.max_interval
if self.online:
self.visualize = kwargs.get('verbose', False)
self.buffer = BufferProcessor(int(self.INDUCTION_TIME * self.fps))
self.last_beat = 0
self.counter = 0

def reset(self):
"""Reset the MultiAgentBeatTrackingProcessor."""
self.buffer.reset()
self.agents = []
self.last_beat = 0
self.counter = 0

def process_offline(self, activations, **kwargs):
"""
Detect the beats in the given activation function.
Parameters
----------
activations : numpy array
Beat activation function.
Returns
-------
beats : numpy array
Detected beat positions [seconds].
"""
# smooth activations
act_smooth = int(self.fps * self.tempo_estimator.act_smooth)
activations = smooth_signal(activations, act_smooth)
# create an interval histogram over the induction time window
induction_window = activations[:int(self.INDUCTION_TIME * self.fps)]
histogram = self.tempo_estimator.interval_histogram(induction_window)
# get N most likely tempi
from .tempo import detect_tempo
tempi = detect_tempo(histogram, self.fps)[:self.NUM_TEMPI, 0]
# convert tempi to intervals
intervals = 60.0 * self.fps / tempi
# induct agents for each interval
for interval in intervals.astype(int):
self.induct_agents(activations[:interval], interval)
# iterate through all activations
for idx, activation in enumerate(activations):
# process activation for each agent
new_agents = []
for agent in self.agents:
# calculate number of frames to look around the prediction
# TODO: Should this be calculated as a @property in agent?
outer_frames = int(agent.interval * agent.OUTER_WINDOW)
# skip if not the time yet for this agent
if agent.prediction + outer_frames != idx:
continue
# get activations of outer windows surrounding the prediction
context = activations[max(0, idx - outer_frames * 2):idx]
# process agent and extend new agents
new_agents.extend(agent.process(context, idx))
# TODO: Those lines are the same for offline/online: refactor?
# append new agents to agents list
self.agents.extend(new_agents)
# sort all agents by score
self.agents.sort(key=lambda a: a.score, reverse=True)
# remove duplicates by using the agents __eq__ method
self.agents = list(dict.fromkeys(self.agents))
# kill worst agents if too many
self.agents = self.agents[:self.MAX_AGENTS]
# return beats of best agent
return np.array(self.agents[0].beats) / self.fps

def process_online(self, activations, reset=True, **kwargs):
"""
Detect the beats in the given activation function for online mode.
Parameters
----------
activations : numpy array
Beat activation function.
reset : bool, optional
Reset the BeatTrackingProcessor to its initial state before
processing.
Returns
-------
beats : numpy array
Detected beat positions [seconds].
"""
# reset to initial state
if reset:
self.reset()
beats_ = []
for activation in activations:
# shift buffer and put new activation at end of buffer
buffer = self.buffer(activation)
# induct agents after induction time has passed
if self.counter == self.INDUCTION_TIME * self.fps:
# create histogram of induction window
histogram = self.tempo_estimator.interval_histogram(buffer)
# get N most likely tempi
from .tempo import detect_tempo
tempi = detect_tempo(histogram, self.fps)[:self.NUM_TEMPI, 0]
# convert tempi to intervals
intervals = 60.0 * self.fps / tempi
# induct agents on past interval frames
for interval in intervals.astype(int):
act = buffer[-interval:]
self.induct_agents(act, interval, self.counter - interval)
# guess beat if possible for each agent
for agent in self.agents:
# skip if beat was already detected inside this inner window
if agent.beats[-1] > self.counter - agent.INNER_WINDOW * 2:
continue
# skip if not the time yet for this agent to guess
if self.counter < agent.prediction or \
self.counter > agent.prediction + agent.INNER_WINDOW:
continue
# get max activation of the past inner window
max_act = max(buffer[-agent.INNER_WINDOW:])
# accept the current frame as a beat
agent.accept(max_act, self.counter)
# set score and predictions deferred after outer window has passed
# this way we get a little peek into the future
new_agents = []
for agent in self.agents:
# calculate number of frames to look around the prediction
outer_frames = int(agent.interval * agent.OUTER_WINDOW)
# skip if not the time yet for this agent
if agent.prediction + outer_frames != self.counter:
continue
# get activations of outer windows surrounding the prediction
context = buffer[-outer_frames * 2:]
# process agent and extend new agents
new_agents.extend(agent.process(context, self.counter))
# append new agents to agents list
self.agents.extend(new_agents)
# sort all agents by score
self.agents.sort(key=lambda a: a.score, reverse=True)
# remove duplicates by using the agents __eq__ method
self.agents = list(dict.fromkeys(self.agents))
# kill worst agents if too many
self.agents = self.agents[:self.MAX_AGENTS]
# if best agent found a beat this frame
is_beat = self.agents and self.agents[0].beats[-1] == self.counter
# beats have to lie apart at least min_interval
beat_distance = self.counter - self.tempo_estimator.min_interval
# if current frame is considered a beat return it as result
if is_beat and self.last_beat < beat_distance:
beats_.append(self.counter)
self.last_beat = self.counter
# increase frame counter
self.counter += 1
# return beat(s)
return np.array(beats_) / self.fps

def induct_agents(self, activations, interval, start=0):
"""
Introduce agents with a given interval by letting them start at
the biggest N maxima inside the given activations.
Parameters
----------
activations : list
Activation function window where agents should be introduced.
interval : int
Time interval which the introduced agents should have.
start : int, optional
Global frame number where agents start.
"""
from scipy.signal import argrelextrema
# get all maxima within activations window
maxima = argrelextrema(activations, np.greater)[0]
# if no maxima could be found just use max value
if len(maxima) == 0:
maxima = np.array([activations.argmax()])
# pick N maxima indices where activation is highest
best_idx = activations[maxima].argsort(axis=0)[::-1][:self.MAX_AGENTS]
# pick best maxima
best_maxima = maxima[best_idx]
# for each best maxima init an agent
for max_idx in best_maxima:
new_agent = self.Agent(score=activations[max_idx],
interval=interval,
prediction=start + max_idx + interval,
beats=[start + max_idx])
# append new agent to agents list
# TODO: Should this method return the agents instead of appending?
self.agents.append(new_agent)


class DBNDownBeatTrackingProcessor(Processor):
"""
Downbeat tracking with RNNs and a dynamic Bayesian network (DBN)
Expand Down

0 comments on commit b4d5381

Please sign in to comment.