ホームページ  >  記事  >  バックエンド開発  >  Python で ClickHouse を使用する方法

Python で ClickHouse を使用する方法

WBOY
WBOY転載
2023-05-17 08:19:282957ブラウズ

    ClickHouse は、主にデータオンライン分析 (OLAP) の分野で使用され、近年注目を集めているオープンソースのカラム型データベース (DBMS) です。 2016 年にオープンソース化されました。現在、国内コミュニティが活況を呈しており、大手メーカーも追随して大規模に採用しています。

    • 今日のヘッドラインは、ユーザー行動分析のために内部で ClickHouse を使用しています。内部には数千の ClickHouse ノードがあり、単一クラスターには最大 1,200 のノードがあります。総データ量は数十 PB です。元のデータは毎日 300TB ずつ増加します。

    • Tencent は、社内でゲーム データ分析に ClickHouse を使用しており、その完全な監視および運用システムを確立しています。

    • 2018 年 7 月にトライアルが開始されて以来、Ctrip は社内ビジネスの 80% を ClickHouse データベースに移行しました。データは毎日 10 億以上増加し、100 万近くのクエリ リクエストが行われます。

    • Kuaishou も社内で ClickHouse を使用しており、総ストレージ容量は約 10PB、毎日 200TB 追加され、クエリの 90% は 3S 未満です。

    海外では、Yandex はユーザーのクリック行動を分析するために使用される数百のノードを備えており、CloudFlare や Spotify などの大手企業もそれを使用しています。

    ClickHouse はもともと、世界 2 番目に大きい Web 分析プラットフォームである YandexMetrica を開発するために開発されました。長年にわたりシステムの中核コンポーネントとして継続的に使用されています。

    1. ClickHouse の使用方法について

    まず、いくつかの基本的な概念を確認しましょう:

    • OLTP: これは従来のリレーションシップ 銀行システムや電子商取引システムなど、トランザクションの一貫性を重視し、主に追加、削除、変更、問い合わせなどの操作を行うデータベース。

    • OLAP: 主にデータを読み取り、複雑なデータ分析を実行し、技術的な意思決定のサポートに重点を置き、直感的でシンプルな結果を提供するウェアハウス タイプのデータベースです。

    1.1. ClickHouse はデータ ウェアハウス シナリオに適用されます

    ClickHouse は列指向データベースであり、OLAP シナリオにより適しています。OLAP シナリオの主な機能は次のとおりです:

    • 大部分は読み取りリクエストです

    • データは単一行の更新ではなく、かなり大きなバッチ (>1000 行) で更新されます。まったくリニューアルします。

    • データベースに追加されたデータは変更できません。

    • 読み取りの場合、かなりの数の行がデータベースからフェッチされますが、列の小さなサブセットのみがフェッチされます。

    • 広いテーブル、つまり各テーブルに多数の列が含まれている

    • クエリが比較的少ない (通常は 1 秒あたり数百のクエリ)サーバーあたり) 回以下)

    • 単純なクエリの場合、約 50 ミリ秒の遅延を考慮してください

    • 列内のデータは比較的small: 数字と短い文字列 (例: URL あたり 60 バイト)

    • #単一クエリの処理時に高いスループットが必要 (サーバーごとに 1 秒あたり最大数十億行)

    • トランザクションは必要ありません

    • データ整合性要件が低い

    • 各クエリには大きなテーブルがあります。彼を除いて、他の人は皆小さいです。

    • クエリ結果はソース データよりも大幅に小さくなります。言い換えると、結果が単一サーバーの RAM 内に収まるようにデータがフィルタリングまたは集約されます

    1.2. クライアント ツール DBeaver

    Clickhouse クライアント ツールは dbeaver です。 , 公式サイトはhttps://dbeaver.io/です。

    • dbeaver は、開発者およびデータベース管理者向けの無料のオープン ソース (GPL) 汎用データベース ツールです。 [Baidu Encyclopedia]

    • このプロジェクトの主な目標は使いやすさを向上させることであり、データベース管理ツールを特別に設計および開発しました。無料のクロスプラットフォームで、オープンソース フレームワークに基づいており、さまざまな拡張機能 (プラグイン) を作成できます。

    • JDBC ドライバーを備えたあらゆるデータベースをサポートします。

    • あらゆる外部データ ソースを処理できます。

    下図に示すように、操作インターフェース メニューの「データベース」を通じて新しい接続を作成および構成し、ClickHouse ドライバーを選択してダウンロードします (デフォルトはドライバーなし)。

    Python で ClickHouse を使用する方法

    DBeaver の設定は Jdbc に基づいており、通常、デフォルトの URL とポートは次のとおりです。

    DBeaver を使用してクエリのために Clickhouse に接続すると、接続またはクエリがタイムアウトになることがあります。このとき、接続パラメータにソケットタイムアウト パラメータを追加して設定すると、問題を解決できます。

    jdbc:clickhouse://192.168.17.61:8123

    1.3. ビッグデータ アプリケーションの実践Python で ClickHouse を使用する方法

    #環境の簡単な説明:
    • ハードウェア リソースは限られており、メモリは 16G しかなく、トランザクション データは数億件に及びます。
    • このアプリケーションは、主にトランザクションマスターテーブル、関連する顧客情報、材料情報、過去の価格、割引およびポイント情報などを含む特定のトランザクションビッグデータです。メイントランザクションテーブル自己関連付けツリーであるテーブル構造。
    顧客の取引行動を分析するために、限られたリソースの条件下で、次の図に示すように、取引の詳細を日別および取引時点別に抽出して取引記録にまとめます。

    其中,在ClickHouse上,交易数据结构由60个列(字段)组成,截取部分如下所示:

    Python で ClickHouse を使用する方法

    针对频繁出现“would use 10.20 GiB , maximum: 9.31 GiB”等内存不足的情况,基于ClickHouse的SQL,编写了提取聚合数据集SQL语句,如下所示。

    Python で ClickHouse を使用する方法

    大约60s返回结果,如下所示:

    Python で ClickHouse を使用する方法

    2. Python使用ClickHouse实践

    2.1. ClickHouse第三方Python驱动clickhouse_driver

    ClickHouse没有提供官方Python接口驱动,常用第三方驱动接口为clickhouse_driver,可以使用pip方式安装,如下所示:

    pip install clickhouse_driver
    Collecting clickhouse_driver
      Downloading https://files.pythonhosted.org/packages/88/59/c570218bfca84bd0ece896c0f9ac0bf1e11543f3c01d8409f5e4f801f992/clickhouse_driver-0.2.1-cp36-cp36m-win_amd64.whl (173kB)
        100% |████████████████████████████████| 174kB 27kB/s
    Collecting tzlocal<3.0 (from clickhouse_driver)
      Downloading https://files.pythonhosted.org/packages/5d/94/d47b0fd5988e6b7059de05720a646a2930920fff247a826f61674d436ba4/tzlocal-2.1-py2.py3-none-any.whl
    Requirement already satisfied: pytz in d:\python\python36\lib\site-packages (from clickhouse_driver) (2020.4)
    Installing collected packages: tzlocal, clickhouse-driver
    Successfully installed clickhouse-driver-0.2.1 tzlocal-2.1

    使用的client api不能用了,报错如下:

      File "clickhouse_driver\varint.pyx", line 62, in clickhouse_driver.varint.read_varint

      File "clickhouse_driver\bufferedreader.pyx", line 55, in clickhouse_driver.bufferedreader.BufferedReader.read_one

      File "clickhouse_driver\bufferedreader.pyx", line 240, in clickhouse_driver.bufferedreader.BufferedSocketReader.read_into_buffer

    EOFError: Unexpected EOF while reading bytes

    Python驱动使用ClickHouse端口9000

    ClickHouse服务器和客户端之间的通信有两种协议:http(端口8123)和本机(端口9000)。DBeaver驱动配置使用jdbc驱动方式,端口为8123。

    ClickHouse接口返回数据类型为元组,也可以返回Pandas的DataFrame,本文代码使用的为返回DataFrame。

    collection = self.client.query_dataframe(self.query_sql)

    2.2. 实践程序代码

    由于我本机最初资源为8G内存(现扩到16G),以及实际可操作性,分批次取数据保存到多个文件中,每个文件大约为1G。

    # -*- coding: utf-8 -*-
    &#39;&#39;&#39;
    Created on 2021年3月1日
    @author: xiaoyw
    &#39;&#39;&#39;
    import pandas as pd
    import json
    import numpy as np
    import datetime
    from clickhouse_driver import Client
    #from clickhouse_driver import connect
    # 基于Clickhouse数据库基础数据对象类
    class DB_Obj(object):
        &#39;&#39;&#39;
        192.168.17.61:9000
        ebd_all_b04.card_tbl_trade_m_orc
        &#39;&#39;&#39;
        def __init__(self, db_name):
            self.db_name = db_name
            host=&#39;192.168.17.61&#39; #服务器地址
            port =&#39;9000&#39; #&#39;8123&#39; #端口
            user=&#39;***&#39; #用户名
            password=&#39;***&#39; #密码
            database=db_name #数据库
            send_receive_timeout = 25 #超时时间
            self.client = Client(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout)
            #self.conn = connect(host=host, port=port, database=database) #, send_receive_timeout=send_receive_timeout)
            
        def setPriceTable(self,df):
            self.pricetable = df
        def get_trade(self,df_trade,filename):          
            print(&#39;Trade join price!&#39;)
            df_trade = pd.merge(left=df_trade,right=self.pricetable[[&#39;occurday&#39;,&#39;DIM_DATE&#39;,&#39;END_DATE&#39;,&#39;V_0&#39;,&#39;V_92&#39;,&#39;V_95&#39;,&#39;ZDE_0&#39;,&#39;ZDE_92&#39;,
                                  &#39;ZDE_95&#39;]],how="left",on=[&#39;occurday&#39;])
            df_trade.to_csv(filename,mode=&#39;a&#39;,encoding=&#39;utf-8&#39;,index=False)
        def get_datas(self,query_sql):          
            n = 0 # 累计处理卡客户数据
            k = 0 # 取每次DataFrame数据量
            batch = 100000 #100000 # 分批次处理
            i = 0 # 文件标题顺序累加
            flag=True # 数据处理解释标志
            filename = &#39;card_trade_all_{}.csv&#39;
            while flag:
                self.query_sql = query_sql.format(n, n+batch) 
                print(&#39;query started&#39;)
                collection = self.client.query_dataframe(self.query_sql)
                print(&#39;return query result&#39;)
                df_trade = collection #pd.DataFrame(collection)
                
                i=i+1
                k = len(df_trade) 
                if k > 0:
                    self.get_trade(df_trade, filename.format(i))
                
                n = n + batch
                if k == 0:
                    flag=False        
                print(&#39;Completed &#39; + str(k) + &#39;trade details!&#39;)
                print(&#39;Usercard count &#39; + str(n) )    
                   
            return n                
    # 价格变动数据集
    class Price_Table(object):
        def __init__(self, cityname, startdate):
            self.cityname = cityname
            self.startdate = startdate
            self.filename = &#39;price20210531.csv&#39;
            
        def get_price(self):
            df_price = pd.read_csv(self.filename)
            ......
                self.price_table=self.price_table.append(data_dict, ignore_index=True)    
                
            print(&#39;generate price table!&#39;)   
    class CardTradeDB(object):
        def __init__(self,db_obj): 
            self.db_obj = db_obj
            
        def insertDatasByCSV(self,filename):
            # 存在数据混合类型
            df = pd.read_csv(filename,low_memory=False)
            
        # 获取交易记录    
        def getTradeDatasByID(self,ID_list=None):
            # 字符串过长,需要使用&#39;&#39;&#39;
            query_sql = &#39;&#39;&#39;select C.carduser_id,C.org_id,C.cardasn,C.occurday as 
            		......
                    limit {},{})
                    group by C.carduser_id,C.org_id,C.cardasn,C.occurday
                    order by C.carduser_id,C.occurday&#39;&#39;&#39;
            
            
            n = self.db_obj.get_datas(query_sql)
            
            return n
                        
    if __name__ == &#39;__main__&#39;:
        PTable = Price_Table(&#39;湖北&#39;,&#39;2015-12-01&#39;)   
        PTable.get_price()  
        
        db_obj = DB_Obj(&#39;ebd_all_b04&#39;)
        db_obj.setPriceTable(PTable.price_table)
        CTD = CardTradeDB(db_obj)
        df = CTD.getTradeDatasByID()

    返回本地文件为:

    Python で ClickHouse を使用する方法

    3. 小结一下

    ClickHouse运用于OLAP场景时,拥有出色的查询速度,但需要具备大内存支持。Python第三方clickhouse-driver 驱动基本满足数据处理需求,如果能返回Pandas DataFrame最好。

    ClickHouse和Pandas聚合都是非常快的,ClickHouse聚合函数也较为丰富(例如文中anyLast(x)返回最后遇到的值),如果能通过SQL聚合的,还是在ClickHouse中完成比较理想,把更小的结果集反馈给Python进行机器学习。

    操作ClickHouse删除指定数据

    def info_del2(i):
        client = click_client(host=&#39;地址&#39;, port=端口, user=&#39;用户名&#39;, password=&#39;密码&#39;,
                              database=&#39;数据库&#39;)
        sql_detail=&#39;alter table SS_GOODS_ORDER_ALL delete where order_id=&#39;+str(i)+&#39;;&#39;
        try:
            client.execute(sql_detail)
        except Exception as e:
            print(e,&#39;删除商品数据失败&#39;)

    在进行数据删除的时候,python操作clickhou和mysql的方式不太一样,这里不能使用以往常用的%s然后添加数据的方式,必须完整的编辑一条语句,如同上面方法所写的一样,传进去的参数统一使用str类型

    以上がPython で ClickHouse を使用する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。