/* Unix SMB/CIFS implementation. Main metadata server / Spotlight routines / ES backend Copyright (C) Ralph Boehme 2019 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include "includes.h" #include "system/filesys.h" #include "lib/util/time_basic.h" #include "lib/tls/tls.h" #include "lib/util/tevent_ntstatus.h" #include "libcli/http/http.h" #include "lib/util/tevent_unix.h" #include "credentials.h" #include "mdssvc.h" #include "mdssvc_es.h" #include "rpc_server/mdssvc/es_parser.tab.h" #include #undef DBGC_CLASS #define DBGC_CLASS DBGC_RPC_SRV #define MDSSVC_ELASTIC_QUERY_TEMPLATE \ "{" \ " \"from\": %zu," \ " \"size\": %zu," \ " \"_source\": [%s]," \ " \"query\": {" \ " \"query_string\": {" \ " \"query\": \"%s\"" \ " }" \ " }" \ "}" #define MDSSVC_ELASTIC_SOURCES \ "\"path.real\"" static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx) { struct mdssvc_es_ctx *mdssvc_es_ctx = NULL; json_error_t json_error; char *default_path = NULL; char *path = NULL; mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx); if (mdssvc_es_ctx == NULL) { return false; } mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx; mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx); if (mdssvc_es_ctx->creds == NULL) { TALLOC_FREE(mdssvc_es_ctx); return false; } default_path = talloc_asprintf( mdssvc_es_ctx, "%s/mdssvc/elasticsearch_mappings.json", get_dyn_SAMBA_DATADIR()); if (default_path == NULL) { TALLOC_FREE(mdssvc_es_ctx); return false; } path = lp_parm_talloc_string(mdssvc_es_ctx, GLOBAL_SECTION_SNUM, "elasticsearch", "mappings", default_path); TALLOC_FREE(default_path); if (path == NULL) { TALLOC_FREE(mdssvc_es_ctx); return false; } mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error); if (mdssvc_es_ctx->mappings == NULL) { DBG_ERR("Opening mapping file [%s] failed: %s\n", path, json_error.text); TALLOC_FREE(path); TALLOC_FREE(mdssvc_es_ctx); return false; } TALLOC_FREE(path); mdssvc_ctx->backend_private = mdssvc_es_ctx; return true; } static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx) { return true; } static struct tevent_req *mds_es_connect_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct mds_es_ctx *mds_es_ctx); static int mds_es_connect_recv(struct tevent_req *req); static void mds_es_connected(struct tevent_req *subreq); static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx); static bool mds_es_connect(struct mds_ctx *mds_ctx) { struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort( mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx); struct mds_es_ctx *mds_es_ctx = NULL; struct tevent_req *subreq = NULL; mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx); if (mds_es_ctx == NULL) { return false; } *mds_es_ctx = (struct mds_es_ctx) { .mdssvc_es_ctx = mdssvc_es_ctx, .mds_ctx = mds_ctx, }; mds_ctx->backend_private = mds_es_ctx; subreq = mds_es_connect_send( mds_es_ctx, mdssvc_es_ctx->mdssvc_ctx->ev_ctx, mds_es_ctx); if (subreq == NULL) { TALLOC_FREE(mds_es_ctx); return false; } tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx); return true; } static void mds_es_connected(struct tevent_req *subreq) { struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data( subreq, struct mds_es_ctx); int ret; bool ok; ret = mds_es_connect_recv(subreq); TALLOC_FREE(subreq); if (ret != 0) { DBG_ERR("HTTP connect failed\n"); return; } ok = mds_es_next_search_trigger(mds_es_ctx); if (!ok) { DBG_ERR("mds_es_next_search_trigger failed\n"); } return; } struct mds_es_connect_state { struct tevent_context *ev; struct mds_es_ctx *mds_es_ctx; struct tevent_queue_entry *qe; const char *server_addr; uint16_t server_port; struct tstream_tls_params *tls_params; }; static void mds_es_http_connect_done(struct tevent_req *subreq); static void mds_es_http_waited(struct tevent_req *subreq); static struct tevent_req *mds_es_connect_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct mds_es_ctx *mds_es_ctx) { struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct mds_es_connect_state *state = NULL; bool use_tls; NTSTATUS status; req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state); if (req == NULL) { return NULL; } *state = (struct mds_es_connect_state) { .ev = ev, .mds_es_ctx = mds_es_ctx, }; state->server_addr = lp_parm_talloc_string( state, mds_es_ctx->mds_ctx->snum, "elasticsearch", "address", "localhost"); state->server_port = lp_parm_int( mds_es_ctx->mds_ctx->snum, "elasticsearch", "port", 9200); use_tls = lp_parm_bool( mds_es_ctx->mds_ctx->snum, "elasticsearch", "use tls", false); DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n", use_tls ? "S" : "", state->server_addr, state->server_port); if (use_tls) { const char *ca_file = lp__tls_cafile(); const char *crl_file = lp__tls_crlfile(); const char *tls_priority = lp_tls_priority(); enum tls_verify_peer_state verify_peer = lp_tls_verify_peer(); status = tstream_tls_params_client(state, ca_file, crl_file, tls_priority, verify_peer, state->server_addr, &state->tls_params); if (!NT_STATUS_IS_OK(status)) { DBG_ERR("Failed tstream_tls_params_client - %s\n", nt_errstr(status)); tevent_req_nterror(req, status); return tevent_req_post(req, ev); } } subreq = http_connect_send(state, state->ev, state->server_addr, state->server_port, mds_es_ctx->mdssvc_es_ctx->creds, state->tls_params); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, mds_es_http_connect_done, req); return req; } static void mds_es_http_connect_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct mds_es_connect_state *state = tevent_req_data( req, struct mds_es_connect_state); int error; error = http_connect_recv(subreq, state->mds_es_ctx, &state->mds_es_ctx->http_conn); TALLOC_FREE(subreq); if (error != 0) { DBG_ERR("HTTP connect failed, retrying...\n"); subreq = tevent_wakeup_send( state->mds_es_ctx, state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, tevent_timeval_current_ofs(10, 0)); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, mds_es_http_waited, req); return; } DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n", state->tls_params ? "S" : "", state->server_addr, state->server_port); tevent_req_done(req); return; } static void mds_es_http_waited(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct mds_es_connect_state *state = tevent_req_data( req, struct mds_es_connect_state); bool ok; ok = tevent_wakeup_recv(subreq); TALLOC_FREE(subreq); if (!ok) { tevent_req_error(req, ETIMEDOUT); return; } subreq = mds_es_connect_send( state->mds_es_ctx, state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, state->mds_es_ctx); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx); } static int mds_es_connect_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } static void mds_es_reconnect_on_error(struct sl_es_search *s) { struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx; struct tevent_req *subreq = NULL; if (s->slq != NULL) { s->slq->state = SLQ_STATE_ERROR; } DBG_WARNING("Reconnecting HTTP...\n"); TALLOC_FREE(mds_es_ctx->http_conn); subreq = mds_es_connect_send( mds_es_ctx, mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, mds_es_ctx); if (subreq == NULL) { DBG_ERR("mds_es_connect_send failed\n"); return; } tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx); } static int search_destructor(struct sl_es_search *s) { DLIST_REMOVE(s->mds_es_ctx->searches, s); return 0; } static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct sl_es_search *s); static int mds_es_search_recv(struct tevent_req *req); static void mds_es_search_done(struct tevent_req *subreq); static bool mds_es_search(struct sl_query *slq) { struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort( slq->mds_ctx->backend_private, struct mds_es_ctx); struct sl_es_search *s = NULL; bool ok; s = talloc_zero(slq, struct sl_es_search); if (s == NULL) { return false; } *s = (struct sl_es_search) { .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx, .mds_es_ctx = mds_es_ctx, .slq = slq, .size = MAX_SL_RESULTS, }; /* 0 would mean no limit */ s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum, "elasticsearch", "max results", MAX_SL_RESULTS); DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string); ok = map_spotlight_to_es_query( s, mds_es_ctx->mdssvc_es_ctx->mappings, slq->path_scope, slq->query_string, &s->es_query); if (!ok) { TALLOC_FREE(s); return false; } DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query); slq->backend_private = s; slq->state = SLQ_STATE_RUNNING; DLIST_ADD_END(mds_es_ctx->searches, s); talloc_set_destructor(s, search_destructor); return mds_es_next_search_trigger(mds_es_ctx); } static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx) { struct tevent_req *subreq = NULL; struct sl_es_search *s = mds_es_ctx->searches; if (mds_es_ctx->http_conn == NULL) { DBG_DEBUG("Waiting for HTTP connection...\n"); return true; } if (s == NULL) { DBG_DEBUG("No pending searches, idling...\n"); return true; } if (s->pending) { DBG_DEBUG("Search pending [%p]\n", s); return true; } subreq = mds_es_search_send(s, s->ev, s); if (subreq == NULL) { return false; } tevent_req_set_callback(subreq, mds_es_search_done, s); return true; } static void mds_es_search_done(struct tevent_req *subreq) { struct sl_es_search *s = tevent_req_callback_data( subreq, struct sl_es_search); struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx; struct sl_query *slq = s->slq; int ret; bool ok; DBG_DEBUG("Search done for search [%p]\n", s); DLIST_REMOVE(mds_es_ctx->searches, s); ret = mds_es_search_recv(subreq); TALLOC_FREE(subreq); if (ret != 0) { mds_es_reconnect_on_error(s); return; } if (slq == NULL) { /* * Closed by the user. This is the only place where we free "s" * explicitly because the talloc parent slq is already gone. * Everywhere else we rely on the destructor of slq to free s"." */ TALLOC_FREE(s); goto trigger; } SLQ_DEBUG(10, slq, "search done"); if (s->total == 0 || s->from >= s->max) { slq->state = SLQ_STATE_DONE; goto trigger; } if (slq->query_results->num_results >= MAX_SL_RESULTS) { slq->state = SLQ_STATE_FULL; goto trigger; } /* * Reschedule this query as there are more results waiting in the * Elasticsearch server and the client result queue has room as * well. But put it at the end of the list of active queries as a simple * heuristic that should ensure all client queries are dispatched to the * server. */ DLIST_ADD_END(mds_es_ctx->searches, s); trigger: ok = mds_es_next_search_trigger(mds_es_ctx); if (!ok) { DBG_ERR("mds_es_next_search_trigger failed\n"); } } static void mds_es_search_http_send_done(struct tevent_req *subreq); static void mds_es_search_http_read_done(struct tevent_req *subreq); struct mds_es_search_state { struct tevent_context *ev; struct sl_es_search *s; struct tevent_queue_entry *qe; struct http_request http_request; struct http_request *http_response; }; static int mds_es_search_pending_destructor(struct sl_es_search *s) { /* * s is a child of slq which may get freed when a user closes a * query. To maintain the HTTP request/response sequence on the HTTP * channel, we keep processing pending requests and free s when we * receive the HTTP response for pending requests. */ DBG_DEBUG("Preserving pending search [%p]\n", s); s->slq = NULL; return -1; } static void mds_es_search_set_pending(struct sl_es_search *s) { DBG_DEBUG("Set pending [%p]\n", s); SLQ_DEBUG(10, s->slq, "pending"); s->pending = true; talloc_set_destructor(s, mds_es_search_pending_destructor); } static void mds_es_search_unset_pending(struct sl_es_search *s) { DBG_DEBUG("Unset pending [%p]\n", s); if (s->slq != NULL) { SLQ_DEBUG(10, s->slq, "unset pending"); } s->pending = false; talloc_set_destructor(s, NULL); } static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct sl_es_search *s) { struct tevent_req *req = NULL; struct tevent_req *subreq = NULL; struct mds_es_search_state *state = NULL; const char *index = NULL; char *elastic_query = NULL; char *uri = NULL; size_t elastic_query_len; char *elastic_query_len_str = NULL; char *hostname = NULL; bool pretty = false; req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state); if (req == NULL) { return NULL; } *state = (struct mds_es_search_state) { .ev = ev, .s = s, }; if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) { return tevent_req_post(req, s->ev); } index = lp_parm_const_string(s->slq->mds_ctx->snum, "elasticsearch", "index", "_all"); if (tevent_req_nomem(index, req)) { return tevent_req_post(req, ev); } if (DEBUGLVL(10)) { pretty = true; } uri = talloc_asprintf(state, "/%s/_search%s", index, pretty ? "?pretty" : ""); if (tevent_req_nomem(uri, req)) { return tevent_req_post(req, ev); } elastic_query = talloc_asprintf(state, MDSSVC_ELASTIC_QUERY_TEMPLATE, s->from, s->size, MDSSVC_ELASTIC_SOURCES, s->es_query); if (tevent_req_nomem(elastic_query, req)) { return tevent_req_post(req, ev); } DBG_DEBUG("Elastic query: '%s'\n", elastic_query); elastic_query_len = strlen(elastic_query); state->http_request = (struct http_request) { .type = HTTP_REQ_POST, .uri = uri, .body = data_blob_const(elastic_query, elastic_query_len), .major = '1', .minor = '1', }; elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len); if (tevent_req_nomem(elastic_query_len_str, req)) { return tevent_req_post(req, ev); } hostname = get_myname(state); if (tevent_req_nomem(hostname, req)) { return tevent_req_post(req, ev); } http_add_header(state, &state->http_request.headers, "Content-Type", "application/json"); http_add_header(state, &state->http_request.headers, "Accept", "application/json"); http_add_header(state, &state->http_request.headers, "User-Agent", "Samba/mdssvc"); http_add_header(state, &state->http_request.headers, "Host", hostname); http_add_header(state, &state->http_request.headers, "Content-Length", elastic_query_len_str); subreq = http_send_request_send(state, ev, s->mds_es_ctx->http_conn, &state->http_request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } mds_es_search_set_pending(s); tevent_req_set_callback(subreq, mds_es_search_http_send_done, req); return req; } static void mds_es_search_http_send_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct mds_es_search_state *state = tevent_req_data( req, struct mds_es_search_state); NTSTATUS status; DBG_DEBUG("Sent out search [%p]\n", state->s); status = http_send_request_recv(subreq); TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { tevent_req_error(req, map_errno_from_nt_status(status)); return; } if (state->s->mds_es_ctx->mds_ctx == NULL) { mds_es_search_unset_pending(state->s); tevent_req_error(req, ECANCELED); return; } subreq = http_read_response_send(state, state->ev, state->s->mds_es_ctx->http_conn, MAX_SL_RESULTS * 8192); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, mds_es_search_http_read_done, req); } static void mds_es_search_http_read_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct mds_es_search_state *state = tevent_req_data( req, struct mds_es_search_state); struct sl_es_search *s = state->s; struct sl_query *slq = s->slq; json_t *root = NULL; json_t *matches = NULL; json_t *match = NULL; size_t i; json_error_t error; int hits; NTSTATUS status; int ret; bool ok; DBG_DEBUG("Got response for search [%p]\n", s); mds_es_search_unset_pending(s); status = http_read_response_recv(subreq, state, &state->http_response); TALLOC_FREE(subreq); if (!NT_STATUS_IS_OK(status)) { DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status)); tevent_req_error(req, map_errno_from_nt_status(status)); return; } if (slq == NULL) { tevent_req_done(req); return; } if (s->mds_es_ctx->mds_ctx == NULL) { tevent_req_error(req, ECANCELED); return; } switch (state->http_response->response_code) { case 200: break; default: DBG_ERR("HTTP server response: %u\n", state->http_response->response_code); goto fail; } DBG_DEBUG("JSON response:\n%s\n", talloc_strndup(talloc_tos(), (char *)state->http_response->body.data, state->http_response->body.length)); root = json_loadb((char *)state->http_response->body.data, state->http_response->body.length, 0, &error); if (root == NULL) { DBG_ERR("json_loadb failed\n"); goto fail; } if (s->total == 0) { /* * Get the total number of results the first time, format * used by Elasticsearch 7.0 or newer */ ret = json_unpack(root, "{s: {s: {s: i}}}", "hits", "total", "value", &s->total); if (ret != 0) { /* Format used before 7.0 */ ret = json_unpack(root, "{s: {s: i}}", "hits", "total", &s->total); if (ret != 0) { DBG_ERR("json_unpack failed\n"); goto fail; } } DBG_DEBUG("Total: %zu\n", s->total); if (s->total == 0) { json_decref(root); tevent_req_done(req); return; } } if (s->max == 0 || s->max > s->total) { s->max = s->total; } ret = json_unpack(root, "{s: {s:o}}", "hits", "hits", &matches); if (ret != 0 || matches == NULL) { DBG_ERR("json_unpack hits failed\n"); goto fail; } hits = json_array_size(matches); if (hits == 0) { DBG_ERR("Hu?! No results?\n"); goto fail; } DBG_DEBUG("Hits: %d\n", hits); for (i = 0; i < hits; i++) { const char *path = NULL; match = json_array_get(matches, i); if (match == NULL) { DBG_ERR("Hu?! No value for index %zu\n", i); goto fail; } ret = json_unpack(match, "{s: {s: {s: s}}}", "_source", "path", "real", &path); if (ret != 0) { DBG_ERR("Missing path.real in JSON result\n"); goto fail; } ok = mds_add_result(slq, path); if (!ok) { DBG_ERR("error adding result for path: %s\n", path); goto fail; } } json_decref(root); s->from += hits; slq->state = SLQ_STATE_RESULTS; tevent_req_done(req); return; fail: if (root != NULL) { json_decref(root); } slq->state = SLQ_STATE_ERROR; tevent_req_error(req, EINVAL); return; } static int mds_es_search_recv(struct tevent_req *req) { return tevent_req_simple_recv_unix(req); } static bool mds_es_search_cont(struct sl_query *slq) { struct sl_es_search *s = talloc_get_type_abort( slq->backend_private, struct sl_es_search); SLQ_DEBUG(10, slq, "continue"); DLIST_ADD_END(s->mds_es_ctx->searches, s); return mds_es_next_search_trigger(s->mds_es_ctx); } struct mdssvc_backend mdsscv_backend_es = { .init = mdssvc_es_init, .shutdown = mdssvc_es_shutdown, .connect = mds_es_connect, .search_start = mds_es_search, .search_cont = mds_es_search_cont, };