Home >Database >Mysql Tutorial >使用mysql_udf与curl库完成http_post通信模块示例_MySQL

使用mysql_udf与curl库完成http_post通信模块示例_MySQL

WBOY
WBOYOriginal
2016-06-01 13:18:191056browse

bitsCN.com

使用mysql_udf与curl库完成http_post通信模块(mysql_udf,multi_curl,http,post)

这个模块其目前主要用于xoyo江湖的sns与kingsoft_xoyo自主研发的TCSQL数据库做数据同步,当有feed插入sns数据库,使用触 发器调用该模块,向tcsql数据库发送同步数据。也可以使用该模块与其它使用socket接口的数据库或程序做转发与同步。
http_post模块主要使用mysql_udf接口,与curl库两部分技术。
mysql_udf是mysql为c语言提供的一个接口,通过这个接口,用户可以自定义mysql的函数,通过调用这些mysql函数,调用相应的c语言 模块来执行特定功能,实现mysql数据与外部应用的交互。curl库是一个比较常用的应用层网络协议库,主要用到的是其中的curl_multi异步通 信api,用来进行网络传输。
首先参考mysql官方提供的udf_example.c文件,建立3个主要的接口函数,分别是初始化函数,执行函数与析构函数。


//args是sql语句传回的参数,message是返回出错信息使用这些都是规定好的。
my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
//主函数体
longlong http_post(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);
//析构函数体
void http_post_deinit(UDF_INIT *initid);
//args是sql语句传回的参数,message是返回出错信息,使用这些都是规定好的。
//初始化函数体 my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
//主函数体 longlong http_post(UDF_INIT *initid, UDF_ARGS *args, char *is_null,char *error);
//析构函数体 void http_post_deinit(UDF_INIT *initid);

在mysql_udf接口中,主函数体中是不允许使用new或malloc动态分配内存,所以如果需要申请内存空间,必须用xxxx_init()函数申 请并将申请的地址赋给initid->ptr指针,然后在主函数体中使用,并在xxxx_deinit析构函数体中释放。另外对于 mysql_udf接口的调用好像当并发量超过一定程度,如果是使用动态分配内存,会出现double free的错误,为了避免这个错误,所以在我的程序里使用静态空间与动态申请空间相结合的方式,这样如果数据较小,并发量较大,不会出现double free错误。对于静态申请空间,最大约在160000~170000byte左右,我这里使用的160000,当mysql传送的数据大于这个数的时 候,才动态申请内存。初始化函数体如下:


my_bool http_post_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
    {
      if (args->arg_count != 2)
      {
        strcpy(message,"Wrong arguments to http_post; ");
        return 1;
      } 

      if(args->arg_count == 2 && args->args[1]!=NULL)
      {
            int flexibleLength = strlen(args->args[1]); 

        if(flexibleLength > 160000)
        {
            int allocLength = 200 + flexibleLength;
            if (!(initid->ptr=(char*) malloc(allocLength) ) )
            {
                    strcpy(message,"Couldn't allocate memory in http_post_init");
                    return 1;
            }
            return 0;
        }
        else
        {
            initid->ptr=NULL;
        } 

      }
       return 0; 

    }

其中http_post_init需要返回my_bool型。这个函数目的是给用户提供一个方式,检验由mysql参数传进来的数据是否正确,如果正确则 返回0,则mysql会自动调用定义的主函数,如果返回1,则mysql打印message信息退出,不会调用主函数。所以在设定返回值的时候一定注意。

主函数如下:


longlong http_post( UDF_INIT *initid, UDF_ARGS *args,
                char *is_null __attribute__((unused)),
                char *error __attribute__((unused)))
{
    char* sendBuffer=NULL;
    CURL *curl;
    CURLM *multi_handle;
    int still_running;
    int times=0;//try times if select false
        int TRY_TIMES=25;
    struct timeval timeout;//set a suitable timeout to play around with
    timeout.tv_sec = 0;
    timeout.tv_usec = 100000; 

    char sendArray[160000] = "/0";//can not move this into the if
    if(initid->ptr == NULL)
    {
        //char sendArray[160000] = "/0";//error
        sendBuffer=sendArray;
    }
    else
    {
        sendBuffer = initid->ptr;
        TRY_TIMES=100;
    } 

    strcpy(sendBuffer,args->args[1]);
    curl = curl_easy_init();
    multi_handle = curl_multi_init();
    if(curl && multi_handle)
    {
        /* what URL that receives this POST */
        curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);
        curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);
        curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);
        curl_multi_add_handle(multi_handle, curl);
        while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle,/ &still_running));
        while(still_running && times         {
              int rc;      //select() return code
              int maxfd;
              fd_set fdread;
              fd_set fdwrite;
              fd_set fdexcep;
              FD_ZERO(&fdread);
              FD_ZERO(&fdwrite);
              FD_ZERO(&fdexcep);   //get file descriptors from the transfers
             curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep,/ &maxfd);
             rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
             switch(rc)
            {
                case -1://select error
                      break;
                case 0:
                default:        // timeout
                     while(CURLM_CALL_MULTI_PERFORM !== curl_multi_perform(multi_handle, &still_running));
                     break;
             }
                times++;
         }//end while
       curl_multi_remove_handle(multi_handle,curl);
       curl_multi_cleanup(multi_handle);//always cleanup
       curl_easy_cleanup(curl);
       if(times>=TRY_TIMES)
       {
            return 1;
       }
        return 0;
  }//end if
  return 1;
}

在主函数中,主要使用curl库进行通信,curl库分成3部分,easy是同步模式,multi是异步模式,share模式是多线程共享数据的模式。

对于easy发送完数据后,会阻塞等待服务器的response,如果没 有返回,就会一直阻塞,当然可以设置一个timeout,但如果这个时间设小了,easy发送大数据的时候就会中断,设太大了影响时间效率,另外当接收端 不发送response的时候,easy库即使发送完了数据,也会阻塞等待,有些时候对于发送端来讲不需要等待接收端的respons,当发送完毕就可以 结束了,这个时候easy就不适用。所以最后选择multi库。

如程序所示,首先得初始化,并设置easy句柄为post模式,指定需要post的数据,如下:

curl = curl_easy_init();

multi_handle = curl_multi_init();

curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);

curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);

curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);

由于要使用multi模式,必须也要初始化一个easy模式,并将这个easy模式的句柄放入所谓的multi函数执行栈:

curl_multi_add_handle(multi_handle, curl);

使用curl_multi_perform(multi_handle, &still_running),来进行异步传输,但如果该函数返回的不是CURLM_CALL_MULTI_PERFORM,则需要重新执行。直到循环while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle, &still_running));结束。此时如果刚才函数体中的still_running被置为1,表明连接建立,正在发送数据。需要配合select机制来进行数据发送。

函数 curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);会将最大的描述符写入maxfd,

然后用select进行等待:rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);

最后如果select返回值不为-1(error)0(timeout)时候再次进行异步传输,即执行curl_multi_perform函数,直到

still_running为0,程序结束退出。

这里设置了一个最大执行次数的限制,如果服务器出现了问题,不能发送response,则still_running不会变为0,程序会死循环,

所以,设置一个最大循环次数TRY_TIMES,防止这种情况发生。但是这个次数设小了,数据可能没有发送完,就退出了,如设置太大了,程序发送完了,服务器没有response就会多执行多余循环。所以这个TRY_TIMES需要根据数据的大小和网络状况来设置,比正常

传输数据的次数略长。这里我小数据的时候循环设次数25,大数据循环设为100.

最后是析构函数体:


void http_post_deinit(UDF_INIT *initid)
{
     if (initid!=NULL && initid->ptr!=NULL)
        {
            free(initid->ptr);
            initid->ptr = NULL;
        }
}

将初始化函数设置的内存释放。

编译执行过程如下:

将程序保存为http_post.c编译如下(请根据机器上的mysql路径进行调整):

gcc -wall -I/usr/local/webserver/mysql/include/mysql/ -shared http_post.c -o http_post.so -fPIC
//使用mysql提供的头文件生成动态链接库
cp -f http_post.so /usr/local/webserver/mysql/lib/mysql/plugin/http_post.so
//将生成的.so文件放入mysql的plugin文件夹下
//进入mysql对动态链接库中的函数进行安装
cd /usr/local/webserver/mysql/bin/mysql
./mysql
//在mysql命令行下输入如下命令:
mysql> DROP FUNCTION IF EXISTS http_post;
//其目的是如果系统内安装了同名函数先进性drop。
mysql> CREATE FUNCTION http_post RETURNS INTEGER SONAME ‘http_post.so';
//生成http_post函数,并指明调用来源是http_post.so。
//最后调用函数,其目的是向指定ip和端口发送post数据。调用前先打开指定ip主机上的网络调试助手,并监听3888端。
mysql> select http_post(‘testpost.com/index.php','sfasfa');

在网络助手中可以看到如下结果:

bitsCN.com
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn