Пристыдили меня сегодня, что мол не пишу технических статей, а все больше про Agile глупости и злой ужасный ООП. Поэтому я решил поделиться маленьким элегантным решением к одной интересной задаче, которая возникла у нас на днях. Если вы занимаетесь разработкой кода на для многопоточного окружения, то статья может вас заинтересовать.
Итак, начнем с задачи. Она звучит очень просто. Есть некий сервис, к которому не рекомендуется делать больше чем X обращений в секунду. Понятное дело, что от небольшой погрешности никто не умрет и задача не имеет супер-критичной формулировки. Но надо приложить максимум возможных усилий, чтобы избежать перегрузки сервиса. Обращения к сервису идут с многих потоков (для примера 50), поэтому механизм контроля должен максимально избегать тяжелой синхронизации.
Звучит действительно несложно. В первую очередь на ум тем, кто не сачковал на курсе по теории алгоритмов или уже не первый год пишет код для многопоточного доступа, приходит Semaphore. В Java есть его реализация в пакете java.util.concurrent. Он отлично подходит для синхронизации доступа к ресурсу – можно задать количество разрешенных заходов, а когда они закончатся, остальные потоки мирно уснут в ожидании освободившихся мест. Проблема заключается в том, что в отличие от классического использования, поток не может сам по завершению задачи вернуть свой заход назад в семафор. Ведь на каждую секунду количество заходов должно быть лимитировано. То есть, нам нужен некий временной семафор.
Прежде чем идти дальше, любой опытный разработчик постарается поискать, не решал ли кто-то уже подобную задачу. Поиск привел нас к TimedSemaphore из commons-lang и RateLimiter из google-guava. Первый имеет сразу две проблемы: слишком прямолинейная реализация с синхронизированным медленным методом aquire и добавление ресурсов в семафор с помощью ScheduledThreadPoolExecutor, что совершенно не гарантирует добавление в начале секунды и отсутствие простоев по несколько секунд. Второй использует несколько странное решение для поставленной задачи – при необходимости обеспечить лимит в N запросов в секунду он ждет после каждого из них по 1/N части секунды. Получается, что наши запросы будут распределяться по секунде вместо того, чтобы заниматься обработкой результатов. Это явно не наш кандидат.
Как же обновлять в семафоре количество возможных заходов? Первой решили опробовать все тот же выделенный поток с расписанием “раз в секунду”. Поток ни с кем не конкурирует, поэтому синхронизировать ничего не нужно. Он просто возвращает значение семафора в равное лимиту на секунду. Данному потоку для пущего устрашения был дан максимальный приоритет. Реализация заведомо ненадежная, но интересовало насколько именно в наших условиях. Оказалось, что немного раз дистанция между обновлениями составляла несколько секунд (потоку просто не давали времени на выполнение) и очень много раз она сбивалась на сотни миллисекунд. А для нас это сулило две проблемы: простой работающих потоков или двойная норма запросов в одну секунду. Обе проблемы совершенно не радовали.
В запасе была оптимизация – семафор пополнялся не только по расписанию, но и по запросу любого работающего потока, который пришел узнать можно ли ему сделать запрос и обнаружил, что секунда с прошлого обновления уже прошла. Реализация требует синхронизации, но она может быть облегчена за счет использования AtomicInteger и его метода compareAndSet. Каждый поток проверяет время последнего обновления по отношению к текущему. Если секунда прошла, то пытается установить свое время вместо старого. Это удастся только одному потоку – именно он и будет отвечать за добавления запросов в семафор. Но получается из одной легковесной структуры для синхронизации мы уже используем две, причем в большой части случаев делая проверки по времени совершенно напрасными. Но в среднем случае ситуация улучшилась. Зато, если все потоки успели заснуть на семафоре в ожидании свободных заходов, то ничего не изменилось и проблемы с обновлением по расписанию все те же.
И тут на помощь пришло альтернативное решение, которое рассматривалось с самого начала – работа через счетчики. Заводим AtomicInteger для хранения количества осуществленных запросов в секунду и дополнительно храним текущую секунду (мы потом придумали как кодировать информацию о текущей секунде сразу в счетчик, чтобы операции были атомарными, но это не так важно). Теперь любой поток проверяет не изменилась ли секунда. Пробует ее обновить и если у него получается, то он сбрасывает счетчик все тем же методом compareAndSet. Иначе, он просто увеличивает счетчик и смотрит, не превышен ли лимит на данную секунду. Если превышен, то засыпает на время до конца секунды. Код такого решения явно сложнее и каждый поток теперь должен сам засыпать вместо ожидания на объекте синхронизации. Вот пример реализации такого подхода:
private static final int LIMIT = 20;
private final AtomicInteger second = new AtomicInteger(getCurrentSecond());
private final AtomicInteger counter = new AtomicInteger(0);
public void accessResource() {
int currentSecond = checkCurrentSecondIsCorrect();
int requestsCount = counter.incrementAndGet();
if (requestsCount > LIMIT) {
sleepTillNextSecond(currentSecond);
}
}
private int checkCurrentSecondIsCorrect() {
while (true) {
int currentSecond = getCurrentSecond();
int lastSecond = second.get();
if (currentSecond == lastSecond || tryToChangeSecond(currentSecond, lastSecond)) {
return currentSecond;
}
}
}
private boolean tryToChangeSecond(int currentSecond, int lastSecond) {
boolean changed = second.compareAndSet(lastSecond, currentSecond);
if (changed) {
counter.set(0);
}
return changed;
}
Это решение можно улучшить, если завести не один AtomicInteger, а N. Представьте для простоты их в виде массива и пусть N = 60 (количество секунд в минуте). Тогда каждый поток приходит и первым делом определяет какая сейчас секунда. После этого берет из массива счетчик по индексу равному секунде и дальше работает с ним. Получается, что все потоки работают просто со счетчиками и нет необходимости им их сбрасывать. А кто же тогда будет их обнулять? Эту задачу как раз и можно повесить на отдельный поток, который будет раз в 5 секунд просыпаться и обнулять счетчики в “безопасной зоне” (скажем для примера, все кроме пары секунд до и после текущей секунды). Ему тоже не нужно ни с кем синхронизироваться, а размер массива и “безопасной зоны” могут быть подобраны так, что даже опоздание этого потока на 30-40 секунд ни на что не влияет.
И тут пришла идея скомбинировать данный подход с семафорами. Алгоритм получился очень простой. Вначале заводится массив из N семафоров и все они сразу имеют установленное количество заходов равное лимиту в секунду. Когда поток приходит, то он высчитывает секунду, берет по ней из массива семафор и пытается получить заход с его помощью tryAcquire, передавая туда время до конца текущей секунды. Если метод вернет TRUE, то можно смело делать запрос. Если он вернул FALSE (не получилось захватить заход за заданное время), то надо просто вернуться на первый шаг и взять другой семафор. Реализация алгоритма представляет простейший цикл. Параллельно действует восстанавливающий семафоры поток. Он представляет массив в виде замкнутого кольца (так как после 59 секунды снова идет 0 секунда) и раз в 5 секунд восстанавливает значения семафоров в “безопасной зоне” (все кроме 5 секунд до и после по кольцу от текущей секунды). Пример реализации:
private static final int RING_SIZE = 60;
private static final int LIMIT = 20;
private final Semaphore[] ring = new Semaphore[RING_SIZE];
public AccessManager() {
for (int i = 0; i < ring.length; i++) {
ring[i] = new Semaphore(LIMIT);
}
}
public void accessResource() {
while (!acquire()) {}
}
private boolean acquire() {
try {
return tryToAcquire(getActiveRingCell());
} catch (InterruptedException e) {
return false;
}
}
private boolean tryToAcquire(int cell) throws InterruptedException {
Semaphore semaphore = ring[cell];
long timeoutMs = calculateDelay();
return semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
}
private int getActiveRingCell() {
return (int) ((Clock.getTimeMillis() / 1000) % RING_SIZE);
}
Пока на тестах в реальном окружении алгоритм показывает себя очень хорошо, но дальнейшую его судьбу определят большая нагрузка и мониторинг... Надеюсь, описанные подходы помогут кому-то реализовать элегантное решения в своем проекте.
Не хочешь пропускать ничего интересного? Подпишись на ленту RSS или следи за нами в Twitter!