قصة كيف روضنا BigQuery

مهمة


في الواقع ، المهمة التي أريد أن أتحدث عنها بسيطة للتشويش في صياغتها: كان من الضروري تصور بيانات المبيعات لقسم التجارة الإلكترونية مع القليل من الدم ، أي القراءة ، من أجل لا شيء تقريبًا.


ما هو المقصود من هذا؟ تولد سلات متاجرنا تدفقًا مستمرًا من البيانات حول المبيعات عبر الإنترنت في مناطق مختلفة من العالم مع جميع العواقب: العملات المختلفة ، والمناطق الزمنية ، والضرائب ، وأنواع العملاء ، وأنواع العناصر ، والطلبات ، وما إلى ذلك. في الواقع ، يتم إنشاء الشيء نفسه من قبل أي متجر عبر الإنترنت ، ربما ، فقط ، خيارات معلمات الطلبات مختلفة قليلاً.


يلزم ، في هذا الدفق ، إصدار لوحة تحكم مجمعة معينة ، والتي يمكن للشركات من خلالها مشاهدة المؤشرات الرئيسية عبر الإنترنت للحظة الحالية وللفترات الطويلة عبر الإنترنت (أو بالأحرى ، بانتظام). علاوة على ذلك ، من المستحسن أن تكون لوحات التحكم هذه مختلفة إلى حد ما بدرجات متفاوتة من التفاصيل والتخصص.


ابحث عن حل


وبطبيعة الحال ، بدأنا بالبحث عن حل جاهز. نظرنا إلى مصوري البيانات مثل ، على سبيل المثال ، حلول Owox و BI من Microsoft وبعض حلول المؤسسات وما إلى ذلك
كل هذا يخدمه الإعلان. لكن:


  • ليست رخيصة
  • شيء في حد ذاته (ليس من السهل التحكم في آليات معالجة البيانات ، وأحيانًا ما عليك فقط الاعتماد على كفاءات علماء البيانات في شركة التطوير)

بحلول ذلك الوقت (2019) ، كان لدي بالفعل إسقاط تصور بيانات جيب صغير في Google Datastudio ، والذي كان موجودًا بالفعل لمدة عام على الأقل واختزل إلى حقيقة أنني قمت بتحميل التقرير يدويًا من 1C ، وتحميله على Google Storage Cloud (لا يتم الخلط بينه وبين GoogleDrive) ، من هناك - إلى جدول BigQuery ، ثم في DataStudio ، قمت بعمل شرائح بيانات وكل شيء آخر.


— , , , , .


, , , , . Data Studio , BigQuery , , , , .


, , SQL BigQuery, , .


BigQuery (https://console.cloud.google.com/bigquery), , .


, : 300 . , 2 BigQuery, , .



. , , BigQuery , , . , !


, SELECT, , BQ.


, BQ , . , , , .


: SQL ==> Python ==> BigQuery.


, — .


, SELECT csv- GoogleStorage BQ , Google Functions, Python ( ) . , 6 ( ) .


, ?



, , , SQL . , Orders, , , , , , , id, id .. , , .


, , SELECT pandas dataframe. :


import pandas as pd
import MySQLdb
import MySQLdb.cursors

def sql_query(date0, date1):
    connection = MySQLdb.connect(host=”server”, user=”user”, passwd=”pass”, db=”orders”, charset=”cp1251”, cursorclass = MySQLdb.cursors.SSCursor)
    query = “SELECT * FROM orders WHERE date>=’”+date0+”’ and date<’”+date1+”’ ”
    data =  pd.read_sql_query(query, connection=connection)
#  query -   SELECT   ,     date0  date1,   , ,   5 .          -       .          ,      BQ
# , ,  connection      ,        ..       .

#     -  . ,        :

    data.payment_date = data.payment_date.apply(lambda x: x.tz_localize(‘Europe/Moscow’)).astype(str)

#     str -  .  ,      ,       .     BQ         .

# ,     pandas     BQ,           ,      .

    return data

, 5- , (90 , .. ) , sql_query(date) , BQ.


, , pandas dataframe .



: , BQ MERGE . BQ 1-2 . 20 70 , STRING FLOAT. 70000 .


, . BQ, .


from google.cloud import bigquery
from google.oauth2 import service_account

project_id = “MyBQproject”  #    BQ
dataset_id = “MyDataSet”  #     

#       BQ.       - ( ,    ).       BQ.  ,   ,    ‘Service accounts’  Google Cloud Platform.

credentials = service_account.Credentials.from_service_account_file(“acc.json”)
client = bigquery.Client(project_id, credentials=credentials)

def upload_temp_table(df, table_id, schema):
    job_config = bigquery.LoadJobConfig()
    job_config.schema = schema
   # job_config.autodetect = True  #     
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE  #  
    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
    job.result()   #  

def merge_tables(temp_table, base_table, schema, head, merge_param):
    merge_query = “MERGE into “+base_table+” as T USING “+temp_table+” as S \
         ON T.”+merge_param+” = S.”+merge_param+ \
         “ WHEN MATCHED then UPDATE SET “+ \
         ‘,’.join([‘T.’+item+’=S.’+item for item in head])+ \
         “ WHEN NOT MATCHED then INSERT \
         (“+’,’.join(head)+”) VALUES (“+’,’.join([‘S.’+item for item in head])+”);”

#  ,         merge_param (,  id ,        )

#  -  

    job_config = bigquery.LoadJobConfig()
    job_config.schema = schema
    job = client.query(merge_query, job_config=job_config)
    job.result()

, — .


import datetime

for period in range(18):
    date0 = (datetime.datetime.now() - datetime.timedelta(days = period*5 + 5)).strftime(‘%Y%m%d’)
    date1 = (datetime.datetime.now() - datetime.timedelta(days = period*5)).strftime(‘%Y%m%d’)
    df = sql_query(date0,date1)   #    sql   >=date0, <date1
    upload_temp_table(df=df, table_id=”temp_table”, schema)  #    
    merge_tables(temp_table=’temp_table’, base_table = ‘orders’, schema=schema, head=[,,,], merge_param=’id’)

head — . . SQL , . MERGE- . BQ . - , Python , BQ.



.


import sys

log_file = open(“log.txt”, ‘a’)
old_stdout = sys.stdout
sys.stdout = log_file

, print
:


sys.stdout = old_stdout
log_file.close()

print -. , .



, BigQuery , DataStudio (https://datastudio.google.com/) .


, DataStudio , BigQuery Python. , , , apply.


, DataStudio (, ), , , , Datastudio .


, Datastudio Blended Tables, LEFT JOIN - . , , , BigQuery. , , , SQL-, BigQuery . , , 6 , , 6 . 20 70 3-4 BigQuery. .


, . , . - BigQuery, - Python. , merge_tables — BQ . , .. — . , .


, BigQuery , Google Spreadsheets. - id . :


  • Python pandas dataframe
  • BQ JOIN BQ (-, ).

, . - , , , .


. - -, . , , ( , F5). : Data Studio Auto-Refresh. : , View, Data Studio Auto-Refresh, , SQL=>BQ.


, , , :-)


All Articles