Search
🔸

광고 API > DB > 스프레드시트 자동화

조금 더 솔직한 API 이야기

카카오모먼트의 경우 대행사 권한에 따른 광고주 사용 허가 등으로 많이 헤맸기 때문에, 따로 포스팅하여 언젠가 쓸 누군가에게 도움 되게 문서화하였습니다.

Airflow DAGs 작성한 전체 코드

코드를 보완한 후 블로그에 업데이트 예정입니다. 보안 이슈상 필요한 부분은 블라인드 처리하였습니다.
airflow : Docker 환경에서 진행하려 하였으나, selenium 모듈 호환 및 slack 연동 시 CPU 과부하 이슈가 종종 있었으며, 제 로컬이 아닌 전사적으로 필요한 시스템이었기에 개발자 분과 함께 ubuntu 기반으로 별도의 AWS 상의 서버를 생성하여 작업 진행하였습니다.
kakao > logslack > print_day 로 진행되며, kakao에서 API 활용 및 DB insert를 다루고 있습니다.
전처리의 경우 dataframe을 사용하였습니다. 효율적인 방법 보다는 빠르게 전처리하고자 했기에 다소 코드가 긴 편입니다.
스프레드시트로 export 하는 파트는 gspread 호환성 이슈로 airflow 내에 error가 생겨, 윈도우 작업 스케줄러로 별도 생성 하였습니다.
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from slack_sdk import WebClient from datetime import datetime, timedelta import json import requests import pandas as pd import time import pymysql.cursors import re ## -------------------------------------------------------기본 설정 channel = "" sales = "" cost = "" inflow = "" ## -------------------------------------------------------매개변수 정의 # dict pass to airflow objects containing meta datas default_args = { # owner name of the DAG 'owner': 'airflow', # whether to rely on previous task status 'depends_on_past': False, # start date of task instance 'start_date': datetime(#시작일#), # email address to get notifications 'email': ['ms_je.kang@cnccompany.net'], # retry the task once, if it fails 'retries': 1, # after waiting for 10 min 'retry_delay': timedelta(minutes=5), } ## -------------------------------------------------------DAG 인스턴스 생성 # configure schedule & set DAG settings dag = DAG( dag_id='임의의 dag id', # prior tasks not executed catchup = False, default_args=default_args, # how log DagRun should be up before timing out(failing) # dagrun_timeout = timedelta(minutes=30), # continue to run DAG once per hour schedule_interval = timedelta(minutes=5), # schedule_interval = '*/1 * * * *', => 'M H D/M M D/W' # '@daily' = '0 0 * * *' ) ## ------------------------------------------------------- def kakao(): conn = pymysql.connect( ## DB 정보 작성 영역이므로 블라인드 ## ) curs = conn.cursor() # 특정 path에 저장 file_path = '/var/www/python/kakao/kakao_code.json' with open(file_path) as json_file: json_data = json.load(json_file) access_token = json_data['access_token'] base_url = "https://apis.moment.kakao.com" api_uri = "/openapi/v4/campaigns" request_url = base_url + api_uri #캠페인 가져오기 header = { 'Authorization': "bearer {}".format(access_token), 'adAccountId': "광고 계정 id" } response = requests.get(request_url, headers=header) data = response.json() df_campaigns_raw = pd.json_normalize(data) df_campaigns_raw = pd.DataFrame(df_campaigns_raw['content'][0]) #캠페인 타입 가져오기 campaigns_id_list = df_campaigns_raw['id'].map(str).tolist() df_campaign_type_raw = pd.DataFrame() for id in campaigns_id_list: campaign_url = request_url + "/" + id response = requests.get(campaign_url, headers=header) campaign_data = pd.DataFrame(pd.json_normalize(response.json())) df_campaign_type_raw = pd.concat([df_campaign_type_raw,campaign_data]) time.sleep(2) api_uri = "/openapi/v4/adAccounts/report" request_url = base_url + api_uri params = { 'datePreset' : 'YESTERDAY', 'level' : 'CAMPAIGN', 'metricsGroup' : ['BASIC','PIXEL_SDK_CONVERSION'] } response = requests.get(request_url, params=params,headers=header) data = response.json() df_report_raw = pd.json_normalize(data) df_report = pd.DataFrame(df_report_raw['data'][0]) # print(df_report) df_report = df_report.drop(columns='dimensions').assign(**pd.DataFrame(df_report.dimensions.values.tolist())) df_report = df_report.drop(columns='metrics').assign(**pd.DataFrame(df_report.metrics.values.tolist())) campaigns_raw = df_campaigns_raw[['id','name','userConfig']] campaign_type_raw = df_campaign_type_raw[['id','campaignTypeGoal.campaignType']] report = df_report[['start', 'campaign_id', 'imp', 'click', 'cost', 'conv_purchase_p_1d', 'conv_purchase_p_7d']].rename(columns = {'campaign_id':'id'}) report['id'] = report['id'].astype(str).astype(int) df_merge = pd.merge(campaigns_raw, campaign_type_raw, on='id',how='left') df_merge = pd.merge(df_merge, report, on='id',how='left') df_merge = df_merge[['id','name','userConfig','imp','click','cost','conv_purchase_p_7d']].fillna(0) convert_dict = {'imp': int, 'click': int, 'cost':int ,'conv_purchase_p_7d':int} df_merge = df_merge.astype(convert_dict) df_merge['채널'] = '모먼트' df_merge['name'][0] if df_merge['name'][0]=='[PRO] 슬림9_플러스친구DM': df_merge=df_merge[1:] df = df_merge[['채널','conv_purchase_p_7d','cost','click']].groupby(by=['채널']).sum().reset_index().rename(columns = {'click':'유입','cost':'비용','conv_purchase_p_7d':'매출'}) else: df = df_merge[['채널','conv_purchase_p_7d','cost','click']].groupby(by=['채널']).sum().reset_index().rename(columns = {'click':'유입','cost':'비용','conv_purchase_p_7d':'매출'}) #데이터 프레임을 문자열로 변환후 특수문자 제거 channel = re.sub(r'[^\w\s]', '',str(df['채널'].values)) sales = re.sub(r'[^\w\s]', '',str(df['매출'].values)) cost = re.sub(r'[^\w\s]', '',str(df['비용'].values)) inflow = re.sub(r'[^\w\s]', '',str(df['유입'].values)) #변환 데이터 추가 sql = "INSERT INTO ad_measurement (channel, sales, cost, inflow) VALUES('"+channel+"',"+sales+","+cost+","+inflow+");" curs.execute(sql) conn.commit() curs.close() conn.close() def print_day(): print("complete") def logSlack(): get_title = "success" client = WebClient(token='토큰 블라인드') client.chat_postMessage(channel='#slackbottest', text=get_title) ## ------------------------------------------------------ t1 = PythonOperator( task_id = 'kakao', python_callable = kakao, execution_timeout=timedelta(seconds=60), dag = dag, ) t2 = PythonOperator( task_id = 'logSlack', python_callable = logSlack, execution_timeout=timedelta(seconds=60), dag = dag, ) t3 = PythonOperator( task_id = 'print_complete', python_callable = print_day, execution_timeout=timedelta(seconds=60), dag = dag, ) ## ------------------------------------------------------ # t2 will depend on t1 t1 >> t2 >> t3
Python
복사