Как подружить Amplitude и хранилище данных (DWH)
13 мая 2021
# Библиотеки
import os
import requests
import pandas as pd
import zipfile
import gzip
import time
from tqdm import tqdm
import pymssql
import datetime
import pyodbc
from urllib.parse import quote_plus
from sqlalchemy import create_engine, event
# Переменные и константы
os.chdir("C:\Agents\AMPL\TEMP") # Каталог хранения архива Amplitude
dts1 = time.time() # Общий Таймер для подсчета времени работы шагов
a = time.time() # финальный таймер
now = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") # формат для имени архива
# Указываем ключики к API Amplitude
api_key = 'ключик'
secret_key = 'еще один ключик'
# Переменные для подключения к DWH (SQL)
server = "Имя сервера"
user = "логин"
password = "пароль"
# Подключаемся и получаем отсечку очереди
conn = pymssql.connect(server, user, password, "Имя БД")
cursor = conn.cursor()
cursor.execute("Запрос на получение отсечки. Процедура или простой select")
# Берем в переменную дату и время отсечки очереди
for row in cursor:
dt = row[0]
conn.close()
# Генерируем имя архива, закидываем временные метки в название
filename = 'AMPL_PROD_'+ dt + '_' + now
# Существующая папка, путь должен оканчиваться на \\ для WIN
# В рабочую папку будут сохраняться файлы, мы ее объявили выше как os.chdir
working_dir = os.getcwd() + '\\'
# Настройки подключения к DWH (SQL). Дублируем на случай, если точка заливки отличается от точки получения отсечки очереди
server = 'имя сервера'
database = 'База данных'
schema = 'схема таблицы'
table_to_export = 'Название таблицы'
# Параметры подключения к DWH (SQL)
params = 'DRIVER= {SQL Server};SERVER='+server+';DATABASE='+database+';User='+user+';Password='+password+';'
quoted = quote_plus(params)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
# Блок получения данных от API Amplitude, сохранения архива и парсинга json
class GetFile():
def __init__(self, now, dt, api_key, secret_key, filename, working_dir):
self.now = now
self.dt = dt
self.api_key = api_key
self.secret_key = secret_key
self.filename = filename
self.working_dir = working_dir
def response(self):
"""
Запрос файла с сервера
"""
print('Отправка запроса на сервер!', end='\n')
count = 0
while True:
count += 1
print(f'Попытка {count}.....', end='')
try:
response = requests.get('https://amplitude.com/api/2/export?start='+self.dt+'&end='+self.dt,
auth=(self.api_key, self.secret_key),
timeout=10)
print('успешно', end='\n')
print('1. Ответ от сервера получен', end='\n')
break
except:
print('неудачно', end='\n')
time.sleep(1)
return response
def download(self):
'''
Скачивание архива с данными
'''
with open(working_dir + self.filename + '.zip', "wb") as code:
file = self.response()
print('2. Скачивание архива с файлами.....', end='')
code.write(file.content)
print('OK', end='\n')
def extraction(self):
'''
Извлечение файлов в папку на компьютере
'''
z = zipfile.ZipFile(self.working_dir + self.filename + '.zip', 'r')
z.extractall(path=self.working_dir + self.filename)
print('3. Архив с файлами извлечен и записан в папку ' + self.filename)
def getfilename(self):
'''
Информация о файле и пути
'''
return 'Файл: {} \nПолный путь: {}'.format(self.filename, self.working_dir + self.filename + '.zip')
def unzip_and_create_df(working_dir, filename):
'''
Распаковка JSON.gz и преобразование json к обычному табличному формату (конкатенация файлов по индексу)
Индексы дублируются, но они не нужны.
'''
directory = working_dir + filename + '\\274440'
files = os.listdir(directory)
df = pd.DataFrame()
print('Прогресс обработки файлов:')
time.sleep(1)
for i in tqdm(files):
with gzip.open(directory + '\\' + i) as f:
add = pd.read_json(f, lines=True)
df = pd.concat([df, add], axis=0)
time.sleep(1)
print('4. JSON файлы из архива успешно преобразованы и записаны в dataframe')
return df
# Создание класса загрузки файла
file = GetFile(now, dt, api_key, secret_key, filename, working_dir)
# Загрузка файла (уже включает в себя обращение на сервер)
file.download()
# Извлечение gz-файлов в одноимённую папку
file.extraction()
# Создание общего DataFrame на базе распакованных json.gz
adf = unzip_and_create_df(working_dir, filename)
# Фиксируем этап и статус
print('5. Считывание данных с БД, настройка связей, чистка, обработка.....', end='')
# Запрос к DWH для получения списка нужных столбцов и их типов
# Если нужно работать со всем массивом, пропускайте этот пункт
sql_query_columns = ("""
'Тут запрос какие поля нам нужны и как их переименовать при импорте в БД'
""")
settdf = pd.read_sql_query(sql_query_columns, new_con)
# Приведение к lower() строк (=названий столбцов) из столбца SAVE_COLUMN_NAME из dwh
# Переименование название столбцов, lower() для последующего мерджа с таблицей хранилища
settdf['SAVE_COLUMN_NAME'] = settdf['SAVE_COLUMN_NAME'].apply(lambda x: x.lower())
adf.columns = [''.join(j.title() for j in i.split('_')).lower() for i in adf.columns]
# Получение нужных столбцов
needed_columns = [i for i in settdf['SAVE_COLUMN_NAME'].to_list()]
# Добавление столбца в список необходимых
needed_columns.append('DOWNLOAD_FILE_NAME')
# Добавление столбца в DataFrame c названием файла
adf['DOWNLOAD_FILE_NAME'] = filename
# Сброс индекса (для красоты, адекватности, поможет дебажить)
adf.reset_index(inplace=True)
# Перевод в юникод (текстовый формат) всех непустых значений, пустые остаются пустыми
adf = adf.astype('unicode_').where(pd.notnull(adf), None)
# Срез DataFrame для загрузки в базу
df_to_sql = adf[needed_columns]
# Дописываем статус в сточку принта
print('OK', end='\n')
# Блок заливки в DWH
# Начало лога блока
dts2 = time.time()
print('6. Заливка данных в БД...', end='')
# Подключаемся по объявленным параметрам к DWH
connection = pyodbc.connect(params)
engine = create_engine(new_con)
# Вставка курсором (батчами) сета в таблицу DWH (в нашем случае - нужных столбцов)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
# При None раздувает RAM
df_to_sql.to_sql(table_to_export, engine, schema=schema, if_exists='append', chunksize=100000, index=False)
# Закрываем логи и коннект
connection.close()
print('OK', end='\n')
dtf = time.time()
diff1, diff2 = [str(int((dtf - i) // 60)) + ' мин ' + str(int((dtf - i) % 60)) + ' сек' for i in (dts1, dts2)]
print(f'Общее время: {diff1}, Заливка: {diff2}')
print('Загрузка завершена, файл выполнил работу')
# Фиксируем успешную обработку очереди
# Открываем коннект и формируем запрос
conn2 = pymssql.connect(server, user, password, "Имя базы данных")
cursor2 = conn2.cursor()
query = "Инсерт лога в очередь или запуск процедуры, как удобно")
# выполняем запрос
cursor2.execute(query)
# Закрываем логи и коннект
conn2.commit()
conn2.close()
print('Мердж в ДВХ')
# В нашем случае мы еще запускаем ETL обработчик новых данных для оперативного использования витрины
conn3 = pymssql.connect(server, user, password, "Имя базы данных")
cursor3 = conn3.cursor()
query = "Запуск ETL процедур дальнейшего обновления. Например EXEC dbo.SP"
cursor3.execute(query)
conn3.commit()
conn3.close()
# Фиксируем общий лог и время работы
b = time.time()
diff = b-a
minutes = diff//60
print('Выполнение кода заняло: {:.0f} минут(ы)'.format( minutes))