Skip to content

Commit

Permalink
Merge pull request #1 from nicelulu/update_22
Browse files Browse the repository at this point in the history
Update 22   to click_update
  • Loading branch information
nicelulu authored Nov 20, 2018
2 parents f96111b + 7457913 commit b8edb95
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 105 deletions.
2 changes: 1 addition & 1 deletion contrib/poco
Submodule poco updated from d7a438 to 566162
1 change: 0 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ list (APPEND dbms_sources
src/AggregateFunctions/AggregateFunctionFactory.cpp
src/AggregateFunctions/AggregateFunctionCombinatorFactory.cpp
src/AggregateFunctions/AggregateFunctionState.cpp
src/AggregateFunctions/FactoryHelpers.cpp
src/AggregateFunctions/parseAggregateFunctionParameters.cpp)

list (APPEND dbms_headers
Expand Down
48 changes: 21 additions & 27 deletions dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>

#include <AggregateFunctions/AggregateFunctionQuantile.h>

#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Helpers.h>


namespace DB
Expand Down Expand Up @@ -52,13 +48,11 @@ static constexpr bool SupportDecimal()
}


template <template <typename> class Function, bool have_second_arg>
template <template <typename> class Function>
AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, const DataTypes & argument_types, const Array & params)
{
if constexpr (have_second_arg)
assertBinary(name, argument_types);
else
assertUnary(name, argument_types);
/// Second argument type check doesn't depend on the type of the first one.
Function<void>::assertSecondArg(argument_types);

const DataTypePtr & argument_type = argument_types[0];
WhichDataType which(argument_type);
Expand Down Expand Up @@ -86,29 +80,29 @@ AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, c

void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
{
factory.registerFunction(NameQuantile::name, createAggregateFunctionQuantile<FuncQuantile, false>);
factory.registerFunction(NameQuantiles::name, createAggregateFunctionQuantile<FuncQuantiles, false>);
factory.registerFunction(NameQuantile::name, createAggregateFunctionQuantile<FuncQuantile>);
factory.registerFunction(NameQuantiles::name, createAggregateFunctionQuantile<FuncQuantiles>);

factory.registerFunction(NameQuantileDeterministic::name, createAggregateFunctionQuantile<FuncQuantileDeterministic, true>);
factory.registerFunction(NameQuantilesDeterministic::name, createAggregateFunctionQuantile<FuncQuantilesDeterministic, true>);
factory.registerFunction(NameQuantileDeterministic::name, createAggregateFunctionQuantile<FuncQuantileDeterministic>);
factory.registerFunction(NameQuantilesDeterministic::name, createAggregateFunctionQuantile<FuncQuantilesDeterministic>);

factory.registerFunction(NameQuantileExact::name, createAggregateFunctionQuantile<FuncQuantileExact, false>);
factory.registerFunction(NameQuantilesExact::name, createAggregateFunctionQuantile<FuncQuantilesExact, false>);
factory.registerFunction(NameQuantileExact::name, createAggregateFunctionQuantile<FuncQuantileExact>);
factory.registerFunction(NameQuantilesExact::name, createAggregateFunctionQuantile<FuncQuantilesExact>);

factory.registerFunction(NameQuantileExactWeighted::name, createAggregateFunctionQuantile<FuncQuantileExactWeighted, true>);
factory.registerFunction(NameQuantilesExactWeighted::name, createAggregateFunctionQuantile<FuncQuantilesExactWeighted, true>);
factory.registerFunction(NameQuantileExactWeighted::name, createAggregateFunctionQuantile<FuncQuantileExactWeighted>);
factory.registerFunction(NameQuantilesExactWeighted::name, createAggregateFunctionQuantile<FuncQuantilesExactWeighted>);

factory.registerFunction(NameQuantileTiming::name, createAggregateFunctionQuantile<FuncQuantileTiming, false>);
factory.registerFunction(NameQuantilesTiming::name, createAggregateFunctionQuantile<FuncQuantilesTiming, false>);
factory.registerFunction(NameQuantileTiming::name, createAggregateFunctionQuantile<FuncQuantileTiming>);
factory.registerFunction(NameQuantilesTiming::name, createAggregateFunctionQuantile<FuncQuantilesTiming>);

factory.registerFunction(NameQuantileTimingWeighted::name, createAggregateFunctionQuantile<FuncQuantileTimingWeighted, true>);
factory.registerFunction(NameQuantilesTimingWeighted::name, createAggregateFunctionQuantile<FuncQuantilesTimingWeighted, true>);
factory.registerFunction(NameQuantileTimingWeighted::name, createAggregateFunctionQuantile<FuncQuantileTimingWeighted>);
factory.registerFunction(NameQuantilesTimingWeighted::name, createAggregateFunctionQuantile<FuncQuantilesTimingWeighted>);

factory.registerFunction(NameQuantileTDigest::name, createAggregateFunctionQuantile<FuncQuantileTDigest, false>);
factory.registerFunction(NameQuantilesTDigest::name, createAggregateFunctionQuantile<FuncQuantilesTDigest, false>);
factory.registerFunction(NameQuantileTDigest::name, createAggregateFunctionQuantile<FuncQuantileTDigest>);
factory.registerFunction(NameQuantilesTDigest::name, createAggregateFunctionQuantile<FuncQuantilesTDigest>);

factory.registerFunction(NameQuantileTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantileTDigestWeighted, true>);
factory.registerFunction(NameQuantilesTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantilesTDigestWeighted, true>);
factory.registerFunction(NameQuantileTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantileTDigestWeighted>);
factory.registerFunction(NameQuantilesTDigestWeighted::name, createAggregateFunctionQuantile<FuncQuantilesTDigestWeighted>);

/// 'median' is an alias for 'quantile'
factory.registerAlias("median", NameQuantile::name);
Expand Down
60 changes: 33 additions & 27 deletions dbms/src/AggregateFunctions/AggregateFunctionQuantile.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
#pragma once

#include <type_traits>
#include <AggregateFunctions/FactoryHelpers.h>

#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
/// These must be exposed in header for the purpose of dynamic compilation.
#include <AggregateFunctions/QuantileReservoirSampler.h>
#include <AggregateFunctions/QuantileReservoirSamplerDeterministic.h>
#include <AggregateFunctions/QuantileExact.h>
#include <AggregateFunctions/QuantileExactWeighted.h>
#include <AggregateFunctions/QuantileTiming.h>
#include <AggregateFunctions/QuantileTDigest.h>

#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/QuantilesCommon.h>

#include <Columns/ColumnArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>

#include <type_traits>


namespace DB
Expand All @@ -36,10 +45,10 @@ template <
typename Data,
/// Structure with static member "name", containing the name of the aggregate function.
typename Name,
/// If true, the function accept second argument
/// If true, the function accepts the second argument
/// (in can be "weight" to calculate quantiles or "determinator" that is used instead of PRNG).
/// Second argument is always obtained through 'getUInt' method.
bool have_second_arg,
bool has_second_arg,
/// If non-void, the function will return float of specified type with possibly interpolated results and NaN if there was no values.
/// Otherwise it will return Value type and default value if there was no values.
/// As an example, the function cannot return floats, if the SQL type of argument is Date or DateTime.
Expand All @@ -49,12 +58,14 @@ template <
bool returns_many
>
class AggregateFunctionQuantile final : public IAggregateFunctionDataHelper<Data,
AggregateFunctionQuantile<Value, Data, Name, have_second_arg, FloatReturnType, returns_many>>
AggregateFunctionQuantile<Value, Data, Name, has_second_arg, FloatReturnType, returns_many>>
{
private:
using ColVecType = std::conditional_t<IsDecimalNumber<Value>, ColumnDecimal<Value>, ColumnVector<Value>>;

static constexpr bool returns_float = !std::is_same_v<FloatReturnType, void>;
static constexpr bool returns_float = !(std::is_same_v<FloatReturnType, void>)
&& (!(std::is_same_v<Value, DataTypeDate::FieldType> || std::is_same_v<Value, DataTypeDateTime::FieldType>)
|| std::is_same_v<Data, QuantileTiming<Value>>);
static_assert(!IsDecimalNumber<Value> || !returns_float);

QuantileLevels<Float64> levels;
Expand Down Expand Up @@ -92,7 +103,7 @@ class AggregateFunctionQuantile final : public IAggregateFunctionDataHelper<Data
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
const auto & column = static_cast<const ColVecType &>(*columns[0]);
if constexpr (have_second_arg)
if constexpr (has_second_arg)
this->data(place).add(
column.getData()[row_num],
columns[1]->getUInt(row_num));
Expand Down Expand Up @@ -159,21 +170,16 @@ class AggregateFunctionQuantile final : public IAggregateFunctionDataHelper<Data
}

const char * getHeaderFilePath() const override { return __FILE__; }
};

}


/// These must be exposed in header for the purpose of dynamic compilation.
#include <AggregateFunctions/QuantileReservoirSampler.h>
#include <AggregateFunctions/QuantileReservoirSamplerDeterministic.h>
#include <AggregateFunctions/QuantileExact.h>
#include <AggregateFunctions/QuantileExactWeighted.h>
#include <AggregateFunctions/QuantileTiming.h>
#include <AggregateFunctions/QuantileTDigest.h>

namespace DB
{
static void assertSecondArg(const DataTypes & argument_types)
{
if constexpr (has_second_arg)
/// TODO: check that second argument is of numerical type.
assertBinary(Name::name, argument_types);
else
assertUnary(Name::name, argument_types);
}
};

struct NameQuantile { static constexpr auto name = "quantile"; };
struct NameQuantiles { static constexpr auto name = "quantiles"; };
Expand Down
31 changes: 0 additions & 31 deletions dbms/src/AggregateFunctions/FactoryHelpers.cpp

This file was deleted.

26 changes: 23 additions & 3 deletions dbms/src/AggregateFunctions/FactoryHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,28 @@
namespace DB
{

void assertNoParameters(const std::string & name, const Array & parameters);
void assertUnary(const std::string & name, const DataTypes & argument_types);
void assertBinary(const std::string & name, const DataTypes & argument_types);
namespace ErrorCodes
{
extern const int AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

inline void assertNoParameters(const std::string & name, const Array & parameters)
{
if (!parameters.empty())
throw Exception("Aggregate function " + name + " cannot have parameters", ErrorCodes::AGGREGATE_FUNCTION_DOESNT_ALLOW_PARAMETERS);
}

inline void assertUnary(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() != 1)
throw Exception("Aggregate function " + name + " require single argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}

inline void assertBinary(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() != 2)
throw Exception("Aggregate function " + name + " require two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}

}
18 changes: 12 additions & 6 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void DataTypeString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr,
ColumnString::Chars_t & data = column_string.getChars();
ColumnString::Offsets & offsets = column_string.getOffsets();

double avg_chars_size;
double avg_chars_size = 1; /// By default reserve only for empty strings.

if (avg_value_size_hint && avg_value_size_hint > sizeof(offsets[0]))
{
Expand All @@ -186,13 +186,19 @@ void DataTypeString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr,

avg_chars_size = (avg_value_size_hint - sizeof(offsets[0])) * avg_value_size_hint_reserve_multiplier;
}
else

try
{
/// By default reserve only for empty strings.
avg_chars_size = 1;
data.reserve(data.size() + std::ceil(limit * avg_chars_size));
}
catch (Exception & e)
{
e.addMessage(
"avg_value_size_hint = " + toString(avg_value_size_hint)
+ ", avg_chars_size = " + toString(avg_chars_size)
+ ", limit = " + toString(limit));
throw;
}

data.reserve(data.size() + std::ceil(limit * avg_chars_size));

offsets.reserve(offsets.size() + limit);

Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,11 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason)
for (const HostID & host : task->entry.hosts)
{
auto maybe_secure_port = context.getTCPPortSecure();
bool is_local_port = maybe_secure_port ? host.isLocalAddress(*maybe_secure_port) : host.isLocalAddress(context.getTCPPort());

/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port))
|| host.isLocalAddress(context.getTCPPort());

if (!is_local_port)
continue;

Expand Down
12 changes: 5 additions & 7 deletions dbms/src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3886,14 +3886,12 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
return;
}

const Cluster::Address & StorageReplicatedMergeTree::findClusterAddress(ReplicatedMergeTreeAddress & leader_address) const
{
const auto & clusters = context.getClusters();
const auto & clusterPtrs = clusters.getContainer();

for(auto iter = clusterPtrs.begin(); iter != clusterPtrs.end(); ++iter)
const Cluster::Address & StorageReplicatedMergeTree::findClusterAddress(ReplicatedMergeTreeAddress & leader_address)
{
for(auto & iter : context.getClusters().getContainer())
{
const auto & shards = iter->second->getShardsAddresses();
const auto & shards = iter.second->getShardsAddresses();

for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
Expand All @@ -3908,7 +3906,7 @@ const Cluster::Address & StorageReplicatedMergeTree::findClusterAddress(Replicat
}
}
}
throw Exception("Not found host " + leader_address.host, ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception("Not found replicate leader host " + leader_address.host + ":" + leader_address.queries_port, ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}

void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica_name_)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <Core/BackgroundSchedulePool.h>
#include <Interpreters/Cluster.h>


namespace DB
{

Expand Down Expand Up @@ -512,6 +511,8 @@ class StorageReplicatedMergeTree : public ext::shared_ptr_helper<StorageReplicat

bool dropPartsInPartition(zkutil::ZooKeeper & zookeeper, String & partition_id,
StorageReplicatedMergeTree::LogEntry & entry, bool detach);

const Cluster::Address & findClusterAddress(ReplicatedMergeTreeAddress & leader_address);

const Cluster::Address & findClusterAddress(ReplicatedMergeTreeAddress & leader_address) const;

Expand Down
16 changes: 16 additions & 0 deletions dbms/tests/queries/0_stateless/00753_quantile_format.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
2016-06-15 23:00:00
['2016-06-15 23:00:00']
2016-06-15 23:00:00
['2016-06-15 23:00:00']
2016-06-15 23:00:00
['2016-06-15 23:00:00']
2016-06-15 23:00:00
['2016-06-15 23:00:00']
30000
[30000]
30000
[30000]
2016-06-15 23:01:04
['2016-06-15 23:01:04']
2016-06-15 23:01:04
['2016-06-15 23:01:04']
26 changes: 26 additions & 0 deletions dbms/tests/queries/0_stateless/00753_quantile_format.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
CREATE TABLE test.datetime (d DateTime) ENGINE = Memory;
INSERT INTO test.datetime(d) VALUES(toDateTime('2016-06-15 23:00:00'));

SELECT quantile(0.2)(d) FROM test.datetime;
SELECT quantiles(0.2)(d) FROM test.datetime;

SELECT quantileDeterministic(0.2)(d, 1) FROM test.datetime;
SELECT quantilesDeterministic(0.2)(d, 1) FROM test.datetime;

SELECT quantileExact(0.2)(d) FROM test.datetime;
SELECT quantilesExact(0.2)(d) FROM test.datetime;

SELECT quantileExactWeighted(0.2)(d, 1) FROM test.datetime;
SELECT quantilesExactWeighted(0.2)(d, 1) FROM test.datetime;

SELECT quantileTiming(0.2)(d) FROM test.datetime;
SELECT quantilesTiming(0.2)(d) FROM test.datetime;

SELECT quantileTimingWeighted(0.2)(d, 1) FROM test.datetime;
SELECT quantilesTimingWeighted(0.2)(d, 1) FROM test.datetime;

SELECT quantileTDigest(0.2)(d) FROM test.datetime;
SELECT quantilesTDigest(0.2)(d) FROM test.datetime;

SELECT quantileTDigestWeighted(0.2)(d, 1) FROM test.datetime;
SELECT quantilesTDigestWeighted(0.2)(d, 1) FROM test.datetime;

0 comments on commit b8edb95

Please sign in to comment.