조금 더 솔직한 API 이야기
카카오모먼트의 경우 대행사 권한에 따른 광고주 사용 허가 등으로 많이 헤맸기 때문에, 따로 포스팅하여 언젠가 쓸 누군가에게 도움 되게 문서화하였습니다.
Airflow DAGs 작성한 전체 코드
코드를 보완한 후 블로그에 업데이트 예정입니다. 보안 이슈상 필요한 부분은 블라인드 처리하였습니다.
•
airflow : Docker 환경에서 진행하려 하였으나, selenium 모듈 호환 및 slack 연동 시 CPU 과부하 이슈가 종종 있었으며, 제 로컬이 아닌 전사적으로 필요한 시스템이었기에 개발자 분과 함께 ubuntu 기반으로 별도의 AWS 상의 서버를 생성하여 작업 진행하였습니다.
•
kakao > logslack > print_day 로 진행되며, kakao에서 API 활용 및 DB insert를 다루고 있습니다.
•
전처리의 경우 dataframe을 사용하였습니다. 효율적인 방법 보다는 빠르게 전처리하고자 했기에 다소 코드가 긴 편입니다.
•
스프레드시트로 export 하는 파트는 gspread 호환성 이슈로 airflow 내에 error가 생겨, 윈도우 작업 스케줄러로 별도 생성 하였습니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from slack_sdk import WebClient
from datetime import datetime, timedelta
import json
import requests
import pandas as pd
import time
import pymysql.cursors
import re
## -------------------------------------------------------기본 설정
channel = ""
sales = ""
cost = ""
inflow = ""
## -------------------------------------------------------매개변수 정의
# dict pass to airflow objects containing meta datas
default_args = {
# owner name of the DAG
'owner': 'airflow',
# whether to rely on previous task status
'depends_on_past': False,
# start date of task instance
'start_date': datetime(#시작일#),
# email address to get notifications
'email': ['ms_je.kang@cnccompany.net'],
# retry the task once, if it fails
'retries': 1,
# after waiting for 10 min
'retry_delay': timedelta(minutes=5),
}
## -------------------------------------------------------DAG 인스턴스 생성
# configure schedule & set DAG settings
dag = DAG(
dag_id='임의의 dag id',
# prior tasks not executed
catchup = False,
default_args=default_args,
# how log DagRun should be up before timing out(failing)
# dagrun_timeout = timedelta(minutes=30),
# continue to run DAG once per hour
schedule_interval = timedelta(minutes=5),
# schedule_interval = '*/1 * * * *', => 'M H D/M M D/W'
# '@daily' = '0 0 * * *'
)
## -------------------------------------------------------
def kakao():
conn = pymysql.connect(
## DB 정보 작성 영역이므로 블라인드 ##
)
curs = conn.cursor()
# 특정 path에 저장
file_path = '/var/www/python/kakao/kakao_code.json'
with open(file_path) as json_file:
json_data = json.load(json_file)
access_token = json_data['access_token']
base_url = "https://apis.moment.kakao.com"
api_uri = "/openapi/v4/campaigns"
request_url = base_url + api_uri
#캠페인 가져오기
header = {
'Authorization': "bearer {}".format(access_token),
'adAccountId': "광고 계정 id"
}
response = requests.get(request_url, headers=header)
data = response.json()
df_campaigns_raw = pd.json_normalize(data)
df_campaigns_raw = pd.DataFrame(df_campaigns_raw['content'][0])
#캠페인 타입 가져오기
campaigns_id_list = df_campaigns_raw['id'].map(str).tolist()
df_campaign_type_raw = pd.DataFrame()
for id in campaigns_id_list:
campaign_url = request_url + "/" + id
response = requests.get(campaign_url, headers=header)
campaign_data = pd.DataFrame(pd.json_normalize(response.json()))
df_campaign_type_raw = pd.concat([df_campaign_type_raw,campaign_data])
time.sleep(2)
api_uri = "/openapi/v4/adAccounts/report"
request_url = base_url + api_uri
params = {
'datePreset' : 'YESTERDAY',
'level' : 'CAMPAIGN',
'metricsGroup' : ['BASIC','PIXEL_SDK_CONVERSION']
}
response = requests.get(request_url, params=params,headers=header)
data = response.json()
df_report_raw = pd.json_normalize(data)
df_report = pd.DataFrame(df_report_raw['data'][0])
# print(df_report)
df_report = df_report.drop(columns='dimensions').assign(**pd.DataFrame(df_report.dimensions.values.tolist()))
df_report = df_report.drop(columns='metrics').assign(**pd.DataFrame(df_report.metrics.values.tolist()))
campaigns_raw = df_campaigns_raw[['id','name','userConfig']]
campaign_type_raw = df_campaign_type_raw[['id','campaignTypeGoal.campaignType']]
report = df_report[['start', 'campaign_id', 'imp', 'click', 'cost', 'conv_purchase_p_1d', 'conv_purchase_p_7d']].rename(columns = {'campaign_id':'id'})
report['id'] = report['id'].astype(str).astype(int)
df_merge = pd.merge(campaigns_raw, campaign_type_raw, on='id',how='left')
df_merge = pd.merge(df_merge, report, on='id',how='left')
df_merge = df_merge[['id','name','userConfig','imp','click','cost','conv_purchase_p_7d']].fillna(0)
convert_dict = {'imp': int,
'click': int,
'cost':int
,'conv_purchase_p_7d':int}
df_merge = df_merge.astype(convert_dict)
df_merge['채널'] = '모먼트'
df_merge['name'][0]
if df_merge['name'][0]=='[PRO] 슬림9_플러스친구DM':
df_merge=df_merge[1:]
df = df_merge[['채널','conv_purchase_p_7d','cost','click']].groupby(by=['채널']).sum().reset_index().rename(columns = {'click':'유입','cost':'비용','conv_purchase_p_7d':'매출'})
else:
df = df_merge[['채널','conv_purchase_p_7d','cost','click']].groupby(by=['채널']).sum().reset_index().rename(columns = {'click':'유입','cost':'비용','conv_purchase_p_7d':'매출'})
#데이터 프레임을 문자열로 변환후 특수문자 제거
channel = re.sub(r'[^\w\s]', '',str(df['채널'].values))
sales = re.sub(r'[^\w\s]', '',str(df['매출'].values))
cost = re.sub(r'[^\w\s]', '',str(df['비용'].values))
inflow = re.sub(r'[^\w\s]', '',str(df['유입'].values))
#변환 데이터 추가
sql = "INSERT INTO ad_measurement (channel, sales, cost, inflow) VALUES('"+channel+"',"+sales+","+cost+","+inflow+");"
curs.execute(sql)
conn.commit()
curs.close()
conn.close()
def print_day():
print("complete")
def logSlack():
get_title = "success"
client = WebClient(token='토큰 블라인드')
client.chat_postMessage(channel='#slackbottest', text=get_title)
## ------------------------------------------------------
t1 = PythonOperator(
task_id = 'kakao',
python_callable = kakao,
execution_timeout=timedelta(seconds=60),
dag = dag,
)
t2 = PythonOperator(
task_id = 'logSlack',
python_callable = logSlack,
execution_timeout=timedelta(seconds=60),
dag = dag,
)
t3 = PythonOperator(
task_id = 'print_complete',
python_callable = print_day,
execution_timeout=timedelta(seconds=60),
dag = dag,
)
## ------------------------------------------------------
# t2 will depend on t1
t1 >> t2 >> t3
Python
복사