Как подружить Amplitude и хранилище данных (DWH)

13 мая 2021

В рамках спецпроекта Product Analytics Champions команда Лиги Ставок рассказывает, как отправлять данные из Amplitude в корпоративное хранилище (DWH) для in-house анализа.
История DWH в Лиге Ставок
Корпоративное хранилище в Лиге Ставок было создано задолго до внедрения Amplitude.

Преимущественно им пользуются аналитики и исследователи. Продакты и маркетологи для получения аналитических данных из хранилища обращались к аналитикам, потому что это требует навыков программирование. Сейчас же данные для большинства задач собираются в Amplitude.

При этом часть информации хранится только в DWH, поэтому аналитики Лиги Ставок видели большой потенциал в объединении данных Amplitude и DWH. И сделали инструкцию по настройке передачи данных из Amplitude в DWH.
Как используют объединенные данные
Давайте разберем на примере Лиги Ставок, чем может помочь объединение данных в DWH:

1. Финансы. Финансовые данные хранятся только в DWH, поэтому при добавлении к ним информации о пользовательском пути возможно рассчитать расходы и доходы по любой когорте пользователей.

2. Формирование персональных рекомендаций. Общие данные по отдельному пользователю служат основой для системы персональных рекомендаций в продукте. Об этом еще подробно расскажем в будущих кейсах.

3. Профилактика фрода. Выявление клиентов, которые проявляют активность напрямую в API и не отображаются в разметке на фронтенде. Поиск роботов и подозрительных сервисов для дальнейшего анализа их поведения.
Инструкция по передаче данных из Amplitude в DWH
У Amplitude есть свой API выдачи сырых данных по проекту. Подробную документацию по нему можно найти по ссылке.
Важно отметить логику выдачи данных за час. Если час еще не закрыт в системе, то данные за часть часа не выгружаются. Это предотвращает пробелы в данных
Важно отметить логику выдачи данных за час. Если час еще не закрыт в системе, то данные за часть часа не выгружаются. Это предотвращает пробелы в данных
Механика. Агент доставки данных в Лиге Ставок написан на Python, работает в связке с SQL базой и вдохновлен этим туториалом. В компании отправляют в Amplitude события с сайта и мобильных приложений, поэтому даже за один час собирается большой объем данных.

При этом важно получать данные оперативно, поэтому была реализована очередь — при окончании каждого часа импортируют данные из Amplitude и сразу заливают в хранилище. Без промежуточных сохранений, к примеру в CSV, и отложенных ETL обработок.
Код. Стоит отметить, что в Лиге Ставок намеренно пишут простые агенты, понятные даже начинающим аналитикам. Поэтому никаких супер фич, только код нацеленный на выполнение поставленной задачи.

В коде используется Python 3.7 и выше. Например, если у вас нет крутых flow-систем с очередями работы агентов (или, как их вернее назвать, dag), то можно сделать простое расписание, даже на Windows.

В таком случае скрипт выполняется с штатным расписанием в планировщике через .bat файл (вызов питона с указанием пути к вашему скрипту).

Это позволяет быстро добиться результата и проще администрировать, даже начинающим аналитикам.

Шаг 1. Импорт библиотек

# Библиотеки
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
Шаг 2. Блок переменных

В процессе выполнения принтуется каждый шаг для понимания, где просело время обработки, поэтому нужны таймеры. Каталог объявляется для того, чтобы архив полученных от сервиса данных физически складывался в нужной нам папке.

# Переменные и константы
os.chdir("C:\Agents\AMPL\TEMP") # Каталог хранения архива Amplitude
dts1 = time.time() # Общий Таймер для подсчета времени работы шагов
a = time.time() # финальный таймер
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") # формат для имени архива
Шаг 3. Подключение API Amplitude

Для этого достаточно указать два ключа, которые вы найдете в настройках проекта (Settings => Project = > General).

# Указываем ключики к API Amplitude
api_key = 'ключик'
secret_key = 'еще один ключик'
Шаг 4. Подключение к базе данных (БД) и получение отсечки

Как было написано раннее, в Лиге Ставок реализована очередь, то есть данные забираются за каждый час, от последней имеющейся. Очередь ведется в SQL базе, где команда фиксирует последнюю полученную временную отсечку от Amplitude.

Дальше как раз указан блок подключения к БД и получения этой самой отсечки в формате yyyymmddThh (т. е. какой час даты планируется сейчас забрать). Дальше, при обращении к API, так и передается одна дата и время как начало и конец периода.

# Переменные для подключения к DWH (SQL)
server = "Имя сервера"
user = "логин"
password = "пароль"

# Подключаемся и получаем отсечку очереди
conn = pymssql.connect(server, user, password, "Имя БД")
cursor = conn.cursor()
cursor.execute("Запрос на получение отсечки. Процедура или простой select")
Шаг 5. Преобразование отсечки в переменную

На этом этапе берется полученная от БД отсечка и оборачивается в переменную для дальнейшей передачи в API Amplitude. После этого закрывается коннект к БД.

# Берем в переменную дату и время отсечки очереди
for row in cursor: 
    dt = row[0]
conn.close()   
Шаг 6. Настройка архива

Далее заранее указывается имя архива из Amplitude в нужном формате, который потребуется. В Лиге Ставок, например, пишут проект, добавляя дату и час, который забрали и время выполнения запроса.

# Генерируем имя архива, закидываем временные метки в название
filename = 'AMPL_PROD_'+ dt + '_' + now

# Существующая папка, путь должен оканчиваться на \\ для WIN
# В рабочую папку будут сохраняться файлы, мы ее объявили выше как os.chdir
working_dir = os.getcwd() + '\\'
Шаг 7. Дублирование подключения к SQL

Для Лиги Ставок это просто разные точки входа, поэтому они повторяют подключения к нужной БД, в которую в последствии будут импортироваться данные из Amplitude.

# Настройки подключения к DWH (SQL). Дублируем на случай, если точка заливки отличается от точки получения отсечки очереди
server = 'имя сервера'
database = 'База данных'
schema = 'схема таблицы'
table_to_export = 'Название таблицы'

# Параметры подключения к DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
Шаг 8. Получение данных от Amplitude

Здесь начинается большой блок получения данных от Amplitude в виде архива, его сохранения, распаковки и парсинга json в датафрейм.

# Блок получения данных от API Amplitude, сохранения архива и парсинга json
class GetFile():
    
    def __init__(self, now, dt, api_key, secret_key, filename, working_dir):
        
        self.now = now
        self.dt = dt
        self.api_key = api_key
        self.secret_key = secret_key
        self.filename = filename
        self.working_dir = working_dir
        
    def response(self):
        """
        Запрос файла с сервера
        """
        print('Отправка запроса на сервер!', end='\n')
        count = 0
        while True:
            count += 1
            print(f'Попытка {count}.....', end='')
            try:
                response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
                                        auth=(self.api_key, self.secret_key),
                                        timeout=10)
                print('успешно', end='\n') 
                print('1. Ответ от сервера получен', end='\n')
                break
            except:
                print('неудачно', end='\n')
                time.sleep(1)
      
        return response
        
    def download(self):
        '''
        Скачивание архива с данными
        '''
        with open(working_dir + self.filename + '.zip', "wb") as code:
            file = self.response()
            print('2. Скачивание архива с файлами.....', end='')           
            code.write(file.content)
        print('OK', end='\n')

    def extraction(self):
        '''
        Извлечение файлов в папку на компьютере
        '''
        z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
        z.extractall(path=self.working_dir + self.filename)
        print('3. Архив с файлами извлечен и записан в папку ' + self.filename)
        
    def getfilename(self):
        '''
        Информация о файле и пути
        '''
        return 'Файл: {} \nПолный путь: {}'.format(self.filename, self.working_dir + self.filename + '.zip')
        

def unzip_and_create_df(working_dir, filename):
        '''
        Распаковка JSON.gz и преобразование json к обычному табличному формату (конкатенация файлов по индексу)
        Индексы дублируются, но они не нужны.
        '''
        directory = working_dir + filename + '\\274440'
        files = os.listdir(directory)
        df = pd.DataFrame()
        print('Прогресс обработки файлов:')
        time.sleep(1)
        for i in tqdm(files):
            with gzip.open(directory + '\\' + i) as f:
                add = pd.read_json(f, lines=True)
            df = pd.concat([df, add], axis=0)
        time.sleep(1)    
        print('4. JSON файлы из архива успешно преобразованы и записаны в dataframe')
        return df
                                   
# Создание класса загрузки файла
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)

# Загрузка файла (уже включает в себя обращение на сервер)
file.download()

# Извлечение gz-файлов в одноимённую папку
file.extraction()

# Создание общего DataFrame на базе распакованных json.gz
adf = unzip_and_create_df(working_dir, filename)
Шаг 9. Маппинг нужных столбцов (опционально)

Итак, датафрейм заполнен. Ниже идет маппинг определенных столбцов, так как в Лиге Ставок собирают не все данные. Этот шаг в целом можно пропускать и забирать все.

# Фиксируем этап и статус
print('5. Считывание данных с БД, настройка связей, чистка, обработка.....', end='')

# Запрос к DWH для получения списка нужных столбцов и их типов
# Если нужно работать со всем массивом, пропускайте этот пункт
sql_query_columns = ("""
                        'Тут запрос какие поля нам нужны и как их переименовать при импорте в БД'
                    """)

settdf = pd.read_sql_query(sql_query_columns, new_con)

# Приведение к lower() строк (=названий столбцов) из столбца SAVE_COLUMN_NAME из dwh
# Переименование название столбцов, lower() для последующего мерджа с таблицей хранилища 
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]

# Получение нужных столбцов
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]

# Добавление столбца в список необходимых
needed_columns.append('DOWNLOAD_FILE_NAME')

# Добавление столбца в DataFrame c названием файла
adf['DOWNLOAD_FILE_NAME'] = filename

# Сброс индекса (для красоты, адекватности, поможет дебажить)
adf.reset_index(inplace=True)

# Перевод в юникод (текстовый формат) всех непустых значений, пустые остаются пустыми
adf = adf.astype('unicode_').where(pd.notnull(adf), None)

# Срез DataFrame для загрузки в базу
df_to_sql = adf[needed_columns]

# Дописываем статус в сточку принта
print('OK', end='\n')
Шаг 10. Отсчет лога и вставка датафрейма

На данном шаге все готово для передачи данных в таблицу хранилища. Для начала важно настроить отсчет лога и вставить курсором весь датафрейм.

# Блок заливки в DWH
# Начало лога блока
dts2 = time.time()
print('6. Заливка данных в БД...', end='')

# Подключаемся по объявленным параметрам к DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)

# Вставка курсором (батчами) сета в таблицу DWH (в нашем случае - нужных столбцов)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True
     
# При None раздувает RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)

# Закрываем логи и коннект
connection.close() 
print('OK', end='\n')

dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + ' мин ' + str(int((dtf - i) % 60)) + ' сек' for i in (dts1, dts2)]
print(f'Общее время: {diff1},   Заливка: {diff2}')
print('Загрузка завершена, файл выполнил работу')
Шаг 11. Фиксирование успешной передачи

Поздравляем! Данные успешно переданы в таблицу. Теперь нужно зафиксировать успешную заливку сдвигом отсечки очереди для следующего запуска.

# Фиксируем успешную обработку очереди
# Открываем коннект и формируем запрос
conn2 = pymssql.connect(server, user, password, "Имя базы данных")
cursor2 = conn2.cursor()
query = "Инсерт лога в очередь или запуск процедуры, как удобно")

# выполняем запрос
cursor2.execute(query)

# Закрываем логи и коннект
conn2.commit()
conn2.close()
Шаг 12. Запуск процесса обработки связанных витрин

Все, очередь зафиксирована. Следующий запуск зальет данные с новой временной отсечкой.

Но помните, что в Лиге Ставок важно забирать данные относительно оперативно? Чтобы не тратить время, их команда настроила сообщение всему остальному ETL о том, что данные получены, и запуск процесса обработки других различных связанных витрин.

print('Мердж в ДВХ')

# В нашем случае мы еще запускаем ETL обработчик новых данных для оперативного использования витрины
conn3 = pymssql.connect(server, user, password, "Имя базы данных")
cursor3 = conn3.cursor()
query = "Запуск ETL процедур дальнейшего обновления. Например EXEC dbo.SP"

cursor3.execute(query)

conn3.commit()
conn3.close()
Шаг 13. Фиксация времени исполнения

Последний шаг, на котором работу можно считать законченной.

# Фиксируем общий лог и время работы
b = time.time()
diff = b-a
minutes = diff//60
print('Выполнение кода заняло: {:.0f} минут(ы)'.format( minutes))
Следующий запуск агента возьмет новый часовой срез и выполнит очередную загрузку. Если при работе агента произойдет сбой — очередь не фиксируется, и весь процесс повторится заново.

Также в последнем шаге, при вызове ETL, имеются проверки на дублирование данных и их полноту. При необходимости автоматика сама вернет отсечку до сбоя, даже если частично что-то залилось, и проверит наличие дублей.

И конечно же, помимо получения данных из Amplitude, их можно обогащать непосредственно в сервисе продуктовой аналитики. В Лиге Ставок реализована межсерверная s2s доставка данных, о который мы расскажем в будущих обновлениях спецпроекта. 
Впереди новые кейсы, истории и гайды по работе с продуктовой аналитикой.
Подписывайтесь, чтобы получать обновления и полезные материалы!
Спецпроект о компаниях, которые ускорили рост продукта
с помощью продуктовой аналитики
Product Analytics Champions: Лига Ставок